X

ZMQ的异步和进程间通信

Zeromq : 异步和多线程

在上一小节中,任务分发的时候,Worker一边连接着任务分发的服务器,一边连接着结果收集的服务器(在不同的端口),这种需要多个Socket的情况下,如果仅仅是按照队列的方式来处理,在上一个模型还比较容易理解,必须得到任务以后,才有计算结果,但是如果是并行的毫无交集的多个Socket呢? 观察下面两段代码:

<?php
/*
* Reading from multiple sockets
* This version uses a simple recv loop
* @author Ian Barber <ian(dot)barber(at)gmail(dot)com>
*/

// Prepare our context and sockets
$context = new ZMQContext();

// Connect to task ventilator
$receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL);
$receiver->connect("tcp://localhost:5557");

// Connect to weather server
$subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB);
$subscriber->connect("tcp://localhost:5556");
$subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "10001");

// Process messages from both sockets
// We prioritize traffic from the task ventilator
while(true) {
// Process any waiting tasks
try {
for($rc = 0; !$rc;) {
if($rc = $receiver->recv(ZMQ::MODE_NOBLOCK)) {
// process task
}
}
} catch (ZMQSocketException $e) {
// do nothing //
}

try {
// Process any waiting weather updates
for($rc = 0; !$rc;) {
if($rc = $subscriber->recv(ZMQ::MODE_NOBLOCK)) {
// process weather update
}
}
} catch (ZMQSocketException $e) {
// do nothing
}

// No activity, so sleep for 1 msec//
usleep(1);
}


 

 

在第二段代码中,有一处改进是,当多个socket的内容准备好的时候,会自动调用处理程序,类似于异步中的回调函数,这个便是ZMQ的poll。

下面我们再来看看ZMQ的多线程,以及线程间的通信问题:

we don't need mutexes, locks, or any other form of inter-thread communication except messages sent across ØMQ sockets.

 

具体的模型是这样的,代码如下:

http://zguide.zeromq.org/php:mtserver

在代码中,可以看到流程,首先是启动5个php进程,每个进程的工作是打印收到的信息,并响应world,5555端口用来接收客户端的请求,而workers.ipc交换文件,则用于主进程向worker子php进程间的通信。程序的模型如下 :

 

子进程执行Step2函数,而在Step2函数中做的是,将Step产生的数据,通过ipc://step2.ipc,将step1的进程的数据,step2进程中,然后再通过ipc://step3.ipc,由进程2传到进程3,最后由进程3进行输出。输出的结果实际上由进程step1产生。通过这个示例,让我们很清楚的了解,zmq通过交换文件进行进程间的通信。

了解了这些以后,接下来,我们来处理上面留下的问题 : 如何防止后来加入的订阅者丢失数据?这个就是zmq的节点间的协作,有一个方法是,预先知道订阅者的数量,然后等待所有的订阅者都加入以后,开始分发数据。

如何实现 ? 在订阅者和发布者通过pub模式连接的时候,再开一个端口,进行应答服务。订阅者发送信息给发布者,发布者收到以后,代表一个订阅者在线,并加入,等到收集到足够数量的订阅者以后,代表人都到齐了,开始分发数据。那么如果,仅仅是如果,世界上有很多如果,如果在传送的数据过程中,某一个订阅者掉线了。。。。 怎么处理这个情况呢?

Zmq提供了一个Identities身份标识,能够记住掉线者的状态,等掉线者重新连接上来的时候,能够从上次断开的时候,接着传送数据。

 

 

与前面所讲的订阅者模式,不同的是,加入了id标识。那么产生的效果如何呢?

但是这个有个问题,是当订阅者结束以后,发布者的内存使用会越来越多,当订阅者重启以后,内存使用率又降低,对于这个问题,因为程序需要空间保留订阅者以后的数据,以及状态。我们可以通过将队列信息传入到文件中存储,以减少内存的使用率。这个设置是比较有技巧的,要根据实际情况来设置,如果过小,则会导致丢失信息。如果过多,会导致读取很慢。

Categories: 系统架构学习
Tags: ZeroMQzmq
龙安_任天兵: 不忘初心,方得始终!