作者:霸气的艳子_612 | 来源:互联网 | 2023-07-23 22:21
Join数据是得有2个数据源情况下,用一定的条件连接2条数据,生成一个新的数据源类似数据库中的Join查询例如:我们有支付数据和下单的数据,我们需要通过ored
Join数据是得有2个数据源情况下,用一定的条件连接2条数据,生成一个新的数据源
类似数据库中的Join查询
例如:我们有支付数据和下单的数据,我们需要通过orederId来关联到一起。
首先要对数据源进行处理,主要是进行分组(keyBy),保证orderId相同的数据是进到了同一个窗口内
数据源1是订单数据:
DataStream keyByedOrderSource = orderSource.keyBy(new OrderSerialKey());
static class OrderSerialKey implements KeySelector {
private static final long serialVersiOnUID= 1L;
@Override
public String getKey(Order e) {
if(e == null || e.getOrderId() == null){
return "0";
}
String key = MD5Util.string2MD5(e.getOrderId().trim().toUpperCase());
return key;
}
}
同样的要对支付消息进行keyBy操作
第二步就是直接在窗口内进行连接了
DataStream joinStream = keyByedOrderSource.join(keyByedPayinfoSource )
.where(new OrderSerialKey())
.equalTo(new PayinfoSerialKey())
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.trigger(JoinProcessTimeTrigger.of(Time.minutes(1)))
.apply(new JoinFunction() {
private static final long serialVersiOnUID= 1L;
@Override
public String join(Order order, Payinfo payinfo) {
//构建一个PayedOrder
PayedOrder po = new PayedOrder();
po.setXXXX.....
return po;
}
});
注意的是,当存在多个orderId相同的订单或者多个orderId相同的支付消息时,那么join出的结果就是cube计算。
例如订单A和订单B的orderId都是1,而支付消息X和支付消息Y的orderId都是1时,
那么就会join出来4条消息
A X
A Y
B X
B Y
这个时候如果不想要这样的结果,那么可以重写JoinFunction进行去重。
以上。