并发背后的故事

当服务器遭遇海量的并发请求时……

等等,什么是并发?

并发(Concurrency)与并行(Parallelism)

并发代表应用程序能够同时(并发)处理多个任务。 尽管严格意义上在单核CPU中应用程序不可能真正“同时”取得进展,但对外看来其内部一次正在处理多个任务,并且在之前的任务完成之前能够继续接受并开始一个新的任务。

并行代表应用程序能够真正同时在多个任务上取得进展。 从本质上讲,并行需要具有多个处理单元。在单核 CPU 中可能做到并发,但不可能做到并行。

并发更加强调多个任务在重叠的时间段内交替推进,通过任务切换提高资源利用率。 并行更加强调多个任务在同一时间段内同时推进,利用多个处理单元缩短总时间。 并行是一种特殊的并发,其中任务实际上是同时执行的。

---
displayMode: compact
---

gantt
    title 并发
    axisFormat  %S秒

    section 任务A
    CPU核心1 :a1, 00, 2s
    CPU核心1 :a2, after b1, 2s

    section 任务B
    CPU核心1 :b1, after a1, 2s
    CPU核心1 :b2, after a2, 2s
---
displayMode: compact
---
gantt
    title 并行
    axisFormat  %S秒

    section 任务A
    CPU核心1 :pa1, 00, 4s

    section 任务B
    CPU核心2 :pb1, 00, 4s

很显然,并发只能让程序看上去在同时处理多个任务,只有并行才能真正让程序的处理能力上一个台阶……对吗?

对于CPU密集型的任务而言,确实如此。 在CPU密集型的任务中,CPU需要进行大量的计算,CPU的计算能力决定了程序的处理能力。

但是对于IO密集型的任务而言,CPU的计算能力并不是瓶颈,反而是IO操作的延迟才是瓶颈。 在IO密集型的任务中,CPU需要等待IO操作完成才能继续执行。而在这个等待的过程中,CPU是空闲的,完全可以将CPU的计算能力用于处理其他任务,进而提高程序的处理能力。

通常而言,并行任务的并行度为 CPU 核心数,一般能达到 128 倍就是非常不错的性能了。 而并发任务的并发度则是 CPU 核心数乘以 IO 等待时间与 CPU 计算时间的比值。对于 IO 密集型的任务而言,同样的机器能够以上万的并发每秒处理数千万个并发请求

早期的并发方案:线程与同步 IO

在20世纪80年代,随着UNIX系统的普及,计算机开始从批处理模式转向交互式应用。 此时CPU仍处于单核时代,但操作系统设计者已经意识到了IO的重要性。

在IO操作中,CPU往往需要等待某个事件的发生,例如等待数据从磁盘读取完成,或者等待网卡接受到远端的数据。 在这个等待的过程中,CPU的闲置是对宝贵计算资源的巨大浪费。

为了充分利用CPU的资源,操作系统会将当前正在等待IO事件的线程挂起,并将CPU的控制权交给其他需要CPU进行计算的线程。当IO操作完成后,操作系统会唤醒挂起的线程,让其继续执行。

通常而言,CPU 进行线程上下文切换的时间在 1-5 微秒之间1,机械硬盘的寻道时间在 5-10 毫秒之间,网络延迟在 1-100 毫秒之间。 因此在 IO 密集型的任务中,使用上下文切换的方式运行其他线程是非常划算的。

1:1 线程模型

由于操作系统的设计者已经设计了线程和线程的挂起机制,因此在应用程序中使用多个线程来处理IO密集型的任务是非常自然的选择。 一种常见的模型是 1:1 的线程模型,其核心思想是为每个连接分配专属线程,通过阻塞式系统调用实现同步IO。在处理请求的过程中如果需要等待IO事件,则使用阻塞式的系统调用挂起当前线程,在IO事件发生后由操作系统唤醒线程继续执行。

阻塞与非阻塞、同步与异步

在概念上,阻塞(blocking)与非阻塞(non-blocking)通常用于描述模块或API在等待事件时的行为。 阻塞意味着模块将会一直等待事件发生,等待时间内不会做其他事情。模块将会在事件发生后恢复执行。 非阻塞意味着模块并不会等待事件发生。无论当前是否有可处理的事件,模块都会继续执行。

另一组常见的概念是同步(synchronous)与异步(asynchronous),通常用于描述两个模块之间的交互方式。 模块A与模块B是同步的意味着在B完成任务之前A会一直等待,在B完成任务后A才会继续执行。 而异步意味着A不会等待B完成,

阻塞与非阻塞、同步与异步在概念上有着十分微妙的不同。大多数语境下阻塞与同步、非阻塞与异步是可以互换的。 为了避免混淆,我们将在后面的内容中使用阻塞与非阻塞来描述 API 的行为,使用同步与异步来描述服务程序与用户的交互方式。 阻塞代表如果当前没有可处理的事件,API 会一直等待事件发生。非阻塞代表 API 不会等待事件发生,但是通常意义下若存在可处理的事件时 API 仍然会等待任务处理完成之后再返回(例如数据拷贝)。 同步代表当前线程在发起操作后,会通过挂起的方式等待对方响应。异步代表当前线程会继续执行其他任务,由其他模块在完成操作后通知当前线程。

class Server {
public:
    // 主线程
    void run() {
        while (!stop_) {
            Connection conn = accept_connection_blocking();     // 阻塞式调用,等待新的连接
            std::thread([worker = Worker{conn}] {
                worker.run();   // 每个任务(处理客户端的请求)独占一个线程
            }).detach();
        }
    }
};
 
class Worker {
    Connection conn_;
 
    bool authenticate() {
        std::string auth_token = conn->receive_blocking();      // 阻塞式调用,从网络中读取请求
        bool is_authenticated = !auth_token.empty();
        return is_authenticated;
    }
 
    std::string process_request(std::string request) {
        // 处理请求,此处使用 echo 服务作为示例
        return "Echo: " + request;
    }
 
public:
    // 每个 worker 都在独立的线程中执行
    void run() {
        if (!authenticate()) {
            conn->close();
            return;
        }
        while (conn_->is_connected()) {
            std::string request = conn->receive_blocking();     // 阻塞式调用,从网络中读取请求
            std::string response = process_request(request);
            conn->send_blocking(response);      // 阻塞式调用,将响应发送到网络
        }
    }
};

在这个模型中,主线程会执行 Server::run() 方法,并在 accept_connection_blocking() 方法中阻塞等待新的连接。 当有新的连接到来时,主线程会创建一个新的线程来处理这个连接。新的线程会执行 Worker::run() 方法,同样阻塞式地等待客户端的请求。

线程池

在操作系统中,线程是一个非常重量级的资源,创建和销毁线程都需要耗费大量的资源,通常需要 10-20 微秒2。 因此为了提高性能,一种朴素的思想是在一个任务完成后重用线程,避免频繁创建和销毁线程。

线程池的基本思想是预先创建一定数量的线程,并将其放入一个线程池中。当有新的任务需要处理时,从线程池中取出一个空闲的线程来执行任务。任务执行完成后,线程会被放回线程池中,等待下一个任务的到来。 一种参考实现如下:

class ThreadPool {
    std::vector<std::thread> threads_;
    std::queue<std::function<void()>> tasks_;
    std::mutex mutex_;
    std::condition_variable condition_;
    int running_tasks = 0;
 
    // 由线程池中的线程调用
    void run() {
        while (true) {
            std::function<void()> task;
            {
                std::unique_lock<std::mutex> lock(mutex_);
                condition_.wait(lock, [this] { return !tasks_.empty(); });
                // 当线程长时间没有需要执行的任务时,也可以考虑销毁线程释放资源
                task = tasks_.front();
                tasks_.pop();
            }
            task();
        }
    }
 
public:
    ThreadPool(int num_threads) {
        for (int i = 0; i < num_threads; ++i) {
            threads_.emplace_back([this] { run(); });
        }
    }
 
    void add_task(std::function<void()> task) {
        std::unique_lock<std::mutex> lock(mutex_);
        ensure_enough_threads();    // 确保线程池中有足够的线程,避免任务等待
        tasks_.push_back(task);
        condition_.notify_one();
    }
};

于是 Server 可以改成:

class Server {
public:
    // 主线程
    void run() {
        while (!stop_) {
            Connection conn = accept_connection_blocking();     // 阻塞式调用,等待新的连接
            std::function<void()> task = [worker = Worker{conn}] {
                worker.run();
            };
            thread_pool_.add_task(task);    // 将任务添加到线程池中
        }
    }
};

释放 CPU 的潜能:异步 IO

2003年,Apache HTTP 服务器凭借 prefork + 线程池模型成为互联网霸主,此时典型服务器配置为单核 CPU + 512MB 内存。但随着 Web 2.0 浪潮兴起,社交网站用户量突破5000万,C10K问题成为业界梦魇。

线程池虽然避免了频繁创建和销毁线程的开销,但是请求和线程池中的线程仍然是 1:1 的关系。每当任务需要等待IO事件时,线程就会被挂起,操作系统需要保存当前线程的上下文,将上下文切换到其他就绪线程的上下文。

假设线程池配置100线程,每个请求包含:50μs CPU计算 + 10ms 数据库IO(线程挂起,时间上和其他请求重叠) 则理论最大吞吐量:

但是线程上下文切换仍然是一个非常耗费资源的操作。在切换线程时,cpu必须完成:

  1. 切换页表全局目录:页表代表了当前线程可以访问的内存空间,而操作系统需要访问当前线程无法访问的内存空间,因此需要切换页表。同时 TLB 和 Cache 可能也需要刷新,避免内核态的敏感信息泄露到用户态。
  2. 切换内核态堆栈:操作系统不能信任用户态的代码和堆栈,因此在内核态中使用独立的堆栈来执行内核态的函数调用。
  3. 切换硬件上下文:将当前的所有硬件状态保存到内存中,以在下次切换到该线程时完美恢复所有的硬件状态。CPU 功能和指令集扩展越复杂,需要保存的状态就越多。在 x86_64 中硬件状态可以达到 1KB。
  4. 系统调度器的调度:操作系统需要选择一个新的线程来执行,并将其上下文切换到 CPU 上。现代 Linux 可以在 O(1) 的时间复杂度内完成调度,但仍然有不可忽视的开销。

除了这些直接的开销外,线程上下文切换还会导致 CPU 的 Cache 和 TLB 失效,导致 CPU 需要重新加载数据到 Cache 中。CPU 的 Cache 和 TLB 是 CPU 性能的关键因素之一,因此在高并发的场景下,线程上下文切换的开销可能会非常大。

