長時間の I/O を走らせつつ定期的に短時間の I/O を走らせるやつ

長時間のタスクを走らせつつ定期的に短時間の I/O をしたいパターンがよくある。いわゆる heartbeat 的なことをやりたいときとか。 例えば長時間の外部コマンドを実行するとき、Go だと

package main

import (
    "fmt"
    "log"
    "os/exec"
    "time"
)

func main() {
    ch := make(chan error)
    go func() {
        ch <- exec.Command("sleep", "10").Run()
    }()

    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()

    var err error
L:
    for {
        select {
        case <-ticker.C:
            fmt.Println("Waiting...")
        case err = <-ch:
            break L
        }
    }
    if err != nil {
        log.Fatal(err)
    }
    fmt.Println("OK")
}

みたいに time.Ticker と select/channel で書きやすいんだけど、Rust だと stream だと考えて futures::stream::select() を使うと良さそうだった。 https://docs.rs/futures/0.3.7/futures/stream/fn.select.html

use futures::StreamExt as _;
use tokio::io::AsyncWriteExt as _;

#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
    let interval = tokio::time::interval(tokio::time::Duration::from_secs(2))
        .map(|_| futures::future::Either::Left(()));
    let long_running_command =
        futures::stream::once(tokio::process::Command::new("sleep").arg("10").status())
            .map(|result| futures::future::Either::Right(result));
    tokio::pin!(long_running_command);

    let mut stream = futures::stream::select(interval, long_running_command);
    let status = loop {
        let item = stream.next().await.unwrap();
        match item {
            futures::future::Either::Left(_) => {
                // Perform some lightweight I/O
                tokio::io::stdout().write_all(b"Waiting...\n").await?;
            }
            futures::future::Either::Right(result) => {
                break result;
            }
        }
    }?;
    if !status.success() {
        return Err(anyhow::anyhow!("command failed"));
    }
    println!("OK");
    Ok(())
}