如何监控 Node.js 线程的 CPU 负载?

虽然 Node.js 本身是单线程应用,但是也支持创建额外的线程。在一个单进程多线程的应用中,观测线程的 CP…

虽然 Node.js 本身是单线程应用,但是也支持创建额外的线程。在一个单进程多线程的应用中,观测线程的 CPU 负载是非常有意义且必要的,因为通过进程 CPU 负载我们看到的只是进程内所有线程的 CPU 负载之和,但是无法知道每个线程的负载,这样在 CPU 负载高时,我们就无法知道是哪个线程导致的。为了更好地了解各个线程的 CPU 负载,需要提供线程级别的 CPU 负载数据。目前,Libuv 已经支持该能力,在比较新的 Node.js 版本中也引入了该能力,本文介绍线程 CPU 负载获取的相关内容。

在做 Node.js APM 时,我们已经通过 Addon + getrusage 获取了线程 CPU 负载,其原理很简单,getrusage 本身是支持获取调用线程的 CPU 负载的,只不过之前因为平台兼容性问题,Libuv 没有支持该能力,现在 Libuv 兼容了更多平台后,也是使用了类似的方式实现的。但是 Addon 一来比较麻烦,二来需要把代码注入到目的线程,因为在目的线程调用上面的函数才能获取该线程的 CPU 负载,相对来说有一定的成本。

现在 Node.js 原生支持该能力后,首先解决了 Addon 的问题,我们只需要在目的线程调用 process.threadCpuUsage() 就能获得当前线程的 CPU 负载,但是问题二还是没解决,还是需要进行代码注入,为了解决这个问题,我最近提交了一个 PR,支持在主线程中获取子线程的 CPU 负载,大致的用法如下。复制

const worker = new Worker(...);
await worker.cpuUsage();

这样我们就可以通过 process 的 worker 事件获取每个 worker(或者通过 diagnostics_channel),从而获取 worker 的 CPU 负载,不需要在每个线程里注入代码。实现如下。复制

const { Worker } = require('worker_threads');

process.on('worker', (worker) => {
  setInterval(async () => {
   const data = await worker.cpuUsage();
   console.log(data);
  }, 1000);
});

new Worker("setInterval(() => {}, 10000)", { eval:true });

上面代码就可以统一获取所有线程的 CPU 负载,实现简单并且逻辑解耦。

最后介绍下实现细节。复制

cpuUsage() {
  const taker = this[kHandle]?.cpuUsage();
  return new Promise((resolve, reject) => {
    if (!taker) return reject(new ERR_WORKER_NOT_RUNNING());
    taker.ondone = (err, current) => {
      if (err !== null) {
        return reject(err);
      }
      resolve({
        user: current.user,
        system: current.system,
      });
    };
  });
}

因为操作是在目的线程完成的,所以实现上采用的是异步方式,同步会阻塞调用 cpuUsage 的线程,完全没有必要。cpuUsage 依赖 C++ 层的实现。复制

void Worker::CpuUsage(const FunctionCallbackInfo<Value>& args) {
  Worker* w;
  ASSIGN_OR_RETURN_UNWRAP(&w, args.This());

  Environment* env = w->env();
  AsyncHooks::DefaultTriggerAsyncIdScope trigger_id_scope(w);
  Local<Object> wrap;
  if (!env->worker_cpu_usage_taker_template()
           ->NewInstance(env->context())
           .ToLocal(&wrap)) {
    return;
  }

  BaseObjectPtr<WorkerCpuUsageTaker> taker =
      MakeDetachedBaseObject<WorkerCpuUsageTaker>(env, wrap);
  // 给子线程提交一个任务
  bool scheduled = w->RequestInterrupt([taker = std::move(taker),
                                        env](Environment* worker_env) mutable {
    auto cpu_usage_stats = std::make_unique<uv_rusage_t>();
    // 在子线程执行 uv_getrusage_thread 获取其 CPU 负载
    int err = uv_getrusage_thread(cpu_usage_stats.get());
    // 获取完毕,给调用线程提交一个任务
    env->SetImmediateThreadsafe(
        [taker = std::move(taker),
         cpu_usage_stats = std::move(cpu_usage_stats),
         err = err](Environment* env) mutable {
          
          Local<Value> argv[] = {
              Null(isolate),
              Undefined(isolate),
          };

          if (err) {
            argv[0] = UVException(
                isolate, err, "uv_getrusage_thread", nullptr, nullptr, nullptr);
          } else {
            Local<Name> names[] = {
                FIXED_ONE_BYTE_STRING(isolate, "user"),
                FIXED_ONE_BYTE_STRING(isolate, "system"),
            };
            Local<Value> values[] = {
                Number::New(isolate,
                            1e6 * cpu_usage_stats->ru_utime.tv_sec +
                                cpu_usage_stats->ru_utime.tv_usec),
                Number::New(isolate,
                            1e6 * cpu_usage_stats->ru_stime.tv_sec +
                                cpu_usage_stats->ru_stime.tv_usec),
            };
            argv[1] = Object::New(
                isolate, Null(isolate), names, values, arraysize(names));
          }
          // 调用者线程执行 JS 回调,即 JS 的 ondone
          taker->MakeCallback(env->ondone_string(), arraysize(argv), argv);
        },
        CallbackFlags::kUnrefed);
  });

  if (scheduled) {
    args.GetReturnValue().Set(wrap);
  }
}

C++ 的实现有一点复杂,主要是因为涉及到多线程之前的操作,有兴趣的同学可以参考

关于作者: hqwt

为您推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注