1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| StreamExecutionEnvironment ssEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings ssSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamTableEnvironment ssTableEnv = StreamTableEnvironment.create(ssEnv, ssSettings);
String sql = "create table test_table ( \n" +
"id int,\n" +
"fk_drug_info int,\n" +
"fk_professional_id string,\n" +
"urine_time TIMESTAMP,\n" +
"urine_place string,\n" +
"urine_result int,\n" +
"test_type int,\n" +
"test_time TIMESTAMP,\n" +
"seal_time TIMESTAMP,\n" +
"urine_image string,\n" +
"urine_video string,\n" +
"urine_cause string,\n" +
"help_professional_id int,\n" +
"latitude string,\n" +
"fk_auth_manager int,\n" +
"table_date int,\n" +
"urine_state_type int,\n" +
"annex string,\n" +
"update_time int,\n" +
"is_del int\n"+
") with ( \n" +
"'connector' = 'kafka',\n" +
"'topic' = 'LG831_test.dbo.urine_upload',\n" +
"'properties.bootstrap.servers' = 'hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092',\n" +
"'properties.group.id' = 'connect-cluster',\n" +
"'format' = 'debezium-json'"+
")";
ssTableEnv.executeSql(sql);
Table result = ssTableEnv.sqlQuery("select * from test_table");
result.execute().print(); |