[Linux]进程间通信—管道通信

1.进程通信简介

  • 首先两个进程之间是不可以直接进行"数据"的传递的,因为进程具有独立性。
  • 1.1进程通信的目的:
  • 数据传输:一个进程需要将它的数据发送给另一个进程
    资源共享:多个进程之间共享同样的资源。
    通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
    进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
    本质就是需要多个进程协同,完成一些事情。
  • 1.2进程通信是什么
  • 进程通信是一个进程把自己的数据交给另一个进程。
  • 1.3进程通信怎么实现
  • 首先,一定是需要一块交换数据的空间(内存),而且由于进程的独立性,这块空间不能由交换数据的进程中任何一个提供。
  • 进程通信的本质:先让不同的进程看到相同的资源(一般由操作系统提供)。
  • 所以根据操作系统提供的"空间"的样式不同,就决定了不同的通信方式。

2.管道

  • 在这里插入图片描述

  • 基于文件的,让不同进程看到同一份资源的通信方式,叫做管道。管道只能被设计成单向通信。如果子进程write,父进程进行read,那么在父进程中关闭w端,也就是关闭下标为4的文件描述符,同理关闭子进程3号文件描述符。具体过程如下图所示。所以说struct file是允许多个进程通过指针指向他的。

  • 在这里插入图片描述

  • int pipe(int pipefd[2]);这个函数用于创建管道,创建的这个管道不需要向磁盘中进行刷新,而且是磁盘中并不存在的文件,是一个内存级别的文件,只能用于具有血缘关系的通信,常用于父子进程。

2.1验证代码
//pipetest.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>

void writer(int wfd)
{
    const char *str = "hello father, I am child";
    char buffer[128];
    int cnt = 0;
    pid_t pid = getpid();
    while(1)
    {
        snprintf(buffer, sizeof(buffer), "message: %s, pid: %d, count: %d\n", str, pid, cnt);
        write(wfd, buffer, strlen(buffer));
        cnt++;
        sleep(1);
    }
}
void reader(int rfd)
{
    char buffer[1024];
    while(1)
    {
        ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
        (void)n;
        printf("father get a message: %s", buffer);
    }
}


int main()
{
    // 1. 
    int pipefd[2];
    int n = pipe(pipefd);
    if(n < 0) return 1;
    printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0]/*read*/, pipefd[1]/*write*/); // 3, 4

    // 2. 
    pid_t id = fork();
    if(id == 0)
    {
        //child: w
        close(pipefd[0]);

        writer(pipefd[1]);

        exit(0);
    }

    // father: r
    close(pipefd[1]);

    reader(pipefd[0]);
    wait(NULL);

    return 0;
}
//Makefile
testpipe:testpipe.c
	gcc -o $@ $^
.PHONY:clean
clean:
	rm -f  testpipe
2.2管道通信的四种情况
  • 管道里面没有数据&&子进程的不关闭自己的写端文件fd,读端(父)就要阻塞等待,直到管道里面有数据。
  • 管道内部写满数据&&父进程的读端不关闭,写进程写满之后,就要阻塞等待。
  • 如果写端不写了&&写端还关闭了pipe,读端会把pipe中的数据读完,最后会读到返回值为0,表示读结束,类似读到文件的结尾
  • 读端不读&&关闭,写端在写,OS会直接终止写入的程序,通过(13)SIGPIPE信号杀掉进程
2.3管道的五种特性
  • 自带同步机制
  • 血缘关系进行通信,常见于父子进程
  • pipe是面向字节流的
  • 父子进程退出,管道自动关闭,因为文件生命周期是随进程的。
  • 管道只能进行单向通信

在这里插入图片描述

3.功能代码(进程池)

//processpool.cc
#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <ctime>
#include "task.hpp"

using namespace std;

enum
{
    UsageError = 1,
    ArgError,
    PipeError
};

void Usage(const std::string &proc)
{
    cout << "Usage: " << proc << " subprocess-num" << endl;
}

class Channel
{
public:
    Channel(int wfd, pid_t sub_id, const std::string &name)
        : _wfd(wfd), _sub_process_id(sub_id), _name(name)
    {
    }
    void PrintDebug()
    {
        cout << "_wfd: " << _wfd;
        cout << ",_sub_process_id: " << _sub_process_id;
        cout << ", _name: " << _name << endl;
    }
    string name() {return _name;}
    int wfd() {return _wfd;}
    pid_t pid() { return _sub_process_id; }
    ~Channel()
    {
    }

private:
    int _wfd;
    pid_t _sub_process_id;
    string _name;
};