为了解决这个问题,出现了一种影响十分深远的设计模式——事件驱动,它的一个更为出名的实现方式是事件循环(Event Loop)。 2003年NGINX首次采用事件驱动架构,仅用单线程处理数万并发连接。解决了C10K问题,成为互联网最流行的Web服务器之一。

事件循环

在 1:1 线程模型中,我们使用阻塞式系统调用向操作系统发起 IO 请求,让操作系统等待事件的发生。 而在事件驱动模型中,我们将任务中“等待事件发生”这一部分抽象出来,在一个统一的调度器中使用少量的线程来监听大量的事件,并在事件发生时调用相应的处理函数执行任务的剩余部分。 由于线程数量大大减少,在相同请求数的情况下,操作系统不再需要频繁地挂起线程并执行上下文切换,极大地提高了 CPU 的利用率。

事件循环的基本结构如下:

void event_loop() {
    while (true) {
        Event event = poll_event();
        dispatch_event(event);
    }
}

其中,poll_event 函数通过某些方式获取当前已经发生的事件,例如磁盘读取缓冲区已完成填充,网络接收缓冲区已经有数据等。dispatch_event 函数根据事件的类型调用相应的处理函数。

poll_event

针对 poll_event,操作系统提供了多种实现方式。最简单的实现方式之一是使用非阻塞的系统调用进行轮询。 例如在 Linux 中,可以使用 fcntl 函数设置文件描述符为非阻塞模式,然后调用 read 函数读取数据。如果没有数据可读,read 函数会立即返回,而不会阻塞线程。

例如基于轮询的 poll_event 的实现如下:

Event poll_event(std::vector<Connection> connections) {
    while (true) {
        for (auto& conn : connections) {
            if (conn->is_readable_non_blocking()) {
                return Event{conn, EventType::Readable};
            }
            if (conn->is_writable_non_blocking()) {
                return Event{conn, EventType::Writable};
            }
        }
    }
}

尽管非阻塞式的系统调用不再挂起线程,节省了线程上下文切换的开销,但是轮询每一个文件描述符的状态仍然需要发起大量系统调用,程序仍然会频繁地进入和退出内核态,导致 CPU 的资源浪费。

为了进一步节省时间,可以考虑使用操作系统提供的IO多路复用,在一次系统调用中监听多个文件描述符的状态。 select、poll、epoll 等系统调用都是操作系统提供的IO多路复用的方式,仅在 API 设计和执行效率上有些不同。epoll 是目前 Linux 中最高效的 IO 多路复用方式。

// 操作系统抽象出的 IO 事件源,例如网络连接、文件描述符等
class EventSource {
    // 实现细节,与操作系统提供的 IO 多路复用 API 相关
};
 
enum class EventType {
    Readable,
    Writable,
    Error,
};
 
// 针对某一个具体连接的事件,例如有数据可读、设备空闲可写等
class Event {
    EventSource *source;
    EventType type;
};
 
class EventLoop {
    std::map<int, EventSource*> event_sources_;
    int epoll_fd;
 
    // 在 EventLoop 内部调用,注册事件源
    void add_event_source(EventSource *source) {
        if (event_sources_.find(source->get_fd()) != event_sources_.end()) {
            return;  // 已经注册过了
        }
        event_sources_.emplace(source->get_fd(), source);
        // 向操作系统注册事件源
        epoll_event ev;
        ev.events = EPOLLIN | EPOLLOUT;
        ev.data.fd = source->get_fd();
        epoll_ctl(epoll_fd, EPOLL_CTL_ADD, source->get_fd(), &ev);
    }
 
    // 在 EventLoop 内部调用,注销事件源
    void remove_connection(EventSource *source) {
        event_sources_.erase(source->get_fd());
        // 从操作系统注销事件源
        epoll_ctl(epoll_fd, EPOLL_CTL_DEL, source->get_fd(), nullptr);
    }
 
    Event poll_event() {
        epoll_event events[1];
        int nfds = epoll_wait(epoll_fd, events, 1, -1);   // 阻塞式调用,等待任意一个事件发生
        for (int i = 0; i < nfds; ++i) {
            if (events[i].events & EPOLLIN) {
                return Event{event_sources_[events[i].data.fd], EventType::Readable};
            }
            if (events[i].events & EPOLLOUT) {
                return Event{event_sources_[events[i].data.fd], EventType::Writable};
            }
        }
    }
 
    void dispatch_event(Event event);
 
public:
    EventLoop() {
        epoll_fd = epoll_create1(0);
    }
 
    // 在主线程中调用,启动事件循环
    void run() {
        while (true) {
            Event event = poll_event();
            dispatch_event(event);
        }
    }
};
dispatch_event

在事件循环监听到事件发生后,需要调用 dispatch_event 派发事件,调用相应的处理函数。 一种常见的方式是向事件循环注册回调函数,每当任务需要等待事件发生时,便向事件循环注册针对该事件的处理函数,等待事件发生后由事件循环调用处理函数。

class EventLoop {
    // ...
    std::map<Event, std::vector<std::function<void()>>> event_handlers_;    // 一次性的事件处理函数,当事件发生时调用并清空,需要重复注册
    std::mutex mutex_;
 
    void dispatch_event(Event event) {
        std::vector<std::function<void()>> handlers;
        {
            std::unique_lock<std::mutex> lock(mutex_);
            if (event_handlers_.find(event) == event_handlers_.end()) {
                // 没有应用注册的事件处理函数,可以考虑向操作系统注销事件源
                remove_event_source(event.source);
                return;
            }
            swap(handlers, event_handlers_[event]); // 获取已注册的事件处理函数,并清空
        }
        for (auto& handler : handlers) {
            handler();  // 也可以使用线程池在独立的线程中执行,避免阻塞事件循环
        }
    }
 
public:
    // 由 worker 调用,注册事件处理函数
    void register_event_handler(Event event, std::function<void()> handler) {
        std::unique_lock<std::mutex> lock(mutex_);
        add_event_source(event.source);    // 注册事件源
        event_handlers_[event].emplace_back(handler);
    }
}
 
class Server {
    EventLoop& ev;
    ServerSocket server_socket_;
 
public:
    void start() {
        ev.register_event_handler(Event{server_socket_, EventType::Readable}, std::bind(&Server::on_connection, this));
    }
 
protected:
    // 当有新的连接时,事件循环会调用这个函数
    void on_connection() {
        Connection conn = server_socket_.accept_connection_non_blocking();     // 非阻塞式调用,等待新的连接
        Worker worker{conn};
        worker.run();
    }
}
 
class Worker {
    EventLoop& ev;
    Connection conn_;
public:
    void run() {
        // 以非常别扭的方式实现 if (!authenticate()) { conn_->close(); return; }
        // 因为 authenticate 是异步函数,其调用后会立即返回,其结果会以参数的形式传递给回调函数。
        authenticate([this] (bool is_authenticated) {
            if (!is_authenticated) {
                conn_->close();
                return;
            }
            
            // 以非常别扭的方式实现 while (true) { handle_request(); }
            // deducing this 需要 C++23,低版本需要实现一个 y-combinator 让 lambda 函数能够递归调用自身。
            // 见 https://stackoverflow.com/a/40873657
            auto loop = [this](this auto const& loop) {
                handle_request(loop);
            };
            loop();
        });
    }
 
protected:
    // 仅注册事件处理函数,不会执行实际业务逻辑,调用后立即返回
    void authenticate(std::function<void(bool)> callback) {
        ev.register_event_handler(Event{conn, EventType::Readable}, [this] {    // 调用后立即返回
            // 因为事件循环仅在事件发生时才调用 authenticate,因此此处的 receive_non_blocking 调用一定能够返回期望的数据。
            std::string auth_token = conn->receive_non_blocking();      // 非阻塞式调用,从网络中读取请求
            bool is_authenticated = auth_token.empty();
            callback(is_authenticated);
        });
    }
 
    void handle_request(std::function<void()> callback) {
        ev.register_event_handler(Event{conn, EventType::Readable}, [this] {
            std::string request = conn->receive_non_blocking();                 // 非阻塞式调用,从网络中读取请求
            std::string response = process_request(request);
            ev.register_event_handler(Event{conn, EventType::Writable}, [this, response, callback] {
                conn->send_non_blocking(response);      // 非阻塞式调用,将响应发送到网络
                callback();                             // 递归调用,继续处理下一个请求
            });
        });
    }
};

回调函数的线程安全

需要注意回调函数是由事件循环执行的,其执行线程可能会与注册回调函数的线程不同。因此当任务的不同部分之间存在共享的状态时需要注意线程安全问题。 此外,由于IO是异步非阻塞的,函数返回后回调函数可能还未执行,因此需要注意回调函数及其变量的生命周期,避免出现垂悬指针。

CPS 变换

在事件驱动编程中,需要将原本同步写法的业务逻辑改写为使用回调函数异步写法的业务逻辑。这种将后续的业务逻辑作为回调函数传递给前面的函数的方式被称为 Continuation-Passing Style(CPS)。 例如

a = f(x);
output(a);

可以改写为

f(x, [](auto a) {
    output(a);
});

如果逻辑比较简单,这种转换还能够人工完成。但是当业务逻辑变得复杂时,例如存在 if/switchfor/whiletry/catch 等控制流语句时,人工转换的工作量会非常大,业务逻辑也会变得非常难以阅读和维护。

Continuation-Passing Style(CPS)变换是一种将函数调用转换为传递函数的风格的方法。 它可以作为编译中的一个阶段,自动将函数调用转换为传递函数的风格。如果编译器支持 CPS 变换,那么即使不需要协程也可以以同步编程的风格编写异步执行的代码。

CPS 变换更常见于函数式编程语言中,例如 Haskell、Scheme 等。对于 C++、Rust 等命令式为主的编程语言中,他们的编译器更倾向于实现更高效的无栈协程(Stackless Coroutine)来实现异步编程。

回调地狱与协程

基于异步的事件循环模型可以实现高效的并发处理,但需要将任务拆分为多个独立的回调函数。 在同步IO的版本中原本非常简单的 bool authenticate() 函数,在异步版本中需要拆分为 void authenticate(std::function<void(bool)> callback) 和一个 lambda 函数,并且 authenticate 函数的返回值也需要作为回调函数的参数传递。 与之类似,所有需要进行异步IO的函数都需要拆分为两个部分:发起IO操作的函数和处理IO结果的回调函数。 这在代码逻辑变得复杂的同时增大了编写和维护的难度。这种因为异步IO而导致的回调地狱(callback hell)是事件驱动编程中最常见的问题之一。

