Rust 非同步機制
此筆記是對 Rust async book Under the hood: Executing Futures and Tasks 章節作解析。
閱讀本文必須先備知識
Arc
的作用- Rust channel 基本概念
- Rust thread 的基本使用
- Future trait 的基本概念
前言
Future
,是 Rust 運作非同步程式的核心零件,但是單單只有 Future
整個非同步程式是無法運作的,必須要有個機制去運行跟調度,而這個機制就是 Rust 給予各個到套件跟函式庫去作自由發揮跟設計,每種用法跟效能上都有差異,但 async book 帶著我們做了一個最單純的版本,讓我們透過這個範例去了解實際上 Future
是怎麼被運用,以及 waker
扮演什麼樣的角色。
建議先照書上全部寫一遍,再來這裡看解說。
Executor
Executor 是負責對已經實作 Future poll
method 的區塊或函式做 poll,並傳遞一個 waker
給該區塊或函式,做下次喚醒。
你會看到範例中會使用 channel 去接收傳來的 task,因為要持續的接收跟執行所以用 while。
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
impl Executor {
fn run(&self) {
while let Ok(task) = self.ready_queue.recv() {
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
let waker = waker_ref(&task);
let context = &mut Context::from_waker(&*waker);
if future.as_mut().poll(context).is_pending() {
*future_slot = Some(future);
}
}
}
}
}
Spawner
主要作為執行 async 區塊作第一次的 poll(讓 Executor 傳遞 waker
),不然永遠無法有機會被喚醒。
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
impl Spawner {
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
self.task_sender.send(task).expect("too many tasks queued")
}
}
Task
每次執行一個 async 區塊或函式就會產生一個對應的 task 做儲存,當 async
區塊或函式完成時會呼叫 wake
,而對應的 task (實作 ArcWake
trait)將重新把自己丟到 executor 作 poll
,完成一個非同步操作。
struct Task {
future: Mutex<Option<BoxFuture<'static, ()>>>,
task_sender: SyncSender<Arc<Task>>,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many task queued");
}
}
因為這種機制,所以我們都說 Rust 的非同步是被動式的,由 async
區塊或函式本身主動通知 executor 重新 poll
來取得當前進展。
流程
所以一整個流程從開始到結束是這樣子的
- spawner 將
async
轉成一個 Task 丟給 executor 作第一次的poll
。 - 由於非同步還沒完成所以回傳
Poll::Pending
,並同時傳遞一個waker
用於下次完成時喚醒。 - 呼叫
wake
並且對應的 Task 執行wake_by_ref
,重新將自己帶到 executor 作poll
。 - 收到
Poll::Ready(())
這個非同步操作完整結束
Waker
既然我們知道整個流程後,那其中最關鍵的當然是 waker
,所以我們可以單純把 waker
抽出來看他怎麼運作。
struct Task {
name: String,
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
println!("{} wake up", arc_self.name);
}
}
fn main() {
let task = Arc::new(Task {
name: "task".to_string(),
});
let waker = waker_ref::<Task>(&task);
let cloned_waker = waker.clone();
thread::spawn(move || {
thread::sleep(Duration::from_secs(1));
cloned_waker.wake();
println!("done")
})
.join()
.unwrap();
}
以上就是針對 Rust 非同步的簡易解析,如果想知道更詳細,我推薦去看 Rust RFC 2592 futures,裡面完整的說明 Future 的設計思路,並且該 RFC 也在翻譯中,敬請期待。