class ProcessPool
{
public:
    ProcessPool(int sub_process_num) : _sub_process_num(sub_process_num)
    {
    }
    int CreateProcess(work_t work) // 回调函数
    {
        for (int number = 0; number < _sub_process_num; number++)
        {
            int pipefd[2]{0};
            int n = pipe(pipefd);
            if (n < 0)
                return PipeError;

            pid_t id = fork();
            if (id == 0)
            {
                // child -> r
                close(pipefd[1]);
                // 执行任务
                dup2(pipefd[0], 0);
                work();
                exit(0);
            }
            string cname = "channel-" + to_string(number);
            // father
            close(pipefd[0]);
            channels.push_back(Channel(pipefd[1], id, cname));
        }
        return 0;
    }
    int NextChannel()
    {
        static int next = 0;
        int c = next;
        next++;
        next %= channels.size();
        return c;
    }
    void SendTaskCode(int index, uint32_t code)
    {
        cout << "send code: " << code << " to " << channels[index].name() << " sub prorcess id: " <<  channels[index].pid() << endl;
        write(channels[index].wfd(), &code, sizeof(code));
    }
    void Debug()
    {
        for (auto &channel : channels)
        {
            channel.PrintDebug();
        }
    }
    ~ProcessPool()
    {
    }

private:
    int _sub_process_num;
    vector<Channel> channels;
};

// ./processpool 5
int main(int argc, char *argv[])
{
    if (argc != 2)
    {
        Usage(argv[0]);
        return UsageError;
    }
    int sub_process_num = std::stoi(argv[1]);
    if (sub_process_num <= 0)
        return ArgError;

    srand((uint64_t)time(nullptr));

    // 1. 创建通信信道和子进程 -- bug
    ProcessPool *processpool_ptr = new ProcessPool(sub_process_num);
    processpool_ptr->CreateProcess(worker);
    // processpool_ptr->Debug();

    // 2. 控制子进程
    while(true)
    {
        // a. 选择一个进程和通道
        int channel = processpool_ptr->NextChannel();
        // cout << channel.name() << endl;

        // b. 你要选择一个任务
        uint32_t code = NextTask();

        // c. 发送任务
        processpool_ptr->SendTaskCode(channel, code);

        sleep(1);
    }

    sleep(100);

    // 3. 回收子进程
    // wait sub process;

    delete processpool_ptr;
    return 0;
}
//Makefile
processpool:processpool.cc
	g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
	rm -f processpool
//task.hpp
#pragma once

#include <iostream>
#include <unistd.h>

using namespace std;

typedef void(*work_t)();  //函数指针类型
typedef void(*task_t)();  //函数指针类型

void PrintLog()
{
    cout << "printf log task" << endl;
}

void ReloadConf()
{
    cout << "reload conf task" << endl;
}

void ConnectMysql()
{
    cout << "connect mysql task" << endl;
}

task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};

uint32_t NextTask()
{
    return rand() % 3;
}

void worker()
{
    // 从0中读取任务即可!
    while(true)
    {
        uint32_t command_code = 0;
        ssize_t n = read(0, &command_code, sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code >= 3) continue;
            tasks[command_code]();
        }
        cout << "I am worker: " << getpid() << endl;
        sleep(1);
    }
}
uint32_t command_code = 0;
        ssize_t n = read(0, &command_code, sizeof(command_code));
        if(n == sizeof(command_code))
        {
            if(command_code >= 3) continue;
            tasks[command_code]();
        }
        cout << "I am worker: " << getpid() << endl;
        sleep(1);
    }
}

相关推荐

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-04 01:14:03       5 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-04 01:14:03       5 阅读
  3. 在Django里面运行非项目文件

    2024-04-04 01:14:03       4 阅读
  4. Python语言-面向对象

    2024-04-04 01:14:03       6 阅读

热门阅读

  1. 零基础入门多媒体音频(6)-alsa(2)

    2024-04-04 01:14:03       21 阅读
  2. Spring Boot Actuator

    2024-04-04 01:14:03       21 阅读
  3. Collection中常用方法

    2024-04-04 01:14:03       21 阅读
  4. redis

    2024-04-04 01:14:03       17 阅读
  5. Yocto理论基础之layer

    2024-04-04 01:14:03       29 阅读
  6. 网络安全专业术语

    2024-04-04 01:14:03       19 阅读