2019独角兽企业重金招聘Python工程师标准>>>
/*** Tests the basic functionality of the AsyncWaitOperator: Processing a limited stream of* elements by doubling their value. This is tested in for the ordered and unordered mode.*/
@Test
public void testAsyncWaitOperator() throws Exception {final int numElements &#61; 5;final long timeout &#61; 1000L;StreamExecutionEnvironment env &#61; StreamExecutionEnvironment.getExecutionEnvironment();DataStream> input &#61; env.addSource(new NonSerializableTupleSource(numElements));AsyncFunction, Integer> function &#61; new RichAsyncFunction, Integer>() {private static final long serialVersionUID &#61; 7000343199829487985L;transient ExecutorService executorService;&#64;Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);executorService &#61; Executors.newFixedThreadPool(numElements);}&#64;Overridepublic void close() throws Exception {super.close();executorService.shutdownNow();}&#64;Overridepublic void asyncInvoke(final Tuple2 input, final AsyncCollector collector) throws Exception {executorService.submit(new Runnable() {&#64;Overridepublic void run() {collector.collect(Collections.singletonList(input.f0 &#43; input.f0));}});}};DataStream orderedResult &#61; AsyncDataStream.orderedWait(input, function, timeout, TimeUnit.MILLISECONDS, 2).setParallelism(1);// save result from ordered processfinal MemorySinkFunction sinkFunction1 &#61; new MemorySinkFunction(0);final List actualResult1 &#61; new ArrayList<>(numElements);MemorySinkFunction.registerCollection(0, actualResult1);orderedResult.addSink(sinkFunction1).setParallelism(1);DataStream unorderedResult &#61; AsyncDataStream.unorderedWait(input, function, timeout, TimeUnit.MILLISECONDS, 2);// save result from unordered processfinal MemorySinkFunction sinkFunction2 &#61; new MemorySinkFunction(1);final List actualResult2 &#61; new ArrayList<>(numElements);MemorySinkFunction.registerCollection(1, actualResult2);unorderedResult.addSink(sinkFunction2);Collection expected &#61; new ArrayList<>(10);for (int i &#61; 0; i }