多线程编程

多线程编程

image-20230315184531017

  • C语言处理时间

long t0 = time(NULL); 获取从1970年1月1日到当前经过的秒数。

sleep(3);休眠3秒

long t1 = t0 + 3; t0时间的3秒后

usleep(3000000); 休眠3000000微秒

1
2
3
4
5
6
7
8
9
10
11
#include<iostream>
#include<chrono>

signed main(){
auto t0 = std::chrono::steady_clock::now(); //获取当前时间点
auto t1 = t0 + std::chrono::seconds(30);
auto dt = t1 - t0; //获取两个时间点的时间差
std::int64_t sec = std::chrono::duration_cast<std::chrono::seconds>(dt).count(); //时间差的秒数
std :: cout << "time sep=" << sec << "ms\n";
return 0;
}

time sep=30ms

  • 跨平台的sleep:
    • std::this_thread::sleep_for(std::chrono::milliseconds(400));
    • 当前线程休眠400ms

多线程

  • 现代C++中的多线程:std::thread
1
2
3
4
5
6
7
8
#include<thread>
main(){
std::thread t1([&]{
download("hello.zip");
});
interact();
return 0;
}
  • 由于std::thread的实现背后是基于pthreads的,而且CMake提供了Threads包:
1
2
3
4
5
6
7
8
9
10
cmake_minimum_required(VERSION 3.10)

set(CMAKE_CXX_STANDARD 17)

project(cpptest LANGUAGES CXX)

add_executable(cpptest main.cpp)

find_package(Threads REQUIRED)
target_link_libraries(cpptest PUBLIC Threads::Threads)
  • 主线程等待子线程结束t1.join():会等待t1进程结束
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#include<iostream>
#include<chrono>
#include<thread>
#include<string>