为了解决这个问题,可以使用协程来简化异步编程。

用户视角的协程

协程(coroutine)是一种用户态实现的、轻量级的线程。基于这些特点协程有时也会被称为用户态线程(user-mode thread)、轻量级线程(lightweight thread)、纤程(fiber)、虚拟线程(virtual thread)、绿色线程(green thread)等。

  • 用户态:与操作系统提供的线程不同,协程的调度完全在用户态完成,不需要执行操作系统的上下文切换。
  • 轻量级:协程不具有线程的任意位置中断、信号处理等特性,因此协程需要保存的上下文远小于线程的上下文,切换速度也远快于线程的切换,协程的创建和销毁也比线程更轻量级。
  • 线程:协程保留了线程对于执行控制流的抽象,允许用户以同步的形式调用 API 并挂起当前协程,等待事件发生后被唤醒并恢复执行。

协程与线程最大的不同是协程之间的切换是协作式而非抢占式的,是可预知和可控的。这也是协程名字的由来。

对于线程而言,操作系统都需要不定时中断当前线程的执行,将CPU转让给其他线程以保证不会有线程长时间未得到执行。 操作系统在收到硬件中断时也会临时中断当前线程的执行,切换到操作系统内核的中断处理程序中执行,执行完毕后再切换回当前线程(甚至是其他线程)。 这些切换可能发生在任何一个时间点,程序完全无法预知线程切换并进行准备。同时操作系统无法感知线程上的程序究竟依赖于上下文中的哪些状态,因此为了保障正确性,操作系统只能完整地保存线程当前的上下文并在唤醒时恢复。

对于协程而言,协程之间的切换能且只能由程序显式地发起,这些可能会挂起协程的位置被称为挂起点(suspension point)。编译器和协程库知道程序中的挂起点,在挂起点前后保存和恢复程序依赖的上下文,并在挂起点直接生成相应的跳转代码。协程的上下文远小于线程的上下文,在一些场景下协程库还允许由编译器感知或程序员指定程序真正依赖的上下文,进一步减少协程切换时所需要保存的状态。 当协程被挂起时,协程的上下文会被保存在一个特殊的数据结构中。这个过程不需要操作系统的介入,也不需要保存和恢复线程的上下文,因此协程的切换速度远远快于线程的切换。

协程的实现方式有很多,大体上可以分为两种:有栈协程(Stackful Coroutine)和无栈协程(Stackless Coroutine)。

有栈协程的运行方式与操作系统的线程非常相似,每一个有栈协程都运行在独立的栈上。有栈协程在执行到挂起点时,其所有的状态都会被保存在协程栈上。 因此栈就是有栈协程的完整运行时上下文,切换协程就是在切换线程使用的运行时栈。 挂起点前后的代码会自动地处理协程上下文的保存和恢复。

无栈协程则不需要独立的栈,其运行方式与操作系统的线程完全不同,其更像是经典的回调函数。 无栈协程执行到挂起点时,栈中不再保存任何状态,其所有状态都会保存在堆中由编译器生成的数据结构中。因此不同的无栈协程可以直接共用执行线程栈,也允许调度到不同线程上基于不同的栈运行。不过这种设计在实现上更加复杂,会在后续的章节中介绍。

有栈协程由于符合已有的线程模型,因此在实现上更加简单,所使用的接口也会更加地符合现有的编程习惯。在经过封装后使用有栈协程编写的代码与使用阻塞式同步API的代码没有任何区别。

对于一个简单的 echo 服务,一种使用有栈协程库的 Worker::run() 实现如下:

class Worker {
// ...
 
// 协程,使用阻塞式的API发起IO操作
void run() {
    if (!authenticate()) {                          // 阻塞式调用,内部会发起异步请求挂起当前协程
        conn_->close();
        return;
    }
    std::string request = co_receive(conn_);        // 阻塞式调用,挂起当前协程但不会阻塞线程,请求到达后恢复执行
    std::string response = process_request(request);
    co_send(conn_, response);                       // 阻塞式调用,挂起当前协程但不会阻塞线程,响应发送完成后恢复执行
}
 
// 另一个使用协程的函数
bool authenticate() {
    std::string auth_token = co_receive(conn_);     // 阻塞式调用,从网络中读取请求
    bool is_authenticated = !auth_token.empty();
    return is_authenticated;
}
 
public:
 
// 封装,将协程的派发指调度器中执行,避免阻塞调用者。
void start() {
    Coroutine co(run);      // 创建协程,协程在创建时通常将自动向调度器注册
    co.detach();            // 显式指定协程 co 与当前协程分离,避免阻塞当前协程
}
 
}

上面代码中的 run 函数是一个普通函数,经过协程库的 Coroutine 对象封装之后就会创建一个新的协程,由新协程执行 run 函数。 在创建协程后,协程库通常要求开发者调用 Coroutine::join()Coroutine::detach() 二者之一,以指定协程的运行方式。 co.join() 会将当前协程挂起,等待 co 协程执行完成后当前协程才会继续执行。在当前协程挂起的过程中,当前线程仍然可以根据调度器的安排继续运行其他的协程,例如 co 协程。 co.detach() 则会将 co 协程与当前协程分离,继续执行后续的代码。此时 co 协程会进入调度器的调度队列,根据调度器的不同策略,可能在当前线程中执行,也可能在其他线程中执行。

此外,有栈协程通常不必区分普通的函数或者“协程函数”。 例如 co_receive 内部的实现可能是一个和协程无关的普通函数,也可能会调用协程库的 API 创建新的协程和挂起当前协程。 但是对于调用者而言,若其期望像普通函数一样以阻塞式的方式执行,那么只需要直接调用即可。只有当调用者需要以其他方式执行时,才需要使用协程库的 API 来创建新的协程和挂起当前协程。


timeline
        title 运行方式
        section 协程 1
          worker<br>(栈1)
                    : run()
          worker<br>(栈1)
                    : run()
                    : authenticate()
          worker<br>(栈1)<br>(挂起协程,切换至栈2)
                    : run()
                    : authenticate()
                    : co_receive()
        section 协程 2
          server<br>(栈2)<br>(处理完毕,切换至栈1)
                    : accept_connection()
        section 协程 1 
          worker<br>(栈1)<br>(恢复协程)
                    : run()
                    : authenticate()
                    : co_receive()
          worker<br>(栈1)
                    : run()
                    : authenticate()
          worker<br>(栈1)
                    : run()
          worker<br>(栈1)<br>(挂起协程,切换至栈2)
                    : run()
                    : co_receive()
        section 协程 2 
          server<br>(栈2): accept_connection()

上图展示了有栈协程的运行方式,其中时间线上的每一列代表当前时间点上线程栈的函数调用栈。此处线程栈不特指某一个栈,而是线程当前使用的栈。更加具体的说,在 x86_64 架构上是 rsp 寄存器指向的栈。 在时间线的最左侧,worker 协程在执行 run() 函数时调用了 authenticate() 函数,而 authenticate() 函数又调用了 co_receive() 函数。可以看到无论是在代码中还是在调用栈中调用协程函数和普通函数没有任何区别。 然而当 co_receive() 函数发现当前没有数据可读时,它会挂起当前协程并将执行流和运行时栈切换到另一个协程。当网络数据到达且协程 2 执行到挂起点后,调度器会将当前线程的执行流和运行时栈切换到 worker 协程中,继续执行 co_receive() 函数。 对于 worker 协程而言,co_receive() 函数的调用就是一个普通的函数调用,甚至是一个阻塞式的函数调用。但是对于底层的执行线程而言,在等待数据到达的过程中继续执行了其他的协程,既没有浪费 CPU 忙等,也没有进行内核态线程的上下文切换。

C++ 引入 M:N 线程模型(协程)的潜在问题

M:N 线程模型是指在用户态的线程(协程)和操作系统的线程之间存在多对多的关系。其中 M 代表用户态线程的数量,N 代表操作系统线程的数量。通常而言 M > N 并且用户态线程可以通过调度器在不同的操作系统线程中执行。

与之对应的 1:1 线程模型是指每一个用户态线程都对应一个操作系统线程。此时用户态线程与操作系统线程通常是一对一绑定的,一个操作系统线程只能执行一个用户态线程。 应用程序无需关心线程的调度和切换,由操作系统自动地完成线程的调度和切换。

C++ Proposal P3620 提出了 M:N 线程模型在与现有的基于 1:1 线程模型的第三方库结合使用时可能会出现的问题。

第三方库代码使用了操作系统线程的同步机制,例如 thread_local、信号处理、锁等,并且要求用户只能在特定的线程中调用第三方库代码。 但是用户可能会将协程误认为线程,认为自己始终在同一个协程中调用第三方库,但忽略了协程调度器可能会将协程调度到其他线程中执行。 这会破坏第三方库的约定,导致不可预知的错误。

即使用户正确配置调度器在同一个线程中执行使用了第三方库的协程,仍然可能导致意料之外的问题。考虑如下代码:

struct UserExtensibleType
{
    virtual void some_method() = 0;
};
std::mutex giant;
void library::someFunction(UserExtensibleType &object)
{
    std::lock_guard g{giant};
    //...
    object.some_method();
}

其中用户传入第三方库的 UserExtensibleType 对象在执行时可能直接或间接的调用了异步的协程函数,导致当前 someFunction 函数的执行作为协程被挂起。 但是 someFunction 函数完全可能在此期间被新协程在同一个线程上第二次调用,导致 giant 锁被多次加锁。 多线程情况下同时多次调用 someFunction 是正常的,第二次调用会挂起当前线程并等待第一次调用完成。 但是第一个 someFunction 的执行已经被协程挂起,且必须由同一个线程继续执行。而第二次调用又挂起了当前线程,因此没有任何协程能够继续执行,导致死锁。

其核心问题在于 C++ 并没有像 Java、Go、JavaScript、Python 等语言一样拥有一个语言虚拟机,C++ 运行时也无法区分用户编写的代码和外部第三方库的代码。

Java 也经历了像 C++ 一样由 1:1 线程模型演变为 M:N 线程模型的过程。Java 原先的线程模型是 1:1 的,每一个 Thread 对象都是一个平台线程(platform thread),与操作系统线程 1:1 绑定。 用户态线程则由继承自 Thread 的 VirtualThread 对象表示,被称为虚拟线程(virtual thread)。虚拟线程与操作系统线程之间的关系是 M:N 的,不具有绑定关系。

