热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

在容器化环境中扩展分布式流式处理器\n

本文介绍了我们在Kubernetes中扩展分布式流处理器的经验。流处理器应该支持维护最佳的并行性。然而,添加更多的资源会带来额外的成本,但却不能保证性能的提升。相反,流处理器应该识别出资源需求级别,并进行相应的扩展。

关键要点

  • 流式处理器应该是可扩展的,用以满足流数据处理不断增长的业务需求。
  • 在容器化环境中扩展流式处理器必须在服务质量与相关成本之间做出权衡。
  • 流式处理器应该能够通过水平扩展在容器化环境(例如运行在云服务供应商上的Kubernetes)中利用这种权衡。
  • 在容器化环境中成功运行流式处理应用程序取决于为每个流式处理器配置的硬件资源。
  • 为容器化环境添加更多的硬件资源并不会带来性能的提升。

由于对事件流式处理(即流式处理)应用程序的需求在不断增长,数据流式处理近来已成为数据分析领域的主要范式之一。各种流式处理应用已经出现在各个行业中,如电信、交通流量管理、人群管理、健康信息学、网络安全、金融,等等。

流式处理器是一种软件平台,让用户能够更快地处理和响应传入的数据流。市场上有很多流式处理器可供选择,比如Flink、Heron、Kafka、Samza、Spark Streaming、Storm和WSO2 Stream Processor,它们都是开源流式处理器。

流式处理器的实时操作为提升系统性能提供了高质量的服务。大多数现代流式处理器只需要少数几个计算节点就可以处理90%的流式场景。但是,随着业务的扩展,大多数有利可图的企业需要处理越来越多的工作负载。因此,在选择流式处理器时,必须选择容易扩展且能够处理更大工作负载的解决方案。

流式处理器越来越多地被部署成云端的软件即服务(SaaS),例如Amazon Kinesis Data Analytics、Microsoft Azure Stream Analytics、Google Cloud Dataflow,等等。基于云的流式处理器为在其上运行的流式处理应用程序提供了弹性扩展能力。以容器为中心的管理环境(即容器化环境,例如Kubernetes)可以以可伸缩的方式运行流式处理应用程序。然而,由于硬件或软件环境的异构性和数据流的特性,在容器化环境中何时以及如何进行分布式流式处理器的缩放就成了一个非常重要的问题。本文介绍了一个数据密集型流式处理应用程序的实际应用场景,解释了如何在Kubernetes中进行系统的扩展,以及相关的权衡。我们使用WSO2 Stream Processor(WSO2 SP)作为示例流式处理器,因为它是一个开源的适合实现此类应用程序的云原生流式处理器。不过,我们认为相同的概念也同样适用于市场上其他云原生流式处理器。

我们将提供一个真实的流式处理示例,与检测恶意攻击有关,在这个示例中,有人试图进行未授权的Web服务器登录,并造成服务器拒绝服务(DoS攻击)。一旦检测到此类DoS攻击,系统就会向系统管理员发送警报,以便采取必要的安全措施。

使用Web服务器日志检测恶意攻击

可以使用Web服务器日志中捕获的HTTP日志事件来监控Web服务器的运行状态。如果连续出现相同的HTTP状态码(例如401未授权或403禁止访问),说明有人在尝试恶意登录Web服务器。401响应说明外部第三方尝试访问Web服务器的凭证已经被拒绝。状态码403说明服务器拒绝接受请求,尽管服务器知道如何处理该请求。

在容器化环境中扩展分布式流式处理器\n

图1:检测对Web服务器的恶意攻击

有几种不同的方法可用来处理这种情况。但是,对于这种情况,信息安全专家更喜欢收到实时警报。如果这种恶意请求在三秒钟内超过30次,并且访问率((未授权请求次数+禁止访问计数)/总请求次数)为1.0,则需要抛出警报。我们使用流式处理器开发了一个警报生成程序(如图1所示),这个处理器接收并处理来自Web服务器的日志事件。为了让系统具备可扩展性,它被部署在运行在Google Compute Engine(GCE)上的Kubernetes集群中。清单1显示了用Siddhi查询语言编写的流式SQL代码。我们将它称为Siddhi应用程序。

