最近,工具人突然被拉进了很多工作微信群。这些群有些共性:比如:
1,在这些群中,被动加入的都是工具人,而群主和他的朋友,都是工具的使用人;
2,在这些群中,工具的使用人各不相同,而工具就那么几个;
3,这些群,安静时是逢年过节,活跃时都在讨论用哪个工具人祭天;
4,这些群的名字,大部分都叫:XX需求爆肝群,XX问题背锅群,XX团队马屁群。
所以从被迫进群的那一刻,工具人们就如同开始了一场绝地求生的游戏,不约而同地开启消息免打扰,尽量保持隐身。于是,工具人很困惑,明明已经有开会这个消磨时间的沟通方式了,为啥还要在微信中打磨工具人呢?
在Flink中,同样让工具人困惑的是,之前学习了Flink的算子之间的数据传递过程,这部分的数据传递在Flink中是通过netty来实现的(相关阅读可以参见:「Flink」工具人之初学Flink-Task算子通信(一)和「Flink」工具人之初学Flink-Task算子通信(二)),而今天要学习的Flink组件之间的通信(如:JobMaster,TaskManager等)却是通过Akka来实现的。
在整个通信的设计的模型中,主要分为几个概念:
一,RpcService
RpcService的唯一实现就是AkkaRpcService,这就是组件事件通信的akka实体。
下图中,我们可以看到AkkaRpcService持有了进程的地址、服务端口,以及提供的一些核心服务。
在进程启动时,以JobManager为例:
ClusterEntrypoint.class
private void runCluster(Configuration configuration) throws Exception {
synchronized (lock) {
//各项服务初始化,其中就包涵了AkkaService的初始化
initializeServices(configuration);
.....
}
}
protected void initializeServices(Configuration configuration) throws Exception {
LOG.info("Initializing cluster services.");
synchronized (lock) {
final String bindAddress = configuration.getString(JobManagerOptions.ADDRESS);
final String portRange = getRPCPortRange(configuration);
//创建RpcService
commOnRpcService= createRpcService(configuration, bindAddress, portRange);
......
}
}
@Nonnull
private RpcService createRpcService(Configuration configuration, String bindAddress, String portRange) throws Exception {
//创建了AkkaRpcService
return AkkaRpcServiceUtils.createRpcService(bindAddress, portRange, configuration);
}
二,RpcEndpoint是服务提供方,而RpcGateway是服务调用方的代理类。
如下面的类图中,我们以JobMaster举例:
JobMaster作为JobManager中的核心服务,他是服务提供方。由于他都受到了高可用的保护,会同时存在多个节点,所以,他都继承于FencedRpcEndpoint,并提供相应的RPC服务调用。
同时JobMaster也继承了JobMasterGateway,通过继承JobMasterGateway来约束了JobMaster必须提供的接口契约,JobMasterGateway是TaskManager中的JobMaster的代理实例,将会被TaskManager调用。
每个服务提供方(如:JobMaster)在初始化时,都会在基类RpcEndpoint的构造过程中,调用自动自己的Akka服务,创建ActorRef对象,绑定远端Gateway的动态代理,并返回了RpcServer对象。
RpcEndpoint.class
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService, "rpcService");
this.endpointId = checkNotNull(endpointId, "endpointId");
//调用AkkaRpcService创建本服务的ActorRef,并绑定远端Gateway的动态代理
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, this::validateRunsInMainThread);
}
这里我们截取部分rpcService::startServer的源码略作说明:
AkkaRpcService.class
@Override
public
checkNotNull(rpcEndpoint, "rpc endpoint");
.......
ActorRef actorRef;
//在这里创建了actorRef
synchronized (lock) {
checkState(!stopped, "RpcService is stopped");
actorRef = actorSystem.actorOf(akkaRpcActorProps, rpcEndpoint.getEndpointId());
actors.put(actorRef, rpcEndpoint);
}
LOG.info("Starting RPC endpoint for {} at {} .", rpcEndpoint.getClass().getName(), actorRef.path());
final String akkaAddress = AkkaUtils.getAkkaURL(actorSystem, actorRef);
final String hostname;
Option
if (host.isEmpty()) {
hostname = "localhost";
} else {
hostname = host.get();
}
Set
implementedRpcGateways.add(RpcServer.class);
implementedRpcGateways.add(AkkaBasedEndpoint.class);
final InvocationHandler akkaInvocationHandler;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
// a FencedRpcEndpoint needs a FencedAkkaInvocationHandler
akkaInvocatiOnHandler= new FencedAkkaInvocationHandler<>(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture,
((FencedRpcEndpoint>) rpcEndpoint)::getFencingToken);
implementedRpcGateways.add(FencedMainThreadExecutable.class);
} else {
akkaInvocatiOnHandler= new AkkaInvocationHandler(
akkaAddress,
hostname,
actorRef,
configuration.getTimeout(),
configuration.getMaximumFramesize(),
terminationFuture);
}
// Rather than using the System ClassLoader directly, we derive the ClassLoader
// from this class . That works better in cases where Flink runs embedded and all Flink
// code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader
ClassLoader classLoader = getClass().getClassLoader();
//在这里创建了动态代理
@SuppressWarnings("unchecked")
RpcServer server = (RpcServer) Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(new Class>[implementedRpcGateways.size()]),
akkaInvocationHandler);
return server;
}
而当JobMaster启动时,则会最终调用rpcServer的start方法
JobMaster.class
/**
* Start the rpc service and begin to run the job.
*
* @param newJobMasterId The necessary fencing token to run the job
* @return Future acknowledge if the job could be started. Otherwise the future contains an exception
*/
public CompletableFuture
// make sure we receive RPC and async calls
super.start();
return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
}
这里的rpcEndpoint就是实际的ActorRef对象,向Actor发送了一个匿名的START指令,启动了endpoint。
RpcEndpoint.class
@Override
public void start() {
rpcEndpoint.tell(Processing.START, ActorRef.noSender());
}
三,连接过程
当TaskManager收到了ResourceManager的创建资源的消息后,会主动通知JobMaster,然后JobMaster会发起到TaskExecutor的连接。
JobLeaderService.class
/**
* Retrying registration for the job manager <--> task manager connection.
*/
private static final class JobManagerRetryingRegistration
extends RetryingRegistration<JobMasterId, JobMasterGateway, JMTMRegistrationSuccess> {
private final String taskManagerRpcAddress;
private final TaskManagerLocation taskManagerLocation;
JobManagerRetryingRegistration(
Logger log,
RpcService rpcService,
String targetName,
Class
String targetAddress,
JobMasterId jobMasterId,
RetryingRegistrationConfiguration retryingRegistrationConfiguration,
String taskManagerRpcAddress,
TaskManagerLocation taskManagerLocation) {
super(
log,
rpcService,
targetName,
targetType,
targetAddress,
jobMasterId,
retryingRegistrationConfiguration);
this.taskManagerRpcAddress = taskManagerRpcAddress;
this.taskManagerLocation = Preconditions.checkNotNull(taskManagerLocation);
}
@Override
protected CompletableFuture
JobMasterGateway gateway,
JobMasterId jobMasterId,
long timeoutMillis) throws Exception {
//通知JobMaster,来连接TaskManager
return gateway.registerTaskManager(taskManagerRpcAddress, taskManagerLocation, Time.milliseconds(timeoutMillis));
}
}
这里其实就是通过动态代理AkkaInvocationHandler发起了远程调用。
在动态代理AkkaInvocationHandler内部,会根据调用的方判断是走本地调用还是远程调用。
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
Class> declaringClass = method.getDeclaringClass();
Object result;
if (declaringClass.equals(AkkaBasedEndpoint.class) ||
declaringClass.equals(Object.class) ||
declaringClass.equals(RpcGateway.class) ||
declaringClass.equals(StartStoppable.class) ||
declaringClass.equals(MainThreadExecutable.class) ||
declaringClass.equals(RpcServer.class)) {
result = method.invoke(this, args);
} else if (declaringClass.equals(FencedRpcGateway.class)) {
throw new UnsupportedOperationException("AkkaInvocationHandler does not support the call FencedRpcGateway#" +
method.getName() + ". This indicates that you retrieved a FencedRpcGateway without specifying a " +
"fencing token. Please use RpcService#connect(RpcService, F, Time) with F being the fencing token to " +
"retrieve a properly FencedRpcGateway.");
} else {
result = invokeRpc(method, args);
}
return result;
}
在Flink中,有些场景看起来像是发起了一个Rpc调用,其实调用的还是本地服务,比如:Standalone模式下向ResourceManager申请资源等。所以,invokeRpc中将所有的调用参数统一封装为一个叫RpcInvocation的数据结构。
它的两个子类LocalRpcInvocation和RemoteRpcInvocation 唯一的区别就是RemoteRpcInvocation 中的数据经过了序列化,方便网络传递。
protected RpcInvocation createRpcInvocationMessage(
final String methodName,
final Class>[] parameterTypes,
final Object[] args) throws IOException {
final RpcInvocation rpcInvocation;
if (isLocal) {
rpcInvocation = new LocalRpcInvocation(
methodName,
parameterTypes,
args);
} else {
try {
RemoteRpcInvocation remoteRpcInvocation = new RemoteRpcInvocation(
methodName,
parameterTypes,
args);
if (remoteRpcInvocation.getSize() > maximumFramesize) {
throw new IOException("The rpc invocation size exceeds the maximum akka framesize.");
} else {
rpcInvocation = remoteRpcInvocation;
}
} catch (IOException e) {
LOG.warn("Could not create remote rpc invocation message. Failing rpc invocation because...", e);
throw e;
}
}
return rpcInvocation;
}
收到请求后 JobMaster连接TaskExecutor,并返回TaskExecutorGateway
JobMaster.class
@Override
public CompletableFuture
final String taskManagerRpcAddress,
final TaskManagerLocation taskManagerLocation,
final Time timeout) {
final ResourceID taskManagerId = taskManagerLocation.getResourceID();
if (registeredTaskManagers.containsKey(taskManagerId)) {
final RegistrationResponse respOnse= new JMTMRegistrationSuccess(resourceId);
return CompletableFuture.completedFuture(response);
} else {
// 连接TaskManager 并返回TaskExecutor
return getRpcService()
.connect(taskManagerRpcAddress, TaskExecutorGateway.class)
.handleAsync(
(TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> {
......
},
getMainThreadExecutor());
}
}
总结一下,以上整体过程大致如下图所示:
最后,工具人表示,在数不清的微信群中疲于奔命,会影响工作效率和质量;工具人们是喝咖啡集中精力思考与创造的工程师,而不是喝着麻辣烫,服务那个伺候这个的保姆。