1.进程通信简介
- 首先两个进程之间是不可以直接进行"数据"的传递的,因为进程具有独立性。
1.1进程通信的目的:
- 数据传输:一个进程需要将它的数据发送给另一个进程
资源共享:多个进程之间共享同样的资源。
通知事件:一个进程需要向另一个或一组进程发送消息,通知它(它们)发生了某种事件(如进程终止时要通知父进程)。
进程控制:有些进程希望完全控制另一个进程的执行(如Debug进程),此时控制进程希望能够拦截另一个进程的所有陷入和异常,并能够及时知道它的状态改变。
本质就是需要多个进程协同,完成一些事情。 1.2进程通信是什么
- 进程通信是一个进程把自己的数据交给另一个进程。
1.3进程通信怎么实现
- 首先,一定是需要一块交换数据的空间(内存),而且由于进程的独立性,这块空间不能由交换数据的进程中任何一个提供。
- 进程通信的本质:先让不同的进程看到相同的资源(一般由操作系统提供)。
- 所以根据操作系统提供的"空间"的样式不同,就决定了不同的通信方式。
2.管道
基于文件的,让不同进程看到同一份资源的通信方式,叫做管道。管道只能被设计成单向通信。如果子进程write,父进程进行read,那么在父进程中关闭w端,也就是关闭下标为4的文件描述符,同理关闭子进程3号文件描述符。具体过程如下图所示。所以说struct file是允许多个进程通过指针指向他的。
int pipe(int pipefd[2]);这个函数用于创建管道,创建的这个管道不需要向磁盘中进行刷新,而且是磁盘中并不存在的文件,是一个内存级别的文件,只能用于具有血缘关系的通信,常用于父子进程。
2.1验证代码
//pipetest.c
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
void writer(int wfd)
{
const char *str = "hello father, I am child";
char buffer[128];
int cnt = 0;
pid_t pid = getpid();
while(1)
{
snprintf(buffer, sizeof(buffer), "message: %s, pid: %d, count: %d\n", str, pid, cnt);
write(wfd, buffer, strlen(buffer));
cnt++;
sleep(1);
}
}
void reader(int rfd)
{
char buffer[1024];
while(1)
{
ssize_t n = read(rfd, buffer, sizeof(buffer)-1);
(void)n;
printf("father get a message: %s", buffer);
}
}
int main()
{
// 1.
int pipefd[2];
int n = pipe(pipefd);
if(n < 0) return 1;
printf("pipefd[0]: %d, pipefd[1]: %d\n", pipefd[0]/*read*/, pipefd[1]/*write*/); // 3, 4
// 2.
pid_t id = fork();
if(id == 0)
{
//child: w
close(pipefd[0]);
writer(pipefd[1]);
exit(0);
}
// father: r
close(pipefd[1]);
reader(pipefd[0]);
wait(NULL);
return 0;
}
//Makefile
testpipe:testpipe.c
gcc -o $@ $^
.PHONY:clean
clean:
rm -f testpipe
2.2管道通信的四种情况
- 管道里面没有数据&&子进程的不关闭自己的写端文件fd,读端(父)就要阻塞等待,直到管道里面有数据。
- 管道内部写满数据&&父进程的读端不关闭,写进程写满之后,就要阻塞等待。
- 如果写端不写了&&写端还关闭了pipe,读端会把pipe中的数据读完,最后会读到返回值为0,表示读结束,类似读到文件的结尾
- 读端不读&&关闭,写端在写,OS会直接终止写入的程序,通过(13)SIGPIPE信号杀掉进程
2.3管道的五种特性
- 自带同步机制
- 血缘关系进行通信,常见于父子进程
- pipe是面向字节流的
- 父子进程退出,管道自动关闭,因为文件生命周期是随进程的。
- 管道只能进行单向通信
3.功能代码(进程池)
//processpool.cc
#include <iostream>
#include <string>
#include <cstdlib>
#include <vector>
#include <unistd.h>
#include <ctime>
#include "task.hpp"
using namespace std;
enum
{
UsageError = 1,
ArgError,
PipeError
};
void Usage(const std::string &proc)
{
cout << "Usage: " << proc << " subprocess-num" << endl;
}
class Channel
{
public:
Channel(int wfd, pid_t sub_id, const std::string &name)
: _wfd(wfd), _sub_process_id(sub_id), _name(name)
{
}
void PrintDebug()
{
cout << "_wfd: " << _wfd;
cout << ",_sub_process_id: " << _sub_process_id;
cout << ", _name: " << _name << endl;
}
string name() {return _name;}
int wfd() {return _wfd;}
pid_t pid() { return _sub_process_id; }
~Channel()
{
}
private:
int _wfd;
pid_t _sub_process_id;
string _name;
};
class ProcessPool
{
public:
ProcessPool(int sub_process_num) : _sub_process_num(sub_process_num)
{
}
int CreateProcess(work_t work) // 回调函数
{
for (int number = 0; number < _sub_process_num; number++)
{
int pipefd[2]{0};
int n = pipe(pipefd);
if (n < 0)
return PipeError;
pid_t id = fork();
if (id == 0)
{
// child -> r
close(pipefd[1]);
// 执行任务
dup2(pipefd[0], 0);
work();
exit(0);
}
string cname = "channel-" + to_string(number);
// father
close(pipefd[0]);
channels.push_back(Channel(pipefd[1], id, cname));
}
return 0;
}
int NextChannel()
{
static int next = 0;
int c = next;
next++;
next %= channels.size();
return c;
}
void SendTaskCode(int index, uint32_t code)
{
cout << "send code: " << code << " to " << channels[index].name() << " sub prorcess id: " << channels[index].pid() << endl;
write(channels[index].wfd(), &code, sizeof(code));
}
void Debug()
{
for (auto &channel : channels)
{
channel.PrintDebug();
}
}
~ProcessPool()
{
}
private:
int _sub_process_num;
vector<Channel> channels;
};
// ./processpool 5
int main(int argc, char *argv[])
{
if (argc != 2)
{
Usage(argv[0]);
return UsageError;
}
int sub_process_num = std::stoi(argv[1]);
if (sub_process_num <= 0)
return ArgError;
srand((uint64_t)time(nullptr));
// 1. 创建通信信道和子进程 -- bug
ProcessPool *processpool_ptr = new ProcessPool(sub_process_num);
processpool_ptr->CreateProcess(worker);
// processpool_ptr->Debug();
// 2. 控制子进程
while(true)
{
// a. 选择一个进程和通道
int channel = processpool_ptr->NextChannel();
// cout << channel.name() << endl;
// b. 你要选择一个任务
uint32_t code = NextTask();
// c. 发送任务
processpool_ptr->SendTaskCode(channel, code);
sleep(1);
}
sleep(100);
// 3. 回收子进程
// wait sub process;
delete processpool_ptr;
return 0;
}
//Makefile
processpool:processpool.cc
g++ -o $@ $^ -std=c++11 -g
.PHONY:clean
clean:
rm -f processpool
//task.hpp
#pragma once
#include <iostream>
#include <unistd.h>
using namespace std;
typedef void(*work_t)(); //函数指针类型
typedef void(*task_t)(); //函数指针类型
void PrintLog()
{
cout << "printf log task" << endl;
}
void ReloadConf()
{
cout << "reload conf task" << endl;
}
void ConnectMysql()
{
cout << "connect mysql task" << endl;
}
task_t tasks[3] = {PrintLog, ReloadConf, ConnectMysql};
uint32_t NextTask()
{
return rand() % 3;
}
void worker()
{
// 从0中读取任务即可!
while(true)
{
uint32_t command_code = 0;
ssize_t n = read(0, &command_code, sizeof(command_code));
if(n == sizeof(command_code))
{
if(command_code >= 3) continue;
tasks[command_code]();
}
cout << "I am worker: " << getpid() << endl;
sleep(1);
}
}
uint32_t command_code = 0;
ssize_t n = read(0, &command_code, sizeof(command_code));
if(n == sizeof(command_code))
{
if(command_code >= 3) continue;
tasks[command_code]();
}
cout << "I am worker: " << getpid() << endl;
sleep(1);
}
}