最近看了大佬的博客,突然想起Async I/O方式是Blink 推给社区的一大重要功能,可以使用异步的方式获取外部数据,想着自己实现以下,项目上用的时候,可以不用现去找了。
最开始想用scala 实现一个读取 hbase数据的demo,参照官网demo:
/**
* An implementation of the 'AsyncFunction' that sends requests and sets the callback.
*/
class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
/** The database specific client that can issue concurrent requests with callbacks */
lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/** The context used for the future callbacks */
implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
// issue the asynchronous request, receive a future for the result
val resultFutureRequested: Future[String] = client.query(str)
// set the callback to be executed once the request by the client is complete
// the callback simply forwards the result to the result future
resultFutureRequested.onSuccess {
case result: String => resultFuture.complete(Iterable((str, result)))
}
}
}
// create the original stream
val stream: DataStream[String] = ...
// apply the async I/O transformation
val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
失败了,上图标红的部分实现不了
1、Future 找不到可以用的实现类
2、unorderedWait 一直报错
源码example 里面也有Scala 的案例
def main(args: Array[String]) {
val timeout = 10000L
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = env.addSource(new SimpleSource())
val asyncMapped = AsyncDataStream.orderedWait(input, timeout, TimeUnit.MILLISECONDS, 10) {
(input, collector: ResultFuture[Int]) =>
Future {
collector.complete(Seq(input))
} (ExecutionContext.global)
}
asyncMapped.print()
env.execute("Async I/O job")
}
主要部分是这样的,菜鸡表示无力,想继承RichAsyncFunction,可以使用open 方法初始化链接。
网上博客翻了不少,大部分是翻译官网的原理,案例也没有可以执行的,苦恼。
失败了。
下面开始上mysql 版本 的 源码(hbase 的还没测试过,本机的hbase 挂了):
业务如下:
接收kafka数据,转为user对象,调用async,使用user.id 查询对应的phone,放回user对象,输出
主类:
import com.alibaba.fastjson.JSON;
import com.venn.common.Common;
import org.apache.flink.formats.json.JsonNodeDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.concurrent.TimeUnit;
public class AsyncMysqlRequest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer source &#61; new FlinkKafkaConsumer<>("async", new JsonNodeDeserializationSchema(), Common.getProp());
// 接收kafka数据&#xff0c;转为User 对象
DataStream input &#61; env.addSource(source).map(value -> {
String id &#61; value.get("id").asText();
String username &#61; value.get("username").asText();
String password &#61; value.get("password").asText();
return new User(id, username, password);
});
// 异步IO 获取mysql数据, timeout 时间 1s&#xff0c;容量 100(超过100个请求&#xff0c;会反压上游节点)
DataStream async &#61; AsyncDataStream.unorderedWait(input, new AsyncFunctionForMysqlJava(), 1000, TimeUnit.MICROSECONDS, 100);
async.map(user -> {
return JSON.toJSON(user).toString();
})
.print();
env.execute("asyncForMysql");
}
}
函数类&#xff1a;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.List;
public class AsyncFunctionForMysqlJava extends RichAsyncFunction {
// 链接
private static String jdbcUrl &#61; "jdbc:mysql://192.168.229.128:3306?useSSL&#61;false";
private static String username &#61; "root";
private static String password &#61; "123456";
private static String driverName &#61; "com.mysql.jdbc.Driver";
java.sql.Connection conn;
PreparedStatement ps;
Logger logger &#61; LoggerFactory.getLogger(AsyncFunctionForMysqlJava.class);
/**
* open 方法中初始化链接
* &#64;param parameters
* &#64;throws Exception
*/
&#64;Override
public void open(Configuration parameters) throws Exception {
logger.info("async function for hbase java open ...");
super.open(parameters);
Class.forName(driverName);
conn &#61; DriverManager.getConnection(jdbcUrl, username, password);
ps &#61; conn.prepareStatement("select phone from async.async_test where id &#61; ?");
}
/**
* use user.getId async get user phone
*
* &#64;param user
* &#64;param resultFuture
* &#64;throws Exception
*/
&#64;Override
public void asyncInvoke(User user, ResultFuture resultFuture) throws Exception {
// 使用 user id 查询
ps.setString(1, user.getId());
ResultSet rs &#61; ps.executeQuery();
String phone &#61; null;
if (rs.next()) {
phone &#61; rs.getString(1);
}
user.setPhone(phone);
List list &#61; new ArrayList();
list.add(user);
// 放回 result 队列
resultFuture.complete(list);
}
&#64;Override
public void timeout(User input, ResultFuture resultFuture) throws Exception {
logger.info("Async function for hbase timeout");
List list &#61; new ArrayList();
list.add(input);
resultFuture.complete(list);
}
/**
* close function
*
* &#64;throws Exception
*/
&#64;Override
public void close() throws Exception {
logger.info("async function for hbase java close ...");
super.close();
conn.close();
}
}
测试数据如下&#xff1a;
{"id" : 1, "username" : "venn", "password" : 1561709530935}
{"id" : 2, "username" : "venn", "password" : 1561709536029}
{"id" : 3, "username" : "venn", "password" : 1561709541033}
{"id" : 4, "username" : "venn", "password" : 1561709546037}
{"id" : 5, "username" : "venn", "password" : 1561709551040}
{"id" : 6, "username" : "venn", "password" : 1561709556044}
{"id" : 7, "username" : "venn", "password" : 1561709561048}
执行结果如下&#xff1a;
1> {"password":"1561709536029","phone":"12345678911","id":"2","username":"venn"}
1> {"password":"1561709541033","phone":"12345678912","id":"3","username":"venn"}
1> {"password":"1561709546037","phone":"12345678913","id":"4","username":"venn"}
1> {"password":"1561709551040","id":"5","username":"venn"} # 关联不上&#xff0c;原样返回
1> {"password":"1561709556044","id":"6","username":"venn"}
1> {"password":"1561709561048","id":"7","username":"venn"}
hbase、redis或其他实现类似&#xff0c;继承AsyncStreamFunction&#xff0c;实现
方法