長時間の I/O を走らせつつ定期的に短時間の I/O を走らせるやつ
長時間のタスクを走らせつつ定期的に短時間の I/O をしたいパターンがよくある。いわゆる heartbeat 的なことをやりたいときとか。 例えば長時間の外部コマンドを実行するとき、Go だと
package main import ( "fmt" "log" "os/exec" "time" ) func main() { ch := make(chan error) go func() { ch <- exec.Command("sleep", "10").Run() }() ticker := time.NewTicker(2 * time.Second) defer ticker.Stop() var err error L: for { select { case <-ticker.C: fmt.Println("Waiting...") case err = <-ch: break L } } if err != nil { log.Fatal(err) } fmt.Println("OK") }
みたいに time.Ticker と select/channel で書きやすいんだけど、Rust だと stream だと考えて futures::stream::select() を使うと良さそうだった。 https://docs.rs/futures/0.3.7/futures/stream/fn.select.html
use futures::StreamExt as _; use tokio::io::AsyncWriteExt as _; #[tokio::main] async fn main() -> Result<(), anyhow::Error> { let interval = tokio::time::interval(tokio::time::Duration::from_secs(2)) .map(|_| futures::future::Either::Left(())); let long_running_command = futures::stream::once(tokio::process::Command::new("sleep").arg("10").status()) .map(|result| futures::future::Either::Right(result)); tokio::pin!(long_running_command); let mut stream = futures::stream::select(interval, long_running_command); let status = loop { let item = stream.next().await.unwrap(); match item { futures::future::Either::Left(_) => { // Perform some lightweight I/O tokio::io::stdout().write_all(b"Waiting...\n").await?; } futures::future::Either::Right(result) => { break result; } } }?; if !status.success() { return Err(anyhow::anyhow!("command failed")); } println!("OK"); Ok(()) }