本文整理了Java中org.apache.kafka.clients.consumer.ConsumerRecords.iterator()
方法的一些代码示例,展示了ConsumerRecords.iterator()
的具体用法。这些代码示例主要来源于Github
/Stackoverflow
/Maven
等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ConsumerRecords.iterator()
方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.ConsumerRecords
类名称:ConsumerRecords
方法名:iterator
暂无
代码示例来源:origin: apache/flume
ConsumerAndRecords(KafkaConsumer
this.cOnsumer= consumer;
this.uuid = uuid;
this.records = ConsumerRecords.empty();
this.recordIterator = records.iterator();
}
代码示例来源:origin: QNJR-GROUP/EasyTransaction
private synchronized void pollAndDispatchMessage() throws InterruptedException {
// 处理记录过程中,不能修改consumer相关的设定
// 拉取需要处理的记录
ConsumerRecords
// 为每个消息都封装成CALLABLE的形式,并进行调用处理
Iterator
List
while (iterator.hasNext()) {
listJob.add(new MessageHandler(iterator.next()));
}
executeJobs(listJob);
// 全部调用成功,更新消费坐标
consumer.commitAsync();
}
代码示例来源:origin: apache/flume
private void poll() {
logger.trace("Polling with timeout: {}ms channel-{}", pollTimeout, getName());
try {
records = consumer.poll(Duration.ofMillis(pollTimeout));
recordIterator = records.iterator();
logger.debug("{} returned {} records from last poll", getName(), records.count());
} catch (WakeupException e) {
logger.trace("Consumer woken up for channel {}.", getName());
}
}
代码示例来源:origin: linkedin/kafka-monitor
@Override
public BaseConsumerRecord receive() {
if (_recordIter == null || !_recordIter.hasNext())
_recordIter = _consumer.poll(Long.MAX_VALUE).iterator();
ConsumerRecord
return new BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.value());
}
代码示例来源:origin: OryxProject/oryx
@Override
protected KeyMessage
if (iterator == null || !iterator.hasNext()) {
try {
long timeout = MIN_POLL_MS;
ConsumerRecords
while ((records = consumer.poll(timeout)).isEmpty()) {
timeout = Math.min(MAX_POLL_MS, timeout * 2);
}
iterator = records.iterator();
} catch (Exception e) {
consumer.close();
return endOfData();
}
}
ConsumerRecord
return new KeyMessageImpl<>(mm.key(), mm.value());
}
代码示例来源:origin: apache/incubator-gobblin
@Override
public Iterator
if (nextOffset > maxOffset) {
return null;
}
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
ConsumerRecords
return Iterators.transform(consumerRecords.iterator(), new Function
@Override
public KafkaConsumerRecord apply(ConsumerRecord
return new Kafka09ConsumerRecord<>(input);
}
});
}
代码示例来源:origin: apache/kylin
consumer.seek(topicPartition, watermark);
messages = consumer.poll(timeOut);
iterator = messages.iterator();
if (!iterator.hasNext()) {
log.info("No more messages, stop");
代码示例来源:origin: apache/incubator-gobblin
/**
* Return the next record when available. Will never time out since this is a streaming source.
*/
@Override
public RecordEnvelope
throws DataRecordException, IOException {
if (!_isStarted.get()) {
throw new IOException("Streaming extractor has not been started.");
}
while ((_records == null) || (!_records.hasNext())) {
synchronized (_consumer) {
if (_close.get()) {
throw new ClosedChannelException();
}
_records = _consumer.poll(this.fetchTimeOut).iterator();
}
}
ConsumerRecord record = _records.next();
_rowCount.getAndIncrement();
return new RecordEnvelope
}
代码示例来源:origin: apache/flink
@Override
public
List
try (KafkaConsumer
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
Iterator
while (iterator.hasNext()) {
ConsumerRecord
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}
代码示例来源:origin: apache/flink
@Override
public
List
try (KafkaConsumer
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
// wait for new records with timeout and break the loop if we didn't get any
Iterator
while (iterator.hasNext()) {
ConsumerRecord
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}
代码示例来源:origin: apache/flink
@Override
public
List
try (KafkaConsumer
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
// wait for new records with timeout and break the loop if we didn't get any
Iterator
while (iterator.hasNext()) {
ConsumerRecord
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}
代码示例来源:origin: apache/flink
@Override
public
List
try (KafkaConsumer
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
// wait for new records with timeout and break the loop if we didn't get any
Iterator
while (iterator.hasNext()) {
ConsumerRecord
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}
代码示例来源:origin: apache/hive
/**
* Poll more records from the Kafka Broker.
*
* @throws PollTimeoutException if poll returns 0 record and consumer's position
private void pollRecords() {
if (LOG.isTraceEnabled()) {
stopwatch.reset().start();
}
records = consumer.poll(pollTimeoutDurationMs);
if (LOG.isTraceEnabled()) {
stopwatch.stop();
LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
// Fail if we can not poll within one lap of pollTimeoutMs.
if (records.isEmpty() && consumer.position(topicPartition)
pollTimeoutMs,
topicPartition.toString(),
startOffset,
consumer.position(topicPartition),
endOffset));
}
cOnsumerRecordIterator= records.iterator();
cOnsumerPosition= consumer.position(topicPartition);
}
代码示例来源:origin: apache/drill
totalFetchTime += lastFetchTime;
recordIter = consumerRecords.iterator();
return recordIter.hasNext();
代码示例来源:origin: apache/flume
Duration duration = Duration.ofMillis(durMs);
ConsumerRecords
it = records.iterator();
代码示例来源:origin: apache/kafka
@Test
public void iterator() throws Exception {
Map
String topic = "topic";
records.put(new TopicPartition(topic, 0), new ArrayList
ConsumerRecord
ConsumerRecord
records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
records.put(new TopicPartition(topic, 2), new ArrayList
ConsumerRecords
Iterator
int c = 0;
for (; iter.hasNext(); c++) {
ConsumerRecord
assertEquals(1, record.partition());
assertEquals(topic, record.topic());
assertEquals(c, record.offset());
}
assertEquals(2, c);
}
}
代码示例来源:origin: apache/flume
private void checkMessageArrived(String msg, String topic) {
ConsumerRecords recs = pollConsumerRecords(topic);
assertNotNull(recs);
assertTrue(recs.count() > 0);
ConsumerRecord cOnsumerRecord= (ConsumerRecord) recs.iterator().next();
assertEquals(msg, consumerRecord.value());
}
代码示例来源:origin: apache/kafka
@Test
public void testSimpleMock() {
consumer.subscribe(Collections.singleton("test"));
assertEquals(0, consumer.poll(Duration.ZERO).count());
consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
// Mock consumers need to seek manually since they cannot automatically reset offsets
HashMap
beginningOffsets.put(new TopicPartition("test", 0), 0L);
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
ConsumerRecord
ConsumerRecord
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords
Iterator
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
}
代码示例来源:origin: apache/kafka
@SuppressWarnings("deprecation")
@Test
public void testSimpleMockDeprecated() {
consumer.subscribe(Collections.singleton("test"));
assertEquals(0, consumer.poll(1000).count());
consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
// Mock consumers need to seek manually since they cannot automatically reset offsets
HashMap
beginningOffsets.put(new TopicPartition("test", 0), 0L);
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
ConsumerRecord
ConsumerRecord
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords
Iterator
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
}
代码示例来源:origin: apache/flume
assertNotNull(recs);
assertTrue(recs.count() > 0);
ConsumerRecord
ByteArrayInputStream in = new ByteArrayInputStream(consumerRecord.value().getBytes());
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);