文章说明

本文提及C++和GO语言的并发,并发一向是一门语言编程的难点,对于C++和JAVA尤其是这样。笔者在整理C++的并发相关内容时,发现其极其庞杂,且涉及内容交错引用,于是,迫不及待打算将其单独整理出来,以供后期的复习使用。

基础

并行和并发

并行

一个CPU在同一时刻只能被一个进程使用,但是如果有两个CPU,同时就可以执行两个进程,那么这两个进程在同一时刻被同时执行,那么就称这两个进程是并行的。

并发

一个时间段,有多个进程都处于从开始运行到运行完毕的状态,但是每一个时刻只有一个程序在运行。

同步和异步

同步

两个事物相互依赖,并且一个事物必须以依赖于另一事物的执行结果。比如在事物 A->B 事件模型中,你需要先完成事物 A 才能执行事物 B。也就是说,同步调用在被调用者未处理完请求之前,调用不返回,调用者会一直等待结果的返回。

异步

两个事物完全独立,一个事物的执行不需要等待另外一个事物的执行。也就是说,异步调用可以返回结果不需要等待结果返回,当结果返回的时候通过回调函数或者其他方式带着调用结果再做相关事情。

阻塞和非阻塞

进程、线程和协程

进程

对于操作系统来说,一个任务就是一个进程。比如打开一个浏览器就是启动一个浏览器进程。

  • 进程是操作系统进行资源分配的基本单位,每个进程都有自己的独立内存空间。
  • 进程比较重量,占据独立内存,上下文切换开销大,但是比较稳定和安全。

线程

  • 线程又叫做轻量级进程,是进程的一个实体,是处理器任务调度和执行的基本单位。
  • 线程只拥有少量资源,程序计数器、寄存器、栈。但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。
  • 一个进程可以包含多个线程,一个线程通常只能属于一个进程。

协程

协程,是一种用户态的轻量级线程,协程的调度完全由用户控制(也就是在用户态执行)。

  • 一个线程可以执行多个协程,协程在线程内并发执行。

线程详解

用户线程 & 内核线程

用户线程与内核线程之间可以有一对一、多对一或多对多的映射关系。

线程安全 & 线程不安全

线程安全是指在多线程环境下,共享的数据结构或代码段可以被多个线程安全地访问和操作,而不会导致数据不一致或程序错误的情况。

死锁

其中两个或多个线程或进程相互等待对方所持有的资源,导致程序无法继续执行;死锁的四个必要条件:互斥、持有等待、循环等待、不可强占用。

Example 死锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <iostream>
#include <mutex>
#include <thread>
#include <time.h>

std::mutex mtxa;
std::mutex mtxb;

void threadA(){
std::cout << "Thread A" << std::endl;
mtxa.lock();
std::this_thread::sleep_for(std::chrono::seconds(10));
std::cout << "Thread A acquired lock B" << std::endl;
mtxb.lock();
mtxb.unlock();
mtxa.unlock();
}

void threadB(){
std::cout << "Thread B" << std::endl;
mtxb.lock();
std::this_thread::sleep_for(std::chrono::seconds(10));
std::cout << "Thread B acquired lock A" << std::endl;
mtxa.lock();
mtxa.unlock();
mtxb.unlock();
}

int main() {
std::thread t1(threadA);
std::thread t2(threadB);
t1.join();
t2.join();
return 0;
}

Example2 生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <iostream>
#include <mutex>
#include <thread>
#include <condition_variable>

const int repository_size = 10;
const int item_total = 20;

std::mutex mtx;
int item_buffer[repository_size];

int read_position = 0;
int write_position = 0;

std::condition_variable repo_not_full;
std::condition_variable repo_not_empty;

void produce_item(int item) {
std::unique_lock<std::mutex> lck(mtx);
while(((write_position + 1) % repository_size) == read_position) {
std::cout << "Producer is waiting for an empty slot...\n";
repo_not_full.wait(lck);
}
item_buffer[write_position] = item;
write_position = (write_position + 1) % repository_size;
repo_not_empty.notify_all();
lck.unlock(); //可以省略 lck.unlock(),代码仍然能够正确工作,并且互斥锁将在 std::unique_lock 对象的析构函数中被解锁
}

