热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka成长3:Producer元数据拉取源码原理(上)

Kafka成长3:Producer元数据拉取源码原理(上)-上一节我们分析了Producer的核心组件,我们得到了一张关键的组件图。你还记得么?简单概括下上面的图就是:创建

上一节我们分析了Producer的核心组件,我们得到了一张关键的组件图。你还记得么?

简单概括下上面的图就是:

创建了Metadata组件,内部通过Cluster维护元数据

初始化了发送消息的内存缓冲器RecordAccumulator

创建了NetworkClient,内部最重要的是创建了NIO的Selector组件

启动了一个Sender线程,Sender引用了上面的所有组件,开始执行run方法。

图的最下方可以看到,上一节截止到了run方法的执行,这一节我们首先会看看run方法核心脉络做了什么。接着分析下Producer第一个核心流程:元数据拉取的源码原理。

让我们开始吧!

Sender的run方法在做什么?

这一节我们就继续分析下,sender的run方法开始执行会做什么。

 public void run() {
        log.debug("Starting Kafka producer I/O thread.");

        // main loop, runs until close is called
        while (running) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }

        log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        // okay we stopped accepting requests but there may still be
        // requests in the accumulator or waiting for acknowledgment,
        // wait until these are completed.
        while (!forceClose && (this.accumulator.hasUnsent() || this.client.inFlightRequestCount() > 0)) {
            try {
                run(time.milliseconds());
            } catch (Exception e) {
                log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        if (forceClose) {
            // We need to fail all the incomplete batches and wake up the threads waiting on
            // the futures.
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e) {
            log.error("Failed to close network client", e);
        }

        log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

这个run方法的核心脉络很简单。主要就是2个while循环+线程的close,而2个while循环,他们都调用了run(long time)的这个方法。

通过注释你可以看到,第二个while是处理特殊情况的,当第一个while退出后,还有未发送的请求,需要第二个while循环处理完成,才会关闭线程。

整体脉络如下图所示:

接着其实就该看下run方法主要在干什么了?

   /**
     * Run a single iteration of sending
     * 
     * @param now
     *            The current POSIX time in milliseconds
     */
    void run(long now) {
        Cluster cluster = metadata.fetch();
        // get the list of partitions with data ready to send
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update
        if (result.unknownLeadersExist)
            this.metadata.requestUpdate();

        // remove any nodes we aren't ready to send to
        Iterator iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
            }
        }

        // create produce requests
        Map> batches = this.accumulator.drain(cluster,
                                                                         result.readyNodes,
                                                                         this.maxRequestSize,
                                                                         now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List batchList : batches.values()) {
                for (RecordBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        List expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
        // update sensors
        for (RecordBatch expiredBatch : expiredBatches)
            this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);

        sensors.updateProduceRequestMetrics(batches);
        List requests = createProduceRequests(batches, now);
        // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
        // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
        // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
        // with sendable data that aren't ready to send since they would cause busy looping.
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        if (result.readyNodes.size() > 0) {
            log.trace("Nodes with data ready to send: {}", result.readyNodes);
            log.trace("Created {} produce requests: {}", requests.size(), requests);
            pollTimeout = 0;
        }
        for (ClientRequest request : requests)
            client.send(request, now);

        // if some partitions are already ready to be sent, the select time would be 0;
        // otherwise if some partition already has some data accumulated but not ready yet,
        // the select time will be the time difference between now and its linger expiry time;
        // otherwise the select time will be the time difference between now and the metadata expiry time;
        this.client.poll(pollTimeout, now);
    }

上面的代码,你如果第一次看,你肯定会觉得,这个脉络非常不清晰,不知道重点在哪里。不过还好有些注释,你能大体猜到他在干嘛。

accumulator的ready,networkclient的ready、networkclient的send、networkclient的poll

这些好像是在准备内存区域、准备网络连接的node节点、发送数据、拉取响应结果的意思。

可是如果你猜不到,该怎么办呢?

这时候就可以祭出debug这个杀器了。由于是producer,我们可以在Hellowolrd的这个客户端打断点,一步一步看下。

当你对run方法一步一步打了断点之后你会发现:

accumulator的ready,networkclient的ready、networkclient的send 这些的逻辑几乎都没有执行,全部都是初始化空对象,或者方法内部直接return。

直接一路执行到了client.poll方法。如下图所示:

那么,你可以得出一个结论,while第一次循环这个run方法的核心逻辑,其实只有一句话:

client.poll(pollTimeout, now)

整体脉络如下所示:

看来接下来,这个NetworkClient的poll方法,就是关键中的关键了:

    /**
     * Do actual reads and writes to sockets.
     * 对套接字进行实际读取和写入
     *
     * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately,
     *                must be non-negative. The actual timeout will be the minimum of timeout, request timeout and
     *                metadata timeout
     * @param now The current time in milliseconds
     * @return The list of responses received
     */
    @Override
    public List poll(long timeout, long now) {
        long metadataTimeout = metadataUpdater.maybeUpdate(now);
        try {
            this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs));
        } catch (IOException e) {
            log.error("Unexpected error during I/O", e);
        }

        // process completed actions
        long updatedNow = this.time.milliseconds();
        List respOnses= new ArrayList<>();
        handleCompletedSends(responses, updatedNow);
        handleCompletedReceives(responses, updatedNow);
        handleDisconnections(responses, updatedNow);
        handleConnections();
        handleTimedOutRequests(responses, updatedNow);

        // invoke callbacks
        for (ClientResponse response : responses) {
            if (response.request().hasCallback()) {
                try {
                    response.request().callback().onComplete(response);
                } catch (Exception e) {
                    log.error("Uncaught error in request completion:", e);
                }
            }
        }

        return responses;
    }