不同的是,Java 原先已经对线程模型进行了抽象和封装,在引入虚拟线程之后无论是还是虚拟线程都可以使用相同的 API 来创建和管理线程,二者仅在创建时有所不同,由平台线程迁移至虚拟线程不需要任何修改代码。 针对上面 C++ 的例子,Java 并不会出现上述问题。因为 Java Mutex 的行为取决于当前是否在虚拟线程中执行,而当前线程是否是虚拟线程是在线程启动时就确定的。因此当 library::someFunction 在虚拟线程中执行且被挂起时,操作系统线程仍然会继续运行。即使第二次调用 library::someFunction 尝试对 giant 加锁,Java Mutex 会识别到当前线程是虚拟线程,底层的操作系统线程仍然会继续运行并且不会被挂起。

协程除了能够节省上下文切换的开销外,还可以将异步编程的代码简化为同步的形式,避免了回调地狱的问题。

例如 co_receive 可以和事件循环结合使用,调用协程库提供的API将协程挂起并在事件发生后恢复执行。

std::string co_receive(Connection conn) {
    std::string request;
    auto current_coroutine = CO_CURRENT();      // 1. 获取当前协程的上下文
    auto waker = current_coroutine->waker();    // 2. 获取当前协程的唤醒器
	ev.register_event_handler(                  // 3. 注册回调函数(线程A)
	    Event{conn, EventType::Readable},
	    [&waker] {                              // 此处可以使用 waker 的引用,因为 waker 属于当前协程上下文的一部分,在挂起后恢复前将一直有效
	        waker->wake();                      // 5. 唤醒协程(线程B)
	    });
    current_coroutine->suspend();               // 4. 挂起当前协程,等待回调函数执行(线程A)
    // 6. 从 suspend 后恢复执行(线程C)
	request = conn->receive_non_blocking();
    return request;
}

协程的调度

协程在和异步IO库配合时有多种不同的调度策略。 例如在上面的例子中:

  • 线程 A 在执行到 4 挂起当前协程后,线程 A 应该挂起、继续执行当前线程中的其他协程还是从其他线程的任务队列中偷取已就绪的协程执行?
  • 异步库的线程 B 在 5 处唤醒协程后,线程 B 应该立即返回还是直接执行被唤醒的协程?
  • 协程执行到 6 处时,底层的执行线程应该是进入函数时的线程 A、异步库中唤醒协程的线程 B、还是调度器线程池中的其他线程 C?

理论上,这些行为都是合法的。 boost::fibers 库中就允许自定义协程的调度策略,既可以将协程的调度限制在启动协程的线程中(boost::fibers 的默认行为),也可以基于任务偷取在全局初始化一个线程池,允许协程在不同的线程中执行。

一个协程也可以没有独立的调度器,而是直接由调用者自己管理和调用。在其他语言中常见的生成器(Generator)就是一个例子。

Generator 是一种特殊的函数,它可以在内部函数的执行过程中暂停并返回一个值,然后在稍后恢复执行。同时内部函数的执行过程是按需的、由调用者控制的。 例如,一个按需生成斐波那契数列的函数可以这样实现:

// 需要注意,函数的返回类型并不一定与协程 yield 的值类型相同
// 此处函数的返回类型为 void,但 yield 的值类型为 int
void fibonacci() {
    // 3. 进入生成器函数内部
    int a = 0, b = 1;
    while (true) {
        CO_YIELD(a);        // 4. 挂起当前协程,返回 a 的值给调用者
        // 7. 从 yield 后恢复执行,a 和 b 的值仍然保持 yield 前的状态
        int c = a + b;
        a = b;
        b = c;
        // 8. while 循环,返回到 4 处重新挂起协程并返回 a 的值
    }
}
 
int main() {
    Coroutine gen(fibonacci);               // 1. 创建协程
 
    auto value = gen();                     // 2. 恢复协程,获取下一个值。此时协程内部从 3 开始执行
    // 5. 协程从 4 处退出,继续执行主函数
    std::cout << value << std::endl;        // 输出 0
 
    value = gen();                          // 6. 恢复协程,获取下一个值。此时协程内部从 7 开始执行,a=0,b=1
    // 9. 协程从 4 处退出,继续执行主函数
    std::cout << value << std::endl;        // 输出 1
    
    // 可以无限次调用 gen(),每次都会从上次挂起的地方继续执行
    std::cout << gen() << std::endl;        // 输出 1
    std::cout << gen() << std::endl;        // 输出 2
    std::cout << gen() << std::endl;        // 输出 3
    std::cout << gen() << std::endl;        // 输出 5
    std::cout << gen() << std::endl;        // 输出 8
    
    Coroutine gen2(fibonacci);              // 创建另一个协程,两个协程的状态互不影响
    std::cout << gen2() << std::endl;       // 输出 0
    std::cout << gen2() << std::endl;       // 输出 1
    std::cout << gen2() << std::endl;       // 输出 1
    std::cout << gen() << std::endl;        // 输出 13
    std::cout << gen2() << std::endl;       // 输出 2
    std::cout << gen() << std::endl;        // 输出 21
    std::cout << gen2() << std::endl;       // 输出 3
    // ...
}

在这个例子中,协程的调度完全由调用者控制,不需要额外的协程调度器。调用者可以在任意时刻调用协程的 T operator ()() 方法来恢复协程的执行,并且协程可以在任意位置挂起并返回一个值给协程 T operator ()() 的调用者。

boost::context 库提供了一个不带有调度器的有栈协程实现,使用 boost::context 可以非常快速地实现上面的斐波那契数列生成器

虽然在上面的例子中,我们总是以同步阻塞的方式来调用协程,但是协程并不总是需要阻塞执行的。 例如当我们已知需要同时发起多个异步操作时,我们可以使用协程异步非阻塞的 API 一次性创建多个协程并发地执行它们。 再通过 when_all() 等函数来同步地等待所有操作完成。

// 并发地下载数组中的所有 URL
void download_files(std::vector<URL> urls) {
    std::vector<Coroutine> coroutines;
    for (const auto& url : urls) {
        // 注意此处对每一个待下载的 URL 都创建了一个新协程,并且并没有调用 `Coroutine::join()` 或 `Coroutine::detach()` 执行协程
        // 因此下面的代码将会立即返回,并将创建的协程放入 coroutines 数组中。
        coroutines.push_back(Coroutine([url] {
            auto data = co_download(url);
            co_save(data);
        }));
    }
    co_when_all(coroutines);        // 同时执行所有协程,并等待所有协程执行完成
}
 
// 同时尝试对所有 server 发起请求,但是只要有任意一个请求成功就直接返回,不再等待其他请求甚至取消其他请求
std::string query_dns(std::vector<std::string> server, std::string domain) {
    std::vector<Coroutine> coroutines;
    for (const auto& s : server) {
        // 注意此处对每一个待下载的 URL 都创建了一个新协程,并且并没有调用 `Coroutine::join()` 或 `Coroutine::detach()` 执行协程
        // 因此下面的代码将会立即返回,并将创建的协程放入 coroutines 数组中。
        coroutines.push_back(Coroutine([&s, &domain] {
            auto result = co_query(s, domain);
            if (result) {
                CO_RETURN(result);  // 直接返回结果,结束当前协程
            }
        }));
    }
    return co_when_any(coroutines); // 同时执行所有协程,并等待任意一个协程执行完成
}

实际上,利用协程库提供的挂起和唤醒机制可以非常方便地实现上面使用到的 co_when_all()co_when_any() 函数。

template<typename T>
std::vector<T> co_when_all(std::vector<Coroutine<T>> coroutines) {
    std::vector<T> results(coroutines.size());
 
    auto current_coroutine = CO_CURRENT();      // 1. 获取当前协程的上下文
    auto waker = current_coroutine->waker();    // 2. 获取当前协程的唤醒器
    std::atomic<int> count = 0;                 // 3. 记录已完成协程的数量
 
    for (auto& c : coroutines) {
        Coroutine co([c=std::move(c), &results, &waker]() mutable {
            c.join();
            auto result = c.get_result();       // 5. 获取协程的结果
            results.push_back(result);          // 6. 将结果保存到结果数组中
            if (++count == coroutines.size()) { // 7. 如果所有协程都已完成
                waker->wake();                  // 8. 唤醒当前协程
            }
        });
    }
    
    current_coroutine->suspend();               // 4. 挂起当前协程,等待所有协程完成
    // 9. 从 suspend 后恢复执行
    return results;
}
 
template<typename T>
std::optional<T> co_when_any(std::vector<Coroutine<T>> coroutines) {
    auto current_coroutine = CO_CURRENT();      // 1. 获取当前协程的上下文
    auto waker = current_coroutine->waker();    // 2. 获取当前协程的唤醒器
 
    std::atomic<bool> done = false;             // 3. 记录是否有协程已完成
    std::optional<T> result;                    // 4. 保存已完成协程的结果
 
    for (auto& c : coroutines) {
        Coroutine co([c=std::move(c), &waker, &result]() mutable {
            c.join();
            auto result = c.get_result();       // 6. 获取协程的结果
            if (!done.test_and_set()) {         // 7. 如果已经有协程完成
                done = true;                    // 8. 标记协程已完成
                waker->wake();                  // 9. 唤醒当前协程
            }
        });
    }
    
    current_coroutine->suspend();               // 5. 挂起当前协程,等待任意一个协程完成
    // 10. 从 suspend 后恢复执行
    return result;
}

对于异步的应用场景而言,调度器仍然是必须的。上述场景中的 co_when_allco_when_any 在实际执行中就需要调度器的参与。 在斐波那契数列生成器的例子中,协程创建者 main() 函数对 gen() 的多次调用本身就可以视为一个简单的调度器。 在更复杂的场景中,往往需要轮转(round-robin)或者任务偷取(work-stealing)等调度策略来调度协程的执行。

一种基于 boost::fibers 提供的有栈协程和调度器实现协程挂起和异步回调唤醒的方式如链接。 其核心在于其中的 fiber::suspendwaker::wake 函数。suspend 将挂起协程,并将当前线程的控制流交给协程调度器,而 wake 将唤醒协程,通知协程调度器在适当的时候继续执行协程。

auto current_coroutine = fibers::context::active();
auto waker = current_coroutine->create_waker();
timer.call_after(std::chrono::seconds(sleep_time),
			     [&waker]() { waker.wake(); });
current_coroutine->suspend();

boost::fibers::fiber 提供的 create_waker()suspend()wake() 方法和调度器高度相关。 suspend() 实际上执行的工作是从调度器中获取一个协程并将当前线程的控制流切换至这个协程。因为并没有将当前协程重新放回至调度器,所以相当于当前协程被挂起了,不会被调度器调度执行。 而 wake() 方法则是简单地将协程重新放回至调度器中,等待调度器在适当的时候继续执行这个协程。

