热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

org.apache.kafka.clients.consumer.ConsumerRecords.iterator()方法的使用及代码示例

本文整理了Java中org.apache.kafka.clients.consumer.ConsumerRecords.iterator()方法的一些代码示例,展示了

本文整理了Java中org.apache.kafka.clients.consumer.ConsumerRecords.iterator()方法的一些代码示例,展示了ConsumerRecords.iterator()的具体用法。这些代码示例主要来源于Github/Stackoverflow/Maven等平台,是从一些精选项目中提取出来的代码,具有较强的参考意义,能在一定程度帮忙到你。ConsumerRecords.iterator()方法的具体详情如下:
包路径:org.apache.kafka.clients.consumer.ConsumerRecords
类名称:ConsumerRecords
方法名:iterator

ConsumerRecords.iterator介绍

暂无

代码示例

代码示例来源:origin: apache/flume

ConsumerAndRecords(KafkaConsumer consumer, String uuid) {
this.cOnsumer= consumer;
this.uuid = uuid;
this.records = ConsumerRecords.empty();
this.recordIterator = records.iterator();
}

代码示例来源:origin: QNJR-GROUP/EasyTransaction

private synchronized void pollAndDispatchMessage() throws InterruptedException {
// 处理记录过程中,不能修改consumer相关的设定
// 拉取需要处理的记录
ConsumerRecords allRecords = consumer.poll(10000);
// 为每个消息都封装成CALLABLE的形式,并进行调用处理
Iterator> iterator = allRecords.iterator();
List listJob = new LinkedList<>();
while (iterator.hasNext()) {
listJob.add(new MessageHandler(iterator.next()));
}
executeJobs(listJob);
// 全部调用成功,更新消费坐标
consumer.commitAsync();
}

代码示例来源:origin: apache/flume

private void poll() {
logger.trace("Polling with timeout: {}ms channel-{}", pollTimeout, getName());
try {
records = consumer.poll(Duration.ofMillis(pollTimeout));
recordIterator = records.iterator();
logger.debug("{} returned {} records from last poll", getName(), records.count());
} catch (WakeupException e) {
logger.trace("Consumer woken up for channel {}.", getName());
}
}

代码示例来源:origin: linkedin/kafka-monitor

@Override
public BaseConsumerRecord receive() {
if (_recordIter == null || !_recordIter.hasNext())
_recordIter = _consumer.poll(Long.MAX_VALUE).iterator();
ConsumerRecord record = _recordIter.next();
return new BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.value());
}

代码示例来源:origin: OryxProject/oryx

@Override
protected KeyMessage computeNext() {
if (iterator == null || !iterator.hasNext()) {
try {
long timeout = MIN_POLL_MS;
ConsumerRecords records;
while ((records = consumer.poll(timeout)).isEmpty()) {
timeout = Math.min(MAX_POLL_MS, timeout * 2);
}
iterator = records.iterator();
} catch (Exception e) {
consumer.close();
return endOfData();
}
}
ConsumerRecord mm = iterator.next();
return new KeyMessageImpl<>(mm.key(), mm.value());
}

代码示例来源:origin: apache/incubator-gobblin

@Override
public Iterator consume(KafkaPartition partition, long nextOffset, long maxOffset) {
if (nextOffset > maxOffset) {
return null;
}
this.consumer.assign(Lists.newArrayList(new TopicPartition(partition.getTopicName(), partition.getId())));
this.consumer.seek(new TopicPartition(partition.getTopicName(), partition.getId()), nextOffset);
ConsumerRecords cOnsumerRecords= consumer.poll(super.fetchTimeoutMillis);
return Iterators.transform(consumerRecords.iterator(), new Function, KafkaConsumerRecord>() {
@Override
public KafkaConsumerRecord apply(ConsumerRecord input) {
return new Kafka09ConsumerRecord<>(input);
}
});
}

代码示例来源:origin: apache/kylin