这个方法的脉络就清晰多了,通过方法名和注释,我们几乎可以猜出他的一些作用主要有:

1)注释说:对套接字进行实际读取和写入

2)metadataUpdater.maybeUpdate(),你还记得NetworkClient的组件DefaultMetadataUpdater么,方法名意思是可能进行元数据更新。这个好像很关键的样子

3)接着执行了Selector的poll方法,这个是NetworkClient的另一个组件Selector,还记得么?它底层封装了原生的NIO Selector。这个方法应该也比较关键。

4)后续对response执行了一系列的方法,从名字上看, handleCompletedSends 处理完成发送的请求、handleCompletedReceives处理完成接受的请求、handleDisconnections处理断开连接的请求、handleConnections处理连接成功的请求、处理超时的请求handleTimedOutRequests。根据不同情况有不同的处理。

5)最后还有一个response的相关的回调处理,如果注册了回调函数,会执行下。这个应该不是很关键的逻辑

也就是简单的说就是NetworkClient执行poll方法,主要通过selector处理请求的读取和写入,对响应结果做不同的处理而已。

如下图所示:

到这里其实我们基本摸清出了run方法主要在做的一件事情了,由于是第一次循环,之前的accumulator的ready,networkclient的ready、networkclient的send 什么都没做,第一次while循环run方法核心执行的是networkclient.poll方法。而poll方法的主要逻辑就是上面图中所示的了。

maybeUpdate可能在在拉取元数据?

刚才我们分析到,poll方法首先执行的是DefaultMetadataUpdater的maybeUpdate方法,它是可能更新的意思。我们来一起看下他的逻辑吧。

        public long maybeUpdate(long now) {
            // should we update our metadata?
            long timeTOnextMetadataUpdate= metadata.timeToNextUpdate(now);
            long timeTOnextReconnectAttempt= Math.max(this.lastNoNodeAvailableMs + metadata.refreshBackoff() - now, 0);
            long waitForMetadataFetch = this.metadataFetchInProgress ? Integer.MAX_VALUE : 0;
            // if there is no node available to connect, back off refreshing metadata
            long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt),
                    waitForMetadataFetch);

            if (metadataTimeout == 0) {
                // Beware that the behavior of this method and the computation of timeouts for poll() are
                // highly dependent on the behavior of leastLoadedNode.
                Node node = leastLoadedNode(now);
                maybeUpdate(now, node);
            }

            return metadataTimeout;
        }   

   /**
     * The next time to update the cluster info is the maximum of the time the current info will expire and the time the
     * current info can be updated (i.e. backoff time has elapsed); If an update has been request then the expiry time
     * is now
     */
    public synchronized long timeToNextUpdate(long nowMs) {
        long timeToExpire = needUpdate ? 0 : Math.max(this.lastSuccessfulRefreshMs + this.metadataExpireMs - nowMs, 0);
        long timeToAllowUpdate = this.lastRefreshMs + this.refreshBackoffMs - nowMs;
        return Math.max(timeToExpire, timeToAllowUpdate);
    }