如果还是你好奇它背后是如何实现的,可以尝试不使用 boost::fibers 提供的调度器,而是仅基于 boost::context 提供的无调度器有栈协程自行实现一个简易的调度器。代码如链接。 其核心在于其中的 Executor::add_taskExecutor::suspend 函数,分别定义了协程挂起和执行结束时的行为。

Call with Current Continuation

Call with current continuation (callcc)是函数式编程中的一个概念,指的是在函数调用时将当前的执行上下文(continuation)作为参数传递给函数。这样,函数可以在执行完毕后,通过调用这个上下文来恢复到原来的执行点。C++ 中的 boost::context 库的 resume_with() 函数实际上就是实现了 callcc 来实现协程的挂起和恢复。

C++ 20 无栈协程

相对于侵入性较小的有栈协程,无栈协程需要编译器和运行时的支持,同时需要对函数的签名进行修改,因此在使用上会有一定的限制。C++20 引入了对无栈协程的支持,使用无栈协程实现的 worker 代码如下:

StacklessCoroutine<std::string> co_receive(Connection conn);
StacklessCoroutine<void> co_send(Connection conn, std::string response);
 
StacklessCoroutine<> async_run() {
    if (!co_await async_authenticate()) {               // 嵌套调用其他协程函数时需要使用 co_await 显式地挂起当前协程,等待内层协程的执行结果
        co_return;                                      // 协程的返回值使用 co_return 语句返回
    }
    std::string request = co_await co_receive(conn_);   // 阻塞式调用,挂起当前协程但不会阻塞线程,请求到达后恢复执行
    std::string response = process_request(request);
    co_await co_send(conn_, response);                  // 阻塞式调用,挂起当前协程但不会阻塞线程,响应发送完成后恢复执行
}
 
void start() {
    async_run().detach();       // 协程在创建时通常将自动向调度器注册,同时将协程与当前线程分离,避免阻塞当前线程
}

可以看到,无栈协程需要修改函数的签名,在原本返回值的基础上套了一层协程对象,用来保存协程的状态。 此时函数的含义已经发生了改变,其不再代表一个普通的计算函数 T f(int a, int b),而是一个协程的构造函数 Coroutine<T> f(int a, int b)。 这类似于有栈协程中的 Coroutine::Coroutine(std::function<T(int a, int b)> f)

因此,无栈协程函数在嵌套调用另一个无栈协程函数时,需要使用 co_await 显式地挂起当前协程并等待内层协程的执行结果。类似于有栈协程中的 Coroutine::join() 方法。 此外,通常而言协程的创建并不代表协程的执行。对于无需等待协程执行结果的场景,调用者通常仍然需要调用 Coroutine::detach() 方法指明启动协程并将协程与当前线程分离。

对于异步回调函数的实现,使用无栈协程的方式与有栈协程类似。我们可以使用协程库的 API 暂时挂起当前协程,并在异步操作完成后恢复协程的执行。

StacklessCoroutine<std::string> co_receive(Connection conn) {
    auto awaitable = callback_awaitable([conn](auto resume_handle){
        ev.register_event_handler(Event{conn, EventType::Readable}, [resume_handle, conn] {
            resume(conn->receive_non_blocking());    // 恢复当前协程
        });
    })
    std::string request = co_await awaitable;    // 挂起当前协程,等待回调函数执行
    co_return request;
}

C++ 20 的无栈协程标准只是提供了编译器和运行时的支持,具体的协程实现以及调度算法仍然需要依赖于第三方库。

C++ 20 无栈协程的编译器转换

无栈协程并没有任何的魔法,它通过编译器将原本顺序执行的代码拆散为多个零散的片段作为协程恢复时执行的回调函数,再以状态机的形式将其集中在一处定义。 例如下面的代码是一个看上去非常简单的协程创建和调用的例子:

// 定义一个代表协程的对象
struct MyCoroutine {
  // promise_type 为固定名称,更改会导致编译错误
  struct promise_type {
    promise_type(const promise_type &) = delete;
    promise_type &operator=(const promise_type &) = delete;
 
    MyCoroutine get_return_object() { return {}; }
    std::suspend_never initial_suspend() { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_void() {}
    void unhandled_exception() {
      std::terminate();
    }
  };
};
 
MyCoroutine myCoroutineFunction(Argument arg) {
    Trace::print("coroutine 1");
    co_await std::suspend_always{};
    Trace::print("coroutine 2");
}
 
void caller() {
    Trace::print("before coroutine");
    MyCoroutine myCoroutine = myCoroutineFunction(Argument{});
    Trace::print("after coroutine");
    myCoroutine.resume();
    Trace::print("after resume");
}

C++ 20 规定当某个函数使用了 co_awaitco_return 或者 co_yield 关键字时,该函数会被当成一个协程函数。 此时协程函数的返回值类型需要拥有一个名为 promise_type 的嵌套类型,并且该类型需要实现一些特定的函数,并且被编译器在不同的阶段调用。在后面的内容中,我们将协程函数返回的类型统称为 Coroutine。 此外,co_await 的类型同样需要实现一些特定的函数,并且被编译器调用。这个类型我们称之为 Awaiter

接下来我们将尝试分析一下编译器是如何将上面的代码转换为状态机的,并且在转换后上述 CoroutineAwaiter 对象是如何工作的。

首先看函数的初始化部分。代码中 MyCoroutine myCoroutineFunction(Argument) 函数会被展开为如下代码:

struct myCoroutineFunction_Frame {
    // 协程的状态对象,保存协程的状态
    MyCoroutine::promise_type promise;
    // 协程的参数
    Argument arg0;
    // 协程的状态机索引
    int resume_index;
    // 协程中跨越 co_await 的变量
    // ...
    // 协程的 awaiter 对象
    union {
        std::suspend_never awaiter0;
        std::suspend_always awaiter1;
        std::suspend_never awaiter2;
        // 其他 Awaiter 对象
    }
}
 
MyCoroutine myCoroutineFunction(Argument arg0) {
    // 创建协程的状态对象 myCoroutineFunction_Frame,之后所有有关协程的状态,例如函数参数、函数内部变量等都会保存在这个对象中。
    // 由于协程可以被传递到其他线程中执行,因此通常需要在堆上创建协程的状态对象。
    auto frame_ptr = new myCoroutineFunction_Frame();
 
    // 初始化 Frame
    // Frame 中含有一个类型为 MyCoroutine::promise_type 的 promise 字段,需要调用 promise_type 的无参构造函数
    // 之前提到过编译器会将无栈协程转换为状态机,此处设置状态机的初始状态
    frame_ptr->resume_index = 0;
    // 因为“构造函数”在构造完协程对象之后就会返回,其参数也会在此时被析构。
    // 因此需要保存函数的参数到 state 对象中。
    // 若函数参数是引用类型或者指针类型,则可能会造成悬空指针。需要开发者自行管理。
    frame_ptr->arg0 = std::move(arg0);
    // 初始化 Frame 中其他状态,与编译器实现细节相关,此处不再展开。
 
    // 调用 promise_type::get_return_object(),返回 MyCoroutine 对象,即当前函数的返回值。
    MyCoroutine result_object = frame_ptr->promise.get_return_object(&frame_ptr->promise);
 
    // 调用“协程构造函数”,初始化协程的状态
    myCoroutineFunction(frame_ptr);
 
    return result_object;
}

此外,编译器会生成一个类型为 void (myCoroutineFunction_Frame*) 的函数,包含 myCoroutineFunction 函数原本的逻辑,并在进入函数和使用 co_return 返回时由编译器隐式地填充一些代码。

// 注意此处的函数参数和返回值
void myCoroutineFunction(myCoroutineFunction_Frame *frame) {
  try {
    // 调用 promise_type::initial_suspend(),得到一个 Awaiter 对象并立即 await。
    // 该函数通常用于在协程开始执行之前进行一些初始化操作,
    // 并和调度器共同决定是在创建协程后立即执行协程还是仅将协程放入调度队列,在后续通过调度器执行。
    co_await frame->promise.initial_suspend();
    
    Trace::print("coroutine 1");
 
    co_await std::suspend_always{};
 
    Trace::print("coroutine 2");
 
    // 当协程执行完毕时隐式调用 `co_return;` 时,调用 promise_type::return_void(),表示协程的执行已经完成。
    // 这需要 promise_type 实现对应类型的 return_void 函数。
    frame->promise.return_void();
 
    // 若函数使用 `co_return value;` 显式地返回了一个值,则会调用 promise_type::return_value(T value),将返回值传递给 promise 对象。
    // 这需要 promise_type 实现接受对应类型的 return_value 函数。
 
    // co_return 的处理流程仍在继续。
    goto final_suspend;
  } catch (...) {
    // 协程执行过程中发生异常时,调用 promise_type::unhandled_exception(),处理异常。
    frame->promise.unhandled_exception();
 
    // 由于 C++ 的异常处理机制,在 catch 块中中断执行流并返回会导致错误。因此需要在离开 catch 块后再调用 final_suspend。
    goto final_suspend;
  }
 
final_suspend:
  // 调用 promise_type::final_suspend(),得到一个 Awaiter 对象并立即 await。
  // 该函数通常用于在协程结束时进行一些清理操作。
  // 或者向调度器协议标记协程已结束,可获取协程的返回值。
  co_await frame->promise.final_suspend();
 
  delete frame;
  return;
}

此时的 void myCoroutineFunction(myCoroutineFunction_Frame *frame) 仍然不是最终的形态。 编译器会继续展开 co_await 语句,将 void myCoroutineFunction(myCoroutineFunction_Frame *frame) 转换为一个状态机,并在每个 co_await 语句处插入对 Awaiter 对象的调用。

void myCoroutineFunction(myCoroutineFunction_Frame *frame) {
  try {
    switch (frame->resume_index) {
    case 0:
      // 协程的初始状态
      // 从函数开头截止到函数的第一个 co_await 语句,即 co_await frame->promise.initial_suspend()
 
      // co_await frame->promise.initial_suspend() 将会展开成以下代码:
      frame->awaiter1 = frame->promise.initial_suspend();
      // 判断 awaiter 是否已经准备就绪,可以直接恢复执行
      if (awaiter.await_ready()) {
        frame->resume_index = 1; // 设置协程的恢复状态
        // 已经准备就绪,直接恢复执行
        // 注意此处 case 0 的结尾没有 break 语句,此处会 fallthrough 到 case 1
      } else {
        // coroutine_handle 是协程的句柄,内部指向当前协程的状态对象,可用于恢复执行协程
        auto coroutine_handle =
            std::coroutine_handle<MyCoroutine::promise_type>::from_promise(
                *promise);
 
        // 未准备就绪,调用 awaiter.await_suspend(coroutine_handle) 注册回调函数
        // 当 awaiter 准备就绪时,由其他模块调用 coroutine_handle.resume() 恢复执行
        awaiter.await_suspend(coroutine_handle);
        // await_suspend 应当立即返回并继续执行
 
        // 函数返回,控制流交给调用方
        // 注意 awaiter 保存在 frame 中,将在协程恢复时调用其 await_resume() 函数后析构
        return;
      }
    case 1:
      // 此时 frame->awaiter 为 frame->promise.initial_suspend() 返回的 Awaiter 对象
      // 调用其 await_resume() 函数,通知 Awaiter 协程已恢复执行
      frame->awaiter1.await_resume();
      // 析构 frame->awaiter
      frame->awaiter1.~suspend_never();
 
      // 此段为函数的第一个 co_await 语句到函数的第二个 co_await 语句
      Trace::print("coroutine 1");
 
      // co_await std::suspend_always{} 会以同样的方式展开
      frame->awaiter2 = std::suspend_always{};
      // 判断 awaiter 是否已经准备就绪,可以直接恢复执行
      if (awaiter.await_ready()) {
        frame->resume_index = 2; // 设置协程的恢复状态
        // 已经准备就绪,直接恢复执行
        // 注意此处 case 1 的结尾没有 break 语句,此处会 fallthrough 到 case 2
      } else {
        // coroutine_handle 是协程的句柄,内部指向当前协程的状态对象,可用于恢复执行协程
        auto coroutine_handle =
            std::coroutine_handle<MyCoroutine::promise_type>::from_promise(
                *promise);
 
        // 未准备就绪,调用 awaiter.await_suspend(coroutine_handle) 注册回调函数
        // 当 awaiter 准备就绪时,由其他模块调用 coroutine_handle.resume() 恢复执行
        awaiter.await_suspend(coroutine_handle);
        // await_suspend 应当立即返回并继续执行
 
        // 函数返回,控制流交给调用方
        // 注意 awaiter 保存在 frame 中,将在协程恢复时调用其 await_resume() 函数后析构
        return;
      }
    case 2:
      // 此时 frame->awaiter 为 std::suspend_always{} 返回的 Awaiter 对象
      // 调用其 await_resume() 函数,通知 Awaiter 协程已恢复执行
      frame->awaiter2.await_resume();
      // 析构 frame->awaiter
      frame->awaiter2.~suspend_always();
 
      // 此段为函数的第二个 co_await 语句到函数的第三个 co_await 语句
      Trace::print("coroutine 2");
 
      frame->promise.return_void();
 
      goto final_suspend;
    case 3:
      // 此时 frame->awaiter 为 frame->promise.final_suspend{} 返回的 Awaiter 对象
      // 调用其 await_resume() 函数,通知 Awaiter 协程已恢复执行
      frame->awaiter3.await_resume();
      // 析构 frame->awaiter
      frame->awaiter3.~suspend_never();
 
      // 协程结束,退出
      delete frame;
      return;
    default:
      // 非法状态,正常情况下不会到达
      // ...
    }
  } catch (...) {
    // 协程异常处理
    frame->promise.unhandled_exception();
 
    goto final_suspend;
  }
 
final_suspend:
  // 展开 co_await frame->promise.final_suspend(),与之前一致,此处省略
  // ...
 
  delete frame;
  return;
}

这可以通过 CppInsights 查看转换后的代码(非编译器实现),或者通过 godbolt 编译并在 dogblot 查看真正的编译器如何进行转换的。

通过转换后的代码,可以比较清晰地看到 Coroutine::promise_typeAwaiter 中各个函数的调用关系和发挥的作用。 每当遇到 co_await 语句时,程序实际上执行了普通的函数 return 返回到调用者处。并且可以通过 Awaiter::await_suspend(std::coroutine_handle) 的参数来随时恢复协程的执行。 然而对于调用者而言,它只拥有一个 Coroutine 对象,并不知道 Awaiter 对象的存在。它该如何恢复协程的执行呢?

答案是通过 promise_type::get_return_object() 在构造 Coroutine 对象时注入一个 std::coroutine_handle 对象。 std::coroutine_handle 内部是一个指向 myCoroutineFunction_Frame 的指针,因此即使一直不更新 Coroutine::handler,其状态也是最新且正确的,可以用于恢复协程的执行。

struct MyCoroutine {
  struct promise_type;
  // 当前协程的句柄,其内部指向当前协程的状态对象,内部状态会随着协程运行自动更新
  std::coroutine_handle<promise_type> handle_;
 
