阿里技术 06月11日 13:07
浅析 rust 大明星 Tokio
index_new5.html
../../../zaker_core/zaker_tpl_static/wap/tpl_guoji1.html

 

本文深入探讨了Rust生态中最热门的异步运行时库Tokio。文章首先从Rust的异步机制入手,阐述了Future、async/await等关键概念,并以此为基础详细分析了Tokio的架构设计,包括任务池、调度器、Worker线程、Driver以及任务的生命周期。此外,文章还对比了Tokio与Nginx的调度机制,并展望了Tokio未来的发展方向,如对io_uring的支持和协议栈的扩展。通过对Tokio的全面解读,帮助读者深入理解其内部工作原理,为Rust异步编程提供参考。

⚙️ Tokio是Rust异步编程的核心运行时,它构建于Rust的异步基础之上,负责任务的调度与执行。

🏗️ Tokio的架构由Runtime、Scheduler、Driver和Worker组成,其中Scheduler是核心,负责任务的管理和调度。

🔄 Worker线程通过循环从任务队列中获取任务并执行,Driver则负责I/O事件的监听和触发,从而驱动异步任务的进展。

🚦 Tokio的任务调度机制包括全局队列、本地队列、LIFO slot和任务窃取,以实现负载均衡和提高效率。

🆚 与Nginx相比,Tokio更适合处理复杂场景,而Nginx则在高并发的简单请求方面表现更优。

蓝堇 2025-06-11 08:31 浙江



这是2025年的第66篇文章

( 本文阅读时间:15分钟 )


Tokio可以说是rust中最热门的库,对于异步与并发进行了很好的支持。大多数基于rust的开源框架都使用到了Tokio,因此在介绍这些实现开源框架时经常会被问到:底层的异步和并发是怎么实现的?我只能回答:底层的异步和并发都是由Tokio控制的。这显然不是一个令人满意的回答。因此本文章将对于Tokio的基本方法和底层逻辑进行分析。



01



概述

一句话概括

Tokio 可以理解成一个“任务池”和一个“调度器”,负责把所有在任务池中的任务调度运行起来。

更具体一点,Tokio 可以类比为一个“异步操作系统”:

Tokio 的优势主要体现在以下方面:


通过内部的优化机制(调度算法、无锁队列与内存池管理等)与 rust 的语言优势,Tokio 效率较高,在早期的实验中,官方给出了性能对比图:

在 rust 发展之初,社区出现了很多运行时库,但是,大浪淘沙,随着时间的流逝,Tokio 越来越亮眼,无论是性能、功能还是社区、文档,它在各个方面都异常优秀,时至今日,可以说已成为事实上的标准。新出现的rust运行时库(例如 Bytedace 的 monoio)宣传性能优于 Tokio,但还是雷声大雨点小,没有被广泛应用。


Tokio 的身影遍布在各种类型的 rust 库中,例如HTTP库(Hyper)、Web框架(Axum / Warp)、gRPC(Tonic)、TLS库(Rustls)、数据库支持(SeaORM)等。同时各大厂商也广泛使用,例如AWS、Azure、Google等。

Tokio 更加适合频繁切换的场景,例如网络服务、微服务、代理、数据库连接池、实时通信系统等。而不适合并行计算或密集计算等场景。


因此,如果需要使用 rust 中的高性能异步并发,且对于 Tokio 内部工作原理不敏感,看到这里放心使用就好了。后文将从rust语言的异步来着手,分析Tokio的架构以及具体调度的生命周期,最后分析与Nginx的对比以未来的一些方向。



02



rust 语言的异步

对 Future/async/await 非常熟悉可以跳过。


Golang/Nodejs等语言的异步内置于语言本身,做了很好的封装且开箱即用,虽然能够简化使用但不灵活无法更改。rust作为系统级语言,并不想把异步的具体实现单一化与局限化,因此在rust std中只实现了异步的基本功能与框架(例如 Future/async/await 等),而把异步调度进行了开放,由第三方库来具体实现。


Future

Future 是 Rust 异步编程的核心抽象,它是一个状态机,通过多次 poll 推进其状态,直到完成。它与 Waker、Runtime、I/O 驱动紧密配合,构成了整个非阻塞异步系统的基础