consumer.seek(topicPartition, watermark);
messages = consumer.poll(timeOut);
iterator = messages.iterator();
if (!iterator.hasNext()) {
log.info("No more messages, stop");

代码示例来源:origin: apache/incubator-gobblin

/**
* Return the next record when available. Will never time out since this is a streaming source.
*/
@Override
public RecordEnvelope readRecordEnvelopeImpl()
throws DataRecordException, IOException {
if (!_isStarted.get()) {
throw new IOException("Streaming extractor has not been started.");
}
while ((_records == null) || (!_records.hasNext())) {
synchronized (_consumer) {
if (_close.get()) {
throw new ClosedChannelException();
}
_records = _consumer.poll(this.fetchTimeOut).iterator();
}
}
ConsumerRecord record = _records.next();
_rowCount.getAndIncrement();
return new RecordEnvelope(record.value(), new KafkaWatermark(_partition, new LongWatermark(record.offset())));
}

代码示例来源:origin: apache/flink

@Override
public Collection> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List> result = new ArrayList<>();
try (KafkaConsumer cOnsumer= new KafkaConsumer<>(properties)) {
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
Iterator> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}

代码示例来源:origin: apache/flink

@Override
public Collection> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List> result = new ArrayList<>();
try (KafkaConsumer cOnsumer= new KafkaConsumer<>(properties)) {
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
// wait for new records with timeout and break the loop if we didn't get any
Iterator> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}

代码示例来源:origin: apache/flink

@Override
public Collection> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List> result = new ArrayList<>();
try (KafkaConsumer cOnsumer= new KafkaConsumer<>(properties)) {
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
// wait for new records with timeout and break the loop if we didn't get any
Iterator> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}

代码示例来源:origin: apache/flink

@Override
public Collection> getAllRecordsFromTopic(Properties properties, String topic, int partition, long timeout) {
List> result = new ArrayList<>();
try (KafkaConsumer cOnsumer= new KafkaConsumer<>(properties)) {
consumer.assign(Arrays.asList(new TopicPartition(topic, partition)));
while (true) {
boolean processedAtLeastOneRecord= false;
// wait for new records with timeout and break the loop if we didn't get any
Iterator> iterator = consumer.poll(timeout).iterator();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
result.add(record);
processedAtLeastOneRecord= true;
}
if (!processedAtLeastOneRecord) {
break;
}
}
consumer.commitSync();
}
return UnmodifiableList.decorate(result);
}

代码示例来源:origin: apache/hive

/**
* Poll more records from the Kafka Broker.
*
* @throws PollTimeoutException if poll returns 0 record and consumer's position */
private void pollRecords() {
if (LOG.isTraceEnabled()) {
stopwatch.reset().start();
}
records = consumer.poll(pollTimeoutDurationMs);
if (LOG.isTraceEnabled()) {
stopwatch.stop();
LOG.trace("Pulled [{}] records in [{}] ms", records.count(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
}
// Fail if we can not poll within one lap of pollTimeoutMs.
if (records.isEmpty() && consumer.position(topicPartition) throw new PollTimeoutException(String.format(ERROR_POLL_TIMEOUT_FORMAT,
pollTimeoutMs,
topicPartition.toString(),
startOffset,
consumer.position(topicPartition),
endOffset));
}
cOnsumerRecordIterator= records.iterator();
cOnsumerPosition= consumer.position(topicPartition);
}

代码示例来源:origin: apache/drill

totalFetchTime += lastFetchTime;
recordIter = consumerRecords.iterator();
return recordIter.hasNext();

代码示例来源:origin: apache/flume

Duration duration = Duration.ofMillis(durMs);
ConsumerRecords records = consumer.poll(duration);
it = records.iterator();

代码示例来源:origin: apache/kafka

@Test
public void iterator() throws Exception {
Map>> records = new LinkedHashMap<>();
String topic = "topic";
records.put(new TopicPartition(topic, 0), new ArrayList>());
ConsumerRecord record1 = new ConsumerRecord<>(topic, 1, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 1, "value1");
ConsumerRecord record2 = new ConsumerRecord<>(topic, 1, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, 2, "value2");
records.put(new TopicPartition(topic, 1), Arrays.asList(record1, record2));
records.put(new TopicPartition(topic, 2), new ArrayList>());
ConsumerRecords cOnsumerRecords= new ConsumerRecords<>(records);
Iterator> iter = consumerRecords.iterator();
int c = 0;
for (; iter.hasNext(); c++) {
ConsumerRecord record = iter.next();
assertEquals(1, record.partition());
assertEquals(topic, record.topic());
assertEquals(c, record.offset());
}
assertEquals(2, c);
}
}

代码示例来源:origin: apache/flume

private void checkMessageArrived(String msg, String topic) {
ConsumerRecords recs = pollConsumerRecords(topic);
assertNotNull(recs);
assertTrue(recs.count() > 0);
ConsumerRecord cOnsumerRecord= (ConsumerRecord) recs.iterator().next();
assertEquals(msg, consumerRecord.value());
}

代码示例来源:origin: apache/kafka

@Test
public void testSimpleMock() {
consumer.subscribe(Collections.singleton("test"));
assertEquals(0, consumer.poll(Duration.ZERO).count());
consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
// Mock consumers need to seek manually since they cannot automatically reset offsets
HashMap beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("test", 0), 0L);
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
ConsumerRecord rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
ConsumerRecord rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords recs = consumer.poll(Duration.ofMillis(1));
Iterator> iter = recs.iterator();
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
}

代码示例来源:origin: apache/kafka