清单1:使用Siddhi流式SQL开发的恶意攻击检测程序

@App:name(\u0026quot;MaliciousAttacksDetection\u0026quot;)@App:description(\u0026quot;HTTP Log Processor for detecting malicious DoS attacks\u0026quot;)@source(type = 'kafka', partition.no.list='0', threading.option='single.thread', group.id=\u0026quot;group\u0026quot;, bootstrap.servers='kafka-service:9092', topic.list = 'attackDetectionTopic',        @map(type = 'json'))define stream inputStream ( iij_timestamp long, ip string, timestamp long, zone float, cik double, accession string, doc string, code float, size double, idx float, norefer float, noagent float, find float, crawler float, groupID int, browser string);--Output of query 1: I want to know the IP of the malicious hosts which try to make unauthorized login attempts in short [email protected](type='log')define stream outputStreamDoSAlert(ip string ,groupID int);--The Actual latency of parallel siddhi apps are getting started to measure at this [email protected](name = \u0026quot;Query1\u0026quot;)@dist(execGroup='group11' ,parallel ='1')from inputStreamselect iij_timestamp, ip, timestamp, zone, cik, accession, doc, code, size, idx, norefer, noagent, find, crawler, groupID, browser, convert(time:timestampInMilliseconds(),'long') as injected_iijtimestampinsert into interimInputStream;--Query 2: Here all the accesses are either 401 or 403 and they have been done at least 30 times within 3 seconds time [email protected](name = \u0026quot;Query2\u0026quot;)@dist(execGroup='group3', parallel ='12')partition with (groupID of interimInputStream)begin   from interimInputStream#window.timeBatch(timestamp, 3 sec)   select  ip, count() as totalAccessCount, sum(ifThenElse(code == 401F, 1, 0)) as unauthorizedCount, sum(ifThenElse(code == 403F, 1, 0)) as forbiddenCount,injected_iijtimestamp as iijtimestamp,groupID,timestamp   insert into #interimStream3;   from #interimStream3#throughput:throughput(iijtimestamp,\u0026quot;throughput\u0026quot;,3,6,\u0026quot;outputStreamDoSAlert\u0026quot;,30)   select ip, totalAccessCount, (unauthorizedCount + forbiddenCount)/totalAccessCount as accessPercentage ,groupID   insert into #interimStream5;   from #interimStream5 [totalAccessCount \u0026gt; 30L and accessPercentage == 1.0]   select ip ,groupID   insert into outputStreamDoSAlert;end;

我们将这个应用程序部署在分布式流式处理器中,如图2的部署架构图所示。每个组件(如Worker-1、Worker-2……)都部署为单个容器和单个Kubernetes pod。表1中列出了每个容器类别及其执行的任务。图2显示了整个系统被部署在六个Kubernetes节点中的方案。

表1:Kubernetes环境中不同类型容器执行的任务

在容器化环境中扩展分布式流式处理器\n
我们将应用程序部署在Google Compute Engine的Kubernetes环境中。对于特定的工作负载P,系统应该提供特定的服务质量(QoS)值Q。我们根据Worker的延迟数量来测量Q的值(延迟是指事件进入Worker和事件退出Worker之间的时间差)。下面列出了集群中部署的每个组件。

  • Web服务器托管在Node 1上;
  • 生产者组件托管在Node 2上,这个组件负责生成工作负载。它读取Web服务器日志并将它们发布到运行在Node 4上的Kafka实例;
  • Node 2和Node 4运行流式处理器的两个管理器;
  • 由gcloud自动生成的NFS托管在Node 3上;
  • Worker运行在Node 5和Node 6上,并负责处理实际的工作负载。它们从Kafka实例读取数据,应用流式处理操作,并将结果写回Kafka实例。

在容器化环境中扩展分布式流式处理器\n

图2:Kubernetes集群的部署架构。

