C++11的多线程开发

多线程并发

总览

  • 创建线程
  • 使用互斥量
  • 异步线程
  • 原子类型
  • 生产者消费者模型
  • 线程池

创建线程

#include <iostream>
#include <thread>
#include <mutex>
#include <functional>

void task_func(int &n) {
    std::this_thread::sleep_for(std::chrono::microseconds(n));
    printf("in task_func, thread id: %d\n", std::this_thread::get_id());
    n = 30;
}

int main() {

    int n = 50;
    auto func = std::bind(task_func, std::ref(n));
    //std::thread th(func);
    std::thread th(func, &n);  //也行

    printf("in main, son thread id: %d\n", th.get_id());
    printf("support num: %d\n", th.hardware_concurrency());
    printf("cur n: %d\n", n);
    th.join();
    return 0;
}

互斥量

#include <iostream>
#include <thread>
#include <mutex>
#include <functional>

int globalVal = 0;

std::mutex lock;

void task_func() {
    for (int i = 0; i < 100000; ++i) {
	lock.lock();
        globalVal++;
	lock.unlock();
    }
}

int main() {

    std::thread t1(task_func);
    std::thread t2(task_func);

    t1.join();
    t2.join();
    std::cout << globalVal << std::endl;
    return 0;
}

运行结果符合预期, 这就解决了吗? 问题一: 未能正常解锁. 真实场景下, lock和unlock之间还有大量逻辑, 出现异常后, unlock是不会执行的, 因此可能造成死锁. 该问题可以通过lock_guard/ unique_lock​解决, 利用C++的RAII机制, 中途抛出异常, 会调用析构函数.

#include<iostream>

using namespace std;

class A {
public:
    A() {
        printf("initialize\n");
    }

    ~A() {
        printf("destroy\n");
    }
};

int exception_maker(int) {
    try {
        throw -1;
    }
    catch (int) {
        throw -1;

    }
}

int main(void) {
    A obj;
    try{
        exception_maker(0);
    }catch(int){
        printf("exception occurs\n");
    }

    return 0;
}
-------------------------------------
initialize
exception occurs
destroy

另一种问题: 上锁顺序的不同造成死锁

void task_func1() {
    for (int i = 0; i < 100000; ++i) {
        lock1.lock();
        lock2.lock();
        globalVal++;
        lock2.unlock();
        lock1.unlock();
    }
}

void task_func2() {
    for (int i = 0; i < 100000; ++i) {
        lock2.lock();
        lock1.lock();
        globalVal++;
        lock1.unlock();
        lock2.unlock();
    }
}

第二个问题解决: std::lock(lock1, lock2); 顺序无所谓

条件变量

#include <iostream>
#include <thread>
#include <mutex>
#include <functional>
#include <queue>
#include <condition_variable>

std::mutex mtx;
std::queue<int> queue;
std::condition_variable cv;


void task_producer() {
    int i = 0;
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        queue.push(i);
        std::cout << "put data: " << i << std::endl;
        lock.unlock();
    
        cv.notify_all();
        if (i < 1000000) {
            i++;
        } else {
            i = 0;
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(50));
    }
}

void task_consumer1() {
    int data;
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);

        while(queue.empty()) {
            cv.wait(lock);
        }

        data = queue.front();
        queue.pop();
        std::cout << "Get data: " << data << std::endl;
        lock.unlock();

        std::this_thread::sleep_for(std::chrono::milliseconds (50));
    }
}

void task_consumer2() {
    int data;
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);

        while(queue.empty()) {
            cv.wait(lock);
            // lock.unlock();  cv.wait();
        }

        data = queue.front();
        queue.pop();
        std::cout << "Get data: " << data << std::endl;
        lock.unlock();
        std::this_thread::sleep_for(std::chrono::milliseconds (50));
    }
}

int main() {
    std::thread t1(task_producer);
    std::thread t2(task_consumer1);
    std::thread t3(task_consumer2);

    t1.join();
    t2.join();
    t3.join();
    return 0;
}

promise和future

解决线程不知何时返回问题

先看一个简单的案例: 子线程计算a, b的平方和, 返回给主线程定义的变量ret. 由于主线程和子线程都可能操作ret, 故上锁.

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;

void task(int a, int b, int &ret){
    // 模拟准备工作
    std::this_thread::sleep_for(std::chrono::milliseconds (100));
    std::unique_lock<std::mutex> lock(mtx);
    ret = a * a + b * b;
    lock.unlock();
    cv.notify_one();
    // 模拟之后处理的任务
    std::this_thread::sleep_for(std::chrono::milliseconds (2000));
}

int main() {
    int ret = 0;
    std::cout << "ret before:" << ret << std::endl;
    std::thread t(task,1,2,std::ref(ret));
    std::unique_lock<std::mutex>lock(mtx);
    cv.wait(lock);
    std::cout << "current ret is:" << ret << std::endl;
    t.join();
    return 0;
}

为何以这么麻烦, 又加锁又用条件变量? 因为主线程不知道子线程什么时候会将计算结果给ret, 故需要条件变量通知主线程. 可如此简单的任务, 实现也太麻烦了, 解决方法: promise和future结合

#include <iostream>
#include <thread>
#include <future>