int consume_item() {
std::unique_lock<std::mutex> lck(mtx);
while(write_position == read_position) {
std::cout << "Consumer is waiting for items...\n";
repo_not_empty.wait(lck);
}
int item = item_buffer[read_position];
read_position = (read_position + 1) % repository_size;
repo_not_full.notify_all();
lck.unlock();
return item;
}

void Producer_thread() {
for(int i = 1; i <= item_total; i++) {
std::cout << "Producer thread " << std::this_thread::get_id() << " is producing the " << i << "^th item..." << std::endl;
produce_item(i);
}
}

void Consumer_thread() {
int cnt = 0;
while(1) {
int item = consume_item();
std::cout << "Consumer thread " << std::this_thread::get_id() << " is consuming the " << item << "^th item..." << std::endl;
if(++cnt == item_total) break;
}
}

int main() {
std::thread producer(Producer_thread);
std::thread consumer(Consumer_thread);
producer.join();
consumer.join();
}

协程详解

协程就是用户态的线程,但是上下文切换的时机是靠调用方(程序员)自身去控制的。

为什么要使用协程 & 线程的开销问题

线程切换开销

  • 线程有自己的寄存器和栈,切换时需要保存和恢复这些信息。
  • 线程的切换需要内核态的帮助,而内核态中的一些数据是共享的,读写时需要同步机制,所以操作一旦陷入内核态就会消耗更多的时间。

线程内存开销

线程需要栈和寄存器,能够创建的数量存在限制,据说在2000左右。

协程优势 & 协程对于CPU和IO的影响

  • 协程能够剔除线程的阻塞,线程的一个协程阻塞后,可以切换其他协程继续执行,从而实现协程在线程中的并发执行。
  • 协程虽然不一定能减少一次业务请求的耗时,但一定可以提升系统的吞吐量。

有栈协程 & 无栈协程

有栈协程

这类协程的实现类似于内核态线程的实现,不同协程间切换还是要切换对应的栈上下文,只是不用陷入内核而已。例如goroutine。

无栈协程

无栈协程的上下文都会放到公共内存中,在协程切换时使用状态机来切换,而不用切换对应的上下文,因此相比有栈协程要轻量许多。例如Coroutinue。

对称协程和非对称协程

对称协程 Symmetric Coroutinue

任何一个协程都是相互独立且平等的,调度权可以在任意协程之间转移。

非对称协程 Asymmetric Coroutine

协程出让调度权的目标只能是它的调用者,即协程之间存在调用和被调用关系

Go

注:笔者对Go的了解有限。
个人感觉Go并发的编程难度远低于C++。提到Go的并发,第一时间就会想到goroutinue。

goroutine 并发

Go 语言支持并发,只需要通过 go 关键字来开启 goroutine 即可。

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package main

import (
"fmt"
"time"
)

func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}

func main() {
go say("world")
say("hello")
}
// 这段代码一般是交错输出5个Hello和5个World,但是偶尔也会输出5+4。
// 原因是main执行结束后立即退出,协程没有足够的时间执行完毕。

channel 同步

Goroutinue可以在没有显式的锁或竞态变量的情况下进行同步。

Example

1
2
3
4
ch := make(chan int) // 创建channel
ch := make(chan int, 100) // 创建channel并设置通道缓冲区
ch <- v // 把 v 发送到通道 ch
v := <-ch // 从 ch 接收数据

channel的底层结构

hchan结构体,包含缓冲链表、互斥锁、发送和接收的协程队列等。

select关键字

按照随机顺序检测scase中的channel是否ready。

C++并发

C++多进程并发

fork()

创建子进程

Example1

1
2
3
4
5
6
7
8
9
int main() {
pid_t pid = fork(); // 调用fork后,子进程和父进程都从fork后执行
if(pid == 0) { // 在子进程pid = 0
cout << getpid() << " " << getppid() << endl; // 126905 126904
} else { //在父进程中,pid是新创建子进程的进程ID
cout << getpid() << endl; //126904
}
return 0;
}

Example2

