热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

使用Flink对数据进行Join连接

Join数据是得有2个数据源情况下,用一定的条件连接2条数据,生成一个新的数据源类似数据库中的Join查询例如:我们有支付数据和下单的数据,我们需要通过ored

Join数据是得有2个数据源情况下,用一定的条件连接2条数据,生成一个新的数据源
类似数据库中的Join查询

例如:我们有支付数据和下单的数据,我们需要通过orederId来关联到一起。

首先要对数据源进行处理,主要是进行分组(keyBy),保证orderId相同的数据是进到了同一个窗口内
数据源1是订单数据:

DataStream keyByedOrderSource = orderSource.keyBy(new OrderSerialKey());

static class OrderSerialKey implements KeySelector {
        private static final long serialVersiOnUID= 1L;
        @Override
        public String getKey(Order e) {
            if(e == null || e.getOrderId() == null){
                return "0";
            }
            String key = MD5Util.string2MD5(e.getOrderId().trim().toUpperCase());
            return key;
        }
    }

同样的要对支付消息进行keyBy操作

第二步就是直接在窗口内进行连接了

DataStream joinStream = keyByedOrderSource.join(keyByedPayinfoSource )
                .where(new OrderSerialKey())
                .equalTo(new PayinfoSerialKey())
                .window(TumblingEventTimeWindows.of(Time.minutes(1)))
                .trigger(JoinProcessTimeTrigger.of(Time.minutes(1)))
                .apply(new JoinFunction() {
                    private static final long serialVersiOnUID= 1L;
                    @Override
                    public String join(Order order, Payinfo payinfo) {
                        //构建一个PayedOrder
                        PayedOrder po = new PayedOrder();
                        po.setXXXX.....
                        return po;
                    }
                });

注意的是,当存在多个orderId相同的订单或者多个orderId相同的支付消息时,那么join出的结果就是cube计算。
例如订单A和订单B的orderId都是1,而支付消息X和支付消息Y的orderId都是1时,
那么就会join出来4条消息
A X
A Y
B X
B Y

这个时候如果不想要这样的结果,那么可以重写JoinFunction进行去重。

以上。


推荐阅读
author-avatar
霸气的艳子_612
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有