但是,随着时间的推移,Web服务器上的工作负载也会增加,这是大多数Web服务器的典型特征。工作负载可能会从P增加到到2P、4P、……、16P,等等。在这种情况下,运行Web服务器监控系统的企业需要维护Q’(QoS属性的观察值),让它与Q相当。流式处理器应该能够扩展到足以维持预期的QoS级别。请注意,可伸缩性是系统处理不断增加的工作负载的能力。在本文中,我们着重关注负载可伸缩性,即随着流量的增加,系统能够正常运行。

有两种方法可以实现负载可伸缩性:强伸缩和弱伸缩。强伸缩在保持问题规模不变的同时增加处理器的数量,弱伸缩也会增加处理器数量,但保持每个处理器的问题规模不变。在本文中,我们采用了弱伸缩,因为我们遇到的是工作负载增加的情况。

实验

我们使用了一个运行在Google Compute Engine(GCE)上的Kubernetes集群。此外,我们使用的节点配备了2核CPU和7.5GB内存,用来托管pod。每个pod都有一个容器,每个容器中都部署了一个流式处理器(SP)组件。我们使用JDK 1.8、Kafka 2.0.1、WSO2-SP 4.3.0和MYSQL 5.7.4来构建docker镜像。它们都作为容器化的应用程序部署在集群中。每个实验需要40分钟时间,包括10分钟的预热。请注意,我们使用符号x-y-z来表示(节点数)-(Worker数)-(实例数)。

我们使用EDGAR日志文件数据集作为实验数据集,因为它已经包含CSV格式的Web服务器日志数据集。我们使用了EDGAR日志文件数据集log20170325.csv。这个CSV文件中包含了22,146,382个事件,文件总大小约为2.4GB,平均消息大小为144字节,每条记录有15个字段。清单2显示了EDGAR数据集中的头两个记录。

清单2:来自EDGAR日志文件数据集的头两个记录

ip,date,time,zone,cik,accession,extention,code,size,idx,norefer,noagent,find,crawler,browser100.14.44.eca,2017-03-25,00:00:00,0.0,1031093.0,0001137171-10-000013,-index.htm,200.0,7926.0,1.0,0.0,0.0,10.0,0.0,

我们分别测量了三个级别的性能,即节点级别、容器级别和流式处理器级别。不过,本文得出的结论是基于流式处理器级别的延迟和吞吐量。我们在group3 Siddhi执行组中测量了这些值。我们使用Kubernetes集群进行了六次不同的实验,得出了表2所示的结果。

在表2中,ID对应于唯一的实验标识符。节点数量对应于Kubernetes Worker的总数。实例数量是指Siddhi实例的数量。生产者数据速率(线程数)对应于生产者的数量,这些生产者通过从EDGAR数据文件读取HTTP日志事件来生成流事件。

结果

两个节点、6个Worker、6个实例(2-6-6)导致单个工作负载生产者P的平均延迟为390毫秒。这是最基本的情况。随着工作负载的增加,延迟显著增加,而吞吐量会随之降低。我们将工作负载生成器线程增加到16,用以生成很大的工作负载。场景2显示了这种情况。当我们将生成器线程增加16时,延迟增加了28.2%,吞吐量减少了29%。场景3表明,减少输入数据项中唯一组ID的数量会使事情变得更糟(特别是在吞吐量方面)。这是因为减少唯一组ID的数量会导致应用程序的并行性也减少。

如果我们将每个节点的Worker数量加倍,如场景4所示,我们将每个Worker的内存量减半。与情况1相比,吞吐量降低了三分之二,延迟增加了9倍。因此,Worker的内存使用量对应用程序的延迟起着重要作用。有时候,如果没有提供足够的内存,甚至无法部署Worker。

对于第5种场景,我们将Kubernetes Worker节点的数量增加到3个,总共有12个Worker,从而消除了性能瓶颈。现在添加了另一个节点(Node 7),每个节点有四个流式处理器。每个Worker需要1 GB内存,因此每个节点可以有4到5个Worker。因此,即使我们有3-6-6的系统设置(即三个节点,六个Worker和六个实例),只能获得2-6-6的运行性能。但是,2-12-12和3-12-12的性能特征会不一样。

添加更多的硬件资源也无助于是。我们可以从实验场景6中观察到这一点。虽然与场景5相比,节点数量增加了一倍,但我们得到了与场景5相似的平均延迟。如果为Worker添加更多的Siddhi部分应用程序,场景6的方法可能会有用,但这需要额外的内存。

