热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

flink异步iomysql缓存_Flink异步IO实战

基本概念首先通过官网的一个图片了解一下AsynchronousIOOperationFlinksource收到一条数据就会进行处理,如果需要通过这条数据关联外部数据源

基本概念

首先通过官网的一个图片了解一下Asynchronous I/O Operation

cefc245e51c7

Flink source收到一条数据就会进行处理,如果需要通过这条数据关联外部数据源,例如mysql,在发出查询请求后,同步IO的方式是会等待查询结果再处理下一条数据的查询,也就是每一条数据都要等待上一个查询结束。而异步IO是指数据来了以后发出查询请求,先不等查询结果,直接继续发送下一条的查询请求,对于查询结果是异步返回的,返回结果之后再进入下一个算子的计算。这两种方式性能差距请看下的样例。

样例

代码传送门

生成6条数据,从0开始递增的6个数字。模拟异步查询之后,加上时间戳输出

public class AsyncIODemo {

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setParallelism(1);

final int maxCount = 6;

final int taskNum = 1;

final long timeout = 40000;

DataStream inputStream = env.addSource(new SimpleSource(maxCount));

AsyncFunction function = new SampleAsyncFunction();

DataStream result = AsyncDataStream.unorderedWait(

inputStream,

function,

timeout,

TimeUnit.MILLISECONDS,

10).setParallelism(taskNum);

result.map(new MapFunction() {

@Override

public String map(String value) throws Exception {

return value + "," + System.currentTimeMillis();

}

}).print();

env.execute("Async IO Demo");

}

private static class SimpleSource implements SourceFunction {

private volatile boolean isRunning = true;

private int counter = 0;

private int start = 0;

public SimpleSource(int maxNum) {

this.counter = maxNum;

}

@Override

public void run(SourceContext ctx) throws Exception {

while ((start

synchronized (ctx.getCheckpointLock()) {

System.out.println("send data:" + start);

ctx.collect(start);

++start;

}

Thread.sleep(10L);

}

}

@Override

public void cancel() {

isRunning = false;

}

}

}

异步方法

public class SampleAsyncFunction extends RichAsyncFunction {

private long[] sleep = {100L, 1000L, 5000L, 2000L, 6000L, 100L};

@Override

public void open(Configuration parameters) throws Exception {

super.open(parameters);

}

@Override

public void close() throws Exception {

super.close();

}

@Override

public void asyncInvoke(final Integer input, final ResultFuture resultFuture) {

System.out.println(System.currentTimeMillis() + "-input:" + input + " will sleep " + sleep[input] + " ms");

query(input, resultFuture);

}

private void query(final Integer input, final ResultFuture resultFuture) {

try {

Thread.sleep(sleep[input]);

resultFuture.complete(Collections.singletonList(String.valueOf(input)));

} catch (InterruptedException e) {

resultFuture.complete(new ArrayList<>(0));

}

}

private void asyncQuery(final Integer input, final ResultFuture resultFuture) {

CompletableFuture.supplyAsync(new Supplier() {

&#64;Override

public Integer get() {

try {

Thread.sleep(sleep[input]);

return input;

} catch (Exception e) {

return null;

}

}

}).thenAccept((Integer dbResult) -> {

resultFuture.complete(Collections.singleton(String.valueOf(dbResult)));

});

}

}

上面的代码中有两个方法query()和asyncQuery()&#xff0c;其中Thread.sleep(sleep[input]);用来模拟查询需要等待的时间&#xff0c;每条数据等待的时间分别为100L, 1000L, 5000L, 2000L, 6000L, 100L毫秒。

结果分析

运行query()的结果为

send data:0

send data:1

send data:2

send data:3

send data:4

send data:5

1577801193230-input:0 will sleep 100 ms

1577801193331-input:1 will sleep 1000 ms

0,1577801194336

1,1577801194336

1577801194336-input:2 will sleep 5000 ms

1577801199339-input:3 will sleep 2000 ms

2,1577801201341

1577801201342-input:4 will sleep 6000 ms

3,1577801207345

4,1577801207345

1577801207346-input:5 will sleep 100 ms

5,1577801207451

可以看到第一条数据进入到map算子的时间与最后一条相差了13115毫秒&#xff0c;执行的顺序与source中数据的顺序一致&#xff0c;并且是串行的。

运行asyncQuery()的结果为

send data:0

send data:1

send data:2

send data:3

1577802161755-input:0 will sleep 100 ms

1577802161756-input:1 will sleep 1000 ms

1577802161757-input:2 will sleep 5000 ms

send data:4

send data:5

1577802161783-input:3 will sleep 2000 ms

1577802161784-input:4 will sleep 6000 ms

1577802161785-input:5 will sleep 100 ms

0,1577802161859

1,1577802162759

3,1577802163862

5,1577802163962

2,1577802166760

4,1577802168762

同样第一条数据进入map算子的时间与最后一条仅相差了6903毫秒&#xff0c;而且输出结果的顺序并不是source中的顺序&#xff0c;而是按照查询时间递增的顺序输出&#xff0c;并且查询请求几乎是同一时间发出的。

通过上面的例子可以看出&#xff0c;flink所谓的异步IO&#xff0c;并不是只要实现了asyncInvoke方法就是异步了&#xff0c;这个方法并不是异步的&#xff0c;而是要依靠这个方法里面所写的查询是异步的才可以。否则像是上面query()方法那样&#xff0c;同样会阻塞查询相当于同步IO。在实现flink异步IO的时候一定要注意。官方文档也给出了相关的说明。

