生产者消费者模型和线程同步问题

线程同步概念

互斥能保证安全,但是仅有安全不够,同步可以更高效的使用资源

生产者消费者模型

下面就基于生产者消费者来深入线程同步等概念:
在这里插入图片描述在这里插入图片描述如何理解生产消费者模型:
以函数调用为例:
在这里插入图片描述
在这里插入图片描述

两个线程之间要进行信息交互就需要引入一段内存空间(交易场所)
线程a将数据放入缓冲区(交易场所),线程b从缓冲区进行读取
这样线程a完成数据存放后就继续做自己的事情,线程b去读取数据
这样就能很好的实现多执行流之间的执行解耦
特点:很好的提高了处理数据的能力
支持忙闲不均

条件变量

条件变量:为了不让消费者的每次消费为无效消费.
所以对于生产者,在每次完成自己的任务之后对条件做出改变,当条件的变量达到一定条件后,消费者才进行有效消费
无效消费过程: 消费前(加锁)----尝试消费(无效消费)—消费结束(解锁)

条件变量的目的:
1.不做无效的锁申请
2.假设消费者很多,让他们有执行顺序
相当于条件变量给各个线程在调度他之前给一个提醒

条件变量本质是数据:可以理解为:

在这里插入图片描述

使用条件变量

认识接口
在这里插入图片描述
与互斥锁的创建和使用非常相似

pthread_cond_destroy();//创建布局条件变量要后要进行销毁
pthread_cond_init();//对局部的条件变量进行初始化
pthread_cond_t;//关键字 创建布局变量
全局就要提供PTHREAD_COND_INITIALIZER的宏来进行初始化

条件变量创建的前提是有线程安全,所以条件变量的接口和互斥锁的接口大致类似.

条件创建了还要有一个接口来等待条件成立:

在这里插入图片描述

pthread_cond_wait();//等待条件成立,参数为条件变量和互斥锁
上述所有的参数返回值都是在成功时返回0
失败返回错误原因

唤醒条件变量

在这里插入图片描述

pthread_cond_signal();//唤醒指定的条件变量,并唤醒一个线程
pthread_cond_broadcast();//是条件变量成立,并唤醒所有的线程

在没有条件变量的时候,打印信息如图:

在这里插入图片描述
可以看到线程的调度是不确定的,我们想让这个线程按照我们想要的顺序(如:一次Thread-1,Thread-2,Thread-3,这样)进行打印,那么就需要用到条件变量.
代码:

#include <iostream>
#include <unistd.h>
#include <string>
#include <pthread.h>

//创建条件变量和互斥锁
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

void *ThreadRoutine(void* args)
{
    std::string name = static_cast<const char *>(args);
    while(true)
    {
        sleep(1);
        pthread_mutex_lock(&mutex);
        pthread_cond_wait(&cond, &mutex);
        std::cout << "I am a new Thread, my name is " << name << std::endl;
        pthread_mutex_unlock(&mutex);
    }
}
int main()
{

    pthread_t t1, t2, t3;
    pthread_create(&t1, nullptr, ThreadRoutine, (void*)"Thread-1");
    pthread_create(&t2, nullptr, ThreadRoutine, (void*)"Thread-2");
    pthread_create(&t3, nullptr, ThreadRoutine, (void*)"Thread-3");


    while(true)
    {
        sleep(1);
        pthread_cond_signal(&cond);
    }
    pthread_join(t1, nullptr);
    pthread_join(t2, nullptr);
    pthread_join(t3, nullptr);



    return 0;
}

我们让代码按照t1, t2, t3的线程顺序来执行子线程的任务
在这里插入图片描述
换一个顺序验证也是如此
如果没有条件变量,这个也是按照顺序打印,不过是批次进行,和CPU时钟机制有关,所有使用条件变量更好

使用pthread_cond_broadcast();//唤醒全部线程
就跟加锁机制一样,不过每次是各个线程只执行一次后就会等待,并不想无锁那样批次打印

在这里插入图片描述

单纯的互斥能保证线程的安全, 但不一定合理或者高效.
pthread_cond_wait();//
在等待的时候,会释放这把锁(等待是在临界区内,释放锁是为了资源高效利用,再次加锁是不允许在有锁的临界区内有无锁的线程存在)

再被唤醒的时候,又会再次加锁
当被唤醒的时候,重新申请也是需要参与锁的竞争的(未解决这个问题, 看下main阻塞队列部分的讲解)

阻塞队列

这个队列只有为空,为满两种状态

为空:消费线程不能再消费
为满:生产线程不能在生产

这个场景也满足上述说明的所需的321原则
(3种关系,生产–生产,消费–消费
2中角色:生产者,消费者
1个环境(这个阻塞队列就是一个临界区))
单生产,单消费:
基于队列实现,阻塞队列的操作(消费者生产者实例):