  MyCoroutine(std::coroutine_handle<promise_type> handle) : handle_(handle) {}
 
  // promise_type 为固定名称,更改会导致编译错误
  struct promise_type {
    promise_type() {}
    promise_type(const promise_type &) = delete;
    promise_type &operator=(const promise_type &) = delete;
 
    MyCoroutine get_return_object() {
      return {std::coroutine_handle<promise_type>::from_promise(*this)};
    }
    std::suspend_never initial_suspend() { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    void return_void() {}
    void unhandled_exception() { std::terminate(); }
  };
 
  void operator()() {
    handle_.resume();
  }
};
 
MyCoroutine myCoroutineFunction() {
  std::cout << "myCoroutineFunction 1" << std::endl;
  co_await std::suspend_always{};
  std::cout << "myCoroutineFunction 2" << std::endl;
  co_await std::suspend_always{};
  std::cout << "myCoroutineFunction 3" << std::endl;
}
 
int main() {
  std::cout << "main 1" << std::endl;
  MyCoroutine f = myCoroutineFunction();
  std::cout << "main 2" << std::endl;
  f();
  std::cout << "main 3" << std::endl;
  f();
  std::cout << "main 4" << std::endl;
}

接下来考虑使用无栈协程实现斐波那契数列生成器,生成器需要在函数的正常执行过程中临时中断执行并返回一个值。

按照上面的分析,我们需要通过 co_await 返回一个 Awaiter 对象,此时能够协程中断并返回到调用者处。对于调用者而言,它只能通过 Coroutine 对象来获取协程的返回值。 然而,我们既无法在协程内部直接访问编译器展开协程时使用的 framereturn_objectpromise 等对象,也无法在协程外部通过 Coroutine 访问协程的 Awaiter 或内部状态。

C++ 20 针对这个情况引入了 co_yield 关键字,co_yield value 等价于调用 co_await promise.yield_value(value) 函数,允许 promise_type::yield_value 函数将值保存在 promise 对象中,并通过 Coroutine::handle_.promise() 访问。

使用无栈协程实现的斐波那契数列生成器代码如下:

struct MyCoroutine {
  struct promise_type;
  std::coroutine_handle<promise_type> handle_;
 
  MyCoroutine(std::coroutine_handle<promise_type> handle) : handle_(handle) {}
 
  // promise_type 为固定名称,更改会导致编译错误
  struct promise_type {
    // 保存 yield 的值
    int value_;
 
    promise_type() {}
    promise_type(const promise_type &) = delete;
    promise_type &operator=(const promise_type &) = delete;
 
    MyCoroutine get_return_object() {
      return {std::coroutine_handle<promise_type>::from_promise(*this)};
    }
    std::suspend_always initial_suspend() { return {}; }
    std::suspend_never final_suspend() noexcept { return {}; }
    std::suspend_always yield_value(int value) {
      value_ = value;
      return {};
    }
    void return_void() {}
    void unhandled_exception() { std::terminate(); }
  };
 
  int operator()() {
    handle_.resume();
    return handle_.promise().value_;
  }
};
 
MyCoroutine fibonacci() {
  int a = 0, b = 1;
  while (true) {
    co_yield a;
    int c = a + b;
    a = b;
    b = c;
  }
}
 
int main() {
  MyCoroutine gen = fibonacci();
  for (int i = 0; i < 10; ++i) {
    std::cout << gen() << std::endl;
  }
}

完整的代码可以在 godbolt 上查看。

现在我们实现了从协程暂停并返回值到调用者的过程,那么如何在恢复协程执行的时候想协程传递参数呢,例如 co_await async_read()? 答案是通过 Awaiter::await_resume 的返回值传递。编译器允许 await_resume 返回任意类型的值,并且这个值会作为 co_await 表达式的值返回给调用者。 当异步操作执行完成后,将结果保存至 promise_type 对象中作为中转,然后在 Awaiter::await_resume 中返回。 其关键在于 promise_type 可以在 co_await value 执行前对 value 进行一系列转换。此处的 value 类型甚至可以不是 Awaiter。因此转换后的 Awaiter 可以保存一份对 promise_type 的引用。 并在 await_resume 中访问 promise_type 对象获得期望的返回值。

struct SuspendByYield {};
struct SuspendByAwait {};
 
struct AwaiterWithPromise {
  promise_type &promise;
  bool await_ready() { return false; }
  void await_suspend(std::coroutine_handle<promise_type>) {}
  int await_resume() { return promise.value_; }
};
 
struct MyCoroutine {
  struct promise_type {
    int value_;
    // ...
    AwaiterWithPromise yield_value(SuspendByYield) { return {*this}; }
    AwaiterWithPromise await_transform(SuspendByAwait) { return {*this}; }
  }
 
  void resume(int value) {
    handle_.promise().value_ = value;
    handle_.resume();
  }
 
  int get() { return handle_.promise().value_; }
 
  std::coroutine_handle<promise_type> handle_;
};
 
MyCoroutine plus() {
  int a = co_yield SuspendByYield{};
  std::cout << "Coroutine got " << a << std::endl;
  int b = co_await SuspendByAwait{};
  std::cout << "Coroutine got " << b << std::endl;
  co_return a + b;
}
 
int main() {
  MyCoroutine f = plus();
  f.resume(1);
  f.resume(2);
  std::cout << "Coroutine returned " << f.get() << std::endl;
}

在上面的代码中,SuspendByYieldSuspendByAwait 是两个空的结构体,分别用于 co_yieldco_await。在实际的代码中完全可以仅保留一个或者合并二者。 对于 SuspendByYieldco_yield 会调用 promise_type::yield_value。代码中的 yield_value 则趁机构造了一个保存了 promise 引用的 AwaiterWithPromise 对象。 对于 SuspendByAwaitco_await 会自动尝试调用 promise_type::await_transform,代码中的 await_transform 同样返回了一个保存了 promise 引用的 AwaiterWithPromise 对象。 在这之后,主协程在代用 handle_.resume() 之前设置好 promise.value_ 的值。协程恢复之后,await_resume 会返回 promise.value_ 的值,进而传递给 co_awaitco_yield 表达式。 完整的代码可以在 godbolt 上查看。

事实上,对于需要通过第三方异步库唤醒的情况往往不需要这么麻烦。因为通常而言可以在 Awaiter::await_suspend 中发起异步操作、传递唤醒句柄以及 Awaiter 对象自身的引用,将需要返回的值直接保存至 Awaiter 对象中,不需要通过 promise_type 中转。

struct TimeoutAwaiter {
  std::chrono::milliseconds timeout_;
  std::chrono::milliseconds real_time_passed_;
 