原来这里有一个时间的判断,当判断满足才会执行maybeUpdate。

这个时间计算好像比较复杂,但是大体可以看出来,metadataTimeout是根据三个时间综合判断出来的,如果是0才会执行真正的maybeUpdate()。

像这种时候,我们可以直接在metadataTimeout这里打一个断点,看下它的值是如何计算的,比如下图:

你会发现,当第一次执行while循环,执行到poll方法,执行到这个maybeUpdate的时候,决定metadataTimeout的3个值,有两个是0,其中一个是非0,是一个299720的值。最终导致metadataTimeout也是非0,是299720。

也就是说,第一次while循环不会执行maybeUpdate的任何逻辑。

那么接着向下执行 Selector的poll()方法。

   /**
     * Do whatever I/O can be done on each connection without blocking. This includes completing connections, completing
     * disconnections, initiating new sends, or making progress on in-progress sends or receives.
     * 在不阻塞的情况下,在每个连接上做任何可以做的 I/O。这包括完成连接完成、断开连接,启动新的发送,或在进行中的发送或接收请求
     */
    @Override
    public void poll(long timeout) throws IOException {
        if (timeout <0)
            throw new IllegalArgumentException("timeout should be >= 0");

        clear();

        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
            timeout = 0;

        /* check ready keys */
        long startSelect = time.nanoseconds();
        //这个方法是NIO底层Selector.select(),会阻塞监听
        int readyKeys = select(timeout);
        long endSelect = time.nanoseconds();
        currentTimeNanos = endSelect;
        this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());

        //如果监听到有操作的SelectionKeys,也就是readyKeys>0<会执行一些操作
        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
            pollSelectionKeys(this.nioSelector.selectedKeys(), false);
            pollSelectionKeys(immediatelyConnectedKeys, true);
        }

        addToCompletedReceives();

        long endIo = time.nanoseconds();
        this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
        maybeCloseOldestConnection();
    }

    private int select(long ms) throws IOException {
        if (ms <0L)
            throw new IllegalArgumentException("timeout should be >= 0");

        if (ms == 0L)
            return this.nioSelector.selectNow();
        else
            return this.nioSelector.select(ms);
    }

上面的脉络主要是2步:

1)select(timeout): NIO底层selector.select(),会阻塞监听

2)pollSelectionKeys(): 监听到有操作的SelectionKeys,做了一些操作

也就是说,最终,Sender线程的run方法,第一次while循环执行poll方法,最后什么都没干,会被selector.select()阻塞住。

如下图所示:

new KafkaProducer之后

分析完了run方法的执行 ,我们分析的KafkaProducerHelloWorld第一步new KafkaProducer()基本就完成了。

大家经历了一节半的时间,终于分析清楚了KafkaProducer创建的原理。不不知道你对Kafka的Producer是不是有了更深的理解了。

分析了new KafkaProducer()之后呢?

我们继续接着KafkaProducerHelloWorld往下分析,你还记得KafkaProducerHelloWorld的代码么?

public class KafkaProducerHelloWorld {

    public static void main(String[] args) throws Exception {
        //配置Kafka的一些参数
        Properties props = new Properties();
        props.put("bootstrap.servers", "mengfanmao.org:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 创建一个Producer实例
        KafkaProducer producer = new KafkaProducer<>(props);

        // 封装一条消息
        ProducerRecord record = new ProducerRecord<>(
                "test-topic", "test-key", "test-value");

        // 同步方式发送消息,会阻塞在这里,直到发送完成
        // producer.send(record).get();

        // 异步方式发送消息,不阻塞,设置一个监听回调函数即可
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(exception == null) {
                    System.out.println("消息发送成功");
                } else {
                    exception.printStackTrace();
                }
            }
        });

        Thread.sleep(5 * 1000);

        // 退出producer
        producer.close();
    }