void task(int a, int b, std::promise<int> &p) {
    // 模拟准备工作
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    p.set_value(a * a + b * b);
    // 模拟之后处理的任务
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}

int main() {
    std::promise<int> p;
    std::future<int> f = p.get_future();  // 联系起来
    std::cout << f.valid() << std::endl;
    std::thread t(task, 1, 2, std::ref(p));
    std::cout << f.get()<<std::endl;   //阻塞,直到set_value被调用,只能get一次
    std::cout << f.valid() << std::endl;
    t.join();
    return 0;
}

解决入参滞后传递, 线程先创建的问题

线程有个参数需要通过计算, 之后才能得到, 但想先创建线程, 在线程里等待参数传递过来, 怎么办? 还是借助promise和future

void task(int a, std::future<int>& b, std::promise<int> &p) {
    // 模拟准备工作
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    if(b.valid()){
        int tmp = b.get();
        p.set_value(a * a + tmp * tmp);
    }
    // 模拟之后处理的任务
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}

int main() {
    std::promise<int> pout;
    std::future<int> fout = pout.get_future();  // 联系起来

    std::promise<int> pin;
    std::future<int> fin = pin.get_future();  // 联系起来

    std::thread t(task,1,std::ref(fin),std::ref(pout));

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    pin.set_value(3);  // 给入参赋值

    std::cout << "ret value:" << fout.get() << std::endl; // 阻塞等待结果

    t.join();
    return 0;
}

----------------------------
ret value:10

一个滞后入参需传递给多个线程-- shared_future

注意promise的set_value也只能调用一次, 否则抛异常.

#include <iostream>
#include <thread>
#include <future>

void task(int a, std::shared_future<int> &s_f, std::promise<int> &p) {
    // 模拟准备工作
    std::this_thread::sleep_for(std::chrono::milliseconds(100));

    int tmp = s_f.get();
    p.set_value(a * a + tmp * tmp);

    // 模拟之后处理的任务
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
}

int main() {
    std::promise<int> pout1;
    std::future<int> fout1 = pout1.get_future();  // 联系起来

    std::promise<int> pout2;
    std::future<int> fout2 = pout2.get_future();  // 联系起来

    std::promise<int> pout3;
    std::future<int> fout3 = pout3.get_future();  // 联系起来

    std::promise<int> pin;
    std::future<int> fin = pin.get_future();
    std::shared_future<int> s_f = fin.share();  // 联系起来

    std::thread t1(task, 1, std::ref(s_f), std::ref(pout1));
    std::thread t2(task, 1, std::ref(s_f), std::ref(pout2));
    std::thread t3(task, 1, std::ref(s_f), std::ref(pout3));


    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    pin.set_value(3);

    std::cout << "ret value:" << fout1.get() + fout2.get() + fout3.get() << std::endl;

    t1.join();
    t2.join();
    t3.join();

    return 0;
}

async创建异步任务

#include <iostream>
#include <thread>
#include <future>

int task(int a, int b) {
    std::cout << std::this_thread::get_id() << std::endl;
    return a * a + b * b;
}

int main() {
    std::cout << std::this_thread::get_id() << std::endl;

    // 必须放在新线程
    // std::future<int> f = std::async(std::launch::async, task, 3, 4);  

    // 延迟调用, 不会在线程中, 调用get时才执行
    //   std::future<int> f = std::async(std::launch::deferred, task, 3, 4);

    // 默认等价于 std::launch::async | std::launch::deferred
    //std::future<int> f = std::async(std::launch::async | std::launch::deferred, task, 3, 4);
    std::future<int> f = std::async(task, 3, 4);
    std::cout << f.get() << std::endl;
    return 0;
}

packaged_task

作用: 任务打包. 主要就是和future联系起来.

#include <iostream>
#include <thread>
#include <future>
#include <functional>

int task(int a, int b) {
    std::cout << "son thread: " << std::this_thread::get_id() << std::endl;
    return a * a + b * b;
}


int main() {
    std::cout << "main thread: " << std::this_thread::get_id() << std::endl;
    // 打包任务
    std::packaged_task<int()> pt(std::bind(task, 3, 4));
    std::future<int> fu = pt.get_future();
    pt();
    std::cout << fu.get() << std::endl;

    // 重置
    pt.reset();
    fu = pt.get_future();
    std::thread t(std::move(pt));
    //  pt();
    std::cout << fu.get() << std::endl;

    t.join();

    std::packaged_task<int()> pt2(std::bind(task, 3, 4));
    fu = pt2.get_future();
    std::async(std::move(pt2));
    std::cout << fu.get() << std::endl;

    return 0;
}


原文链接: https://www.cnblogs.com/shmilyt/p/17054572.html

欢迎关注

微信关注下方公众号,第一时间获取干货硬货;公众号内回复【pdf】免费获取数百本计算机经典书籍

    C++11的多线程开发

原创文章受到原创版权保护。转载请注明出处:https://www.ccppcoding.com/archives/312388

非原创文章文中已经注明原地址,如有侵权,联系删除

关注公众号【高性能架构探索】,第一时间获取最新文章

转载文章受原作者版权保护。转载请注明原作者出处!

(0)
上一篇 2023年2月16日 下午12:22
下一篇 2023年2月16日 下午12:23

相关推荐