因为部门里Storm使用的多了,即使要更换应该也会使用相近的框架,这里因为对Storm了解的多了,使用Storm作为java管理python的框架。
Storm使用2.1.0版本
考虑了几个方法
1:Jython:对Python支持不完全,远远不如CPython的好用,如果是要做一些胶水语言的功能尚可,但是完全不能使用其他散发套件,且仅支持Python2。
2:自己起一个进程:不如Storm bolt管理的功能完整。
3. Storm多语言的支持,ShellBolt。
感谢Sugeesh Chandraweera的文章https://medium.com/@Sugeesh/connecting-python-bolt-for-apache-storm-topology-af6c2e3f2200,这边做了整理并修改。(国外的服务器,看不到不影响以下阅读)
这套框架管理的Python是进程而非线程,可以避开GIL的性能瓶颈,需注意Storm多语言的支持并没有成熟到开箱即用,开发者需要有修改源码的能力。
也可以使用我根据面向对象重构的代码
https://github.com/AlbertLiang1994/Storm-Modification/blob/master/Storm.py。
我加了一个套件,override,如果不想引用可以拿掉。
继承BasicBolt后就可以在process里实现业务逻辑了,也可以在initialize里初始化需要用到的对象,PythonBolt.py(命名习惯,实际上名字不一定要和类名一样,也不需要以Bolt结尾)。
import Storm
class PythonBolt(Storm.BasicBolt):
def initialize(self, conf, context):
self._conf = conf
self._context = context
def process(self, tuple):
word = tuple.values[0]
self.emit([word]) # return list object
PythonBolt().run()
然后在java里写一个对应的类。
public class PythonBolt extends ShellBolt implements IRichBolt {
public PythonBolt() {
super(System.getenv().get("PYTHON_EXECUTABLE"), System.getenv().get("STORM_BASE_DIR") + "/PythonBolt.py");
Map env = new HashMap();
env.put("PYTHONPATH", System.getenv().get("STORM_BASE_DIR"));
this.setEnv(env);
this.changeChildCWD(false);
}
}
PYTHON_EXECUTABLE是我在Storm目录下的conf/storm-env.sh里设定的环境变量,因为我不想更改服务器上的python版本,也不想使用python2,所以装了python3,并使用PYTHON_EXECUTABLE指向执行路径。
STORM_BASE_DIR是Storm的bin目录下storm启动脚本里设定的路径,获取后就不用在代码里写死成绝对路径。
changeChildCWD一定要设成false,否则会获取runtime路径。
如果要作为沙箱执行远端上传的代码,有以下注意事项:Storm的Python子进程使用匿名管道,也就是std in,std out,代码里如果有print会导致topology直接崩溃重启,在远端代码不可控的情况下,可以把sys.stdout指向别的对象,因为我们想提供user print内容展示的功能,把对象指向redis连接,然后在storm内部内获取展示。
如果要禁止user import os、sys 等有关操作系统的依赖,可以在执行远端上传代码前这样写:
sys.modules[mod] = None
指定的mod就无法import。
3. 如果要避免用户的method执行时间过长,超过Storm的心跳导致topology直接崩溃重启,可以使用pebble的ThreadPool,设定timeout。
4. 使用importlib import动态上传的python档,要记得del sys.modules[module_name],否则只要sys.modules有这个对象,多次上传并不会引发更新引用。
PS: 如果看完了相关源码,python调用java也可以游刃有余。