C++20 新特性(15):协程(Coroutines )
ztj100 2024-12-14 16:12 75 浏览 0 评论
C++20 终于加入了协程(Coroutines )。协程是一种轻量级的用户态线程,线程的创建、切换、销毁等都不需要进出内核态,从而带来巨大的性能提升,特别是当线程数量巨大的时候。
下面从传统的C语言的 pthread 开始,到 C++11 的 thread 类,到 C++20 的协程,通过一个简单的加法的例子,比较一下协程和普通线程之间的异同。
Pthread 线程的例子
Pthread 线程的使用,通过 pthread_create() 来进行创建,通过 pthread_join() 来等待线程结束。线程之间可以通过创建线程的参数来共享一些变量,通过 mutex 和 condition value 来进行互斥和通知。
下面通过一个例子来说明Pthread线程的使用,四个整数,先通过两个线程进行两两求和,然后再由第三个线程对求和的结果再求和,最终得到四个整数之和。
#include <iostream>
#include <pthread.h>
#include <chrono>
using std::cout, std::endl;
// <1> 用来计算当前时间和指定开始时间的时间差,浮点数,单位秒
double time_diff( std::chrono::time_point< std::chrono::high_resolution_clock > * start )
{
return static_cast< std::chrono::duration<double> >( std::chrono::high_resolution_clock::now() - *start ).count();
}
// <2> 线程参数,包括求和相关信息,线程通信用的条件变量,以及起始时间
struct add_args
{
volatile int * p1;
volatile int * p2;
volatile int * ret;
pthread_mutex_t * mutex;
pthread_cond_t * cond;
std::chrono::time_point< std::chrono::high_resolution_clock > * start;
};
// <3> 线程函数,首先等待被加数准备好(简单以非0为准备好),然后执行加法,通知结果就绪
void * th_add( void * args )
{
struct add_args * aa = static_cast<struct add_args *>( args );
pthread_mutex_lock( aa->mutex );
while( *aa->p1 == 0 || *aa->p2 == 0 ) {
pthread_cond_wait( aa->cond, aa->mutex );
}
*aa->ret = *aa->p1 + *aa->p2;
cout << time_diff( aa->start ) << " sum is " << *aa->ret << endl;
pthread_mutex_unlock( aa->mutex );
pthread_cond_broadcast( aa->cond );
return static_cast<void *>( 0 );
}
int main( int argc, char * argv[] )
{
int sum[7] = { 0 };
auto start = std::chrono::high_resolution_clock::now();
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
struct add_args aa1, aa2, aa3;
aa1 = { sum + 0, sum + 1, sum + 4, &mutex, &cond, &start };
aa2 = { sum + 2, sum + 3, sum + 5, &mutex, &cond, &start };
aa3 = { sum + 4, sum + 5, sum + 6, &mutex, &cond, &start };
// <4> 启动线程,此时被加数还未准备好,线程都在等待
pthread_t t1, t2, t3;
pthread_create( &t1, NULL, th_add, &aa1 );
pthread_create( &t2, NULL, th_add, &aa2 );
pthread_create( &t3, NULL, th_add, &aa3 );
// <5> 准备被加数,通知线程工作
pthread_mutex_lock( &mutex );
sum[0] = 1, sum[1] = 2; sum[2] = 3;
pthread_cond_broadcast( &cond );
pthread_mutex_unlock( &mutex );
cout << time_diff( &start ) << " broadcast (1) " << endl;
pthread_yield();
pthread_mutex_lock( &mutex );
sum[3] = 4;
pthread_cond_broadcast( &cond );
pthread_mutex_unlock( &mutex );
cout << time_diff( &start ) << " broadcast (2) " << endl;
// <6> 等待线程结束,获取线程执行结果,
void * ret = NULL;
pthread_join( t1, &ret );
pthread_join( t2, &ret );
pthread_join( t3, &ret );
cout << time_diff( &start ) << " final " << sum[4] << " " << sum[5] << " " << sum[6] << endl;
pthread_mutex_destroy( &mutex );
pthread_cond_destroy( &cond );
return 0;
}
编译和运行结果为:
[smlc@test code]$ g++ -std=c++20 a15.cpp -lpthread
[smlc@test code]$ ./a.out
0.000127409 broadcast (1)
0.000454453 sum is 3
0.000562961 broadcast (2)
0.000610694 sum is 7
0.000679388 sum is 10
0.000708396 final 3 7 10
C++11 的 thread 类的线程例子
C++11 的 thread 类,除了可以使用 mutex 和 condition value 进行线程通信外,也封装了 promise 类和 future 类,进行更简单的线程间通信。
#include <iostream>
#include <thread>
#include <future>
#include <utility>
#include <chrono>
using std::cout, std::endl;
// <1> 用来计算当前时间和指定开始时间的时间差,浮点数,单位秒
double time_diff( std::chrono::time_point< std::chrono::high_resolution_clock > * start )
{
return static_cast< std::chrono::duration<double> >( std::chrono::high_resolution_clock::now() - *start ).count();
}
// <2> 线程函数,可以是任意参数类型
void th_add( std::promise<int> && sum, std::future<int> p1, std::future<int> p2,
std::chrono::time_point< std::chrono::high_resolution_clock > * start )
{
int ret = p1.get() + p2.get();
sum.set_value( ret );
cout << time_diff( start ) << " sum is " << ret << endl;
return;
}
// <3> 函数对象也可以作为线程函数,重载 operator () 运算符,同样支持任意参数类型
struct th_add_2
{
std::chrono::time_point< std::chrono::high_resolution_clock > * m_start;
void operator () ( std::promise<int> && sum, std::future<int> p1, std::future<int> p2 )
{
int ret = p1.get() + p2.get();
sum.set_value( ret );
cout << time_diff( m_start ) << " sum is " << ret << endl;
return;
}
};
int main( int argc, char * argv[] )
{
auto start = std::chrono::high_resolution_clock::now();
std::promise<int> sum[7];
std::future<int> ret6 = sum[6].get_future();
// <4> 启动线程,此时被加数还未准备好,线程都在等待
std::thread t1( th_add, std::move( sum[6] ), sum[5].get_future(), sum[4].get_future(), &start );
std::thread t2( th_add_2 { &start }, std::move( sum[5] ), sum[3].get_future(), sum[2].get_future() );
std::thread t3( th_add, std::move( sum[4] ), sum[1].get_future(), sum[0].get_future(), &start );
// <5> 准备被加数,通知线程工作
sum[0].set_value( 1 );
sum[1].set_value( 2 );
sum[2].set_value( 3 );
cout << time_diff( &start ) << " broadcast (1) " << endl;
std::this_thread::yield();
sum[3].set_value( 4 );
cout << time_diff( &start ) << " broadcast (2) " << endl;
// <6> 等待线程结束,获取线程执行结果,
t1.join();
t2.join();
t3.join();
cout << time_diff( &start ) << " final " << ret6.get() << endl;
return 0;
}
编译和运行结果为:
[smlc@test code]$ g++ -std=c++20 a15-2.cpp
[smlc@test code]$ ./a.out
0.000179768 broadcast (1)
0.000511978 sum is 3
0.000619311 broadcast (2)
0.000675313 sum is 7
0.000761572 sum is 10
0.000815648 final 10
协程(Coroutines)的基础
首先,协程是用户态的线程,没有对应的底层操作系统的线程,也就是说,连续启动的多个协程实际是在同一个底层操作系统的线程中运行,这些协程是串行执行的。当然了,可以在不同的底层操作系统线程中分别启动协程,甚至可以在协程暂停后,迁移到另一个底层操作系统线程中再恢复运行。
其次,底层操作系统的线程是由操作系统进行调度的,可以是进入内核态之后由内核进行调度切换,也可以是因为时间片到期后由操作系统通过中断技术强行进行调度切换。而协程则是在用户态进行切换,需要在用户态,由协程内部显式要求暂停,以及由外部显式将指定协程恢复运行。
因此 C++ 的协程的函数和普通的函数调用有差别:
- 协程的函数和普通函数调用的主要差别,是协程函数内部必须有 co_await 、 co_yield 、 co_return 等关键字的一个或多个。
- 一个函数因为上述关键字成为协程之后,就不能再 return 语句,也不能有可变参数。另外常量函数、构造函数、析构函数、 main 函数等也不能定义成协程。
- 普通函数调用一次只会返回一个结果,但协程调用一次,在执行过程中,可以多次暂停然后按需恢复执行,每次暂停都可以产生一个结果。所以生成器(generator)式协程可以源源不断的通过 co_yield 来产生数据。
协程为了实现在用户态控制的暂停和恢复执行,需要保存执行状态,同时还需要和外部交换是数据,因此定义了一系列的对象和接口,假设协程定义如下:
R coroutine_func( Args ... )
{
// ... coroutine body ...
co_await XXXX; // or : co_yield xxxx; // or : co_return xxxx;
// ... other coroutine body ...
}
首先是 Promise 类型,负责和外界进行数据交换,定义为:coroutine_traits< R, Args ... >::promise_type,如果未进行模板特化,默认为:R::promise_type,因此在定义协程的返回值类型时,要么在返回值类型(例如 R)里面定义 promise_type 类型,要么给进行模板 coroutine_traits< R, Args ... > 的特化,里面定义相应的 promise_type 类型。
这个 promise_type 类型要提供下列函数:
- auto initial_suspend() :初始化时的协程挂起
- auto final_suspend():结束时的协程挂起
- R get_return_object():获取返回结果对象,每次挂起时调用,返回结果给外部
- auto yield_value():用于 co_yield 设置结果对象的值,供后续挂起时返回结果使用
- auto return_value():用于 co_return 设置结果对象的值,供后续挂起时返回结果使用
然后是 awaitable 接口,用于 co_await 表达式, 用来控制是否挂起,以及在挂起和恢复时执行额外操作。这个 awaitable 接口类型,这个不是接口类,而是定义了一些接口函数的要求,需要提供下列函数:
- bool await_ready():判断是否可以运行,返回false表示需要挂起
- auto await_suspend(coroutine_handle<> h):需要挂起时执行此函数,其中 handle 是协程体的控制句柄,在后面描述
- void await_resume():从挂起中恢复时执行此函数
标准系统库中定义了两个缺省的 awaitable 接口的实现供使用:
- struct suspend_never :从不挂起,他的 await_ready() 返回 true
- struct suspend_always :总是挂起,他的 await_ready() 返回 false
最后是协程的控制句柄类 coroutine_handle< promise_type >,用于获取协程信息和控制协程的运行,例如恢复执行等。当协程需要挂起时,在 await_suspend() 函数中的参数会提供当前协程的控制句柄,供后续进行控制。控制句柄类提供下列接口:
- resume() 或者 operator () () :用于恢复协程运行
- Promise & promise() :用于获取协程对应的 Promise 对象
上述这些类型之间以及和协程关系,大致如下图所示:
这些类型中, promise_type 一般是需要自行定义的,因为这个是和协程的返回值相关,如果不使用协程的返回值(例如通过协程函数的参数来返回信息),也可以考虑使用标准库中定义的 std::noop_coroutine_promise 类型,awaitable 接口在需要对协程的挂起和恢复进行额外控制时,可以自行定义,否则也可以直接使用标准库中定义的 std::suspend_never 和 std::suspend_always 类型。而 coroutine_handle< promise_type > 则是标准库中已有定义,不需要额外定义。
协程的例子
C++20 的协程功能很强大很灵活,提供了很多可以定制的地方,但从另一个角度来看,也使得 C++20 的使用过于复杂,而且选择太多,同时没有推荐的实现方式,也容易写出各种古怪的代码。
除了实现单个协程本身的功能外,还需要实现多个协程之间的调度,这个调度还需要和协程本身配合实现,这样进一步增加了协程的复杂度,因此一个简单的协程程序,也需要很多代码来实现。
这里的例子不使用协程的返回值以及promise类型,只使用协程的参数,通过awaitable接口来进行协程的调度。
#include <iostream>
#include <chrono>
#include <coroutine>
#include <unistd.h>
using std::cout, std::endl;
// <1> 用来计算当前时间和指定开始时间的时间差,浮点数,单位秒
double time_diff( std::chrono::time_point< std::chrono::high_resolution_clock > * start )
{
return static_cast< std::chrono::duration<double> >( std::chrono::high_resolution_clock::now() - *start ).count();
}
// <2> 协程参数,包括求和相关信息,以及起始时间,以及协程句柄(进行调度使用),
// 同时也是 awaitable 接口对象,可以用于 co_await 调用
struct add_args
{
int * p1;
int * p2;
int * ret;
std::chrono::time_point< std::chrono::high_resolution_clock > * start;
std::coroutine_handle<> hc;
// <3> 总是要求挂起
bool await_ready() { return false; }
// <4> 挂起时,保存协程句柄,供后续调度使用
void await_suspend( std::coroutine_handle<> h ) { hc = h; return; }
// <5> 协程恢复运行时,清理协程句柄,避免后续再次调度
void await_resume() { hc = nullptr; return; }
};
// <6> 协程返回参数,主要是定义 promise_type ,虽然业务逻辑不需要用到,但协程框架需要
struct add_coro
{
struct promise_type
{
struct add_coro get_return_object() { return {}; }
std::suspend_never initial_suspend() { return {}; }
std::suspend_never final_suspend() { return {}; }
void return_void() {}
void unhandled_exception() {}
};
};
// <7> 协程函数运行体,里面有 co_await 和 co_return 表明这是协程,因此不能有 return 语句
struct add_coro th_add( struct add_args * args )
{
// <8> co_await 需要直接访问变量,如果写成 co_await *args; 会产生临时变量
struct add_args & args_ref = *args;
while( *args->p1 == 0 || *args->p2 == 0 ) {
co_await args_ref;
}
*args->ret = *args->p1 + *args->p2;
cout << time_diff( args->start ) << " sum is " << *args->ret << endl;
co_return;
}
// <9> 简单的协程调度程序,全部协程如果处于挂起的,都重新调度一下。
void schedule_add( int n, struct add_args * aa )
{
for( int i = 0; i < n; ++i ) {
if( aa[i].hc ) {
aa[i].hc.resume();
}
}
return;
}
int main( int argc, char * argv[] )
{
auto start = std::chrono::high_resolution_clock::now();
int sum[7] = { 0 };
struct add_args aa[] = {
{ sum + 0, sum + 1, sum + 4, &start },
{ sum + 2, sum + 3, sum + 5, &start },
{ sum + 4, sum + 5, sum + 6, &start } };
// <10> 启动协程,此时被加数还未准备好,协程都在挂起
th_add( &aa[0] );
th_add( &aa[1] );
th_add( &aa[2] );
// <11> 准备被加数,尝试调度协程工作
sum[0] = 1, sum[1] = 2; sum[2] = 3;
cout << time_diff( &start ) << " broadcast (1) " << endl;
schedule_add( 3, aa );
sum[3] = 4;
cout << time_diff( &start ) << " broadcast (2) " << endl;
schedule_add( 3, aa );
// <6> 等待协程结束,获取协程执行结果,
while( sum[6] == 0 ) {
schedule_add( 3, aa );
sleep( 1 );
}
cout << time_diff( &start ) << " final " << sum[4] << " " << sum[5] << " " << sum[6] << endl;
return 0;
}
编译和执行结果为:
[smlc@test code]$ g++ -std=c++20 a15-3.cpp -fcoroutines -lpthread
[smlc@test code]$ ./a.out
1.771e-06 broadcast (1)
0.000179137 sum is 3
0.000194825 broadcast (2)
0.000198804 sum is 7
0.000202648 sum is 10
0.000206225 final 3 7 10
从时间上也可以看到,由于不需要进入内核态,不需要创建底层操作系统线程,所以执行花费的时间少很多。
【往期回顾】
相关推荐
- sharding-jdbc实现`分库分表`与`读写分离`
-
一、前言本文将基于以下环境整合...
- 三分钟了解mysql中主键、外键、非空、唯一、默认约束是什么
-
在数据库中,数据表是数据库中最重要、最基本的操作对象,是数据存储的基本单位。数据表被定义为列的集合,数据在表中是按照行和列的格式来存储的。每一行代表一条唯一的记录,每一列代表记录中的一个域。...
- MySQL8行级锁_mysql如何加行级锁
-
MySQL8行级锁版本:8.0.34基本概念...
- mysql使用小技巧_mysql使用入门
-
1、MySQL中有许多很实用的函数,好好利用它们可以省去很多时间:group_concat()将取到的值用逗号连接,可以这么用:selectgroup_concat(distinctid)fr...
- MySQL/MariaDB中如何支持全部的Unicode?
-
永远不要在MySQL中使用utf8,并且始终使用utf8mb4。utf8mb4介绍MySQL/MariaDB中,utf8字符集并不是对Unicode的真正实现,即不是真正的UTF-8编码,因...
- 聊聊 MySQL Server 可执行注释,你懂了吗?
-
前言MySQLServer当前支持如下3种注释风格:...
- MySQL系列-源码编译安装(v5.7.34)
-
一、系统环境要求...
- MySQL的锁就锁住我啦!与腾讯大佬的技术交谈,是我小看它了
-
对酒当歌,人生几何!朝朝暮暮,唯有己脱。苦苦寻觅找工作之间,殊不知今日之事乃我心之痛,难道是我不配拥有工作嘛。自面试后他所谓的等待都过去一段时日,可惜在下京东上的小金库都要见低啦。每每想到不由心中一...
- MySQL字符问题_mysql中字符串的位置
-
中文写入乱码问题:我输入的中文编码是urf8的,建的库是urf8的,但是插入mysql总是乱码,一堆"???????????????????????"我用的是ibatis,终于找到原因了,我是这么解决...
- 深圳尚学堂:mysql基本sql语句大全(三)
-
数据开发-经典1.按姓氏笔画排序:Select*FromTableNameOrderByCustomerNameCollateChinese_PRC_Stroke_ci_as//从少...
- MySQL进行行级锁的?一会next-key锁,一会间隙锁,一会记录锁?
-
大家好,是不是很多人都对MySQL加行级锁的规则搞的迷迷糊糊,一会是next-key锁,一会是间隙锁,一会又是记录锁。坦白说,确实还挺复杂的,但是好在我找点了点规律,也知道如何如何用命令分析加...
- 一文讲清怎么利用Python Django实现Excel数据表的导入导出功能
-
摘要:Python作为一门简单易学且功能强大的编程语言,广受程序员、数据分析师和AI工程师的青睐。本文系统讲解了如何使用Python的Django框架结合openpyxl库实现Excel...
- 用DataX实现两个MySQL实例间的数据同步
-
DataXDataX使用Java实现。如果可以实现数据库实例之间准实时的...
- MySQL数据库知识_mysql数据库基础知识
-
MySQL是一种关系型数据库管理系统;那废话不多说,直接上自己以前学习整理文档:查看数据库命令:(1).查看存储过程状态:showprocedurestatus;(2).显示系统变量:show...
- 如何为MySQL中的JSON字段设置索引
-
背景MySQL在2015年中发布的5.7.8版本中首次引入了JSON数据类型。自此,它成了一种逃离严格列定义的方式,可以存储各种形状和大小的JSON文档,例如审计日志、配置信息、第三方数据包、用户自定...
你 发表评论:
欢迎- 一周热门
-
-
MySQL中这14个小玩意,让人眼前一亮!
-
旗舰机新标杆 OPPO Find X2系列正式发布 售价5499元起
-
【VueTorrent】一款吊炸天的qBittorrent主题,人人都可用
-
面试官:使用int类型做加减操作,是线程安全吗
-
C++编程知识:ToString()字符串转换你用正确了吗?
-
【Spring Boot】WebSocket 的 6 种集成方式
-
PyTorch 深度学习实战(26):多目标强化学习Multi-Objective RL
-
pytorch中的 scatter_()函数使用和详解
-
与 Java 17 相比,Java 21 究竟有多快?
-
基于TensorRT_LLM的大模型推理加速与OpenAI兼容服务优化
-
- 最近发表
- 标签列表
-
- idea eval reset (50)
- vue dispatch (70)
- update canceled (42)
- order by asc (53)
- spring gateway (67)
- 简单代码编程 贪吃蛇 (40)
- transforms.resize (33)
- redisson trylock (35)
- 卸载node (35)
- np.reshape (33)
- torch.arange (34)
- npm 源 (35)
- vue3 deep (35)
- win10 ssh (35)
- vue foreach (34)
- idea设置编码为utf8 (35)
- vue 数组添加元素 (34)
- std find (34)
- tablefield注解用途 (35)
- python str转json (34)
- java websocket客户端 (34)
- tensor.view (34)
- java jackson (34)
- vmware17pro最新密钥 (34)
- mysql单表最大数据量 (35)