热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Spark+Kafka+WebSocket+eCharts实时分析完全记录(4)

本系列内容:Kafka环境搭建与测试Python生产者消费者测试Spark接收Kafka消息处理,然后回传到KafkaFlask引入消费者WebSocket实时显示版本:spark

本系列内容:

  1. Kafka环境搭建与测试
  2. Python生产者/消费者测试
  3. Spark接收Kafka消息处理,然后回传到Kafka
  4. Flask引入消费者
  5. WebSocket实时显示

版本:

spark-2.4.3-bin-hadoop2.7.tgz

kafka_2.11-2.1.0.tgz

—————————–第4小节:Flask引入消费者———————————

步骤01:使用pip安装flask与flask_socketio

步骤02:编写后台处理程序

import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
cOnsumer= KafkaConsumer('result', bootstrap_servers=['192.168.147.128:9092'])
# 接收到消息就调用test_message方法,test_message是定义在web_socket对象上的js函数
def background_thread():
for msg in consumer:
data_json = msg.value.decode('utf8')
socketio.emit('test_message', {'data': data_json})
# JS代码中可以调用这个装饰器下的视图函数,以初始化消费者监听kafka
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
thread = socketio.start_background_task(target=background_thread)
socketio.emit('connected', {'data': 'Connected'})
# 返回一个html页面
@app.route("/")
def handle_mes():
return render_template("index.html")
if __name__ == '__main__':
socketio.run(app, debug=True)


推荐阅读
author-avatar
MySeptember
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有