表2:不同Kubernetes集群的性能
在容器化环境中扩展分布式流式处理器\n

当我们在Siddhi应用程序中使用分区时,Siddhi应用程序被分成若干个Siddhi应用程序,这些应用程序将根据分区属性的唯一值分布来获得工作负载。表2最右边的一列显示了groupID字段使用的属性惟一值的数量。在Siddhi应用程序中,我们使用groupID作为分区属性。因此,使用6个惟一值意味着只有6个部分Siddhi应用程序可以根据Siddhi的哈希映射获得工作负载。我们将惟一groupID的数量增加到12来增加并行性。这意味着流将被定向到12个Siddhi部分应用程序。由于这些原因,并行性增加了,我们在场景1中获得的延迟比场景3中的更好。

总结

可伸缩性是容器环境中的流式处理器面临的一个重大挑战,因为应用程序的工作负载会随着时间的推移而增加。本文分享了在Kubernetes环境中扩展此类分布式流式处理器的经验。为此,流式处理器应该提供某种编程语言或查询结构,以维持最佳的并行级别,而不管应用程序的初始规模如何。随着工作负载的增加,需要提供足够数量的硬件资源,让系统可以保持足够的QoS水平。添加更多资源会产生额外成本,但添加更多的资源并不能保证一定会获得性能的提升。流式处理器需要能够识别出资源需求级别,并扩展到可以保持最佳性能与成本比的级别。

关于作者

Sarangan Janakan是WSO2的实习软件工程师,目前是斯里兰卡莫拉图瓦大学计算机科学与工程系的三年级本科生。他的研究兴趣包括数据流式处理和云计算。

Miyuru Dayarathna是WSO2的高级技术主管。他是一名计算机科学家,在流式计算、图形数据管理和挖掘、云计算、性能工程、信息安全等方面做出了贡献。他还是斯里兰卡莫拉图瓦大学计算机科学与工程系的顾问。他最近在WSO2的研究重点是流式处理器和身份识别服务器。他已经在知名的国际期刊和会议上发表过技术论文。

查看英文原文:https://www.infoq.com/articles/distributed-stream-processor-container


