Zeromq : 异步和多线程
在上一小节中,任务分发的时候,Worker一边连接着任务分发的服务器,一边连接着结果收集的服务器(在不同的端口),这种需要多个Socket的情况下,如果仅仅是按照队列的方式来处理,在上一个模型还比较容易理解,必须得到任务以后,才有计算结果,但是如果是并行的毫无交集的多个Socket呢? 观察下面两段代码:
<?php /* * Reading from multiple sockets * This version uses a simple recv loop * @author Ian Barber <ian(dot)barber(at)gmail(dot)com> */ // Prepare our context and sockets $context = new ZMQContext(); // Connect to task ventilator $receiver = new ZMQSocket($context, ZMQ::SOCKET_PULL); $receiver->connect("tcp://localhost:5557"); // Connect to weather server $subscriber = new ZMQSocket($context, ZMQ::SOCKET_SUB); $subscriber->connect("tcp://localhost:5556"); $subscriber->setSockOpt(ZMQ::SOCKOPT_SUBSCRIBE, "10001"); // Process messages from both sockets // We prioritize traffic from the task ventilator while(true) { // Process any waiting tasks try { for($rc = 0; !$rc;) { if($rc = $receiver->recv(ZMQ::MODE_NOBLOCK)) { // process task } } } catch (ZMQSocketException $e) { // do nothing // } try { // Process any waiting weather updates for($rc = 0; !$rc;) { if($rc = $subscriber->recv(ZMQ::MODE_NOBLOCK)) { // process weather update } } } catch (ZMQSocketException $e) { // do nothing } // No activity, so sleep for 1 msec// usleep(1); }
在第二段代码中,有一处改进是,当多个socket的内容准备好的时候,会自动调用处理程序,类似于异步中的回调函数,这个便是ZMQ的poll。
下面我们再来看看ZMQ的多线程,以及线程间的通信问题:
we don't need mutexes, locks, or any other form of inter-thread communication except messages sent across ØMQ sockets.
具体的模型是这样的,代码如下:
http://zguide.zeromq.org/php:mtserver
在代码中,可以看到流程,首先是启动5个php进程,每个进程的工作是打印收到的信息,并响应world,5555端口用来接收客户端的请求,而workers.ipc交换文件,则用于主进程向worker子php进程间的通信。程序的模型如下 :
子进程执行Step2函数,而在Step2函数中做的是,将Step产生的数据,通过ipc://step2.ipc,将step1的进程的数据,step2进程中,然后再通过ipc://step3.ipc,由进程2传到进程3,最后由进程3进行输出。输出的结果实际上由进程step1产生。通过这个示例,让我们很清楚的了解,zmq通过交换文件进行进程间的通信。
了解了这些以后,接下来,我们来处理上面留下的问题 : 如何防止后来加入的订阅者丢失数据?这个就是zmq的节点间的协作,有一个方法是,预先知道订阅者的数量,然后等待所有的订阅者都加入以后,开始分发数据。
如何实现 ? 在订阅者和发布者通过pub模式连接的时候,再开一个端口,进行应答服务。订阅者发送信息给发布者,发布者收到以后,代表一个订阅者在线,并加入,等到收集到足够数量的订阅者以后,代表人都到齐了,开始分发数据。那么如果,仅仅是如果,世界上有很多如果,如果在传送的数据过程中,某一个订阅者掉线了。。。。 怎么处理这个情况呢?
Zmq提供了一个Identities身份标识,能够记住掉线者的状态,等掉线者重新连接上来的时候,能够从上次断开的时候,接着传送数据。
与前面所讲的订阅者模式,不同的是,加入了id标识。那么产生的效果如何呢?
但是这个有个问题,是当订阅者结束以后,发布者的内存使用会越来越多,当订阅者重启以后,内存使用率又降低,对于这个问题,因为程序需要空间保留订阅者以后的数据,以及状态。我们可以通过将队列信息传入到文件中存储,以减少内存的使用率。这个设置是比较有技巧的,要根据实际情况来设置,如果过小,则会导致丢失信息。如果过多,会导致读取很慢。