pyzmq实验总结

简介

稍微偏底层的网络框架:

  • 可以跨语言跨平台调用。
  • 可以用inprocIPCTCPTIPCmulticast多途径传递数据。
  • 提供常用的模式,如pub-sub(发布-订阅)push-pull(推送-拉取)router-dealer(路由器-经销商)等模式。
  • 快速的异步IO引擎。
  • 开源、免费。

文档

常用api

Context

创建一个zmq的context。

Socekt

在context中创建一个socket。

socket.bind(addr)

让某个socket监听某个端口。其他socket想连接这个socket,需要使用socket.connect(addr)

socket.connect(addr)

通过端口地址,连接到远程的socket。

bind vs connect

  • 服务端和客户端必须有bindconnect,不能只用connect
  • 不同的进程可以connect同一个端口,但不能bind同一个端口。
  • 必须要保证客户端先connect,服务端再发送数据,否则数据会丢失。
  • 在实际传输数据之前,是客户端先调用connect还是服务端先调用bind,这二者的顺序没有影响。
1
2
3
4
5
6
7
8
import zmq
import random
import sys
import time
# import multiprocessing as mp

context = zmq.Context()
port = "5557"

常用模式

PAIR

传统soket:

  • 点到点。
  • 多对一(多个客服端,一个服务端)。
  • 一对多(广播)。

PAIR socket的行为类似于传统soket:

  • 通讯是双向的。
  • socket不保存特定状态。
  • 只能连接一个(一对一,所以叫配对)
  • 服务端监听一个特定的端口,一个客户端连接到上面。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 服务端

socket = context.socket(zmq.PAIR)
socket.bind("tcp://*:%s" % port)
# socket.connect("tcp://localhost:%s" % port)

# socket = context.socket(zmq.PAIR)
# socket.connect("tcp://localhost:%s" % port)

for i in range(5):
socket.send_string("Server message to client3")
msg = socket.recv_string()
print(msg)
time.sleep(1)
1
2
3
4
5
6
client message to server1
client message to server2
client message to server1
client message to server2
client message to server1

客户端代码:

1
2
3
4
5
6
7
8
9
10
port = "5557"
context = zmq.Context()
socket = context.socket(zmq.PAIR)
socket.connect("tcp://localhost:%s" % port)
for i in range(5):
msg = socket.recv_string()
print(msg)
socket.send_string("client message to server1")
socket.send_string("client message to server2")
time.sleep(1)

客户端输出:

1
2
3
4
5
Server message to client3
Server message to client3
Server message to client3
Server message to client3
Server message to client3

可以观察到:如果服务端绑定了bind某个socket,需要在客户端连接connect上这个socket以后,服务端传输的数据才能被客户端接收到,否则服务端发出去的数据会丢失

注意:上面这点对于所有类型的socket都成立,必须要保证客户端先connect,服务端再发送数据。另外,在实际传输数据之前,是客户端先调用connect还是服务端先调用bind,这二者的顺序没有影响。

Client / Server

又称Request/Reply模式。

  • REQ socket可以连接到多个服务端。
  • 请求可以被同时分发到多个服务端上。
  • REQREP返回消息之前,会被阻塞。
  • REPREQ返回消息之前,会被阻塞。

简单说必须你一言我一语

1
2
3
4
5
6
7
8
9
10
11
# 服务端

socket = context.socket(zmq.REP)
socket.bind("tcp://*:%s" % port)

for i in range(5):
# Wait for next request from client
message = socket.recv_string()
print("Received request: ", message)
time.sleep(1)
socket.send_string("World from %s" % port)
1
2
3
4
5
Received request:  Hello
Received request: Hello
Received request: Hello
Received request: Hello
Received request: Hello

客户端代码:

1
2
3
4
5
6
7
8
9
10
socket = context.socket(zmq.REQ)
socket.connect ("tcp://localhost:%s" % port)
# Do 5 requests, waiting each time for a response
for request in range (5):
print("Sending request ", request,"...")
socket.send_string ("Hello")
# Get the reply.
message = socket.recv_string()
print("Received reply ", request, "[", message, "]")

客户端输出:

1
2
3
4
5
6
7
8
9
10
Sending request  0 ...
Received reply 0 [ World from 5557 ]
Sending request 1 ...
Received reply 1 [ World from 5557 ]
Sending request 2 ...
Received reply 2 [ World from 5557 ]
Sending request 3 ...
Received reply 3 [ World from 5557 ]
Sending request 4 ...
Received reply 4 [ World from 5557 ]

可以观察到:每个REP都会等待收到一个REQ请求,然后才能继续发送响应;每个REQ在发送请求后,必须等待收到一个REP响应,才可以继续发送请求。

Publish/Subscribe

订阅/发布模式,就像微信上订阅公众号一样,你会收到所有你订阅的公众号发送的文章;同时每个公众号会把文章发给所有订阅这个公众号的用户。

1
2
3
4
5
6
7
8
9
10
11
# 服务端

socket = context.socket(zmq.PUB)
socket.bind("tcp://*:%s" % port)

for i in range(5):
topic = random.randrange(9999,10001)
messagedata = random.randrange(1,215) - 80
print("%d %d" % (topic, messagedata))
socket.send_string("%d %d" % (topic, messagedata))
time.sleep(1)
1
2
3
4
5
10000 -68
10000 65
9999 110
10000 -21
10000 114

客户端代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
socket = context.socket(zmq.SUB)
print("Collecting updates from weather server...")
socket.connect ("tcp://localhost:%s" % port)
# Subscribe to 9999
topicfilter = "9999"
socket.setsockopt_string(zmq.SUBSCRIBE, topicfilter)
# Process 1 updates
total_value = 0
for update_nbr in range (1):
string = socket.recv_string()
topic, messagedata = string.split()
total_value += int(messagedata)
print(topic, messagedata)

客户端输出:

1
9999 -76

Push/Pull

又称Pipeline 模式,很像生产者-消费者模式。不一样的是,消费者的结果不会返回给上游,而是传给下游。

顺序:

  1. 启动结果管理器;
  2. 启动worker;
  3. 启动生产者。

1.启动结果管理器

1
2
3
4
5
results_receiver = context.socket(zmq.PULL)
results_receiver.bind("tcp://127.0.0.1:5559")
for x in range(10):
result = results_receiver.recv_json()
print(result)

2.启动worker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import multiprocessing as mp
from multiprocessing import Process

class Worker(Process):
def __init__(self, worker_id):
super().__init__()
self.worker_id = worker_id
def run(self):
# recieve work
consumer_receiver = context.socket(zmq.PULL)
consumer_receiver.connect("tcp://127.0.0.1:5558")
# send work
consumer_sender = context.socket(zmq.PUSH)
consumer_sender.connect("tcp://127.0.0.1:5559")
print(f"worker[{i}] stand by ...")

for _ in range(5):
work = consumer_receiver.recv_json()
data = work['num']
print(f"worker[{i}] data: {data}")
result = { 'consumer' : self.worker_id, 'num' : data}
consumer_sender.send_json(result)

# 开启两个消费者(worker)进程
for i in range(2):
process = Worker(i)
process.start()
1
2
3
4
5
6
7
8
9
10
11
12
worker[0] stand by ...
worker[1] stand by ...
worker[0] data: 0
worker[1] data: 1
worker[0] data: 2
worker[1] data: 3
worker[0] data: 4
worker[1] data: 5
worker[0] data: 6
worker[1] data: 7
worker[0] data: 8
worker[1] data: 9

3.启动生产者,开始生产:

1
2
3
4
5
6
7
8
# 生产端

producer_socket = context.socket(zmq.PUSH)
producer_socket.bind("tcp://127.0.0.1:5558")

for num in range(10):
work_message = { 'num' : num }
producer_socket.send_json(work_message)

worker端显示收到了数据。

结果管理器的输出:

1
2
3
4
5
6
7
8
9
10
{'consumer': 0, 'num': 0}
{'consumer': 1, 'num': 1}
{'consumer': 0, 'num': 2}
{'consumer': 1, 'num': 3}
{'consumer': 0, 'num': 4}
{'consumer': 1, 'num': 5}
{'consumer': 0, 'num': 6}
{'consumer': 1, 'num': 7}
{'consumer': 0, 'num': 8}
{'consumer': 1, 'num': 9}