循环创建n个子进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int main() {
int n = 5; // 要创建的子进程数量
for (int i = 0; i < n; i++) {
pid_t pid = fork();
if (pid == -1) {
std::cerr << "无法创建子进程" << std::endl;
return 1;
} else if (pid == 0) {
// 子进程代码
std::cout << "这是子进程,进程ID为:" << getpid() << std::endl;
return 0; //此处不退出,子进程也会创建子进程。
} else {
// 父进程代码
std::cout << "创建子进程,进程ID为:" << pid << std::endl;
wait(NULL); // 等待子进程结束
}
}
return 0;
}

C++多线程并发

thread

用于创建一个执行的线程实例。

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void hello() {
std::cout << "hello world" << std::endl;
}

int main()
{
std::thread t1(hello);
// join函数等待子线程运行结束
// t1.join();
// detach将线程分离,使其称为独立的线程,也无法获取执行的结果。
t1.detach();
// 如果在C++中创建了一个线程对象,但没有调用join或detach函数,而是让线程对象析构,会导致程序崩溃和内存泄露。
// 程序崩溃:如果主线程退出,而子线程仍在运行,主线程析构时会销毁线程对象,而线程对象的析构函数会检查线程是否已经执行完毕。如果线程仍在运行,析构函数将调用std::terminate函数终止程序。
// 内存泄露:如果主线程退出,但子线程仍在运行,那么线程对象和线程相关的资源将无法被正确释放,从而导致内存泄漏。
std::cout << "Main" << std::endl;
return 0;
}

mutex

互斥锁

condition_variable

condition_variable实例被创建出现主要就是用于唤醒等待线程从而避免死锁。
std::condition_variable的notify_one()用于唤醒一个线程;notify_all()则是通知所有线程。

lock_guard & unique_lock

超出作用域自动释放

lock_guard

不能显式的调用lock和unlock

unique_lock

可以在声明后的任意位置调用,可以缩小锁的作用范围,提供更高的并发度。

join

std::thread::join创建同步任务,主线程等待子线程执行结束。

future

提供一个访问异步结果的途径

async & detach

std::async 和 std::thread::detach 都可以用于创建异步任务。

asyc

  • std::async 返回一个 std::future 对象,用于获取异步任务的结果;
  • std::async 的任务调度是由实现决定的,可以在后台线程池中执行任务,也可以在当前线程中执行任务;
  • std::async在某些情况下是阻塞的,调用get函数时会等待异步任务的完成,并返回结果。

detach

  • std::thread::detach 将线程与当前线程分离,使它们可以并行执行,并且无法获取线程的执行结果;
  • std::thread::detach 创建的线程会在后台独立执行;
  • std::thread::detach 是一个非阻塞调用,它将线程与当前线程分离后,继续执行后续的代码,不会等待线程的完成。

线程池

当进行并行的任务作业操作时,线程的建立与销毁的开销是,阻碍性能进步的关键,因此线程池,由此产生。使用多个线程,无限制循环等待队列,进行计算和操作。帮助快速降低和减少性能损耗。
实现过程中:让每一个thread创建后,就去执行调度函数:循环获取task,然后执行。当线程池停止使用时,循环停止。

线程池的组成

  • 线程池管理器:初始化创建线程、启动和停止线程、调配任务,管理线程池
  • 工作线程
  • 任务接口
  • 任务队列

C++多协程并发

C++协程包含C++20 coroutinue实现和一些其他的共享库,本文重点在于coroutinue的相关内容学习。当然,如果目前使用C++协程,最好使用C++开源协程库,例如libgo等。GCC等编译器目前可能还不支持C++20。

coroutine

C++20的协程是一个特殊函数,具有挂起和恢复的能力。

co_await

co_await触发一个挂起点,在触发挂起点之后,执行流程会返回到调用者继续执行,同时异步执行co_await所等待对象。

co_yield

用于在协程中产生一个值,并将执行流程暂停。

co_return

设置返回值的结果,触发挂起点的恢复。

libgo

Example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <stdio.h>
#include <libgo/coroutine.h>

int main(int argc, char **argv)
{
go []{
printf("1\n");
co_yield;
printf("2\n");
};

go []{
printf("3\n");
co_yield;
printf("4\n");
};

return 0;
}

参考