正月にDenoを読んでたメモです。いろいろ間違ってる可能性が高いのでご注意ください。
Denoとは
Node.jsの作者Ryan Dahl氏による新しいTypeSciprtのランタイム。Node.jsの反省点を生かして作られてる。 おおきく分けてTypeScript、V8、Rustの三層で構成されていてTypeScriptとRust間はFlatBuffersでやり取りされ、仲介としてC++で書かれたlibdenoが存在する。
参考資料
yosuke-furukawa.hatenablog.com
読んでいく
前提
実装は日に日に変化しているのでひとまず以下のバージョンについてのメモとする
Cargo.toml
まずはCargo.tomlを眺めてみる。package.jsonみたいなやつです。dependenciesは以下のような感じ。特段目を引くようなものは見当たらないようにみえる。
[dependencies] atty = "=0.2.11" dirs = "=1.0.4" flatbuffers = "=0.5.0" futures = "=0.1.25" getopts = "=0.2.18" http = "=0.1.14" hyper = "=0.12.19" hyper-rustls = "=0.15.0" kernel32-sys = "=0.2.2" lazy_static = "=1.2.0" libc = "=0.2.46" log = "=0.4.6" rand = "=0.6.3" remove_dir_all = "=0.5.1" ring = "=0.13.5" rustyline = "=2.1.0" serde_json = "1.0.34" source-map-mappings = "0.5.0" tempfile = "=3.0.5" tokio = "=0.1.13" tokio-executor = "=0.1.5" tokio-fs = "=0.1.4" tokio-io = "=0.1.10" tokio-process = "=0.2.3" tokio-threadpool = "=0.1.9" url = "=1.7.2" winapi = "=0.3.6"
Rust側を見てく
エントリポイントはsrc/main.rs
ぽいのでここから読んでいく。
- src/main.rs
fn main() { // ... ommited ... 基本的にはロガーの設定 let state = Arc::new(isolate::IsolateState::new(flags, rest_argv, None)); let snapshot = snapshot::deno_snapshot(); let isolate = isolate::Isolate::new(snapshot, state, ops::dispatch); tokio_util::init(|| { isolate .execute("denoMain();") .unwrap_or_else(print_err_and_exit); isolate.event_loop().unwrap_or_else(print_err_and_exit); }); }
前半はロガーの設定などをぼちぼちやる感じ。
isolate::IsolateState
はisolate
用のフラグやworker用channels
の保持用ぽい。まずこいつを作る。そもそもisolate
は何かというとコンテキストが隔離されたJS実行環境と思えばいいのだろうか。chromeでのタブやworkerをイメージすれば良さそう(多分)。実際、最近入ったworker対応でもやはりworker作成時にisolate
を作成している。
let snapshot = snapshot::deno_snapshot()
ではv8のsnapshotを作成している。deno_snapshot()
は以下。
- src/snapshot.rs
pub fn deno_snapshot() -> deno_buf { #[cfg(not(feature = "check-only"))] let data = include_bytes!(concat!(env!("GN_OUT_DIR"), "/gen/snapshot_deno.bin")); // ... ommited .../ unsafe { deno_buf::from_raw_parts(data.as_ptr(), data.len()) } }
deno_snapshot
はこれだけでinclude_bytes!
でファイルをごそっと読んでそのポインタと長さを返しているだけの様子。snapshotはなんぞやという話は以下を読むと良さそう。
コンテキスト作成時にV8のヒープにロードするのには時間がかかるので、ロード後のsnapshotを撮っておいてそれを使用することで起動を速くする仕組みっぽい。上の記事でもまさにTypeScriptのコンパイラの話をしている。Denoではtools/build.py
実行時にdeno/js配下のファイルがトランスパイルかつV8のヒープロードされた状態でスナップショットにされるぽい。なのでjs/*.tsを変更した場合は再ビルドしないと反映されない。ちなみにnew Date()
やMath.random()
は値が焼き付くようなことが書いてある。
あとはtokioの中でdenoMain
を実行して、isolate.event_loop()でタスクがなくなるまで待つことになっているぽい。タスクがなくなったらループを抜けて終了する。
tokio_util::init(|| { isolate .execute("denoMain();") .unwrap_or_else(print_err_and_exit); isolate.event_loop().unwrap_or_else(print_err_and_exit); });
tokioの初期化は以下のようになっている。tokioのチュートリアルもやったがこの辺何をやってるのかまだちゃんとわかってない。宿題。
pub fn init<F>(f: F) where F: FnOnce(), { let rt = tokio::runtime::Runtime::new().unwrap(); let mut executor = rt.executor(); let mut enter = tokio_executor::enter().expect("Multiple executors at once"); tokio_executor::with_default(&mut executor, &mut enter, move |_enter| f()); }
そもそもtokioってなにかというとRustの非同期I/Oライブラリで、イベントループを作ってLinuxであればepoll
、BSDであればkqueue
を使ってディスクリプタを監視し適宜処理を行うやつでNode.jsでいうところのlibuv
の役割を果たしているようにみえる。違ったら指摘いただけると。。。
denoを読み始めたんだけど、結局tokio
を学ばなければならないとなって正月はほぼ以下を読んでいた。以下はtokioの学習用の簡易実装でいろいろ勉強になる。ひとまずこれを読めばどんなことをやっているかはわかる。(tokioのおもちゃ実装ということで昔はtoykioという名前だった)
fahrenheitでは簡素化とポータビリティのためepollではなくselectを使用している。と書いてある。
ブログ記事もある。
ただ、まだ理解できていないのでもう少し勉強して理解できたら別途まとめたい。
isolate.event_loop()
がどうなってるかというと以下のようになっていて、self.is_idle()
が真になるまでループを抜けてこない。self.is_idle()
は非同期タスクが0かつ設定されたtimeoutがなくなると真となる。なので非同期タスクがない(たとえば、console.log("hello");
などを実行した)場合は待ちタスクがないのですぐアイドルと判定されループを抜けて終了する。
pub fn event_loop(&self) -> Result<(), JSError> { while !self.is_idle() { match recv_deadline(&self.rx, self.get_timeout_due()) { Ok((req_id, buf)) => self.complete_op(req_id, buf), Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), Err(e) => panic!("recv_deadline() failed: {:?}", e), } // ommited... promise error check } // ommited... promise error check Ok(()) }
ループ内では、recv_deadline(&self.rx, self.get_timeout_due())
で非同期タスク完了のメッセージを待ち続けることになる。
では送信元はどこかというとdeno/src/isolate.rs
のextern "C" fn pre_dispatch
の以下の箇所っぽい。タスクを登録して、その完了時にsender.send
でメッセージを送信している。
let task = op .and_then(move |buf| { let sender = tx; // tx is moved to new thread sender.send((req_id, buf)).expect("tx.send error"); Ok(()) }).map_err(|_| ()); tokio::spawn(task);
extern "C"
がついていることからもC++で書かれたlibsenoから叩かれる箇所だと推測できる。追ってみるとIsorate::new
でlibdeno::config
に受信コールバックとして渡されている。
let config = libdeno::deno_config { will_snapshot: 0, load_snapshot: snapshot, shared: libdeno::deno_buf::empty(), // TODO Use for message passing. recv_cb: pre_dispatch, resolve_cb, };
let task = op.and_then(...)
のop
は何かというと、以下のようなシグネチャになってる。
pub type Op = Future<Item = Buf, Error = DenoError> + Send;
deno/src/ops.rs
のdispatch
の返り値となっており、dispatch
でメッセージのデシリアライズ後matchでファイルの読み書きやフェッチなどの処理に振り分けられる。例えばメッセージの種別がReadFile
であれば以下のようにop_read_file
に振り分けられる。
pub fn dispatch( isolate: &Isolate, control: libdeno::deno_buf, data: libdeno::deno_buf, ) -> (bool, Box<Op>) { let base = msg::get_root_as_base(&control); let is_sync = base.sync(); let inner_type = base.inner_type(); let cmd_id = base.cmd_id(); let op: Box<Op> = if inner_type == msg::Any::SetTimeout { // ... ommited ... } else { // Handle regular ops. let op_creator: OpCreator = match inner_type { msg::Any::ReadFile => op_read_file, // ... 他の実処理に分岐される ...
たとえば一番シンプルな処理っぽいchdir
であれば以下のような感じ。該当する処理を行ってBox<Op>
を返すという感じ。
fn op_chdir( _state: &Arc<IsolateState>, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { assert_eq!(data.len(), 0); let inner = base.inner_as_chdir().unwrap(); let directory = inner.directory().unwrap(); Box::new(futures::future::result(|| -> OpResult { std::env::set_current_dir(&directory)?; Ok(empty_buf()) }()))
ここでの結果がpre_dispatch
にis_sync
フラグと一緒に戻されて、非同期/同期で処理が分岐される。
例えば同期モードであれば(https://github.com/denoland/deno/blob/6f79ad721a9f8c9d66d79f21ea479286f3ca5374/src/isolate.rs#L416-L425) のようにbloking_on
で処理の完了を待ってからレスポンスメッセージが送られる。
let buf = tokio_util::block_on(op).unwrap(); let buf_size = buf.len(); if buf_size == 0 { // FIXME isolate.state.metrics_op_completed(buf.len()); } else { // Set the synchronous response, the value returned from isolate.send(). isolate.respond(req_id, buf);
非同期の場合は先に記載したように処理の完了を待って完了後、完了が通知される。この通知は先のisolate.event_loop()
内で受信されて非同期タスクの完了処理が実行される。完了処理は現在待機中のタスク数のデクリメント
(tokio側のAPIを使いたい旨のコメントがあったが、問題があるのか現在は手動で行っている。)とV8側へのレスポンス
。
let tx = isolate.tx.clone(); isolate.ntasks_increment(); let task = op .and_then(move |buf| { let sender = tx; // tx is moved to new thread sender.send((req_id, buf)).expect("tx.send error"); Ok(()) }).map_err(|_| ()); tokio::spawn(task);
TypeScript側を見てく
Rust側の大枠の流れはわかったのでTypeScript側を見てみる
エントリポイントはjs/main.ts
。ここにRust側から呼ばれていたdenoMain
がある。
export default function denoMain() { libdeno.recv(handleAsyncMsgFromRust); const startResMsg = sendStart(); // ... ommited ... os.setPid(startResMsg.pid()); const cwd = startResMsg.cwd(); log("cwd", cwd); for (let i = 1; i < startResMsg.argvLength(); i++) { args.push(startResMsg.argv(i)); } log("args", args); Object.freeze(args); const inputFn = args[0]; compiler.recompile = startResMsg.recompileFlag(); if (inputFn) { compiler.run(inputFn, `${cwd}/`); } else { replLoop(); } }
まずはlibdeno.recv(handleAsyncMsgFromRust);
でRust側からの受信コールバックを設定する。
const promiseTable = new Map<number, util.Resolvable<msg.Base>>(); export function handleAsyncMsgFromRust(ui8: Uint8Array) { // If a the buffer is empty, recv() on the native side timed out and we // did not receive a message. if (ui8.length) { const bb = new flatbuffers.ByteBuffer(ui8); const base = msg.Base.getRootAsBase(bb); const cmdId = base.cmdId(); const promise = promiseTable.get(cmdId); util.assert(promise != null, `Expecting promise in table. ${cmdId}`); promiseTable.delete(cmdId); const err = errors.maybeError(base); if (err != null) { promise!.reject(err); } else { promise!.resolve(base); } } // Fire timers that have become runnable. fireTimers(); }
基本的にここに到達するのはTypeScript側から非同期処理を呼んでその応答がRust側から返ってきたケース(だと思う)。非同期処理開始メッセージを送る祭にcommandId
をキーにPromise
をpromiseTable
に登録しておいて、返ってきたメッセージのcommandId
をキーにそれを回収、resolve/reject
を実行してるっぽい。
ちょうど下にsendAsync
というのがいた。promiseTable.set(cmdId, promise);
してる。
export function sendAsync( builder: flatbuffers.Builder, innerType: msg.Any, inner: flatbuffers.Offset, data?: ArrayBufferView ): Promise<msg.Base> { const [cmdId, resBuf] = sendInternal(builder, innerType, inner, data, false); util.assert(resBuf == null); const promise = util.createResolvable<msg.Base>(); promiseTable.set(cmdId, promise); return promise; }
次にconst startResMsg = sendStart();
でスタートメッセージを同期モードで送信している。Rust側で各メッセージに対して何をやっているかはops.rs
を見ればいいのがわかったいるので覗いてみる。
let inner = msg::StartRes::create( &mut builder, &msg::StartResArgs { cwd: Some(cwd_off), pid: std::process::id(), argv: Some(argv_off), debug_flag: state.flags.log_debug, recompile_flag: state.flags.recompile, types_flag: state.flags.types, version_flag: state.flags.version, v8_version: Some(v8_version_off), deno_version: Some(deno_version_off), ..Default::default() }, );
基本的にはRust側でもている基本的情報を返信しているだけっぽい。返してるメッセージは上記のようなものでフラグとか引数、プロセスIDやバージョンなどを詰めて返している模様。 あとは返ってきたメッセージの引数やフラグなどを処理して以下のようにファイルが指定されていればコンパイルして実行。なければREPLモードに入るっぽい。
if (inputFn) { compiler.run(inputFn, `${cwd}/`); } else { replLoop(); }
compiler.run
の先はどうなってるかまだちゃんとみてないけど、CodeFetch
というメッセージが同期が飛んでるのでRust側で該当ファイルを読んで返却後トランスパイルしてどこかにキャッシュしてるのかな。今度みる。
FlatBuffers
メッセージのやり取りにはFlatBuffers
が使用されているが、定義はsrc/msg.fbs
にいる。
tools/build.py
を実行するとTypeScriptとRustのコードがtarget/debug/gen/
配下にmsg_generated.rs
、msg_generated.ts
として生成される。
たとえば先のstartメッセージのレスポンスであれば以下のように定義されている。
table StartRes { cwd: string; argv: [string]; debug_flag: bool; deps_flag: bool; recompile_flag: bool; types_flag: bool; version_flag: bool; deno_version: string; v8_version: string; }
FlatBuffers
はロード時にパースせず値が必要なときまで後回しするなどオーバーヘッドが少なく速いらしい。
このへんもまた今度詳しく調べてみる。
setTimeoutを実行してみる
だいたいの流れはわかったのでひとまず何か非同期処理を実行してみる。まずはsetTimeout
を試してみる。あとこの辺試してて気づいたんですが、microtaskのqueueはV8側で面倒見てくれるっぽい。知らなかった。
setTimeout(() => console.log("hello"), 1000);
を実行してみてその流れをみてみる。
setTimeout
はjs/timer.ts
に定義されている。
export function setTimeout( cb: (...args: Args) => void, delay: number, ...args: Args ): number { return setTimer(cb, delay, args, false); }
これをたどっていくとsetGlobalTimeout
でメッセージを作って送信しているのがわかる。ただし、sendSync
で送られている。timeout
周りは非同期ながら若干特別扱いされてるっぽい。
function setGlobalTimeout(due: number | null, now: number) { // ... ommitted... // Send message to the backend. const builder = flatbuffers.createBuilder(); msg.SetTimeout.startSetTimeout(builder); msg.SetTimeout.addTimeout(builder, timeout); const inner = msg.SetTimeout.endSetTimeout(builder); const res = sendSync(builder, msg.Any.SetTimeout, inner); globalTimeoutDue = due; }
これはsrc/ops.rs
のdispatch
のメッセージから各処理への分岐部分に書いてあった。例外的に同期処理として扱われメインスレッドで更新されるとのこと。
let op: Box<Op> = if inner_type == msg::Any::SetTimeout { // SetTimeout is an exceptional op: the global timeout field is part of the // Isolate state (not the IsolateState state) and it must be updated on the // main thread. assert_eq!(is_sync, true); op_set_timeout(isolate, &base, data) }
op_set_timeout
を見るとどうもisolate側にtimeout値を設定しているだけのよう。そして同期モードのメッセージなのでdummyの空bufferをひとまず返してTypeScript側がブロックしないようにしてるっぽい。
fn op_set_timeout( isolate: &Isolate, base: &msg::Base, data: libdeno::deno_buf, ) -> Box<Op> { let inner = base.inner_as_set_timeout().unwrap(); let val = inner.timeout() as i64; let timeout_due = if val >= 0 { Some(Instant::now() + Duration::from_millis(val as u64)) } else { None }; isolate.set_timeout_due(timeout_due); ok_future(empty_buf()) }
timeout_due
がセットされると最初の方で記載したisolate.eventloop
のself.is_idle
が偽になってrecv_deadline
で受信待ちになる。
pub fn event_loop(&self) -> Result<(), JSError> { while !self.is_idle() { match recv_deadline(&self.rx, self.get_timeout_due()) { Ok((req_id, buf)) => self.complete_op(req_id, buf), Err(mpsc::RecvTimeoutError::Timeout) => self.timeout(), Err(e) => panic!("recv_deadline() failed: {:?}", e), } // ... ommited ... }
recv_deadline
は以下のようになっている。due
が設定されていれば、rx.recv_timeout(timeout)
でタイムアウトを待つ。が、その後非同期メッセージを受信した場合は一旦rx.recv_timeout
から抜けてきてしまうので、後続の非同期タスクを登録したあと、次ループで再度rx.recv_timeout
で待つんだと思う。
fn recv_deadline<T>( rx: &mpsc::Receiver<T>, maybe_due: Option<Instant>, ) -> Result<T, mpsc::RecvTimeoutError> { match maybe_due { None => rx.recv().map_err(|e| e.into()), Some(due) => { let now = Instant::now(); let timeout = if due > now { due - now } else { Duration::new(0, 0) }; rx.recv_timeout(timeout) } } }
let now = Instant::now(); let timeout = if due > now { due - now } else { Duration::new(0, 0) };
とやっているのは一度ループから抜けた際に経過してしまった時間を吸収してるっぽい。なので、常に設定されるタイマーは一個になる気がする。 なので以下を実行した場合も1,000msと2,000msのタイマーが設定されるわけではなく1,000msのタイマーを待ったあと再度差分の1,000ms(実際には997とか微妙に減った値だと思う)が設定されるぽい。
setTimeout(() => {...}, 1000); setTimeout(() => {...}, 2000);
そうするとTypeScript側も工夫が必要で複数のタイマーはひとまずconst dueMap: { [due: number]: Timer[] } = Object.create(null);
に管理されるっぽい。稼働中のタイマーのみglobalTimeoutDue
にセットされて管理される。現在のタイマーが完了前に次のタイマー設定が来た場合はglobalTimeoutDue
が未設定、もしくは
globalTimeoutDue
より期限が近いタイマーが新たに設定されるっぽい。そのへんをやってるのが以下。
function schedule(timer: Timer, now: number) { assert(!timer.scheduled); assert(now <= timer.due); let list = dueMap[timer.due]; if (list === undefined) { list = dueMap[timer.due] = []; } list.push(timer); timer.scheduled = true; if (globalTimeoutDue === null || globalTimeoutDue > timer.due) { setGlobalTimeout(timer.due, now); } }
なのでタイマーがセットされると同時にタイムアウト完了コールバックも同時に設定され、コールバック内で次に設定すべきタイムアウトがあれば経過時間を調整して設定、なければ完了メッセージ(timeout = -1)を送っている。Rust側では完了メッセージを受けたら、timeout_due
にNone
を設定して(他にまちタスクがなければ)isolate.eventloop
を抜けて終了という流れっぽい。
なのでhandleAsyncMsgFromRust
のfireTimers
はまさにそれようなんですね。バッファが空の場合はタイムアウトという取り決めのもと次のタイマーをセットしにいってるんだと思う。
export function handleAsyncMsgFromRust(ui8: Uint8Array) { if (ui8.length) { // ... ommitted ... } // Fire timers that have become runnable. fireTimers(); }
かなり昔にtokio_timer
を使ってタイマーを実装するというissueを見かけた気がしたけど、そのような作りではなく、どのような議論を経てこの実装になっているのかはちょっとわからん。
非同期処理を確認したかったんだけどsetTimeout
はちょっと特殊だったっぽい。次はreadFile
とかcodeFetch
周りを読めたら読みたい。
ひとまずここまで。