热门标签 | HotTags
当前位置:  开发笔记 > 运维 > 正文

Akka2使用探索4(Actors)

ask异步发送一条消息并返回一个Future代表一个可能的回应。需要采用Future的处理模式。每一个消息发送者分别保证自己的消息的次序.try{Stringresultoperation();getSender().tell(result);}catch(Exceptione){getSender().tell(newakka.a

ask 异步发送一条消息并返回一个 Future 代表一个可能的回应。需要采用Future的处理模式。 每一个消息发送者分别保证自己的消息的次序. try { String result = operation(); getSender().tell(result); } catch (Exception e) { getSender().tell(new akka.a

ask 异步发送一条消息并返回一个 Future代表一个可能的回应。需要采用Future的处理模式。

每一个消息发送者分别保证自己的消息的次序. try {
String result = operation();
getSender().tell(result);
} catch (Exception e) {
getSender().tell(new akka.actor.Status.Failure(e));
throw e;
}

ask使用方式如下:

    List futures = []
        AkkaClientNoReply client = new AkkaClientNoReply("akka://xw@127.0.0.1:8888/user/server")
        client.send("hello")
        0.upto(15) {
            futures <
        //模拟客户端给服务端发0——15消息,服务器处理(把数&#20540;&#43;1返回给客户端)
        }

        final Future aggregate = Futures.sequence(futures, client.system.dispatcher());
        final Future transformed = aggregate.map(new Mapper() {
            public Integer apply(Iterable coll) {
                final Iterator it = coll.iterator();
                int count = 0;
                while (it.hasNext()) {
                    int x = (Integer) it.next();
                    count = count &#43; x
                }
                return new Integer(count);
            }
        });

        AkkaServerApp app = new AkkaServerApp("resultHandler", "127.0.0.1", 6666, "result")
        app.messageProcessor = {msg, UntypedActorContext context ->
            log.info("1到16之和为" &#43; msg)
        }
        app.startup()

        akka.pattern.Patterns.pipe(transformed).to(app.serverActor)


如果服务端处理消息时发生了异常而导致没有给客户端回应,那么客户端收到的结果将会收到Timeout的Failure:Failure(akka.pattern.AskTimeoutException: Timed out)。可以将异常捕获用Failure封装异常发给客户端:actor.tell(new akka.actor.Status.Failure(e))。

Future的onComplete, onResult, 或 onTimeout 方法可以用来注册一个回调,以便在Future完成时得到通知。从而提供一种避免阻塞的方法。

警告

在使用future回调如 onComplete, onSuccess, and onFailure时, 在actor内部你要小心避免捕捉该actor的引用, i.e. 不要在回调中调用该actor的方法或访问其可变状态。这会破坏actor的封装,会引用同步bug和race condition, 因为回调会与此actor一同被并发调度。 不幸的是目前还没有一种编译时的方法能够探测到这种非法访问。

转发消息

你可以将消息从一个actor转发给另一个。虽然经过了一个‘中转’,但最初的发送者地址/引用将保持不变。当实现功能类&#20284;路由器、负载均衡器、备份等的actor时会很有用。

myActor.forward(message, getContext());

回应消息

getSender().tell(replyMsg)

如果没有sender (不是从actor发送的消息或者没有future上下文) 那么 sender 缺省为“dead-letter” actor的引用.

初始化接收消息超时

设置receiveTimeout 属性并声明一个处理 ReceiveTimeout 对象的匹配分支。

public class MyReceivedTimeoutUntypedActor extends UntypedActor {
public MyReceivedTimeoutUntypedActor() {
getContext().setReceiveTimeout(Duration.parse("30 seconds"));
}
public void onReceive(Object message) {
if (message.equals("Hello")) {
getSender().tell("Hello world");
} else if (message == Actors.receiveTimeout()) {
throw new RuntimeException("received timeout");
} else {
unhandled(message);
}
}
}

终止Actor

通过调用ActorRefFactory i.e. ActorContextActorSystemstop 方法来终止一个actor , 通常 context 用来终止子actor,而 system 用来终止顶级actor. 实际的终止操作是异步执行的, i.e. stop 可能在actor被终止之前返回。

如果当前有正在处理的消息,对该消息的处理将在actor被终止之前完成,但是邮箱中的后续消息将不会被处理。缺省情况下这些消息会被送到 ActorSystem死信, 但是这取决于邮箱的实现。

actor的终止分两步: 第一步actor将停止对邮箱的处理,向所有子actor发送终止命令,然后处理来自子actor的终止消息直到所有的子actor都完成终止, 最后终止自己 (调用 postStop, 销毁邮箱, 向 DeathWatch 发布 Terminated, 通知其监管者). 这个过程保证actor系统中的子树以一种有序的方式终止, 将终止命令传播到叶子结点并收集它们回送的确认消息给被终止的监管者。如果其中某个actor没有响应 (i.e. 由于处理消息用了太长时间以至于没有收到终止命令), 整个过程将会被阻塞。

ActorSystem.shutdown被调用时, 系统根监管actor会被终止,以上的过程将保证整个系统的正确终止。

postStop hook 是在actor被完全终止以后调用。

PoisonPill

你也可以向actor发送 akka.actor.PoisonPill 消息, 这个消息处理完成后actor会被终止。 PoisonPill 与普通消息一样被放进队列,因此会在已经入队列的其它消息之后被执行。

优雅地终止

如果你想等待终止过程的结束,或者组合若干actor的终止次序,可以使用gracefulStop:

try {

Future stopped = akka.pattern.Patterns.gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system);

Await.result(stopped, Duration.create(6, TimeUnit.SECONDS));

// the actor has been stopped

} catch (ActorTimeoutException e) {

// the actor wasn't stopped within 5 seconds

}

热拔插 Become/Unbecome

升级 Upgrade

Akka支持在运行时对Actor消息循环 (e.g. 的实现)进行实时替换: 在actor中调用 context.become 方法。

Become 要求一个 akka.japi.Procedure 参数作为新的消息处理实现。 被替换的代码被存在一个栈中,可以被push和pop。

降级

由于被热替换掉的代码存在栈中,你也可以对代码进行降级,只需要在actor中调用 context.unbecome 方法。

Killing actor

发送Kill消息给actor

Actor 与 异常

在消息被actor处理的过程中可能会抛出异常,例如数据库异常。

消息会怎样

如果消息处理过程中(即从邮箱中取出并交给receive后)发生了异常,这个消息将被丢失。必须明白它不会被放回到邮箱中。所以如果你希望重试对消息的处理,你需要自己抓住异常然后在异常处理流程中重试. 请确保你限制重试的次数,因为你不会希望系统产生活锁 (从而消耗大量CPU而于事无补)。

邮箱会怎样

如果消息处理过程中发生异常,邮箱没有任何变化。如果actor被重启,邮箱会被保留。邮箱中的所有消息不会丢失。

actor会怎样

如果抛出了异常,actor实例将被丢弃而生成一个新的实例。这个新的实例会被该actor的引用所引用(所以这个过程对开发人员来说是不可见的)。注意这意味着如果你不在preRestart 回调中进行保存,并在postRestart回调中恢复,那么失败的actor实例的当前状态会被丢失。

推荐阅读
  • 在JavaWeb项目架构中,NFS(网络文件系统)的实现与优化是关键环节。NFS允许不同主机系统通过局域网共享文件和目录,提高资源利用率和数据访问效率。本文详细探讨了NFS在JavaWeb项目中的应用,包括配置、性能优化及常见问题的解决方案,旨在为开发者提供实用的技术参考。 ... [详细]
  • 第二章:Kafka基础入门与核心概念解析
    本章节主要介绍了Kafka的基本概念及其核心特性。Kafka是一种分布式消息发布和订阅系统,以其卓越的性能和高吞吐量而著称。最初,Kafka被设计用于LinkedIn的活动流和运营数据处理,旨在高效地管理和传输大规模的数据流。这些数据主要包括用户活动记录、系统日志和其他实时信息。通过深入解析Kafka的设计原理和应用场景,读者将能够更好地理解其在现代大数据架构中的重要地位。 ... [详细]
  • 2016-2017学年《网络安全实战》第三次作业
    2016-2017学年《网络安全实战》第三次作业总结了教材中关于网络信息收集技术的内容。本章主要探讨了网络踩点、网络扫描和网络查点三个关键步骤。其中,网络踩点旨在通过公开渠道收集目标信息,为后续的安全测试奠定基础,而不涉及实际的入侵行为。 ... [详细]
  • 解读中台架构:微服务与分布式技术的区别及应用
    中心化与去中心化是长期讨论的话题。中心化架构的优势在于部署和维护相对简单,尤其在服务负载较为稳定的情况下,能够提供高效稳定的性能。然而,随着业务规模的扩大和技术需求的多样化,中心化架构的局限性逐渐显现,如扩展性和故障恢复能力较差。相比之下,微服务和分布式技术通过解耦系统组件,提高了系统的灵活性和可扩展性,更适合处理复杂多变的业务场景。本文将深入探讨中台架构中微服务与分布式技术的区别及其应用场景,帮助读者更好地理解和选择适合自身业务的技术方案。 ... [详细]
  • Nginx不仅是一款轻量级的高性能Web服务器,还具备出色的负载均衡和反向代理功能。它支持复杂的正则匹配规则、动静内容分离以及灵活的URL重写功能,使得配置和管理更加便捷高效。此外,Nginx提供了多种负载均衡算法,如轮询、加权轮询、最少连接数等,以满足不同应用场景的需求。 ... [详细]
  • 近年来,BPM(业务流程管理)系统在国内市场逐渐普及,多家厂商在这一领域崭露头角。本文将对当前主要的BPM厂商进行概述,并分析其各自的优势。目前,市场上较为成熟的BPM产品主要分为两类:一类是综合型厂商,如IBM和SAP,这些企业在整体解决方案方面具有明显优势;另一类则是专注于BPM领域的专业厂商,它们在特定行业或应用场景中表现出色。通过对比分析,本文旨在为企业选择合适的BPM系统提供参考。 ... [详细]
  • 人人租机作为国内领先的信用免押租赁平台,为企业和个人提供全方位的新租赁服务。通过接入支付宝小程序功能,该平台实现了从零到百的迅猛增长,成为全国首家推出“新租赁小程序”开发服务的阿里巴巴小程序服务商(ISV)。这一创新举措不仅提升了用户体验,还显著增强了平台的市场竞争力。 ... [详细]
  • 期末Web开发综合实践项目:运用前端技术打造趣味小游戏体验
    期末Web开发综合实践项目中,学生通过运用HTML、CSS和JavaScript等前端技术,设计并实现了一款趣味性十足的小游戏。该项目不仅检验了学生对前端基础知识的掌握情况,还提升了他们的实际操作能力和创意设计水平。视频链接展示了项目的最终成果,直观呈现了游戏的互动性和视觉效果。 ... [详细]
  • 负载均衡基础概念与技术解析
    随着互联网应用的不断扩展,用户流量激增,业务复杂度显著提升,单一服务器已难以应对日益增长的负载需求。负载均衡技术应运而生,通过将请求合理分配到多个服务器,有效提高系统的可用性和响应速度。本文将深入探讨负载均衡的基本概念和技术原理,分析其在现代互联网架构中的重要性及应用场景。 ... [详细]
  • IIS 7及7.5版本中应用程序池的最佳配置策略与实践
    在IIS 7及7.5版本中,优化应用程序池的配置是提升Web站点性能的关键步骤。具体操作包括:首先定位到目标Web站点的应用程序池,然后通过“应用程序池”菜单找到对应的池,右键选择“高级设置”。在一般优化方案中,建议调整以下几个关键参数:1. **基本设置**: - **队列长度**:默认值为1000,可根据实际需求调整队列长度,以提高处理请求的能力。此外,还可以进一步优化其他参数,如处理器使用限制、回收策略等,以确保应用程序池的高效运行。这些优化措施有助于提升系统的稳定性和响应速度。 ... [详细]
  • ZeroMQ在云计算环境下的高效消息传递库第四章学习心得
    本章节深入探讨了ZeroMQ在云计算环境中的高效消息传递机制,涵盖客户端请求-响应模式、最近最少使用(LRU)队列、心跳检测、面向服务的队列、基于磁盘的离线队列以及主从备份服务等关键技术。此外,还介绍了无中间件的请求-响应架构,强调了这些技术在提升系统性能和可靠性方面的应用价值。个人理解方面,ZeroMQ通过这些机制有效解决了分布式系统中常见的通信延迟和数据一致性问题。 ... [详细]
  • 如何在主服务器响应变慢时,自动切换至备用服务器地址以确保服务连续性
    在主服务器响应速度下降时,如何通过编程实现自动切换至备用服务器地址,以确保服务的连续性和稳定性。本文将介绍一种基于PHP和MySQL的解决方案,通过监测主服务器的响应时间,当检测到延迟过高时,自动切换至备用服务器接口,从而保障系统的高可用性。 ... [详细]
  • 黄聪:MySQL主从复制配置,实现高效读写分离
    大型网站为应对高并发访问,不仅需要在前端实现分布式负载均衡,还需在数据业务和访问层采取有效措施。采用传统的数据结构已无法满足需求,通过配置MySQL主从复制,可实现高效的读写分离,显著提升系统性能和稳定性。 ... [详细]
  • 双因子安全机制与WiFi万能钥匙的较量:解析其背后的对抗策略
    几乎所有智能手机用户都熟悉类似“WiFi万能钥匙”的应用程序。这款应用凭借庞大的下载量,不仅在各大应用商店中占据显著位置,还长期稳居下载排行榜前列。然而,随着双因子认证等高级安全机制的普及,这类应用面临着前所未有的挑战。本文将深入探讨双因子安全机制与WiFi万能钥匙之间的对抗策略,分析其背后的技术原理和安全风险。 ... [详细]
  • 程序员们欢呼雀跃:微软正式推出GitHub安卓版应用预览版,新增暗黑模式支持
    近日,GitHub正式发布了面向Android用户的移动应用预览版,这一消息让众多程序员兴奋不已。新版本不仅支持暗黑模式,还优化了用户体验,为开发者提供了更加便捷的代码管理和协作工具。此前,GitHub已为iOS用户推出了相应的移动应用,此次Android版的发布进一步扩大了其在移动开发领域的影响力。 ... [详细]
author-avatar
心星Lover
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有