伪唤醒:
在这里插入图片描述

在这份代码中,将来如果因为productor慢不满足生产, 多个线程在一个阻塞队列中等待,而有一个Push达到(有一个生产刚产出), 假设此时的代码是将全部线程都唤醒,那么除了第一个线程得到条件变量的满足和锁的满足,其他线程会在条件变量下的等待转化为竞争锁等待的情况, 假设此时若第一个线程完成pop且unlock速度快,那么这时后续的线程会在得到锁之后直接对空队列进行Pop操作,这是就会出现错误,这个状态就是伪唤醒(条件不满足,但是线程被唤醒了)

(虽然上述只是假设, 但是cpu的运行速度很快, 我们不防会有这样的情况发生)

所以修改Pop内的if(IsFull)代码和Push内的(IsEmpty)代码,还可以使用之前的锁封装的代码

在这里插入图片描述
此时的消费者生产者代码:

BlockQueue.hpp

#pragma once
#include <iostream>
#include <pthread.h>
#include <queue>
#include "LockGuard.hpp"

int defaultcap = 5; // for test

template <class T>
class BlockQueue
{
public:
    BlockQueue(int cap = defaultcap)
        : _capacity(cap)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_p_cond, nullptr);
        pthread_cond_init(&_c_cond, nullptr);
    }
    bool IsEmpty()
    {
        return _q.size() == 0;//查看队列状态
    }
    bool IsFull()
    {
        return _q.size() == _capacity;//此时为满,
    }
    bool Pop(T *out)
    {
        LockGuard lg(&_mutex);
        while(IsEmpty())
        {
            //为空, 进行等待
            pthread_cond_wait(&_c_cond, &_mutex);
        }
        *out = _q.front();
        _q.pop();
        //可以生产//可增加水准线进行响应的操作
        pthread_cond_signal(&_p_cond);
        //pthread_mutex_unlock(&_mutex);
        return true;
    }
    bool Push(const T &in)
    {
        // 当前变量进行加锁
        LockGuard lg(&_mutex);
        //pthread_mutex_lock(&_mutex);
        while(IsFull())
        {
            // 为满,进行阻塞等待
            pthread_cond_wait(&_p_cond, &_mutex);
        }
        // 进行生产等待
        _q.push(in);
        //可以进行消费
        pthread_cond_signal(&_c_cond);
        //pthread_mutex_unlock(&_mutex);
        return true;
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_p_cond);
        pthread_cond_destroy(&_c_cond);
    }

private:
    std::queue<T> _q;
    int _capacity; // 为空时,不能再消费,为满时,不能再生产,状态是capacity与size进行比较
    pthread_mutex_t _mutex;
    pthread_cond_t _p_cond;
    pthread_cond_t _c_cond;
};

main.cc

#include "BlockQueue.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>

void *productor(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data = rand() % 10 + 1;//[1,10];
        bq->Push(data);

        std::cout << "consumer data: " << data << std::endl;

        sleep(1);
    }
}
void *consumer(void *args)
{
    BlockQueue<int> *bq = static_cast<BlockQueue<int> *>(args);
    while(true)
    {
        int data = 0;
        bq->Pop(&data);

        std::cout << "product data: " << data << std::endl;
    }
}
int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());
    BlockQueue<int> * bq = new BlockQueue<int>();
    pthread_t c, p;//两个线程
    pthread_create(&p, nullptr, productor, bq);
    pthread_create(&c, nullptr, consumer, bq);


    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    return 0;
}

利用生产者消费者模型实现分派任务的操作()
在此基础上构建一个任务类型:

Task.hpp

#pragma once
#include <iostream>
#include <string>
#include <unistd.h>
enum //设置退出码
{
    ok = 0,
    div_zero,
    mod_zero,
    unknow
};
const std::string opts = "+-*/%)[()]"; //设置随机运算
class Task
{
public:
    Task()//无参构造函数用于生成无参临时对象,如果有参构造是全缺省那么可以不用写这个无参构造函数
    {}
    Task(int x, int y, char op)
    :data_x(x), data_y(y), opt(op), result(0), code(ok)
    {}
    void Run()//任务主题内容
    {
        switch(opt)
        {
            case '+':
                result = data_x + data_y;
                break;
            case '-':
                result = data_x - data_y;
                break;
            case '*':
                result = data_x * data_y;
                break;
            case '/':
            {
                if(data_y == 0)
                {
                    code = div_zero;
                }
                else
                {
                    result = data_x / data_y;
                }
                break;
            }
            case '%':
            {
                if(data_y == 0)
                {
                    code = mod_zero;
                }
                else
                {
                    result = data_x % data_y;
                }
                break;
            }
            default:
                code = unknow;
                break;
        }
    }
    void operator()()
    {
        Run();
    }
    ~Task()
    {}
    //打印任务,用于更清晰的认识
    std::string print_task()
    {
        std::string s;
        s = std::to_string(data_x) + opt + std::to_string(data_y) + "=?\n";
        return s;
    }
    std::string print_result()
    {
        std::string s;
        s = std::to_string(data_x) + opt + std::to_string(data_y) + "=" + std::to_string(result) + "[" + std::to_string(code) + "]" + "\n";
        return s;
    }
private:
    int data_x;
    int data_y;
    char opt;

