undefined

bokuweb.me

Denoを読む(1)

正月にDenoを読んでたメモです。いろいろ間違ってる可能性が高いのでご注意ください。

Denoとは

deno.land

Node.jsの作者Ryan Dahl氏による新しいTypeSciprtのランタイム。Node.jsの反省点を生かして作られてる。 おおきく分けてTypeScript、V8、Rustの三層で構成されていてTypeScriptとRust間はFlatBuffersでやり取りされ、仲介としてC++で書かれたlibdenoが存在する。

参考資料

scrapbox.io

denolib.gitbook.io

yosuke-furukawa.hatenablog.com

読んでいく

前提

実装は日に日に変化しているのでひとまず以下のバージョンについてのメモとする

github.com

Cargo.toml

まずはCargo.tomlを眺めてみる。package.jsonみたいなやつです。dependenciesは以下のような感じ。特段目を引くようなものは見当たらないようにみえる。

[dependencies]
atty = "=0.2.11"
dirs = "=1.0.4"
flatbuffers = "=0.5.0"
futures = "=0.1.25"
getopts = "=0.2.18"
http = "=0.1.14"
hyper = "=0.12.19"
hyper-rustls = "=0.15.0"
kernel32-sys = "=0.2.2"
lazy_static = "=1.2.0"
libc = "=0.2.46"
log = "=0.4.6"
rand = "=0.6.3"
remove_dir_all = "=0.5.1"
ring = "=0.13.5"
rustyline = "=2.1.0"
serde_json = "1.0.34"
source-map-mappings = "0.5.0"
tempfile = "=3.0.5"
tokio = "=0.1.13"
tokio-executor = "=0.1.5"
tokio-fs = "=0.1.4"
tokio-io = "=0.1.10"
tokio-process = "=0.2.3"
tokio-threadpool = "=0.1.9"
url = "=1.7.2"
winapi = "=0.3.6"

Rust側を見てく

エントリポイントはsrc/main.rsぽいのでここから読んでいく。

  • src/main.rs
fn main() {
  // ... ommited ... 基本的にはロガーの設定

  let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None));
  let snapshot = snapshot::deno_snapshot();
  let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch);
  tokio_util::init(|| {
    isolate
      .execute("denoMain();")
      .unwrap_or_else(print_err_and_exit);
    isolate.event_loop().unwrap_or_else(print_err_and_exit);
  });
}

前半はロガーの設定などをぼちぼちやる感じ。

isolate::IsolateStateisolate用のフラグやworker用channelsの保持用ぽい。まずこいつを作る。そもそもisolateは何かというとコンテキストが隔離されたJS実行環境と思えばいいのだろうか。chromeでのタブやworkerをイメージすれば良さそう(多分)。実際、最近入ったworker対応でもやはりworker作成時にisolateを作成している。

github.com

let snapshot = snapshot::deno_snapshot()ではv8のsnapshotを作成している。deno_snapshot()は以下。

  • src/snapshot.rs
pub fn deno_snapshot() -> deno_buf {
  #[cfg(not(feature = "check-only"))]
  let data =
    include_bytes!(concat!(env!("GN_OUT_DIR"), "/gen/snapshot_deno.bin"));
  // ... ommited .../

  unsafe { deno_buf::from_raw_parts(data.as_ptr(), data.len()) }
}

deno_snapshotはこれだけでinclude_bytes!でファイルをごそっと読んでそのポインタと長さを返しているだけの様子。snapshotはなんぞやという話は以下を読むと良さそう。

v8.dev