KafkaProducerHelloWorld主要就3步:

1)new KafkaProducer 这个我们已经分析完了,主要分析了配置文件的解析、各个组件是什么、有什么,还有就是刚才分析的run线程第一次循环到底执行了什么。

2) new ProducerRecord 创建待发送的消息

3) producer.send() 发送消息

首先创建待发送的消息:

ProducerRecord record = new ProducerRecord<>("test-topic", "test-key", "test-value");

public ProducerRecord(String topic, K key, V value) {
    this(topic, null, null, key, value);
}
    /**
     * Creates a record with a specified timestamp to be sent to a specified topic and partition
     * 创建具有指定时间戳的记录以发送到指定主题和分区
     * @param topic The topic the record will be appended to
     * @param partition The partition to which the record should be sent
     * @param timestamp The timestamp of the record
     * @param key The key that will be included in the record
     * @param value The record contents
     */
    public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {
        if (topic == null)
            throw new IllegalArgumentException("Topic cannot be null");
        if (timestamp != null && timestamp <0)
            throw new IllegalArgumentException("Invalid timestamp " + timestamp);
        this.topic = topic;
        this.partition = partition;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
    }

我们之前提过,Record表示了一条消息的抽象封装。这个ProducerRecord其实就表示了一条消息。

从构造函数的注释可以看出来,ProducerRecord可以指定往哪个topic,哪一个分区partition,并且消息可以设置一个时间戳。分区和时间戳默认可以不指定

其实看这块源码,我们主要得到的信息就是这些了,这些都比较简单。就不画图了。

发送消息时的元数据拉取触发

当Producer和Record都创建好了之后,可以用同步或者异步的方式发送消息。

// 同步方式发送消息,会阻塞在这里,直到发送完成
// producer.send(record).get();

// 异步方式发送消息,不阻塞,设置一个监听回调函数即可
producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        if(exception == null) {
            System.out.println("消息发送成功");
        } else {
            exception.printStackTrace();
        }
    }
});

    //同步发送
    @Override
    public Future send(ProducerRecord record) {
        return send(record, null);
    }
    //异步发送
    public Future send(ProducerRecord record, Callback callback) {
        // intercept the record, which can be potentially modified; this method does not throw exceptions
        ProducerRecord interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);
        return doSend(interceptedRecord, callback);
    }

同步和异步的整个发送逻辑如下图所示:

从上图你会发现,但是无论同步发送还是异步底层都会调用同一个方法doSend()。区别就是有没有callBack回调函数而已,他们还都在调用前注册一些拦截器,这里我们抓大放小下,我们重点还是关注doSend方法。

doSend方法如下:

/**
 * Implementation of asynchronously send a record to a topic. Equivalent to send(record, null).
 * See {@link #send(ProducerRecord, Callback)} for details.
 */
private Future doSend(ProducerRecord record, Callback callback) {
    TopicPartition tp = null;
    try {
        // first make sure the metadata for the topic is available
        long waitedOnMetadataMs= waitOnMetadata(record.topic(), this.maxBlockTimeMs);
        long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer");
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer");
        }
        int partition = partition(record, serializedKey, serializedValue, metadata.fetch());
        int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);
        ensureValidRecordSize(serializedSize);
        tp = new TopicPartition(record.topic(), partition);
        long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
        log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        if (this.interceptors != null)
            this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        if (this.interceptors != null)
            this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } catch (BufferExhaustedException e) {
        this.errors.record();
        this.metrics.sensor("buffer-exhausted-records").record();
        if (this.interceptors != null)
            this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (KafkaException e) {
        this.errors.record();
        if (this.interceptors != null)
            this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (Exception e) {
        // we notify interceptor about all exceptions, since onSend is called before anything else in this method
        if (this.interceptors != null)
            this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}

这个方法的脉络虽然比较长,但是脉络还是比较清晰,主要先执行了:

1)waitOnMetadata 应该是等待元数据拉取

2)keySerializer.serialize和valueSerializer.serialize,很明显就是将Record序列化成byte字节数组

3)通过partition进行路由分区,按照一定路由策略选择Topic下的某个分区

4)accumulator.append将消息放入缓冲器中

5)唤醒Sender线程的selector.select()的阻塞,开始处理内存缓冲器中的数据。

