热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

《高级》Flink异步io链接RedisJava和Scala版

最近发现好多小伙伴不知道如何异步链接redis我准备了两个版本java版本和scala版本直接上代码,大部分同学看了应该会懂刚开始学习flink的同学中间细节的

最近发现好多小伙伴不知道如何异步链接redis

我准备了两个版本 java版本和scala版本

直接上代码,大部分同学看了应该会懂

刚开始学习flink的同学中间细节的东西,不明白的可以微信联系我,可以进入我的flink微信交流群。

 

喜欢flink的朋友,支持一下原创,可以关注我的公众号:

 

先看java版本

import com.alibaba.fastjson.JSON;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.configuration.Configuration;import org.apache.flink.streaming.api.datastream.AsyncDataStream;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.async.ResultFuture;import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPool;import redis.clients.jedis.JedisPoolConfig;import java.util.Collections;import java.util.Properties;import java.util.concurrent.CompletableFuture;import java.util.concurrent.TimeUnit;import java.util.function.Supplier;
public class AsynFlinkRedisJava { public static void main(String args[]) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(500); Properties props = new Properties(); props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("zookeeper.connect", "localhost:2181"); props.setProperty("group.id", "flink-kafka"); FlinkKafkaConsumer08 consumer = new FlinkKafkaConsumer08("flink1", new SimpleStringSchema(), props); DataStream stream = env.addSource(consumer); DataStream resultStream = AsyncDataStream.unorderedWait(stream, new AsyncRedis(),1000, TimeUnit.MICROSECONDS, 100); resultStream.print(); env.execute("kaishi"); }}class AsyncRedis extends RichAsyncFunction { private transient JedisPool pool; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); pool= new JedisPool(new JedisPoolConfig(), "localhost", 6379);    } @Override public void asyncInvoke(String input, final ResultFuture resultFuture) throws Exception {        CompletableFuture.supplyAsync(new Supplier() { @Override public String get() { try { String imei = JSON.parseObject(input).get("imei").toString(); Jedis jedis = pool.getResource(); String result = jedis.hget("DC_IMEI_APPID",imei); pool.returnResource(jedis); return result; } catch (Exception e) { System.out.println(e); return null; } } }).thenAccept( (String dbResult) -> { resultFuture.complete(Collections.singleton(dbResult)); }); }}

再看scala版本:

import java.util.Propertiesimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment, _}import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08import com.alibaba.fastjson.JSONimport redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig}import java.util.concurrent.TimeUnitimport org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}import scala.concurrent.{ExecutionContext, Future}object AsynFlinkRedis { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() //kafka位置 老版本的 kafka是配置zookeeper地址 properties.setProperty("bootstrap.servers","localhost:9092") properties.setProperty("zookeeper.connect","localhost:2181") val topic = "flink1" properties.setProperty("group.id", "test-flink") val kafkStream = new FlinkKafkaConsumer08(topic,new SimpleStringSchema(),properties) val stream = env.addSource(kafkStream) stream.print() val resultStream=AsyncDataStream.unorderedWait(stream,new RedisAsyncFunction(), 1000, TimeUnit.MILLISECONDS, 100) resultStream.print() env.execute()  }}class RedisAsyncFunction extends  AsyncFunction[String,String]{ lazy val pool = new JedisPool(new JedisPoolConfig,"localhost",6379) override def asyncInvoke(input: String, resultFuture: ResultFuture[String]): Unit = { Future { //获取kafka日志的imei号 val imei = JSON.parseObject(input).get("imei").toString //从redis中获取imei对应的userid println(pool.getNumActive) val jedis = pool.getResource val useridJson =jedis.hget("DC_IMEI_APPID",imei) print(useridJson) resultFuture.complete(Seq(useridJson)) pool.returnResource(jedis) }(ExecutionContext.global)}}

kafka练习日志:

{"accStatus":"NULL","addr":"","alertType":"stayAlert","fenceId":"NULL","gpsTime":"2019-01-29 23:45:01","iccid":"NULL","imei":"868620190220000","imsi":"NULL","lat":"46.795862","lng":"134.011538","offlineTime":"NULL","postTime":"2019-01-30 00:00:00","time":"NULL","type":"DEVICE"}

redis测试数据:

hset DC_IMEI_APPID 868620190220000 "{\"allFullId\":\"1,130396,130395,129659\",\"appId\":\"TRACKER\",\"userId\":\"129659\",\"mcType\":\"GT06N\",\"timeZone\":\"UTCA08:00\"}"


推荐阅读
  • 浅析python实现布隆过滤器及Redis中的缓存穿透原理_python
    本文带你了解了位图的实现,布隆过滤器的原理及Python中的使用,以及布隆过滤器如何应对Redis中的缓存穿透,相信你对布隆过滤 ... [详细]
  • 从0到1搭建大数据平台
    从0到1搭建大数据平台 ... [详细]
  • 秒建一个后台管理系统?用这5个开源免费的Java项目就够了
    秒建一个后台管理系统?用这5个开源免费的Java项目就够了 ... [详细]
  • 本文介绍了如何使用Python的Paramiko库批量更新多台服务器的登录密码。通过示例代码展示了具体实现方法,确保了操作的高效性和安全性。Paramiko库提供了强大的SSH2协议支持,使得远程服务器管理变得更加便捷。此外,文章还详细说明了代码的各个部分,帮助读者更好地理解和应用这一技术。 ... [详细]
  • 在本地环境中部署了两个不同版本的 Flink 集群,分别为 1.9.1 和 1.9.2。近期在尝试启动 1.9.1 版本的 Flink 任务时,遇到了 TaskExecutor 启动失败的问题。尽管 TaskManager 日志显示正常,但任务仍无法成功启动。经过详细分析,发现该问题是由 Kafka 版本不兼容引起的。通过调整 Kafka 客户端配置并升级相关依赖,最终成功解决了这一故障。 ... [详细]
  • 本章节在上一章的基础上,深入探讨了如何通过引入机器人实现自动聊天、表情包回应以及Adidas官方账号的自动抽签功能。具体介绍了使用wxpy库进行微信机器人的开发,优化了智能回复系统的性能和用户体验。通过详细的代码示例和实践操作,展示了如何实现这些高级功能,进一步提升了机器人的智能化水平。 ... [详细]
  • 本文全面解析了 gRPC 的基础知识与高级应用,从 helloworld.proto 文件入手,详细阐述了如何定义服务接口。例如,`Greeter` 服务中的 `SayHello` 方法,该方法在客户端和服务器端的消息交互中起到了关键作用。通过实例代码,读者可以深入了解 gRPC 的工作原理及其在实际项目中的应用。 ... [详细]
  • 本题库精选了Java核心知识点的练习题,旨在帮助学习者巩固和检验对Java理论基础的掌握。其中,选择题部分涵盖了访问控制权限等关键概念,例如,Java语言中仅允许子类或同一包内的类访问的访问权限为protected。此外,题库还包括其他重要知识点,如异常处理、多线程、集合框架等,全面覆盖Java编程的核心内容。 ... [详细]
  • 在稀疏直接法视觉里程计中,通过优化特征点并采用基于光度误差最小化的灰度图像线性插值技术,提高了定位精度。该方法通过对空间点的非齐次和齐次表示进行处理,利用RGB-D传感器获取的3D坐标信息,在两帧图像之间实现精确匹配,有效减少了光度误差,提升了系统的鲁棒性和稳定性。 ... [详细]
  • 本文探讨了如何在 Java 中将多参数方法通过 Lambda 表达式传递给一个接受 List 的 Function。具体分析了 `OrderUtil` 类中的 `runInBatches` 方法及其使用场景。 ... [详细]
  • 【实例简介】本文详细介绍了如何在PHP中实现微信支付的退款功能,并提供了订单创建类的完整代码及调用示例。在配置过程中,需确保正确设置相关参数,特别是证书路径应根据项目实际情况进行调整。为了保证系统的安全性,存放证书的目录需要设置为可读权限。值得注意的是,普通支付操作无需证书,但在执行退款操作时必须提供证书。此外,本文还对常见的错误处理和调试技巧进行了说明,帮助开发者快速定位和解决问题。 ... [详细]
  • 当前,众多初创企业对全栈工程师的需求日益增长,但市场中却存在大量所谓的“伪全栈工程师”,尤其是那些仅掌握了Node.js技能的前端开发人员。本文旨在深入探讨全栈工程师在现代技术生态中的真实角色与价值,澄清对这一角色的误解,并强调真正的全栈工程师应具备全面的技术栈和综合解决问题的能力。 ... [详细]
  • 在使用sbt构建项目时,遇到了“对象apache不是org软件包的成员”的错误。本文详细分析了该问题的原因,并提供了有效的解决方案,包括检查依赖配置、清理缓存和更新sbt插件等步骤,帮助开发者快速解决问题。 ... [详细]
  • 在处理大文件上传时,服务端为何无法直接接收?这主要与 PHP 配置文件 `php.ini` 中的几个关键参数有关,如 `upload_max_filesize` 和 `post_max_size`。这些参数分别限制了单个文件的最大上传大小和整个 POST 请求的数据量。为了实现大文件的高效上传,可以通过文件分割与分片上传的方法来解决。本文将详细介绍这一实现方法,并提供相应的代码示例,帮助开发者更好地理解和应用这一技术。 ... [详细]
  • Matplotlib在数据科学中的可视化应用与技术解析
    Matplotlib和数据可视化 数据的处理、分析和可视化已经成为Python近年来最为重要的应用领域之一,其中数据的可视化指的是将数据呈现为漂亮的统计图表ÿ ... [详细]
author-avatar
大市低开_127
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有