For example, the following patterns result in a blocking asyncInvoke(...) functions and thus void the asynchronous behavior:Using a database client whose lookup/query method call blocks until the result has been received back

总结

本文基于flink 1.9.0。通过样例介绍了如何实现flink异步IO&#xff0c;读者可以修改本文样例体验异步IO其他的特性&#xff0c;例如Order of Results或者Event Time。



推荐阅读
  • Java太阳系小游戏分析和源码详解
    本文介绍了一个基于Java的太阳系小游戏的分析和源码详解。通过对面向对象的知识的学习和实践,作者实现了太阳系各行星绕太阳转的效果。文章详细介绍了游戏的设计思路和源码结构,包括工具类、常量、图片加载、面板等。通过这个小游戏的制作,读者可以巩固和应用所学的知识,如类的继承、方法的重载与重写、多态和封装等。 ... [详细]
  • [大整数乘法] java代码实现
    本文介绍了使用java代码实现大整数乘法的过程,同时也涉及到大整数加法和大整数减法的计算方法。通过分治算法来提高计算效率,并对算法的时间复杂度进行了研究。详细代码实现请参考文章链接。 ... [详细]
  • Java学习笔记之面向对象编程(OOP)
    本文介绍了Java学习笔记中的面向对象编程(OOP)内容,包括OOP的三大特性(封装、继承、多态)和五大原则(单一职责原则、开放封闭原则、里式替换原则、依赖倒置原则)。通过学习OOP,可以提高代码复用性、拓展性和安全性。 ... [详细]
  • 本文介绍了iOS数据库Sqlite的SQL语句分类和常见约束关键字。SQL语句分为DDL、DML和DQL三种类型,其中DDL语句用于定义、删除和修改数据表,关键字包括create、drop和alter。常见约束关键字包括if not exists、if exists、primary key、autoincrement、not null和default。此外,还介绍了常见的数据库数据类型,包括integer、text和real。 ... [详细]
  • 本文介绍了如何在给定的有序字符序列中插入新字符,并保持序列的有序性。通过示例代码演示了插入过程,以及插入后的字符序列。 ... [详细]
  • JavaSE笔试题-接口、抽象类、多态等问题解答
    本文解答了JavaSE笔试题中关于接口、抽象类、多态等问题。包括Math类的取整数方法、接口是否可继承、抽象类是否可实现接口、抽象类是否可继承具体类、抽象类中是否可以有静态main方法等问题。同时介绍了面向对象的特征,以及Java中实现多态的机制。 ... [详细]
  • Spring特性实现接口多类的动态调用详解
    本文详细介绍了如何使用Spring特性实现接口多类的动态调用。通过对Spring IoC容器的基础类BeanFactory和ApplicationContext的介绍,以及getBeansOfType方法的应用,解决了在实际工作中遇到的接口及多个实现类的问题。同时,文章还提到了SPI使用的不便之处,并介绍了借助ApplicationContext实现需求的方法。阅读本文,你将了解到Spring特性的实现原理和实际应用方式。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • 本文介绍了Java高并发程序设计中线程安全的概念与synchronized关键字的使用。通过一个计数器的例子,演示了多线程同时对变量进行累加操作时可能出现的问题。最终值会小于预期的原因是因为两个线程同时对变量进行写入时,其中一个线程的结果会覆盖另一个线程的结果。为了解决这个问题,可以使用synchronized关键字来保证线程安全。 ... [详细]
  • 个人学习使用:谨慎参考1Client类importcom.thoughtworks.gauge.Step;importcom.thoughtworks.gauge.T ... [详细]
  • 本文详细介绍了Java中vector的使用方法和相关知识,包括vector类的功能、构造方法和使用注意事项。通过使用vector类,可以方便地实现动态数组的功能,并且可以随意插入不同类型的对象,进行查找、插入和删除操作。这篇文章对于需要频繁进行查找、插入和删除操作的情况下,使用vector类是一个很好的选择。 ... [详细]
  • 从零学Java(10)之方法详解,喷打野你真的没我6!
    本文介绍了从零学Java系列中的第10篇文章,详解了Java中的方法。同时讨论了打野过程中喷打野的影响,以及金色打野刀对经济的增加和线上队友经济的影响。指出喷打野会导致线上经济的消减和影响队伍的团结。 ... [详细]
  • 猜字母游戏
    猜字母游戏猜字母游戏——设计数据结构猜字母游戏——设计程序结构猜字母游戏——实现字母生成方法猜字母游戏——实现字母检测方法猜字母游戏——实现主方法1猜字母游戏——设计数据结构1.1 ... [详细]
  • Html5-Canvas实现简易的抽奖转盘效果
    本文介绍了如何使用Html5和Canvas标签来实现简易的抽奖转盘效果,同时使用了jQueryRotate.js旋转插件。文章中给出了主要的html和css代码,并展示了实现的基本效果。 ... [详细]
  • Java中包装类的设计原因以及操作方法
    本文主要介绍了Java中设计包装类的原因以及操作方法。在Java中,除了对象类型,还有八大基本类型,为了将基本类型转换成对象,Java引入了包装类。文章通过介绍包装类的定义和实现,解答了为什么需要包装类的问题,并提供了简单易用的操作方法。通过本文的学习,读者可以更好地理解和应用Java中的包装类。 ... [详细]
author-avatar
七楼居民_651
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有