  bool await_ready() { return false; }
  void await_suspend(std::coroutine_handle<> handle) {
    // 将唤醒句柄传递给第三方异步库,此处使用 std::thread 模拟第三方异步库执行回调函数
    std::thread([handle = std::move(handle), timeout_, start_time = std::chrono::steady_clock::now(), awaiter = this] () mutable {
      std::this_thread::sleep_for(timeout_);
      // 计算实际经过的时间,将结果直接保存至 awaiter 对象中
      std::chrono::milliseconds real_time_passed = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start_time);
      awaiter->real_time_passed_ = real_time_passed;
      // 需要注意,在执行 handle.resume() 时,会在当前线程(std::thread)中恢复协程的执行,而不是原先创建协程的线程。
      handle.resume();
    }).detach();
  }
  std::chrono::milliseconds await_resume() { 
    return real_time_passed_;
  }
};
 
MyCoroutine clock() {
    std::cout << "clock start on thread " << std::this_thread::get_id() << std::endl;
    while (true) {
        co_await TimeoutAwaiter{std::chrono::milliseconds(1000)};
        std::cout << "clock tick on thread " << std::this_thread::get_id() << std::endl;
    }
}
 
int main() {
    std::cout << "main start on thread " << std::this_thread::get_id() << std::endl;
    clock();
    std::cout << "clock returned" << std::endl;
    // 等待 10 秒后退出。
    std::this_thread::sleep_for(std::chrono::seconds(10));
}

在上面的代码中,协程后续的执行会在 std::thread 中恢复而不是原先创建协程的主线程中恢复。因此如果第三方异步库的线程只负责唤醒,而不应该执行协程的后续操作时,则可以直接将 std::coroutine_handle 传递给应用的线程池恢复执行。 一种可能的实现方式如下:

ThreadPool pool;
 
void await_suspend(std::coroutine_handle<> handle) {
  // 将唤醒句柄传递给第三方异步库,此处使用 std::thread 模拟第三方异步库执行回调函数
  std::thread([handle = std::move(handle), timeout_] () mutable {
    std::this_thread::sleep_for(timeout_);
    // 在线程池中恢复
    pool.enqueue([handle = std::move(handle)] () mutable {
      handle.resume();
    });
  }).detach();
}

和 P0876 草案提出的无调度器有栈协程类似,C++ 20 的无栈协程不需要调度器的参与即可完成协程的挂起和恢复,给予开发者最大程度的自由。同时协程的调度部分可以和其他库自由地配合,开发者也只需要编写数十行代码即可实现一个简单够用的调度器。 上面定时异步任务的完整代码可以通过 Godbolt 查看。

通过上面的例子可以发现,Awaiter 通常用于描述需要完成的异步操作,例如 async_read()async_sleep() 等函数通常就需要返回一个 Awaiter,供调用者 co_await 等待异步操作完成。 当某个异步操作内部同时也使用了协程时,那么返回值类型需要既是 Awaiter 也是 Coroutine,例如 echo 服务中的 authenticate() 函数。

Coroutine 则用于组织和管理当前协程函数,其基于 Coroutine::promise_type 实现了协程的状态管理和数据传递的工作,允许协程库精细地控制协程的行为。

总结一下,variable = co_await value 可以等价为以下代码:

auto get_awaitable(auto &&value) {
    if constexpr (requires { promise_type::await_transform(value) }) {
        return promise_type::await_transform(value);
    } else {
        return value;
    }
}
 
template<typename Awaitable>
auto get_awaiter(Awaitable &&awaitable) {
    if constexpr (requires { value.operator co_await(); }) {
        return value.operator co_await();
    } else if constexpr (requires { operator co_await(static_cast<Awaitable&&>(awaitable)); }) {
        return operator co_await(static_cast<Awaitable&&>(awaitable));
    } else {
        static_assert(is_awaitable_v<Awaitable>, "Awaiter must be awaitable");
        return awaitable;
    }
}
 
auto awaitable = get_awaitable(value);
auto awaiter = get_awaiter(awaitable);
// 快速路径1:当 awaiter 已经准备好时,直接返回结果,避免中断协程执行
if (!awaiter.await_ready()) {
    // promise 代表当前协程 promise_type 的实例
    auto handle = std::coroutine_handle<promise_type>::from_promise(promise);
    bool suspend;
    if constexpr (requires { {awaiter.await_suspend(handle)} -> std::same_as<bool>; }) {
        suspend = awaiter.await_suspend(handle);
    } else {
        suspend = true;
    }
    // 快速路径2:当 await_suspend 返回 bool 且值为 false 时,直接恢复执行,避免中断协程执行
    if (suspend) {
        // 保存协程状态
        // frame->local_var_xxx = xxx;
        // 记录下次恢复执行的位置
        frame->resume_index = 1;
        // 中断协程执行,控制流返回到调用者
        return;
    }
}
resume_point_1;
variable = awaiter.await_resume();

C++ 20 的 Coroutine 中还有大量的隐藏特性,例如可以通过特化 std::coroutine_traits<Coroutine, Args...>::promise_type 来修改 Coroutine func(Args...) 对应的 promise_type 的类型。 有关 C++ 20 Coroutine 的更多细节可以参考 CppReference

异步编程框架

尽管上面讨论的众多异步编程模型都可以实现高并发地、异步地执行任务,但它们的开发和使用都高度依赖于底层的实现细节,抽象程度较低。

然而在实际的异步编程中,操作的超时取消、错误处理、结果的传递等都是非常常见的需求。这些功能通常是异步编程的基础设施,与具体的异步实现无关。 但是目前无论是使用回调函数还是协程,用户都需要自己实现这些功能。这显然不是一个理想的解决方案。

因此,我们需要一个更高层次的抽象来简化异步编程的复杂性。

promise 与 future

在 C++ 11 中,一种用于实现异步操作的高层次抽象是 std::promisestd::future

promise 对象和 future 对象之间是一对一的关系,promise 代表一个将会在未来某个时间完成的异步操作,future 代表这个异步操作的结果。

promise 和 future 相当于一个只能够发送一次消息的管道。通常而言 promise 对象由被调用者持有,在异步操作执行结束时调用 promise 的 API 设置异步操作的执行结果。此后异步操作的发起者就可以通过 future 对象的方法获知异步操作已经结束并且取回结果。

sequenceDiagram
    participant 任务1 as 任务1 (主线程)
    participant Promise as Promise
    participant Future as Future
    participant 任务2 as 任务2 (异步线程)

    任务1 ->> Promise: 1. 创建promise
    Promise -->> 任务1: 
    任务1 ->> Promise: 2. 获取关联的future
    Promise ->> Future: 自动绑定
    Future -->> 任务1: 

    任务1 ->> 任务2: 3. 启动异步任务,移动promise
    activate 任务2
    任务2 -->> 任务1: 

    任务2 ->> 任务2: 4. 执行计算任务
    任务2 ->> Promise: 5. 设置结果值(set_value)
    activate Promise
    Promise ->> Future: 6. 自动同步结果
    deactivate Promise
    deactivate 任务2

    note left of 任务2: 7. 异步操作完成

    任务1 ->> Future: 8. get() 获取结果
    activate Future
    Future -->> 任务1: 9. 返回计算结果
    deactivate Future

在 C++ 11 中,用户可以通过以下方式使用 promise 和 future:

// 代表一个异步操作,通常对外隐藏 promise,只返回 future
std::future<int> do_work() {
    std::promise<int> promise;
    std::future<int> future = promise.get_future();
    std::thread([promise = std::move(promise)] () mutable {
        try {
            int result = do_actual_work();
            promise.set_value(result);
        } catch (...) {
            promise.set_exception(std::current_exception());
        }
    }).detach();
    return future;
}
 
int main() {
    std::future<int> future = do_work();
 
    try {
        int result = future.get();
        std::cout << "Result: " << result << std::endl;
    } catch (std::string error) {
        std::cout << "Error: " << error << std::endl;
    }
}

在实现中,promise 和 future 之间通常通过共享一个对象来传递数据。一种非常简单的实现方式如下:

template <typename T>
class State {
    T value_;
    std::exception_ptr error_;
    std::atomic<bool> ready_ = false;
}
 
template <typename T>
class Promise {
    std::shared_ptr<State<T>> state_;
 
public:
    Promise():state_(std::make_shared<State<T>>()) {}
 
    Promise(const Promise&) = delete;
    Promise& operator=(const Promise&) = delete;
 
    // 获取与 promise 关联的 future 对象
    Future get_future() {
        return Future(this);
    }
 
    // 设置正常结束的返回值,只能调用一次
    void set_value(T value) {
        try {
            // operator =() 可能抛出异常
            state_->value_ = std::move(value);
        } catch (...) {
            state_->error_ = std::current_exception();
        }
        bool already_set = state_->ready_.exchange(true, std::memory_order_release);
        if (already_set) {
            throw std::logic_error("Promise already set");
        }
    }
 
    // 设置异常结束的返回值,只能调用一次
    void set_exception(std::exception_ptr error) {
        state_->error_ = error;
        bool already_set = state_->ready_.exchange(true, std::memory_order_release);
        if (already_set) {
            throw std::logic_error("Promise already set");
        }
    }
}
 
template <typename T>
class Future {
    std::shared_ptr<State<T>> state_;
 
public:
    Future(Promise<T>* promise) {
        state_ = promise->state_;
    }
    
    Future(const Future&) = delete;
    Future& operator=(const Future&) = delete;
 