Future 其实就是一个trait,定义如下:

    pub trait Future {

        type Output;


        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;

    }

    Future 与 任务(task)的区别: Future 是 rust 原生支持的异步 trait,许多第三方异步库在此基础上将 Future 封装为 task 用来完成调度。


    async

    通常不会用上述的trait来创建 Future,而是结合使用 async,编译器会将被 async 修饰的函数或代码块转化为 Future。也就是说调用 async fn 的具体函数并不会立刻执行,而只是创建 Future 等待 poll 来推动状态机。

      // async fn

      async fn fetch_data() -> Result<StringError> {

          let resp = reqwest::get("https://example.com").await?;

          Ok(resp.text().await?)

      }


      // async 代码块

      let future = async {

          // 异步逻辑

          let data = expensive_computation().await;

          format!("Result: {}", data)

      };


      await

      上述 Future trait中,poll 是核心方法,用于推进状态机的进行。我们的代码不会直接调用 poll,而是通过 Rust 的关键字 .await 来执行这个 Future,await 会被 rust 在编译时生成代码来调用 poll,返回 Poll(见下),如果是 Pending 则被 runtime 挂起(比如重新放到任务队列中)。当有 event 产生时,挂起的 future 会被唤醒,Rust 会再次调用 future 的 poll,如果此时返回 Ready 就执行完成。

        pub enumPoll<T> {

            Ready(T),

            Pending,

        }

        多级 Future 嵌套时,只有遇到类似 .await 才会推动执行,是协同式调度而不是抢占式调度(Tokio 1.x版本引入抢占机制来缓解饥饿问题,但rust原生基础是协同式调度)。因此 rust 无需提前为 Future 分配独立的栈或堆上内存,是一种零成本抽象。


        总结

        如下图所示,rust std 中的异步只维护 Future 以及内部方法 poll,具体的任务队列和调度方法由第三方的 runtime 来实现。每次代码执行到 .await 时会进行一次poll,poll 若 ready 则直接退出表示执行完成,poll 若遇到阻塞,则挂起等待事件池来唤醒。当有事件(例如I/O等)唤醒之后,会把该挂起 Future 封装为 task,加入到任务队列中等待调度,runtime 会不断地从任务队列中拿出任务来执行。





        03



        Tokio 架构与构造过程

        架构

        承接上文,这一部分主要介绍 Tokio 实现的 runtime架构,如下图所示:




        构造(build)

        Tokio 中的 Runtime 结构体如下:

          pub structRuntime {

              /// Task scheduler

              schedulerScheduler,


              /// Handle to runtime, also contains driver handles

              handleHandle,


              /// Blocking pool handle, used to signal shutdown

              blocking_poolBlockingPool,

          }

          blocking线程 和 worker线程:worker线程是我们要重点关注的运行时轻量级线程,负责调度和任务执行;blocking线程是在这个过程中的所有的阻塞任务,其数量等于所有的worker线程数量+其他控制线程数量,原因是worker线程本身就是一个blocking任务,其他控制线程又包括信号与通道等。


          其中 BlockingPool 是专门用来运行阻塞任务的线程池,上述解释已简单概括;Handle 维护了过程中各种handler,本文不重点关注这两项。Scheduler 是“任务池”和“调度器”的封装,也是 Runtime 最核心的部分。

          想要使用 Runtime 必须要经过初始化:

            tokio::runtime::Builder::new_multi_thread()

                            .enable_all().worker_threads(threads).thread_name(name)

                            .build().unwrap(),


            build() 即构造了 Runtime 结构,其中最重要的是 Driver 和 Worker 。


            Driver 构造的过程

            Driver 封装了 I/O 和 Timer 的驱动,并加入了部分机制(例如内存slab),下面以I/O Driver为例,详细说明:

              pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {

                  let poll = mio::Poll::new()?;

                  #[cfg(not(target_os = "wasi"))]

                  let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;

                  let registry = poll.registry().try_clone()?;


                  let driver = Driver {

                      signal_ready: false,

                      events: mio::Events::with_capacity(nevents),

                      poll,

                  };


                  let (registrations, synced) = RegistrationSet::new();


                  let handle = Handle {

                      registry,

                      registrations,

                      synced: Mutex::new(synced),

                      #[cfg(not(target_os = "wasi"))]

                      waker,

                      metrics: IoDriverMetrics::default(),

                  };


                  Ok((driver, handle))

              }


              worker构造的过程

                pub(super) fn create(

                    size: usize,

                    park: Parker,

                    driver_handle: driver::Handle,

                    blocking_spawner: blocking::Spawner,

                    seed_generator: RngSeedGenerator,

                    config: Config,

                ) -> (Arc<Handle>, Launch) {

                    let mut cores = Vec::with_capacity(size);

                    let mut remotes = Vec::with_capacity(size);

                    let mut worker_metrics = Vec::with_capacity(size);


                    // Create the local queues

                    for _ in 0..size {

                        let (steal, run_queue) = queue::local();


                        let park = park.clone();

                        let unpark = park.unpark();

                        let metrics = WorkerMetrics::from_config(&config);

                        let stats = Stats::new(&metrics);


                        cores.push(Box::new(Core {

                            tick0,

                            lifo_slot: None,

                            lifo_enabled: !config.disable_lifo_slot,

                            run_queue,

                            is_searchingfalse,

                            is_shutdownfalse,

                            is_tracedfalse,

                            parkSome(park),

                            global_queue_interval: stats.tuned_global_queue_interval(&config),

                            stats,

                            randFastRand::from_seed(config.seed_generator.next_seed()),

                        }));


                        remotes.push(Remote { steal, unpark });

                        worker_metrics.push(metrics);

                    }


                    let (idle, idle_synced) = Idle::new(size);

                    let (inject, inject_synced) = inject::Shared::new();


                    let remotes_len = remotes.len();

                    let handle = Arc::new(Handle {

                        task_hooks: TaskHooks::from_config(&config),

                        shared: Shared {

                            remotes: remotes.into_boxed_slice(),

                            inject,

                            idle,

                            owned: OwnedTasks::new(size),

                            synced: Mutex::new(Synced {

                                idle: idle_synced,

                                inject: inject_synced,

                            }),

                            shutdown_cores: Mutex::new(vec![]),

                            trace_status: TraceStatus::new(remotes_len),

                            config,

                            scheduler_metrics: SchedulerMetrics::new(),

                            worker_metrics: worker_metrics.into_boxed_slice(),

                            _counters: Counters,

                        },

                        driver: driver_handle,

                        blocking_spawner,

                        seed_generator,

                    });


                    let mut launch = Launch(vec![]);


                    for (index, core) in cores.drain(..).enumerate() {

                        launch.0.push(Arc::new(Worker {

                            handle: handle.clone(),

                            index,

                            core: AtomicCell::new(Some(core)),

                        }));

                    }


                    (handle, launch)

                }


                为 Tokio 多线程运行时创建一组 Worker 线程,每个 Worker 都绑定了一个本地任务队列(Local Queue)、I/O 和定时器驱动(Driver),并准备好参与异步任务调度。其中:




                04



                Tokio task 生命周期

                构造出基本的 runtime 架构后,就等待有任务被加入到 runtime 中被调度与执行,这一部分详细说明任务从被构造到执行完成的流程。



                如上图所示,Worker 在创建后执行调度循环,不断地从任务队列中取任务,执行poll,若结果为阻塞则注册waker并挂起,之后取新任务poll。当 Driver 有新事件时会调用 waker 来唤醒任务,重新加入到任务队列中待新一轮调用。


                任务队列



                最终,任务队列的执行顺序是:本地LIFO slot --->> 本地任务队列 --->> 全局任务队列 --->> 窃取其他线程的任务。


                任务饥饿问题

                饥饿问题有以下两种场景:

                1.某任务是密集计算型任务,不断占据cpu而不释放;

                2.本地任务更新太快太频繁,全局任务无法被执行到;


                第一种场景,Tokio 从 1.x开始,引入了抢占式调度来缓解饿死问题,简单来说就是会定期强制任务挂起来让出资源。但是需要说明的是,这种场景本质上和rust的异步并发冲突,更加推荐使用tokio::task::spawn_blocking,来将任务转化为并行计算任务。


                第二种场景经常会遇到,本质上是由于三种不同的队列有优先级,可能会导致低优先级的队列被饿死。例如I/O频繁的TCP连接会不断地加入到本地队列,而无法处理全局队列任务。Tokio 的解决的方法是为每个任务加入循环次数,当其循环加入队列次数超过一定上限后会先搁置,优先处理低优先级(例如全局队列)的任务。


                总之,Tokio 设计了相关的机制来平衡公平性和效率,同时还有一些其他算法或异步runtime优化了部分过程,取得了更好的效果(例如horaedb(Apache、Ant)Monoio(bytedance))。虽然并非完美,但是各方都在努力完善公平性和性能,这或许就是rust设计开放runtime的初衷。



                05



                其他

                与Nginx的调度对比

                Nginx采用 多worker进程单线程 + 非阻塞I/O + 事件驱动 的模型。



                两者的对比如下:




                绑核问题

                Tokio本身并不支持绑CPU核,要实现绑核有以下三种方法:

                  taskset -c [CPU NUMBER] -p PID


                    docker run --cpuset-cpus [CPU NUMBER]


                      runtime::Builder::new_multi_thread()

                      .on_thread_start(move || {

                          core_affinity::set_for_current(core_id.clone());

                      })


                      Tokio 未来的一些方向

                      io_uring

                      是 Linux(>5.1) 上新一代的高性能异步 I/O 框架,其主要针对 epoll 进行了性能和功能上的提升。其设计了一种用户态和内核的环形缓冲区,解决线程竞争,实现无锁设计。减少了系统调用的次数,实现零拷贝。支持批量操作。

                      当前Tokio官方还没有全面支持 io_uring,但是在社区中已经出现了不少支持:例如tokio-uring,其基本思想是在IO Driver中的mio 进行封装,注册在 Tokio(mio) 上一个 uring fd,而基于这个 fd 和一套自己的 Pending Op 管理系统又对外作为 Reactor 暴露了事件源的能力。


                      首先需要明确,若想使用IO或sleep等操作,必须要使用Tokio::net::TcpStream 来进行 Tokio的封装,主要原因是需要将IO或Timer事件注册到Reactor上,因此协议的支持也是非常重要的部分。当前 Tokio支持常见的TCP、UDP等协议,同时也在积极探索新协议以及更好的适配性,例如:更好的 TLS 支持(如整合 

                      rustls

                      );支持 HTTP/3、QUIC 等协议栈(底层支持);对IPV6多路复用的增强支持。


                      Tokio 正在推动对测试工具的支持,例如:提供 mock I/O 接口;支持 determinism 测试;提供 tracing、instrumentation 集成,便于调试异步程序。相关的社区实践包括 tokio-tesing等。


                      前文也说过,内部的调度算法决定了不同场景下的效果,当前已经有许多 runtime库提出了新的算法来优化某些过程,Tokio 也在积极探索和演进。


                      Tokio 也在积极探索在不同平台的支持,例如嵌入式平台。同时也在开发WASM支持,可以在WASM中运行Tokio。


                      声明:Tokio更新频繁,本文主要针对1.44.1版本分析。

                      后续我们也会基于 Qwen3 实验哪种方法在复杂任务的工具调用效果最好,敬请期待。

                      参考链接

                      Tokio源码:https://github.com/tokio-rs/tokio

                      Tokio官方文档:https://tokio.rs/tokio/tutorial

                      字节monoio:https://rustmagazine.github.io/rust_magazine_2021/chapter_12/monoio.html

                      apache-haoraedb:https://github.com/apache/horaedb

                      https://tony612.github.io/tokio-internals/01_intro_async.html

                      https://tidb.net/blog/18804515




                      欢迎留言一起参与讨论~

                      阅读原文

                      跳转微信打开

                      Fish AI Reader

                      Fish AI Reader

                      AI辅助创作,多种专业模板,深度分析,高质量内容生成。从观点提取到深度思考,FishAI为您提供全方位的创作支持。新版本引入自定义参数,让您的创作更加个性化和精准。

                      FishAI

                      FishAI

                      鱼阅,AI 时代的下一个智能信息助手,助你摆脱信息焦虑

                      联系邮箱 441953276@qq.com

                      相关标签

                      Tokio Rust 异步编程 并发 运行时
                      相关文章