热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

聊聊flink的ActorGateway

为什么80%的码农都做不了架构师?序本文主要研究一下flink的ActorGatewayActorGatewayflink-1.7.2flink-runtime

为什么80%的码农都做不了架构师?>>>   hot3.png

本文主要研究一下flink的ActorGateway

ActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/ActorGateway.java

public interface ActorGateway extends Serializable {/*** Sends a message asynchronously and returns its response. The response to the message is* returned as a future.** @param message Message to be sent* @param timeout Timeout until the Future is completed with an AskTimeoutException* @return Future which contains the response to the sent message*/Future ask(Object message, FiniteDuration timeout);/*** Sends a message asynchronously without a result.** @param message Message to be sent*/void tell(Object message);/*** Sends a message asynchronously without a result with sender being the sender.** @param message Message to be sent* @param sender Sender of the message*/void tell(Object message, ActorGateway sender);/*** Forwards a message. For the receiver of this message it looks as if sender has sent the* message.** @param message Message to be sent* @param sender Sender of the forwarded message*/void forward(Object message, ActorGateway sender);/*** Retries to send asynchronously a message up to numberRetries times. The response to this* message is returned as a future. The message is re-sent if the number of retries is not yet* exceeded and if an exception occurred while sending it.** @param message Message to be sent* @param numberRetries Number of times to retry sending the message* @param timeout Timeout for each sending attempt* @param executionContext ExecutionContext which is used to send the message multiple times* @return Future of the response to the sent message*/Future retry(Object message,int numberRetries,FiniteDuration timeout,ExecutionContext executionContext);/*** Returns the path of the remote instance.** @return Path of the remote instance.*/String path();/*** Returns the underlying actor with which is communicated** @return ActorRef of the target actor*/ActorRef actor();/*** Returns the leaderSessionID associated with the remote actor or null.** @return Leader session ID if its associated with this gateway, otherwise null*/UUID leaderSessionID();
}

  • ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGateway

AkkaActorGateway

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/instance/AkkaActorGateway.java

public class AkkaActorGateway implements ActorGateway, Serializable {private static final long serialVersionUID = 42L;// ActorRef of the remote instanceprivate final ActorRef actor;// Associated leader session ID, which is used for RequiresLeaderSessionID messagesprivate final UUID leaderSessionID;// Decorator for messagesprivate final MessageDecorator decorator;public AkkaActorGateway(ActorRef actor, UUID leaderSessionID) {this.actor = Preconditions.checkNotNull(actor);this.leaderSessionID = Preconditions.checkNotNull(leaderSessionID);// we want to wrap RequiresLeaderSessionID messages in a LeaderSessionMessagethis.decorator = new LeaderSessionMessageDecorator(leaderSessionID);}/*** Sends a message asynchronously and returns its response. The response to the message is* returned as a future.** @param message Message to be sent* @param timeout Timeout until the Future is completed with an AskTimeoutException* @return Future which contains the response to the sent message*/@Overridepublic Future ask(Object message, FiniteDuration timeout) {Object newMessage = decorator.decorate(message);return Patterns.ask(actor, newMessage, new Timeout(timeout));}/*** Sends a message asynchronously without a result.** @param message Message to be sent*/@Overridepublic void tell(Object message) {Object newMessage = decorator.decorate(message);actor.tell(newMessage, ActorRef.noSender());}/*** Sends a message asynchronously without a result with sender being the sender.** @param message Message to be sent* @param sender Sender of the message*/@Overridepublic void tell(Object message, ActorGateway sender) {Object newMessage = decorator.decorate(message);actor.tell(newMessage, sender.actor());}/*** Forwards a message. For the receiver of this message it looks as if sender has sent the* message.** @param message Message to be sent* @param sender Sender of the forwarded message*/@Overridepublic void forward(Object message, ActorGateway sender) {Object newMessage = decorator.decorate(message);actor.tell(newMessage, sender.actor());}/*** Retries to send asynchronously a message up to numberRetries times. The response to this* message is returned as a future. The message is re-sent if the number of retries is not yet* exceeded and if an exception occurred while sending it.** @param message Message to be sent* @param numberRetries Number of times to retry sending the message* @param timeout Timeout for each sending attempt* @param executionContext ExecutionContext which is used to send the message multiple times* @return Future of the response to the sent message*/@Overridepublic Future retry(Object message,int numberRetries,FiniteDuration timeout,ExecutionContext executionContext) {Object newMessage = decorator.decorate(message);return AkkaUtils.retry(actor,newMessage,numberRetries,executionContext,timeout);}/*** Returns the ActorPath of the remote instance.** @return ActorPath of the remote instance.*/@Overridepublic String path() {return actor.path().toString();}/*** Returns {@link ActorRef} of the target actor** @return ActorRef of the target actor*/@Overridepublic ActorRef actor() {return actor;}@Overridepublic UUID leaderSessionID() {return leaderSessionID;}@Overridepublic String toString() {return String.format("AkkaActorGateway(%s, %s)", actor.path(), leaderSessionID);}
}

  • AkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法

MessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/MessageDecorator.java

public interface MessageDecorator extends java.io.Serializable {/*** Decorates a message** @param message Message to decorate* @return Decorated message*/Object decorate(Object message);
}

  • MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecorator

LeaderSessionMessageDecorator

flink-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/messages/LeaderSessionMessageDecorator.java

public class LeaderSessionMessageDecorator implements MessageDecorator {private static final long serialVersionUID = 5359618147408392706L;/** Leader session ID with which the RequiresLeaderSessionID messages will be decorated */private final UUID leaderSessionID;/*** Sets the leader session ID with which the messages will be decorated.** @param leaderSessionID Leader session ID to be used for decoration*/public LeaderSessionMessageDecorator(UUID leaderSessionID) {this.leaderSessionID = leaderSessionID;}@Overridepublic Object decorate(Object message) {if (message instanceof RequiresLeaderSessionID) {return new JobManagerMessages.LeaderSessionMessage(leaderSessionID, message);} else {return message;}}
}

  • LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的message

JobManagerMessages.LeaderSessionMessage

flink-1.7.2/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala

object JobManagerMessages {/** Wrapper class for leader session messages. Leader session messages implement the* [[RequiresLeaderSessionID]] interface and have to be wrapped in a [[LeaderSessionMessage]],* which also contains the current leader session ID.** @param leaderSessionID Current leader session ID* @param message [[RequiresLeaderSessionID]] message to be wrapped in a [[LeaderSessionMessage]]*/case class LeaderSessionMessage(leaderSessionID: UUID, message: Any)//......
}

  • JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及message

小结

  • ActorGateway接口定义了ask、tell、forward、retry、path、actor、leaderSessionID方法;它有一个实现类为AkkaActorGateway
  • AkkaActorGateway实现了ActorGateway接口,它的构造器要求输入ActorRef及leaderSessionID,同时基于leaderSessionID创建了LeaderSessionMessageDecorator;ask、tell、forward、retry方法均首先调用LeaderSessionMessageDecorator.decorate方法包装message参数,然后再去调用ActorRef的相应方法
  • MessageDecorator接口定义了decorate方法用于包装message,它有一个实现类为LeaderSessionMessageDecorator;LeaderSessionMessageDecorator实现了MessageDecorator接口,其decorate方法判断message是RequiresLeaderSessionID类型的话,则返回JobManagerMessages.LeaderSessionMessage,否则返回原始的message;JobManagerMessages.LeaderSessionMessage是一个case class,它有两个属性,分别是leaderSessionID及message

doc

  • ActorGateway

转:https://my.oschina.net/go4it/blog/3023351



推荐阅读
author-avatar
柔柔的爱2502880187
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有