实际上,在服务器和客户机上实现text/event-stream相对容易。假设位于example.com的服务器是在端口80(默认端口)上运行的简单服务器:import http.server
import json
import socketserver
import time
class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Connection', 'Keep-Alive')
self.send_header('Content-Type', 'text/event-stream')
self.end_headers()
name = self.path.strip('/').encode('UTF-8')
old_value = None
new_value = None
while True:
old_value, new_value = new_value, time.time()
data = json.dumps({
'old_value': old_value,
'new_value': new_value,
}).encode('UTF-8')
self.wfile.write(b'name: ')
self.wfile.write(name)
self.wfile.write(b'\r\n')
self.wfile.write(b'data: ')
self.wfile.write(data)
self.wfile.write(b'\r\n')
self.wfile.write(b'\r\n')
self.wfile.flush()
time.sleep(30)
server = socketserver.TCPServer(('', 80), RequestHandler)
server.serve_forever()
每隔30秒,它输出一个事件:
^{pr2}$
然后,客户端可以连接到端口http://example.com/value121,并解析每个事件:import http.client
import json
connection = http.client.HTTPConnection('http://example.com', 80)
connection.request('GET', '/value121', headers={
'Accept': 'text/event-source',
'Connection': 'Keep-Alive',
})
with connection.getresponse() as response:
while not response.closed:
event = {}
for line in response:
line = line.decode('UTF-8')
if line == '\r\n':
# End of event.
break
elif line.startswith(':'):
# Comment, ignore.
pass
else:
# Data line.
key, value = line.split(':', 1)
value = value.strip()
if key == 'data':
value = json.loads(value)
event[key] = value
# Handle event, extract values, etc.
print(event)
每隔30秒客户就会打印出:'data': {'old_value': , 'new_value': }, 'name': 'value121'}