用图来表示如下所示:

这两节我们重点分析元数据拉取的这个场景的源码原理。

所以这里我们着重先看下步骤1 ,之后的4步我们之后会分析到的。

waitOnMetadata 如何等待元数据拉取的?

既然send的第一步是执行waitOnMetadata方法,首先看下它的代码:

 /**
     * Wait for cluster metadata including partitions for the given topic to be available.
     * @param topic The topic we want metadata for
     * @param maxWaitMs The maximum time in ms for waiting on the metadata
     * @return The amount of time we waited in ms
     */
    private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException {
        // add topic to metadata topic list if it is not there already.
        if (!this.metadata.containsTopic(topic))
            this.metadata.add(topic);

        if (metadata.fetch().partitionsForTopic(topic) != null)
            return 0;

        long begin = time.milliseconds();
        long remainingWaitMs = maxWaitMs;
        while (metadata.fetch().partitionsForTopic(topic) == null) {
            log.trace("Requesting metadata update for topic {}.", topic);
            int version = metadata.requestUpdate();
            sender.wakeup();
            metadata.awaitUpdate(version, remainingWaitMs);
            long elapsed = time.milliseconds() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            if (metadata.fetch().unauthorizedTopics().contains(topic))
                throw new TopicAuthorizationException(topic);
            remainingWaitMs = maxWaitMs - elapsed;
        }
        return time.milliseconds() - begin;
    }

    /**
     * Get the current cluster info without blocking
     */
    public synchronized Cluster fetch() {
        return this.cluster;
    }

    public synchronized int requestUpdate() {
        this.needUpdate = true;
        return this.version;
    }

    /**
     * Wait for metadata update until the current version is larger than the last version we know of
     */
    public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException {
        if (maxWaitMs <0) {
            throw new IllegalArgumentException("Max time to wait for metadata updates should not be <0 milli seconds");
        }
        long begin = System.currentTimeMillis();
        long remainingWaitMs = maxWaitMs;
        while (this.version <= lastVersion) {
            if (remainingWaitMs != 0)
                wait(remainingWaitMs);
            long elapsed = System.currentTimeMillis() - begin;
            if (elapsed >= maxWaitMs)
                throw new TimeoutException("Failed to update metadata after " + maxWaitMs + " ms.");
            remainingWaitMs = maxWaitMs - elapsed;
        }
    }

这个方法核心就是判断了是否有Cluster元数据信息,如果没有,进行了如下操作:

1)metadata.requestUpdate(); 更新了一个needUpdate标记,这个值会影响之前maybeUpdate的metadataTimeout的计算,可以让metadataTimeout为0

2)sender.wakeup();唤醒之前nioSelector.select()的阻塞,继续执行

3)metadata.awaitUpdate(version, remainingWaitMs); 主要进行了版本比较,如果不是最新版本,调用了Metadata.wait()方法(wait方法是每个Object都会有的方法,一般和notify或者notifyAll组合使用)

整个过程我直接用图给大家表示一下,如下所示:

整个图就是今天我们分析的关键结果了,这里通过两种阻塞和唤醒机制,一个是NIO中Selector的select()和wakeUp(),一个是MetaData对象的wait()和notifyAll()机制。所以这里要结合之前Sender线程的阻塞逻辑一起来理解。

是不是很有意思一种使用,这里没有用任何线程的join、sleep、wait、park、unpark、notify这些方法。

小结

最后我们简单小结下,这里一节我们主要分析了如下Producer的源码原理:

初始化KafkaProducer时并没有去拉取元数据,但是创建了Selector组件,启动了Sender线程,select阻塞等待请求响应。由于还没有发送任何请求,所以初始化时并没有去真正拉取元数据。

真正拉取元数据是在第一次send方法调用时,会唤醒唤醒Selector之前阻塞的select(),进入第二次while循环,从而发送拉取元数据请求,并且通过Obejct.wait的机制等待60s,等到从Broker拉取元数据成功后,才会继续执行真正的生产消息的请求,否则会报拉取元数据超时异常。

这一节我们只是看到了进行了wait如何等待元数据拉取。

而唤醒Selector的select之后应该会进入第二次while循环