推荐阅读
  • 关于我们EMQ是一家全球领先的开源物联网基础设施软件供应商,服务新产业周期的IoT&5G、边缘计算与云计算市场,交付全球领先的开源物联网消息服务器和流处理数据 ... [详细]
  • 本文介绍了如何使用php限制数据库插入的条数并显示每次插入数据库之间的数据数目,以及避免重复提交的方法。同时还介绍了如何限制某一个数据库用户的并发连接数,以及设置数据库的连接数和连接超时时间的方法。最后提供了一些关于浏览器在线用户数和数据库连接数量比例的参考值。 ... [详细]
  • 搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的详细步骤
    本文详细介绍了搭建Windows Server 2012 R2 IIS8.5+PHP(FastCGI)+MySQL环境的步骤,包括环境说明、相关软件下载的地址以及所需的插件下载地址。 ... [详细]
  • 如何使用Java获取服务器硬件信息和磁盘负载率
    本文介绍了使用Java编程语言获取服务器硬件信息和磁盘负载率的方法。首先在远程服务器上搭建一个支持服务端语言的HTTP服务,并获取服务器的磁盘信息,并将结果输出。然后在本地使用JS编写一个AJAX脚本,远程请求服务端的程序,得到结果并展示给用户。其中还介绍了如何提取硬盘序列号的方法。 ... [详细]
  • 知识图谱——机器大脑中的知识库
    本文介绍了知识图谱在机器大脑中的应用,以及搜索引擎在知识图谱方面的发展。以谷歌知识图谱为例,说明了知识图谱的智能化特点。通过搜索引擎用户可以获取更加智能化的答案,如搜索关键词"Marie Curie",会得到居里夫人的详细信息以及与之相关的历史人物。知识图谱的出现引起了搜索引擎行业的变革,不仅美国的微软必应,中国的百度、搜狗等搜索引擎公司也纷纷推出了自己的知识图谱。 ... [详细]
  • 本文介绍了RPC框架Thrift的安装环境变量配置与第一个实例,讲解了RPC的概念以及如何解决跨语言、c++客户端、web服务端、远程调用等需求。Thrift开发方便上手快,性能和稳定性也不错,适合初学者学习和使用。 ... [详细]
  • 加密世界下一个主流叙事领域:L2、跨链桥、GameFi等
    本文介绍了加密世界下一个主流叙事的七个潜力领域,包括L2、跨链桥、GameFi等。L2作为以太坊的二层解决方案,在过去一年取得了巨大成功,跨链桥和互操作性是多链Web3中最重要的因素。去中心化的数据存储领域也具有巨大潜力,未来云存储市场有望达到1500亿美元。DAO和社交代币将成为购买和控制现实世界资产的重要方式,而GameFi作为数字资产在高收入游戏中的应用有望推动数字资产走向主流。衍生品市场也在不断发展壮大。 ... [详细]
  • 如何在服务器主机上实现文件共享的方法和工具
    本文介绍了在服务器主机上实现文件共享的方法和工具,包括Linux主机和Windows主机的文件传输方式,Web运维和FTP/SFTP客户端运维两种方式,以及使用WinSCP工具将文件上传至Linux云服务器的操作方法。此外,还介绍了在迁移过程中需要安装迁移Agent并输入目的端服务器所在华为云的AK/SK,以及主机迁移服务会收集的源端服务器信息。 ... [详细]
  • SpringBoot整合SpringSecurity+JWT实现单点登录
    SpringBoot整合SpringSecurity+JWT实现单点登录,Go语言社区,Golang程序员人脉社 ... [详细]
  • 云原生边缘计算之KubeEdge简介及功能特点
    本文介绍了云原生边缘计算中的KubeEdge系统,该系统是一个开源系统,用于将容器化应用程序编排功能扩展到Edge的主机。它基于Kubernetes构建,并为网络应用程序提供基础架构支持。同时,KubeEdge具有离线模式、基于Kubernetes的节点、群集、应用程序和设备管理、资源优化等特点。此外,KubeEdge还支持跨平台工作,在私有、公共和混合云中都可以运行。同时,KubeEdge还提供数据管理和数据分析管道引擎的支持。最后,本文还介绍了KubeEdge系统生成证书的方法。 ... [详细]
  • PHP图片截取方法及应用实例
    本文介绍了使用PHP动态切割JPEG图片的方法,并提供了应用实例,包括截取视频图、提取文章内容中的图片地址、裁切图片等问题。详细介绍了相关的PHP函数和参数的使用,以及图片切割的具体步骤。同时,还提供了一些注意事项和优化建议。通过本文的学习,读者可以掌握PHP图片截取的技巧,实现自己的需求。 ... [详细]
  • PHP设置MySQL字符集的方法及使用mysqli_set_charset函数
    本文介绍了PHP设置MySQL字符集的方法,详细介绍了使用mysqli_set_charset函数来规定与数据库服务器进行数据传送时要使用的字符集。通过示例代码演示了如何设置默认客户端字符集。 ... [详细]
  • Java序列化对象传给PHP的方法及原理解析
    本文介绍了Java序列化对象传给PHP的方法及原理,包括Java对象传递的方式、序列化的方式、PHP中的序列化用法介绍、Java是否能反序列化PHP的数据、Java序列化的原理以及解决Java序列化中的问题。同时还解释了序列化的概念和作用,以及代码执行序列化所需要的权限。最后指出,序列化会将对象实例的所有字段都进行序列化,使得数据能够被表示为实例的序列化数据,但只有能够解释该格式的代码才能够确定数据的内容。 ... [详细]
  • 怀疑是每次都在新建文件,具体代码如下 ... [详细]
  • 在重复造轮子的情况下用ProxyServlet反向代理来减少工作量
    像不少公司内部不同团队都会自己研发自己工具产品,当各个产品逐渐成熟,到达了一定的发展瓶颈,同时每个产品都有着自己的入口,用户 ... [详细]
author-avatar
手机用户2502858383_827
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有