多线程编程
![image-20230315184531017](/2023/03/16/%E5%A4%9A%E7%BA%BF%E7%A8%8B%E7%BC%96%E7%A8%8B/C:%5CUsers%5CHenry%5CAppData%5CRoaming%5CTypora%5Ctypora-user-images%5Cimage-20230315184531017.png)
long t0 = time(NULL);
获取从1970年1月1日到当前经过的秒数。
sleep(3);
休眠3秒
long t1 = t0 + 3;
t0时间的3秒后
usleep(3000000);
休眠3000000微秒
1 2 3 4 5 6 7 8 9 10 11
| #include<iostream> #include<chrono>
signed main(){ auto t0 = std::chrono::steady_clock::now(); auto t1 = t0 + std::chrono::seconds(30); auto dt = t1 - t0; std::int64_t sec = std::chrono::duration_cast<std::chrono::seconds>(dt).count(); std :: cout << "time sep=" << sec << "ms\n"; return 0; }
|
time sep=30ms
- 跨平台的sleep:
std::this_thread::sleep_for(std::chrono::milliseconds(400));
- 当前线程休眠400ms
多线程
1 2 3 4 5 6 7 8
| #include<thread> main(){ std::thread t1([&]{ download("hello.zip"); }); interact(); return 0; }
|
- 由于
std::thread
的实现背后是基于pthreads
的,而且CMake提供了Threads包:
1 2 3 4 5 6 7 8 9 10
| cmake_minimum_required(VERSION 3.10)
set(CMAKE_CXX_STANDARD 17)
project(cpptest LANGUAGES CXX)
add_executable(cpptest main.cpp)
find_package(Threads REQUIRED) target_link_libraries(cpptest PUBLIC Threads::Threads)
|
- 主线程等待子线程结束
t1.join()
:会等待t1进程结束
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| #include<iostream> #include<chrono> #include<thread> #include<string>
void download(std::string file){ for(int i=0;i<10;++i){ std::cout << "Downloading " << file << " (" << i*10 << "%)..." << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(300)); } std::cout<<"Download complete: " << file << std::endl; }
void interact(){ std::string name; std::cout << "Please enter your name: " << std::endl; std::cin >> name; std::cout << "Hi, " << name << std::endl; }
signed main(){ std::thread t1([&]{ download("hello.zip"); }); interact(); std::cout<<"Waiting for child thread ... " << std::endl; t1.join(); std::cout << "Child thread exited!" << std::endl; return 0; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Please enter your name: Downloading hello.zip ( 0%)... Downloading hello.zip (10%)... wyDownloading hello.zip (20%)... h Hi, wyh Waiting for child thread ... Downloading hello.zip (30%)... Downloading hello.zip (40%)... Downloading hello.zip (50%)... Downloading hello.zip (60%)... Downloading hello.zip (70%)... Downloading hello.zip (80%)... Downloading hello.zip (90%)... Download complete: hello.zip Child thread exited!
|
异步
std::async
接受一个带返回值的lambda函数
1 2 3 4 5
| main(){ std::function<int> fret = std::sync([&] { return download("hello.zip"); }); }
|
自身返回一个std::future
对象
- lambda的函数体在另一个线程里执行
- 最后调用
furt.get()
方法。如果此时线程没有运行完,会等待线程运行完后获取其返回值。
显示的等待:futr.wait();
std::wait_for(std::chrono::milliseconds(1000));
std::lock_guard
:符合RAII思想的上锁和解锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| #include<iostream> #include<thread> #include<mutex> #include<vector> #include<string>
signed main(){ std::vector<int> arr; std::mutex mtx; std::thread t1 ([&]{ for(int i=0;i<1000;++i){ std::lock_guard grd(mtx); arr.push_back(i); } }); std::thread t2([&]{ for(int i=0;i<1000;++i){ std::lock_guard grd(mtx); arr.push_back(i); } }); t1.join(); t2.join(); return 0; }
|
一个自由度更高的锁sdt::unique_lock()
接受一个参数std::defer_lock
,指定了这个参数之后不会在构造函数中调用mtx.lock()
。需要之后自己手动进行grd.lock()
返回true
表示上锁成功,否则上锁失败
mtx.try_lock_for(std::chrono::milliseconds());
尝试在一个时间内等待是否上锁成功
-
在之前已经上过锁的情况下:mtx.lock()
可以使用如下参数:std::unique_lock grd(mtx, std::adopt_lock);
-
std::unique_lock grd(mtx, std::try_to_lock);
-
任何具有lock()
和unlock()
的类都可以作为std::lock_guard
类型的参数,例如std::lock_guard grd2(grd1);
可以这样嵌套
python中的鸭子类型,C++称为concept
死锁解决
AB和BA的上锁会导致死锁问题,C++提供std::lock(mtx1, mtx2)
来避免死锁问题。
std::lock
的RAII版本std::scoped_lock
同一个线程重复调用lock()
也会导致死锁问题。
- 解决方法:
std::recursive_mutex
,会自动进行识别,如果是同一个线程对这个锁进行重复上锁,会计数器+1,解锁后计数器-1.但是会有性能损失。
数据结构的多线程安全
互斥量
vector<int>
不是多线程安全的容器
- 因此需要封装一个多线程安全的
MyVector
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| #include<iostream> #include<thread> #include<mutex> #include<vector> #include<string>
struct MyVector{ std::vector<int> m_arr; mutable std::mutex m_mtx; public: void push_back(int x){ m_mtx.lock(); m_arr.push_back(x); m_mtx.unlock(); } size_t size() const{ m_mtx.lock(); size_t res = m_arr.size(); m_mtx.unlock(); return res; } };
signed main(){ MyVector arr; std::thread t1([&](){ for(int i=0;i<1000;++i){ arr.push_back(i); } }); std::thread t2([&](){ for(int i=0;i<1000;++i){ arr.push_back(i+1000); } }); t1.join(); t2.join(); std::cout<<arr.size() <<std::endl; return 0; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| #include<iostream> #include<thread> #include<mutex> #include<vector> #include<string>
struct MyVector{ std::vector<int> m_arr; std::mutex m_mtx; public: class Accessor{ MyVector &m_that; std::unique_lock<std::mutex> m_guard; public: Accessor(MyVector &that) : m_that(that),m_guard(that.m_mtx) {} void push_back(int x)const{ return (void)(m_that.m_arr.push_back(x)); } size_t size()const{ return m_that.m_arr.size(); } }; Accessor access(){ return {*this}; } };
signed main(){ MyVector arr; std::thread t1([&](){ auto tmp = arr.access(); for(int i=0;i<1000;++i){ tmp.push_back(i); } }); std::thread t2([&](){ auto tmp = arr.access(); for(int i=0;i<1000;++i){ tmp.push_back(i+1000); } }); t1.join(); t2.join(); std::cout<<arr.access().size() <<std::endl; return 0; }
|
条件变量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| #include<iostream> #include<thread> #include<mutex> #include<vector> #include<string> #include<condition_variable>
signed main(){ std::condition_variable cv; std::mutex mtx;
std::thread t1([&]{ std::unique_lock lck(mtx); cv.wait(lck); std::cout<<"t1 is awake"<<std::endl; }); std::this_thread::sleep_for(std::chrono::milliseconds(400)); std::cout<<"notifying"<<std::endl; cv.notify_one(); t1.join(); return 0; }
|
原子操作
避免多线程使用同一个变量导致的错误:
- 上锁:影响效率
std::atomic<T>
,对其的操作+=,-= &= |= *= /=
会被编译器转换为专门的指令。
std::atomic<int> counter
counter.store(0);
赋值的原子操作
counter.fetch_add(1);
+=
的原子操作,而且还能返回旧值
counter.load();
获取值的原子操作;
counter.exchange(x);
读取的同时写入,返回的是旧值
counter.exchange_strong(old, value);
读取原子变量的值和old进行比较,如果不相等将old值写入变量,否则将value值写入变量