void download(std::string file){
for(int i=0;i<10;++i){
std::cout << "Downloading " << file
<< " (" << i*10 << "%)..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
std::cout<<"Download complete: " << file << std::endl;
}

void interact(){
std::string name;
std::cout << "Please enter your name: " << std::endl;
std::cin >> name;
std::cout << "Hi, " << name << std::endl;
}

signed main(){
std::thread t1([&]{
download("hello.zip");
});
interact();
std::cout<<"Waiting for child thread ... " << std::endl;
t1.join();
std::cout << "Child thread exited!" << std::endl;
return 0;
}
  • 输出
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Please enter your name: Downloading hello.zip (
0%)...
Downloading hello.zip (10%)...
wyDownloading hello.zip (20%)...
h
Hi, wyh
Waiting for child thread ...
Downloading hello.zip (30%)...
Downloading hello.zip (40%)...
Downloading hello.zip (50%)...
Downloading hello.zip (60%)...
Downloading hello.zip (70%)...
Downloading hello.zip (80%)...
Downloading hello.zip (90%)...
Download complete: hello.zip
Child thread exited!
  • std::thread的析构函数会销毁线程

    • 遵循三五法则,std::thread同样遵循RAII思想和三五法则,自定义了析构函数,删除了拷贝构造/赋值函数,保留了移动构造/赋值函数
    • 所以会出现一个函数中某个线程在运行时,函数结束,线程调用其析构函数,会导致正在运行的线程出错
  • thread.detach():分离线程

    • 线程的声明周期不再由当前std::thread管理,而是在线程退出以后自动销毁
    • 但是进程结束后线程还是会自动退出的
  • 另外一种方法:在某个函数创建线程后用std::move()提交到全局变量vector<std::thread>中,然后在进程结束之前每个都join()一遍。

  • 还可以使用单例模式:

    • image-20230316094158213

异步

std::async接受一个带返回值的lambda函数

1
2
3
4
5
main(){
std::function<int> fret = std::sync([&] {
return download("hello.zip");
});
}

自身返回一个std::future对象

  • lambda的函数体在另一个线程里执行
  • 最后调用furt.get()方法。如果此时线程没有运行完,会等待线程运行完后获取其返回值。

显示的等待:futr.wait();

std::wait_for(std::chrono::milliseconds(1000));

  • std::mutex互斥锁

std::lock_guard :符合RAII思想的上锁和解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<iostream>
#include<thread>
#include<mutex>
#include<vector>
#include<string>

signed main(){
std::vector<int> arr;
std::mutex mtx;
std::thread t1 ([&]{
for(int i=0;i<1000;++i){
std::lock_guard grd(mtx);
arr.push_back(i);
}
});
std::thread t2([&]{
for(int i=0;i<1000;++i){
std::lock_guard grd(mtx);
arr.push_back(i);
}
});
t1.join();
t2.join();
return 0;
}
  • 析构函数会调用grd.unlock();

一个自由度更高的锁sdt::unique_lock()接受一个参数std::defer_lock,指定了这个参数之后不会在构造函数中调用mtx.lock()。需要之后自己手动进行grd.lock()

  • mtx.try_lock()

返回true表示上锁成功,否则上锁失败

  • mtx.try_lock_for(std::chrono::milliseconds());

尝试在一个时间内等待是否上锁成功

  • 在之前已经上过锁的情况下:mtx.lock()可以使用如下参数:std::unique_lock grd(mtx, std::adopt_lock);

  • std::unique_lock grd(mtx, std::try_to_lock);

  • 任何具有lock()unlock()的类都可以作为std::lock_guard类型的参数,例如std::lock_guard grd2(grd1);可以这样嵌套

    python中的鸭子类型,C++称为concept

死锁解决

AB和BA的上锁会导致死锁问题,C++提供std::lock(mtx1, mtx2)来避免死锁问题。

  • std::lock的RAII版本std::scoped_lock

同一个线程重复调用lock()也会导致死锁问题。

  • 解决方法:std::recursive_mutex,会自动进行识别,如果是同一个线程对这个锁进行重复上锁,会计数器+1,解锁后计数器-1.但是会有性能损失。

数据结构的多线程安全

互斥量
  • vector<int>不是多线程安全的容器
  • 因此需要封装一个多线程安全的MyVector
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#include<iostream>
#include<thread>
#include<mutex>
#include<vector>
#include<string>

struct MyVector{
std::vector<int> m_arr;
mutable std::mutex m_mtx;
public:
void push_back(int x){
m_mtx.lock();
m_arr.push_back(x);
m_mtx.unlock();
}
size_t size() const{
m_mtx.lock();
size_t res = m_arr.size();
m_mtx.unlock();
return res;
}
};

signed main(){
MyVector arr;
std::thread t1([&](){
for(int i=0;i<1000;++i){
arr.push_back(i);
}
});
std::thread t2([&](){
for(int i=0;i<1000;++i){
arr.push_back(i+1000);
}
});
t1.join();
t2.join();
std::cout<<arr.size() <<std::endl;
return 0;
}
  • 如上为代理模式(设计模式)

  • 由于size()函数为了与vector保持一致使用了const,但是函数内对锁的内容有修改,因此需要在mtx之前加入mutable

  • std::shared_mutex读写锁

    • 对于读操作,使用mtx.lock_shared()mtx.unlock_shared()
    • 对于写操作,使用mtx.lock()mtx.unlock()
  • 符合RAII的lock_shared()

    • std::unique_lock针对lock()
    • std::shared_lock针对lock_shared()
      • shared_lock()支持参数defer_lockowns_lock()
  • 符合RAII思想的访问者模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include<iostream>
#include<thread>
#include<mutex>
#include<vector>
#include<string>

struct MyVector{
std::vector<int> m_arr;
std::mutex m_mtx;
public:
class Accessor{
MyVector &m_that;
std::unique_lock<std::mutex> m_guard;
public:
Accessor(MyVector &that) :
m_that(that),m_guard(that.m_mtx) {}
void push_back(int x)const{
return (void)(m_that.m_arr.push_back(x));
}
size_t size()const{
return m_that.m_arr.size();
}
};
Accessor access(){
return {*this};
}
};

signed main(){
MyVector arr;
std::thread t1([&](){
auto tmp = arr.access();
for(int i=0;i<1000;++i){
tmp.push_back(i);
}
});
std::thread t2([&](){
auto tmp = arr.access();
for(int i=0;i<1000;++i){
tmp.push_back(i+1000);
}
});
t1.join();
t2.join();
std::cout<<arr.access().size() <<std::endl;
return 0;
}
  • 分离存储类和访问类,通过存储类的访问类进行访问。
条件变量
  • 等待被唤醒
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include<iostream>
#include<thread>
#include<mutex>
#include<vector>
#include<string>
#include<condition_variable>

signed main(){
std::condition_variable cv;
std::mutex mtx;

std::thread t1([&]{
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout<<"t1 is awake"<<std::endl;
});
std::this_thread::sleep_for(std::chrono::milliseconds(400));
std::cout<<"notifying"<<std::endl;
cv.notify_one();
t1.join();
return 0;
}
  • notify_all()唤醒全部在等待的锁

原子操作

避免多线程使用同一个变量导致的错误:

  • 上锁:影响效率
  • std::atomic<T>,对其的操作+=,-= &= |= *= /=会被编译器转换为专门的指令。

std::atomic<int> counter

  • counter.store(0); 赋值的原子操作
  • counter.fetch_add(1); +=的原子操作,而且还能返回旧值
  • counter.load();获取值的原子操作;
  • counter.exchange(x); 读取的同时写入,返回的是旧值
  • counter.exchange_strong(old, value);读取原子变量的值和old进行比较,如果不相等将old值写入变量,否则将value值写入变量