    int result;
    int code;
};

对上述代码的修改:

#include "BlockQueue.hpp"
#include "Task.hpp"
#include <pthread.h>
#include <time.h>
#include <sys/types.h>
#include <unistd.h>

void *productor(void *args)//生产者
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);//阻塞队列的对象
    while(true)
    {
        int x = rand() % 11;//[0,10];
        int y = rand() % 11;//[0,10];
        char opt = opts[rand() % opts.size()];
        Task t(x, y, opt);
        std::cout << t.print_task() << std::endl;;
        bq->Push(t);//放入队列,队列size+1

        //usleep(1000);
    }
}
void *consumer(void *args)
{
    BlockQueue<Task> *bq = static_cast<BlockQueue<Task> *>(args);
    while(true)
    {
        sleep(1);
        Task t;
        //1.拿到消费数据
        bq->Pop(&t);

        //2.执行任务
        t();
        
        //3.打印任务信息
        std::cout << t.print_result() << std::endl;;
    }
}
int main()
{
    srand((uint64_t)time(nullptr) ^ getpid() ^ pthread_self());//伪随机种子
    BlockQueue<Task> * bq = new BlockQueue<Task>();//创建一个阻塞队列
    pthread_t c, p;//两个线程
    pthread_create(&p, nullptr, productor, bq);//两个线程模拟消费者生产者模型
    pthread_create(&c, nullptr, consumer, bq);


    pthread_join(p, nullptr);
    pthread_join(c, nullptr);

    return 0;
}

针对上述代码,生产者和消费者本身就是互斥的,也就是串行执行,怎么会高效呢?

探究这个问题,首先从消费者消费后去哪里?生产在生产之前从哪来?来考虑.
生产者的数据,在产生时是花费时间,消费者消费也要花时间.
在消费者处理数据时花时间的同时生产者在某个时刻刚好将数据传给临界区,生产者只需要保证自己完成传送就可以做其他自己的事,消费者自己继续处理数据
所以高效,并发不体现在同步互斥,而是在拿数据,处理数据这里.

多线程任务下的消费者生产者模型多对多:
在这里插入图片描述将bq和线程名字一起封装可以更好的观察:
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本篇结束,下篇更精彩,关注我,带你飞~~~~

相关推荐

最近更新

  1. 【编程语言】C++和C的异同点

    2024-07-10 08:44:07       0 阅读
  2. 【React Hooks原理 - useSyncExternalStore】

    2024-07-10 08:44:07       0 阅读
  3. Ubuntu22.04:安装并配置nfs

    2024-07-10 08:44:07       0 阅读
  4. udp和tcp区别

    2024-07-10 08:44:07       0 阅读
  5. Leetcode 383. 赎金信

    2024-07-10 08:44:07       0 阅读
  6. 接口加密方案

    2024-07-10 08:44:07       0 阅读

热门阅读

  1. 使用YOLO5进行模型训练机器学习【教程篇】

    2024-07-10 08:44:07       10 阅读
  2. IT专业入门,高考假期预习指南

    2024-07-10 08:44:07       6 阅读
  3. 强化OT安全英国发布工控网络事件响应实践指南

    2024-07-10 08:44:07       13 阅读
  4. 使用静态图加速

    2024-07-10 08:44:07       3 阅读
  5. 修改ES索引名称

    2024-07-10 08:44:07       5 阅读
  6. asp.netWebForm(.netFramework) CSRF漏洞

    2024-07-10 08:44:07       8 阅读
  7. Redis的使用(三)常见使用场景-session共享

    2024-07-10 08:44:07       6 阅读
  8. DS200CVMAG1AEB处理器 控制器 模块

    2024-07-10 08:44:07       6 阅读
  9. 插8张显卡的服务器有哪些?

    2024-07-10 08:44:07       4 阅读
  10. react antd table拖拽

    2024-07-10 08:44:07       7 阅读
  11. VB 关键字

    2024-07-10 08:44:07       8 阅读
  12. 前端面试题(13)答案版

    2024-07-10 08:44:07       5 阅读
  13. 智能警卫:Conda包依赖的自动监控之道

    2024-07-10 08:44:07       6 阅读