    // 尝试获取结果,如果结果未准备好则返回 std::nullopt,在成功获取结果后,future 对象失效
    std::optional<T> try_get() {
        if (!state_) {
            throw std::logic_error("Future not valid");
        }
        if (!state_->ready_.load(std::memory_order_acquire)) {
            return std::nullopt;
        }
        if (state_->error_) {
            auto error = state_->error_;
            state_ = nullptr; // 防止多次调用
            std::rethrow_exception(error);
        }
        state_ = nullptr; // 防止多次调用
        return state_->value_;
    }
 
    // 获取结果,如果结果未准备好则等待直至结果准备好,只能调用一次
    T get() {
        if (!state_) {
            throw std::logic_error("Future not valid");
        }
        while (!state_->ready_.load(std::memory_order_acquire)) {
            // 可以使用条件变量或者其他方式等待
            std::this_thread::yield();
        }
        if (state_->error_) {
            auto error = state_->error_;
            state_ = nullptr; // 防止多次调用
            std::rethrow_exception(error);
        }
        state_ = nullptr; // 防止多次调用
        return state_->value_;
    }
}

promise 本身只表示一个将来完成的异步操作,并不关心异步操作的具体实现。在 C++ 中,async 对此进行了封装,可以更方便地实现异步操作。async 实际上就是对 do_work() 的封装。

template <typename T>
std::future<T> async(std::function<T()> func) {
    std::promise<T> promise;
    std::future<T> future = promise.get_future();
    std::thread([promise = std::move(promise), func = std::move(func)] () mutable {
        try {
            T result = func();
            promise.set_value(result);
        } catch (...) {
            promise.set_exception(std::current_exception());
        }
    }).detach();
    return future;
}

可以发现,promise 和 future 的仍然是一个非常低级的抽象,只针对异步操作中的结果传递进行了封装。对于操作的组合、取消、错误处理等功能仍然无能为力。

sender 与 receiver

在 C++ 提案 P2300 中,提出了一种新的异步编程模型,sender 和 receiver。

promise 和 future 模型只定义了异步任务结果传递的通路,具体如何生成结果、如何处理结果仍然暴露在外,需要用户自己实现。 与之不同的是 sender 和 receiver 是自包含的,在定义完成后它们就包含了完成任务所需的所有信息,不需要也不允许外界再进行任何操作,它们会按照预定的方式完成任务并传递结果。

receiver 是一个接收异步任务结果的对象,通常由异步任务的发起者构造,并在创建异步任务执行流时传递给 sender。receiver 接口包含以下方法:

template <typename Receiver, typename... Results>
concept receiver = requires(Receiver receiver, Results... results, std::exception_ptr error) {
    { set_value(receiver, results...) } -> std::same_as<void>;
    { set_error(receiver, error) } -> std::same_as<void>;
    { set_stopped(receiver) } -> std::same_as<void>;
};

下面是一个将结果直接输出到标准输出的 receiver,代码最后的 static_assert 可以验证 cout_receiver 是否符合 receiver 的要求。 虽然 cout_receiver 实现了支持任意个数参数的 set_value 方法,但在实际使用中,receiver 通常只会接收一个参数,并不需要处理传入任意个数参数的情况

struct cout_receiver {
    template <typename... Results>
    friend void set_value(cout_receiver self, Results... results) {
        std::cout << "Result: " << ( ... << results) << std::endl;
    }
 
    friend void set_error(cout_receiver self, std::exception_ptr err) {
        std::terminate();
    }
 
    friend void set_stopped(cout_receiver self) {
        std::terminate();
    }
};
 
static_assert(receiver<cout_receiver, int>);

sender 代表一个自包含的异步任务,其中包含了完成任务所需的所有信息。sender 在构造完成后对外仅公开一个 Operation connect(Sender, Receiver) 方法,表示将 sender 和 receiver 连接在一起。 当 sender 完成任务时,会将结果发送给通过 connect 绑定的 receiver。connect 方法返回的 Operation 对象则用来控制操作的执行,例如启动、取消等。

template <typename Operation>
concept operation = requires(Operation op) {
    { start(op) } -> std::same_as<void>;
};
 
template <typename Sender, typename Receiver, typename Operation>
concept sender = operation<Operation> && requires(Sender sender, Receiver receiver) {
    { connect(sender, receiver) } -> std::same_as<Operation>;
};

下面是一个简单的 sender 的实现,just 方法用于创建一个 sender 对象,这个对象会立即完成并将构造时传入的值发送给 receiver。

template <typename Receiver, typename Result>
struct just_operation : immovable {
    Receiver rec;
    Result value;
 
    friend void start(just_operation& self) {
        // 在操作开始后立即完成并将缓存的值发送给 receiver
        set_value(self.rec, self.value);
    }
};
static_assert(operation<just_operation<cout_receiver, int>>);
 
template <typename Result>
struct just_sender {
    Result value;
 
    // connect 方法用于将 sender 和 receiver 连接在一起,返回一个 operation 对象
    template <receiver<Result> Receiver>
    friend just_operation<Receiver, Result> connect(just_sender self, Receiver &&rec) {
        return { {}, rec, self.value };
    }
};
static_assert(sender<just_sender<int>, cout_receiver, just_operation<cout_receiver, int>>);
 
template <typename Result>
just_sender<Result> just(Result result) {
    return { result };
}

此时,我们可以通过以下方式使用 sender 和 receiver:

int main() {
    auto s = just(42);
    auto r = cout_receiver();
    auto op = connect(s, r);
    start(op);
}

当我们进一步重载 operator | 时,可以得到更简洁的语法:

template <typename Sender, typename Receiver>
auto operator|(Sender &&s, Receiver &&r) {
    return connect(std::forward<Sender>(s), std::forward<Receiver>(r));
}
 
int main() {
    auto op = just(42) | cout_receiver{};
    start(op);
}

上述代码可以在 Godbolt 上验证。

定制点(Customization Point):使用函数重载而非多态定义 sender 与 receiver 接口

你可能注意到了,在上述代码中,sender 和 receiver 的接口是通过友元函数而非成员方法来定义的。 因此实际上 set_value() 并不是 receiver 的成员方法,而是一个普通的函数。代码中定义了多个 receiver 时实际上发生了函数重载,而不是基于类对象的多态。 这主要是为了在 C++ 中实现定制点(Customization Point),在规定完成特定功能所需要的接口的同时,允许库提供一个通用的默认实现并允许用户针对特定类型提供一个定制的实现。 例如 C++ 标准库的 std::swapstd::hash 等接口均是定制点,允许用户为自定义类型提供一个定制的实现。

相比于基于多态,基于函数重载的性能更优。函数重载在编译期就可以确定调用的函数,因此没有运行时开销。而 C++ 没有像 Java 的 interface 或者 Rust 的 trait 一样的概念,使用纯虚函数和多态模拟的 interface 需要在运行时通过虚函数表查找函数,性能较差。

C++ 20 的 concept 也是一个可选的方案,使用 concept 表示 receiver 和 sender 的接口,并以成员函数的方式实现。这样可以避免多态的性能开销。 但是这个的方案具有非侵入性,扩展性更好。C++ 允许为一个 primitive 类型或者第三方库中的类型定义一个函数重载,而不需要修改原有的类定义。例如我们可以不用构造 justcout_receiver,直接将 int 定义为 sender,将 std::cout 定义为 receiver。完整的代码可以在 Godblot 中查看。

事实上这个方案并不是完美的,只是在目前 C++ 标准上效果最好的方案。截至 P2300R10,sender 和 receiver 接口的形式仍在讨论和迭代中,P2279比较了现有方案的优劣。有关 C++ 定制点的更多信息可以参考 这个知乎答案

接下来考虑实现 then,表示在前一个任务完成之后继续执行下一个任务,其函数签名为 then_sender then(sender s, function f)。因为 then 将一个 sender 通过某种方式(此处为参数 f)转换为另一个 sender,所以 then 也被称为 sender adapter。

then_sender 的构造是平凡的,直接将参数中的 senderfunction 保存在成员变量中即可。 接下来考虑 then_senderconnect 方法,它需要返回一个 then_operation,这一步几乎也是平凡的,在 then_sender 的基础上额外保存一个 receiver r 即可。 问题在于 then_operationstart 方法。它需要在 sender s 完成的时候将结果传递给 f,再将 f 的结果传递给 receiver。 而唯一能够从 sender s 中获取结果的方式是将 s connect 到某个 receiver 上,且这个 receiver 不能是最终的 receiver r。 因此我们必须在 then_operation 中定义一个新的 receiver 并 connect 到 s 上,这个 receiver 的 set_value 方法会在 sender s 完成时被调用,并将结果作为参数调用 f,最后将 f 的结果传递给最终的 receiver r

template <typename Operation>
struct then_operation {
    Operation inner_op;
 
    friend void start(then_operation &&self) {
        return start(inner_op);
    }
};
 
template <typename Sender, typename Function>
struct then_sender {
    Sender s;
    Function f;
 
    template <typename Receiver>
    struct s_receiver {
        Function f;
        Receiver r;
 
        template <typename Result>
        friend void set_value(s_receiver self, Result result) {
            auto result = self.f(result);
            set_value(self.r, result);
        }
 
        friend void set_error(s_receiver self, std::exception_ptr err) {
            set_error(self.r, err);
        }
 
        friend void set_stopped(s_receiver self) {
            set_stopped(self.r);
        }
    };
 
    template <typename Receiver>
    friend then_operation connect(then_sender &&self, Receiver &&r) {
        auto inner_op = connect(std::move(self.s), s_receiver{std::move(self.f), std::forward<Receiver>(r)});
        return { std::move(inner_op) };
    }
};

完整代码见 Godbolt

到了这一步,我们已经实现了一个简单的 sender 和 receiver 的框架。一些常用的算法例如 when_allwhen_any 都可以仿照 then 的实现方式来实现。

但是这套 std::execution 框架的能力还远不止于此。 直到现在我们还没有提到的事情是 scheduler。回忆一下 justthen 的实现,juststart 立即在在调用 start 的线程上向 receiver 发送结果,而以 then 为代表的 sender adapter 则很有可能在收到结果后直接调用后续的 receiver,继而继续使用 just 的执行线程。 因此我们在第一步就实现的 just 就是一个 scheduler,它决定了当前任务甚至之后的任务应该在哪一个计算资源上运行。 如果我们使用的 scheduler 是基于线程池的 pool.scheduler(),那么后续的计算很可能也会在线程池中执行。

Godblot

Footnotes

  1. Linux 系统上下文切换与系统调用开销:https://www.sobyte.net/post/2022-06/ctx-switch/

  2. Linux 系统线程创建与销毁开销:https://lemire.me/blog/2020/01/30/cost-of-a-thread-in-c-under-linux/