简介 稍微偏底层的网络框架:
可以跨语言跨平台调用。
可以用inproc
、IPC
、TCP
、TIPC
、multicast
多途径传递数据。
提供常用的模式,如pub-sub(发布-订阅)
、push-pull(推送-拉取)
、router-dealer(路由器-经销商)
等模式。
快速的异步IO引擎。
开源、免费。
文档
常用api Context 创建一个zmq的context。
Socekt 在context中创建一个socket。
socket.bind(addr) 让某个socket监听某个端口。其他socket想连接这个socket,需要使用socket.connect(addr)
。
socket.connect(addr) 通过端口地址,连接到远程的socket。
bind vs connect
服务端和客户端必须有bind
和connect
,不能只用connect
。
不同的进程可以connect
同一个端口,但不能bind
同一个端口。
必须要保证客户端先connect
,服务端再发送数据,否则数据会丢失。
在实际传输数据之前,是客户端先调用connect
还是服务端先调用bind
,这二者的顺序没有影响。
1 2 3 4 5 6 7 8 import zmqimport randomimport sysimport timecontext = zmq.Context() port = "5557"
常用模式 PAIR 传统soket:
点到点。
多对一(多个客服端,一个服务端)。
一对多(广播)。
PAIR
socket的行为类似于传统soket:
通讯是双向的。
socket不保存特定状态。
只能连接一个(一对一,所以叫配对)
服务端监听一个特定的端口,一个客户端连接到上面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 socket = context.socket(zmq.PAIR) socket.bind("tcp://*:%s" % port) for i in range (5 ): socket.send_string("Server message to client3" ) msg = socket.recv_string() print (msg) time.sleep(1 )
1 2 3 4 5 6 client message to server1 client message to server2 client message to server1 client message to server2 client message to server1
客户端代码:
1 2 3 4 5 6 7 8 9 10 port = "5557" context = zmq.Context() socket = context.socket(zmq.PAIR) socket.connect("tcp://localhost:%s" % port) for i in range (5 ): msg = socket.recv_string() print (msg) socket.send_string("client message to server1" ) socket.send_string("client message to server2" ) time.sleep(1 )
客户端输出:
1 2 3 4 5 Server message to client3 Server message to client3 Server message to client3 Server message to client3 Server message to client3
可以观察到:如果服务端绑定了bind
某个socket,需要在客户端连接connect
上这个socket以后,服务端传输的数据才能被客户端接收到,否则服务端发出去的数据会丢失 。
注意: 上面这点对于所有类型的socket都成立,必须要保证客户端先connect
,服务端再发送数据。另外,在实际传输数据之前,是客户端先调用connect
还是服务端先调用bind
,这二者的顺序没有影响。
Client / Server 又称Request/Reply
模式。
REQ
socket可以连接到多个服务端。
请求可以被同时分发到多个服务端上。
REQ
在REP
返回消息之前,会被阻塞。
REP
在REQ
返回消息之前,会被阻塞。
简单说必须你一言我一语 。
1 2 3 4 5 6 7 8 9 10 11 socket = context.socket(zmq.REP) socket.bind("tcp://*:%s" % port) for i in range (5 ): message = socket.recv_string() print ("Received request: " , message) time.sleep(1 ) socket.send_string("World from %s" % port)
1 2 3 4 5 Received request: Hello Received request: Hello Received request: Hello Received request: Hello Received request: Hello
客户端代码:
1 2 3 4 5 6 7 8 9 10 socket = context.socket(zmq.REQ) socket.connect ("tcp://localhost:%s" % port) for request in range (5 ): print ("Sending request " , request,"..." ) socket.send_string ("Hello" ) message = socket.recv_string() print ("Received reply " , request, "[" , message, "]" )
客户端输出:
1 2 3 4 5 6 7 8 9 10 Sending request 0 ... Received reply 0 [ World from 5557 ] Sending request 1 ... Received reply 1 [ World from 5557 ] Sending request 2 ... Received reply 2 [ World from 5557 ] Sending request 3 ... Received reply 3 [ World from 5557 ] Sending request 4 ... Received reply 4 [ World from 5557 ]
可以观察到:每个REP
都会等待收到一个REQ
请求,然后才能继续发送响应;每个REQ
在发送请求后,必须等待收到一个REP
响应,才可以继续发送请求。
Publish/Subscribe 订阅/发布模式,就像微信上订阅公众号一样,你会收到所有你订阅的公众号发送的文章;同时每个公众号会把文章发给所有订阅这个公众号的用户。
1 2 3 4 5 6 7 8 9 10 11 socket = context.socket(zmq.PUB) socket.bind("tcp://*:%s" % port) for i in range (5 ): topic = random.randrange(9999 ,10001 ) messagedata = random.randrange(1 ,215 ) - 80 print ("%d %d" % (topic, messagedata)) socket.send_string("%d %d" % (topic, messagedata)) time.sleep(1 )
1 2 3 4 5 10000 -68 10000 65 9999 110 10000 -21 10000 114
客户端代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 socket = context.socket(zmq.SUB) print ("Collecting updates from weather server..." )socket.connect ("tcp://localhost:%s" % port) topicfilter = "9999" socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter) total_value = 0 for update_nbr in range (1 ): string = socket.recv_string() topic, messagedata = string.split() total_value += int (messagedata) print (topic, messagedata)
客户端输出:
Push/Pull 又称Pipeline
模式,很像生产者-消费者
模式。不一样的是,消费者的结果不会返回给上游,而是传给下游。
顺序:
启动结果管理器;
启动worker;
启动生产者。
1.启动结果管理器
1 2 3 4 5 results_receiver = context.socket(zmq.PULL) results_receiver.bind("tcp://127.0.0.1:5559" ) for x in range (10 ): result = results_receiver.recv_json() print (result)
2.启动worker
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import multiprocessing as mpfrom multiprocessing import Processclass Worker (Process ): def __init__ (self, worker_id ): super ().__init__() self .worker_id = worker_id def run (self ): consumer_receiver = context.socket(zmq.PULL) consumer_receiver.connect("tcp://127.0.0.1:5558" ) consumer_sender = context.socket(zmq.PUSH) consumer_sender.connect("tcp://127.0.0.1:5559" ) print (f"worker[{i} ] stand by ..." ) for _ in range (5 ): work = consumer_receiver.recv_json() data = work['num' ] print (f"worker[{i} ] data: {data} " ) result = { 'consumer' : self .worker_id, 'num' : data} consumer_sender.send_json(result) for i in range (2 ): process = Worker(i) process.start()
1 2 3 4 5 6 7 8 9 10 11 12 worker[0] stand by ... worker[1] stand by ... worker[0] data: 0 worker[1] data: 1 worker[0] data: 2 worker[1] data: 3 worker[0] data: 4 worker[1] data: 5 worker[0] data: 6 worker[1] data: 7 worker[0] data: 8 worker[1] data: 9
3.启动生产者,开始生产:
1 2 3 4 5 6 7 8 producer_socket = context.socket(zmq.PUSH) producer_socket.bind("tcp://127.0.0.1:5558" ) for num in range (10 ): work_message = { 'num' : num } producer_socket.send_json(work_message)
worker端显示收到了数据。
结果管理器的输出:
1 2 3 4 5 6 7 8 9 10 {'consumer': 0, 'num': 0} {'consumer': 1, 'num': 1} {'consumer': 0, 'num': 2} {'consumer': 1, 'num': 3} {'consumer': 0, 'num': 4} {'consumer': 1, 'num': 5} {'consumer': 0, 'num': 6} {'consumer': 1, 'num': 7} {'consumer': 0, 'num': 8} {'consumer': 1, 'num': 9}