@SuppressWarnings("deprecation")
@Test
public void testSimpleMockDeprecated() {
consumer.subscribe(Collections.singleton("test"));
assertEquals(0, consumer.poll(1000).count());
consumer.rebalance(Arrays.asList(new TopicPartition("test", 0), new TopicPartition("test", 1)));
// Mock consumers need to seek manually since they cannot automatically reset offsets
HashMap beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("test", 0), 0L);
beginningOffsets.put(new TopicPartition("test", 1), 0L);
consumer.updateBeginningOffsets(beginningOffsets);
consumer.seek(new TopicPartition("test", 0), 0);
ConsumerRecord rec1 = new ConsumerRecord<>("test", 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key1", "value1");
ConsumerRecord rec2 = new ConsumerRecord<>("test", 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, "key2", "value2");
consumer.addRecord(rec1);
consumer.addRecord(rec2);
ConsumerRecords recs = consumer.poll(1);
Iterator> iter = recs.iterator();
assertEquals(rec1, iter.next());
assertEquals(rec2, iter.next());
assertFalse(iter.hasNext());
assertEquals(2L, consumer.position(new TopicPartition("test", 0)));
consumer.commitSync();
assertEquals(2L, consumer.committed(new TopicPartition("test", 0)).offset());
}

代码示例来源:origin: apache/flume

assertNotNull(recs);
assertTrue(recs.count() > 0);
ConsumerRecord cOnsumerRecord= recs.iterator().next();
ByteArrayInputStream in = new ByteArrayInputStream(consumerRecord.value().getBytes());
BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(in, null);

推荐阅读
  • Python正则表达式学习记录及常用方法
    本文记录了学习Python正则表达式的过程,介绍了re模块的常用方法re.search,并解释了rawstring的作用。正则表达式是一种方便检查字符串匹配模式的工具,通过本文的学习可以掌握Python中使用正则表达式的基本方法。 ... [详细]
  • 图像因存在错误而无法显示 ... [详细]
  • Python中的PyInputPlus模块原文:https ... [详细]
  • 【爬虫】关于企业信用信息公示系统加速乐最新反爬虫机制
    ( ̄▽ ̄)~又得半夜修仙了,作为一个爬虫小白,花了3天时间写好的程序,才跑了一个月目标网站就更新了,是有点悲催,还是要只有一天的时间重构。升级后网站的层次结构并没有太多变化,表面上 ... [详细]
  • 注:根据Qt小神童的视频教程改编概论:利用最新的Qt5.1.1在windows下开发的一个小的时钟程序,有指针与表盘。1.Qtforwindows开发环境最新的Qt已经集 ... [详细]
  • 微软头条实习生分享深度学习自学指南
    本文介绍了一位微软头条实习生自学深度学习的经验分享,包括学习资源推荐、重要基础知识的学习要点等。作者强调了学好Python和数学基础的重要性,并提供了一些建议。 ... [详细]
  • Iamtryingtomakeaclassthatwillreadatextfileofnamesintoanarray,thenreturnthatarra ... [详细]
  • 本文介绍了数据库的存储结构及其重要性,强调了关系数据库范例中将逻辑存储与物理存储分开的必要性。通过逻辑结构和物理结构的分离,可以实现对物理存储的重新组织和数据库的迁移,而应用程序不会察觉到任何更改。文章还展示了Oracle数据库的逻辑结构和物理结构,并介绍了表空间的概念和作用。 ... [详细]
  • 本文讨论了一个关于cuowu类的问题,作者在使用cuowu类时遇到了错误提示和使用AdjustmentListener的问题。文章提供了16个解决方案,并给出了两个可能导致错误的原因。 ... [详细]
  • ZSI.generate.Wsdl2PythonError: unsupported local simpleType restriction ... [详细]
  • 推荐系统遇上深度学习(十七)详解推荐系统中的常用评测指标
    原创:石晓文小小挖掘机2018-06-18笔者是一个痴迷于挖掘数据中的价值的学习人,希望在平日的工作学习中,挖掘数据的价值, ... [详细]
  • 本文整理了Java面试中常见的问题及相关概念的解析,包括HashMap中为什么重写equals还要重写hashcode、map的分类和常见情况、final关键字的用法、Synchronized和lock的区别、volatile的介绍、Syncronized锁的作用、构造函数和构造函数重载的概念、方法覆盖和方法重载的区别、反射获取和设置对象私有字段的值的方法、通过反射创建对象的方式以及内部类的详解。 ... [详细]
  • 本文分析了Wince程序内存和存储内存的分布及作用。Wince内存包括系统内存、对象存储和程序内存,其中系统内存占用了一部分SDRAM,而剩下的30M为程序内存和存储内存。对象存储是嵌入式wince操作系统中的一个新概念,常用于消费电子设备中。此外,文章还介绍了主电源和后备电池在操作系统中的作用。 ... [详细]
  • 使用C++编写程序实现增加或删除桌面的右键列表项
    本文介绍了使用C++编写程序实现增加或删除桌面的右键列表项的方法。首先通过操作注册表来实现增加或删除右键列表项的目的,然后使用管理注册表的函数来编写程序。文章详细介绍了使用的五种函数:RegCreateKey、RegSetValueEx、RegOpenKeyEx、RegDeleteKey和RegCloseKey,并给出了增加一项的函数写法。通过本文的方法,可以方便地自定义桌面的右键列表项。 ... [详细]
  • python+selenium十:基于原生selenium的二次封装fromseleniumimportwebdriverfromselenium.webdriv ... [详细]
author-avatar
Hoorxx鹿_416
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有