コンテキスト作成時にV8のヒープにロードするのには時間がかかるので、ロード後のsnapshotを撮っておいてそれを使用することで起動を速くする仕組みっぽい。上の記事でもまさにTypeScriptのコンパイラの話をしている。Denoではtools/build.py実行時にdeno/js配下のファイルがトランスパイルかつV8のヒープロードされた状態でスナップショットにされるぽい。なのでjs/*.tsを変更した場合は再ビルドしないと反映されない。ちなみにnew Date()Math.random()は値が焼き付くようなことが書いてある。

あとはtokioの中でdenoMainを実行して、isolate.event_loop()でタスクがなくなるまで待つことになっているぽい。タスクがなくなったらループを抜けて終了する。

tokio_util::init(|| {
  isolate
    .execute("denoMain();")
    .unwrap_or_else(print_err_and_exit);
  isolate.event_loop().unwrap_or_else(print_err_and_exit);
});

tokioの初期化は以下のようになっている。tokioのチュートリアルもやったがこの辺何をやってるのかまだちゃんとわかってない。宿題。

pub fn init<F>(f: F)
where
  F: FnOnce(),
{
  let rt = tokio::runtime::Runtime::new().unwrap();
  let mut executor = rt.executor();
  let mut enter = tokio_executor::enter().expect("Multiple executors at once");
  tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f());
}

そもそもtokioってなにかというとRustの非同期I/Oライブラリで、イベントループを作ってLinuxであればepoll、BSDであればkqueueを使ってディスクリプタを監視し適宜処理を行うやつでNode.jsでいうところのlibuvの役割を果たしているようにみえる。違ったら指摘いただけると。。。 denoを読み始めたんだけど、結局tokioを学ばなければならないとなって正月はほぼ以下を読んでいた。以下はtokioの学習用の簡易実装でいろいろ勉強になる。ひとまずこれを読めばどんなことをやっているかはわかる。(tokioのおもちゃ実装ということで昔はtoykioという名前だった) fahrenheitでは簡素化とポータビリティのためepollではなくselectを使用している。と書いてある。

github.com

ブログ記事もある。

rust-lang-nursery.github.io

ただ、まだ理解できていないのでもう少し勉強して理解できたら別途まとめたい。

isolate.event_loop()がどうなってるかというと以下のようになっていて、self.is_idle()が真になるまでループを抜けてこない。self.is_idle()は非同期タスクが0かつ設定されたtimeoutがなくなると真となる。なので非同期タスクがない(たとえば、console.log("hello");などを実行した)場合は待ちタスクがないのですぐアイドルと判定されループを抜けて終了する。

  pub fn event_loop(&self) -> Result<(), JSError> {
    while !self.is_idle() {
      match recv_deadline(&self.rx, self.get_timeout_due()) {
        Ok((req_id, buf)) => self.complete_op(req_id, buf),
        Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
        Err(e) => panic!("recv_deadline() failed: {:?}", e),
      }
      // ommited... promise error check
    }
    // ommited... promise error check
    Ok(())
  }

ループ内では、recv_deadline(&self.rx, self.get_timeout_due())で非同期タスク完了のメッセージを待ち続けることになる。

では送信元はどこかというとdeno/src/isolate.rsextern "C" fn pre_dispatchの以下の箇所っぽい。タスクを登録して、その完了時にsender.sendでメッセージを送信している。

let task = op
  .and_then(move |buf| {
    let sender = tx; // tx is moved to new thread
    sender.send((req_id, buf)).expect("tx.send error");
    Ok(())
  }).map_err(|_| ());
  tokio::spawn(task);

extern "C"がついていることからもC++で書かれたlibsenoから叩かれる箇所だと推測できる。追ってみるとIsorate::newlibdeno::configに受信コールバックとして渡されている。

let config = libdeno::deno_config {
  will_snapshot: 0,
  load_snapshot: snapshot,
  shared: libdeno::deno_buf::empty(), // TODO Use for message passing.
  recv_cb: pre_dispatch,
  resolve_cb,
};

let task = op.and_then(...)opは何かというと、以下のようなシグネチャになってる。

pub type Op = Future<Item = Buf, Error = DenoError> + Send;

deno/src/ops.rsdispatchの返り値となっており、dispatchでメッセージのデシリアライズ後matchでファイルの読み書きやフェッチなどの処理に振り分けられる。例えばメッセージの種別がReadFileであれば以下のようにop_read_fileに振り分けられる。

pub fn dispatch(
  isolate: &Isolate,
  control: libdeno::deno_buf,
  data: libdeno::deno_buf,
) -> (bool, Box<Op>) {
  let base = msg::get_root_as_base(&control);
  let is_sync = base.sync();
  let inner_type = base.inner_type();
  let cmd_id = base.cmd_id();
  let op: Box<Op> = if inner_type == msg::Any::SetTimeout {
    // ... ommited ...
  } else {
    // Handle regular ops.
    let op_creator: OpCreator = match inner_type {
      msg::Any::ReadFile => op_read_file,
      // ... 他の実処理に分岐される ...

たとえば一番シンプルな処理っぽいchdirであれば以下のような感じ。該当する処理を行ってBox<Op>を返すという感じ。

fn op_chdir(
  _state: &Arc<IsolateState>,
  base: &msg::Base,
  data: libdeno::deno_buf,
) -> Box<Op> {
  assert_eq!(data.len(), 0);
  let inner = base.inner_as_chdir().unwrap();
  let directory = inner.directory().unwrap();
  Box::new(futures::future::result(|| -> OpResult {
    std::env::set_current_dir(&directory)?;
    Ok(empty_buf())
  }()))

ここでの結果がpre_dispatchis_syncフラグと一緒に戻されて、非同期/同期で処理が分岐される。

例えば同期モードであれば(https://github.com/denoland/deno/blob/6f79ad721a9f8c9d66d79f21ea479286f3ca5374/src/isolate.rs#L416-L425) のようにbloking_onで処理の完了を待ってからレスポンスメッセージが送られる。

let buf = tokio_util::block_on(op).unwrap();
let buf_size = buf.len();

if buf_size == 0 {
  // FIXME
 isolate.state.metrics_op_completed(buf.len());
} else {
  // Set the synchronous response, the value returned from isolate.send().
  isolate.respond(req_id, buf);

非同期の場合は先に記載したように処理の完了を待って完了後、完了が通知される。この通知は先のisolate.event_loop()内で受信されて非同期タスクの完了処理が実行される。完了処理は現在待機中のタスク数のデクリメント(tokio側のAPIを使いたい旨のコメントがあったが、問題があるのか現在は手動で行っている。)とV8側へのレスポンス

let tx = isolate.tx.clone();
isolate.ntasks_increment();
let task = op
  .and_then(move |buf| {
    let sender = tx; // tx is moved to new thread
    sender.send((req_id, buf)).expect("tx.send error");
    Ok(())
  }).map_err(|_| ());
tokio::spawn(task);

TypeScript側を見てく

Rust側の大枠の流れはわかったのでTypeScript側を見てみる エントリポイントはjs/main.ts。ここにRust側から呼ばれていたdenoMainがある。

export default function denoMain() {
  libdeno.recv(handleAsyncMsgFromRust);
  const startResMsg = sendStart();

  // ... ommited ...

  os.setPid(startResMsg.pid());

  const cwd = startResMsg.cwd();
  log("cwd", cwd);

  for (let i = 1; i < startResMsg.argvLength(); i++) {
    args.push(startResMsg.argv(i));
  }
  log("args", args);
  Object.freeze(args);
  const inputFn = args[0];

  compiler.recompile = startResMsg.recompileFlag();

  if (inputFn) {
    compiler.run(inputFn, `${cwd}/`);
  } else {
    replLoop();
  }
}

まずはlibdeno.recv(handleAsyncMsgFromRust);でRust側からの受信コールバックを設定する。

const promiseTable = new Map<number, util.Resolvable<msg.Base>>();

export function handleAsyncMsgFromRust(ui8: Uint8Array) {
  // If a the buffer is empty, recv() on the native side timed out and we
  // did not receive a message.
  if (ui8.length) {
    const bb = new flatbuffers.ByteBuffer(ui8);
    const base = msg.Base.getRootAsBase(bb);
    const cmdId = base.cmdId();
    const promise = promiseTable.get(cmdId);
    util.assert(promise != null, `Expecting promise in table. ${cmdId}`);
    promiseTable.delete(cmdId);
    const err = errors.maybeError(base);
    if (err != null) {
      promise!.reject(err);
    } else {
      promise!.resolve(base);
    }
  }
  // Fire timers that have become runnable.
  fireTimers();
}

基本的にここに到達するのはTypeScript側から非同期処理を呼んでその応答がRust側から返ってきたケース(だと思う)。非同期処理開始メッセージを送る祭にcommandIdをキーにPromisepromiseTableに登録しておいて、返ってきたメッセージのcommandIdをキーにそれを回収、resolve/rejectを実行してるっぽい。

ちょうど下にsendAsyncというのがいた。promiseTable.set(cmdId, promise);してる。

export function sendAsync(
  builder: flatbuffers.Builder,
  innerType: msg.Any,
  inner: flatbuffers.Offset,
  data?: ArrayBufferView
): Promise<msg.Base> {
  const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false);
  util.assert(resBuf == null);
  const promise = util.createResolvable<msg.Base>();
  promiseTable.set(cmdId, promise);
  return promise;
}

次にconst startResMsg = sendStart(); でスタートメッセージを同期モードで送信している。Rust側で各メッセージに対して何をやっているかはops.rsを見ればいいのがわかったいるので覗いてみる。

let inner = msg::StartRes::create(
  &mut builder,
  &msg::StartResArgs {
    cwd: Some(cwd_off),
    pid: std::process::id(),
    argv: Some(argv_off),
    debug_flag: state.flags.log_debug,
    recompile_flag: state.flags.recompile,
    types_flag: state.flags.types,
    version_flag: state.flags.version,
    v8_version: Some(v8_version_off),
    deno_version: Some(deno_version_off),
    ..Default::default()
  },
);

基本的にはRust側でもている基本的情報を返信しているだけっぽい。返してるメッセージは上記のようなものでフラグとか引数、プロセスIDやバージョンなどを詰めて返している模様。 あとは返ってきたメッセージの引数やフラグなどを処理して以下のようにファイルが指定されていればコンパイルして実行。なければREPLモードに入るっぽい。

if (inputFn) {
  compiler.run(inputFn, `${cwd}/`);
} else {
  replLoop();
}

compiler.runの先はどうなってるかまだちゃんとみてないけど、CodeFetchというメッセージが同期が飛んでるのでRust側で該当ファイルを読んで返却後トランスパイルしてどこかにキャッシュしてるのかな。今度みる。

FlatBuffers

メッセージのやり取りにはFlatBuffersが使用されているが、定義はsrc/msg.fbsにいる。

github.com

tools/build.pyを実行するとTypeScriptとRustのコードがtarget/debug/gen/配下にmsg_generated.rsmsg_generated.tsとして生成される。

たとえば先のstartメッセージのレスポンスであれば以下のように定義されている。

table StartRes {
  cwd: string;
  argv: [string];
  debug_flag: bool;
  deps_flag: bool;
  recompile_flag: bool;
  types_flag: bool;
  version_flag: bool;
  deno_version: string;
  v8_version: string;
}

FlatBuffersはロード時にパースせず値が必要なときまで後回しするなどオーバーヘッドが少なく速いらしい。 このへんもまた今度詳しく調べてみる。

qiita.com

setTimeoutを実行してみる

だいたいの流れはわかったのでひとまず何か非同期処理を実行してみる。まずはsetTimeoutを試してみる。あとこの辺試してて気づいたんですが、microtaskのqueueはV8側で面倒見てくれるっぽい。知らなかった。

setTimeout(() => console.log("hello"), 1000);

を実行してみてその流れをみてみる。

setTimeoutjs/timer.tsに定義されている。

export function setTimeout(
  cb: (...args: Args) => void,
  delay: number,
  ...args: Args
): number {
  return setTimer(cb, delay, args, false);
}

これをたどっていくとsetGlobalTimeoutでメッセージを作って送信しているのがわかる。ただし、sendSync で送られている。timeout周りは非同期ながら若干特別扱いされてるっぽい。

function setGlobalTimeout(due: number | null, now: number) {
  // ... ommitted...
  // Send message to the backend.
  const builder = flatbuffers.createBuilder();
  msg.SetTimeout.startSetTimeout(builder);
  msg.SetTimeout.addTimeout(builder, timeout);
  const inner = msg.SetTimeout.endSetTimeout(builder);
  const res = sendSync(builder, msg.Any.SetTimeout, inner);

  globalTimeoutDue = due;
}

これはsrc/ops.rsdispatchのメッセージから各処理への分岐部分に書いてあった。例外的に同期処理として扱われメインスレッドで更新されるとのこと。

let op: Box<Op> = if inner_type == msg::Any::SetTimeout {
  // SetTimeout is an exceptional op: the global timeout field is part of the
  // Isolate state (not the IsolateState state) and it must be updated on the
  // main thread.
  assert_eq!(is_sync, true);
  op_set_timeout(isolate, &base, data)
}

op_set_timeoutを見るとどうもisolate側にtimeout値を設定しているだけのよう。そして同期モードのメッセージなのでdummyの空bufferをひとまず返してTypeScript側がブロックしないようにしてるっぽい。

fn op_set_timeout(
  isolate: &Isolate,
  base: &msg::Base,
  data: libdeno::deno_buf,
) -> Box<Op> {
  let inner = base.inner_as_set_timeout().unwrap();
  let val = inner.timeout() as i64;
  let timeout_due = if val >= 0 {
    Some(Instant::now() + Duration::from_millis(val as u64))
  } else {
    None
  };
  isolate.set_timeout_due(timeout_due);
  ok_future(empty_buf())
}

timeout_dueがセットされると最初の方で記載したisolate.eventloopself.is_idleが偽になってrecv_deadlineで受信待ちになる。

pub fn event_loop(&self) -> Result<(), JSError> {
  while !self.is_idle() {
    match recv_deadline(&self.rx, self.get_timeout_due()) {
      Ok((req_id, buf)) => self.complete_op(req_id, buf),
      Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(),
      Err(e) => panic!("recv_deadline() failed: {:?}", e),
    }
    // ... ommited ...
 }

recv_deadlineは以下のようになっている。dueが設定されていれば、rx.recv_timeout(timeout)でタイムアウトを待つ。が、その後非同期メッセージを受信した場合は一旦rx.recv_timeoutから抜けてきてしまうので、後続の非同期タスクを登録したあと、次ループで再度rx.recv_timeoutで待つんだと思う。

fn recv_deadline<T>(
  rx: &mpsc::Receiver<T>,
  maybe_due: Option<Instant>,
) -> Result<T, mpsc::RecvTimeoutError> {
  match maybe_due {
    None => rx.recv().map_err(|e| e.into()),
    Some(due) => {
        let now = Instant::now();
      let timeout = if due > now {
        due - now
      } else {
        Duration::new(0, 0)
      };
      rx.recv_timeout(timeout)
    }
  }
}
let now = Instant::now();
let timeout = if due > now {
  due - now
} else {
  Duration::new(0, 0)
};

とやっているのは一度ループから抜けた際に経過してしまった時間を吸収してるっぽい。なので、常に設定されるタイマーは一個になる気がする。 なので以下を実行した場合も1,000msと2,000msのタイマーが設定されるわけではなく1,000msのタイマーを待ったあと再度差分の1,000ms(実際には997とか微妙に減った値だと思う)が設定されるぽい。

setTimeout(() => {...}, 1000);
setTimeout(() => {...}, 2000);

そうするとTypeScript側も工夫が必要で複数のタイマーはひとまずconst dueMap: { [due: number]: Timer[] } = Object.create(null); に管理されるっぽい。稼働中のタイマーのみglobalTimeoutDueにセットされて管理される。現在のタイマーが完了前に次のタイマー設定が来た場合はglobalTimeoutDueが未設定、もしくは globalTimeoutDueより期限が近いタイマーが新たに設定されるっぽい。そのへんをやってるのが以下。

function schedule(timer: Timer, now: number) {
  assert(!timer.scheduled);
  assert(now <= timer.due);
  let list = dueMap[timer.due];
  if (list === undefined) {
    list = dueMap[timer.due] = [];
  }
  list.push(timer);
  timer.scheduled = true;
  if (globalTimeoutDue === null || globalTimeoutDue > timer.due) {
    setGlobalTimeout(timer.due, now);
  }
}

なのでタイマーがセットされると同時にタイムアウト完了コールバックも同時に設定され、コールバック内で次に設定すべきタイムアウトがあれば経過時間を調整して設定、なければ完了メッセージ(timeout = -1)を送っている。Rust側では完了メッセージを受けたら、timeout_dueNoneを設定して(他にまちタスクがなければ)isolate.eventloopを抜けて終了という流れっぽい。

なのでhandleAsyncMsgFromRustfireTimersはまさにそれようなんですね。バッファが空の場合はタイムアウトという取り決めのもと次のタイマーをセットしにいってるんだと思う。

export function handleAsyncMsgFromRust(ui8: Uint8Array) {
  if (ui8.length) {
    // ... ommitted ...
  }
  // Fire timers that have become runnable.
  fireTimers();
}

かなり昔にtokio_timerを使ってタイマーを実装するというissueを見かけた気がしたけど、そのような作りではなく、どのような議論を経てこの実装になっているのかはちょっとわからん。

非同期処理を確認したかったんだけどsetTimeoutはちょっと特殊だったっぽい。次はreadFileとかcodeFetch周りを読めたら読みたい。

ひとまずここまで。