虽然 Node.js 本身是单线程应用,但是也支持创建额外的线程。在一个单进程多线程的应用中,观测线程的 CPU 负载是非常有意义且必要的,因为通过进程 CPU 负载我们看到的只是进程内所有线程的 CPU 负载之和,但是无法知道每个线程的负载,这样在 CPU 负载高时,我们就无法知道是哪个线程导致的。为了更好地了解各个线程的 CPU 负载,需要提供线程级别的 CPU 负载数据。目前,Libuv 已经支持该能力,在比较新的 Node.js 版本中也引入了该能力,本文介绍线程 CPU 负载获取的相关内容。
在做 Node.js APM 时,我们已经通过 Addon + getrusage 获取了线程 CPU 负载,其原理很简单,getrusage 本身是支持获取调用线程的 CPU 负载的,只不过之前因为平台兼容性问题,Libuv 没有支持该能力,现在 Libuv 兼容了更多平台后,也是使用了类似的方式实现的。但是 Addon 一来比较麻烦,二来需要把代码注入到目的线程,因为在目的线程调用上面的函数才能获取该线程的 CPU 负载,相对来说有一定的成本。
现在 Node.js 原生支持该能力后,首先解决了 Addon 的问题,我们只需要在目的线程调用 process.threadCpuUsage() 就能获得当前线程的 CPU 负载,但是问题二还是没解决,还是需要进行代码注入,为了解决这个问题,我最近提交了一个 PR,支持在主线程中获取子线程的 CPU 负载,大致的用法如下。复制
const worker = new Worker(...);
await worker.cpuUsage();
这样我们就可以通过 process 的 worker 事件获取每个 worker(或者通过 diagnostics_channel),从而获取 worker 的 CPU 负载,不需要在每个线程里注入代码。实现如下。复制
const { Worker } = require('worker_threads');
process.on('worker', (worker) => {
setInterval(async () => {
const data = await worker.cpuUsage();
console.log(data);
}, 1000);
});
new Worker("setInterval(() => {}, 10000)", { eval:true });
上面代码就可以统一获取所有线程的 CPU 负载,实现简单并且逻辑解耦。
最后介绍下实现细节。复制
cpuUsage() {
const taker = this[kHandle]?.cpuUsage();
return new Promise((resolve, reject) => {
if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
taker.ondone = (err, current) => {
if (err !== null) {
return reject(err);
}
resolve({
user: current.user,
system: current.system,
});
};
});
}
因为操作是在目的线程完成的,所以实现上采用的是异步方式,同步会阻塞调用 cpuUsage 的线程,完全没有必要。cpuUsage 依赖 C++ 层的实现。复制
void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
Environment* env = w->env();
AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
Local<Object> wrap;
if (!env->worker_cpu_usage_taker_template()
->NewInstance(env->context())
.ToLocal(&wrap)) {
return;
}
BaseObjectPtr<WorkerCpuUsageTaker> taker =
MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap);
// 给子线程提交一个任务
bool scheduled = w->RequestInterrupt([taker = std::move(taker),
env](Environment* worker_env) mutable {
auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
// 在子线程执行 uv_getrusage_thread 获取其 CPU 负载
int err = uv_getrusage_thread(cpu_usage_stats.get());
// 获取完毕,给调用线程提交一个任务
env->SetImmediateThreadsafe(
[taker = std::move(taker),
cpu_usage_stats = std::move(cpu_usage_stats),
err = err](Environment* env) mutable {
Local<Value> argv[] = {
Null(isolate),
Undefined(isolate),
};
if (err) {
argv[0] = UVException(
isolate, err, "uv_getrusage_thread", nullptr, nullptr, nullptr);
} else {
Local<Name> names[] = {
FIXED_ONE_BYTE_STRING(isolate, "user"),
FIXED_ONE_BYTE_STRING(isolate, "system"),
};
Local<Value> values[] = {
Number::New(isolate,
1e6 * cpu_usage_stats->ru_utime.tv_sec +
cpu_usage_stats->ru_utime.tv_usec),
Number::New(isolate,
1e6 * cpu_usage_stats->ru_stime.tv_sec +
cpu_usage_stats->ru_stime.tv_usec),
};
argv[1] = Object::New(
isolate, Null(isolate), names, values, arraysize(names));
}
// 调用者线程执行 JS 回调,即 JS 的 ondone
taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
},
CallbackFlags::kUnrefed);
});
if (scheduled) {
args.GetReturnValue().Set(wrap);
}
}
C++ 的实现有一点复杂,主要是因为涉及到多线程之前的操作,有兴趣的同学可以参考
微信扫一扫打赏
支付宝扫一扫打赏