那第二次while循环如何发送请求拉取元数据请求,并且在成功后notifyAll()进行唤醒操作的呢?

让我们下一节继续分析,大家敬请期待! 我们下一节见!

本文由博客一文多发平台 OpenWrite 发布!


推荐阅读
  • Java高并发与多线程(二):线程的实现方式详解
    本文将深入探讨Java中线程的三种主要实现方式,包括继承Thread类、实现Runnable接口和实现Callable接口,并分析它们之间的异同及其应用场景。 ... [详细]
  • 本文介绍了 Android 开发中常用的滚动视图组件 ScrollView 和 HorizontalScrollView 的基本用法和注意事项,帮助开发者更好地处理屏幕内容超出显示范围的情况。 ... [详细]
  • iOS snow animation
    CTSnowAnimationView.hCTMyCtripCreatedbyalexon1614.Copyright©2016年ctrip.Allrightsreserved.# ... [详细]
  • 使用Tkinter构建51Ape无损音乐爬虫UI
    本文介绍了如何使用Python的内置模块Tkinter来构建一个简单的用户界面,用于爬取51Ape网站上的无损音乐百度云链接。虽然Tkinter入门相对简单,但在实际开发过程中由于文档不足可能会带来一些不便。 ... [详细]
  • 普通树(每个节点可以有任意数量的子节点)级序遍历 ... [详细]
  • 2022年2月 微信小程序 app.json 配置详解:启用调试模式
    本文将详细介绍如何在微信小程序的 app.json 文件中启用调试模式(debug),并通过实际案例展示其配置方法和应用场景。 ... [详细]
  • Android 构建基础流程详解
    Android 构建基础流程详解 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 本文深入解析了WCF Binding模型中的绑定元素,详细介绍了信道、信道管理器、信道监听器和信道工厂的概念与作用。从对象创建的角度来看,信道管理器负责信道的生成。具体而言,客户端的信道通过信道工厂进行实例化,而服务端则通过信道监听器来接收请求。文章还探讨了这些组件之间的交互机制及其在WCF通信中的重要性。 ... [详细]
  • Python 伦理黑客技术:深入探讨后门攻击(第三部分)
    在《Python 伦理黑客技术:深入探讨后门攻击(第三部分)》中,作者详细分析了后门攻击中的Socket问题。由于TCP协议基于流,难以确定消息批次的结束点,这给后门攻击的实现带来了挑战。为了解决这一问题,文章提出了一系列有效的技术方案,包括使用特定的分隔符和长度前缀,以确保数据包的准确传输和解析。这些方法不仅提高了攻击的隐蔽性和可靠性,还为安全研究人员提供了宝贵的参考。 ... [详细]
  • 基于Dubbo与Zipkin的微服务调用链路监控解决方案
    本文提出了一种基于Dubbo与Zipkin的微服务调用链路监控解决方案。通过抽象配置层,支持HTTP和Kafka两种数据上报方式,实现了灵活且高效的调用链路追踪。该方案不仅提升了系统的可维护性和扩展性,还为故障排查提供了强大的支持。 ... [详细]
  • 微信小程序实现类似微博的无限回复功能,内置云开发数据库支持
    本文详细介绍了如何利用微信小程序实现类似于微博的无限回复功能,并充分利用了微信云开发的数据库支持。文中不仅提供了关键代码片段,还包含了完整的页面代码,方便开发者按需使用。此外,HTML页面中包含了一些示例图片,开发者可以根据个人喜好进行替换。文章还将展示详细的数据库结构设计,帮助读者更好地理解和实现这一功能。 ... [详细]
  • 此更新支持将 Cognito User Pools 作为 API Gateway 授权器的类型 ... [详细]
  • Ubuntu 22.04 安装搜狗输入法详细指南及常见问题解决方案
    本文将详细介绍如何在 Ubuntu 22.04 上安装搜狗输入法,并提供常见问题的解决方法。包括下载安装包、更新源、安装依赖项等步骤。 ... [详细]
  • 事件是程序各部分之间的一种通信方式,也是异步编程的一种实现形式。本文将详细介绍EventTarget接口及其相关方法,以及如何使用监听函数处理事件。 ... [详细]
author-avatar
手机用户2502873667
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有