热门标签 | HotTags
当前位置:  开发笔记 > 编程语言 > 正文

Kafka–kafkaconsumer

ConsumerRecordsrecordsconsumer.poll(100); ***Fetchdataforthetopicsorpartiti

ConsumerRecords records = consumer.poll(100);

 

/**
* Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have
* subscribed to any topics or partitions before polling for data.
*


* On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last
* consumed offset can be manually set through {@link #seek(TopicPartition, long)} or automatically set as the last committed
* offset for the subscribed list of partitions
*
*
*
@param timeout The time, in milliseconds, spent waiting in poll if data is not available in the buffer.
* If 0, returns immediately with any records that are available currently in the buffer, else returns empty.
* Must not be negative.
*
@return map of topic to records since the last fetch for the subscribed list of topics and partitions
*
*
@throws org.apache.kafka.clients.consumer.InvalidOffsetException if the offset for a partition or set of
* partitions is undefined or out of range and no offset reset policy has been configured
*
@throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
*
@throws org.apache.kafka.common.errors.InterruptException if the calling thread is interrupted before or while
* this function is called
*
@throws org.apache.kafka.common.errors.AuthorizationException if caller lacks Read access to any of the subscribed
* topics or to the configured groupId
*
@throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. invalid groupId or
* session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
*
@throws java.lang.IllegalArgumentException if the timeout value is negative
*
@throws java.lang.IllegalStateException if the consumer is not subscribed to any topics or manually assigned any
* partitions to consume from
*/
@Override
public ConsumerRecords poll(long timeout) {
try {
if (timeout <0)
throw new IllegalArgumentException("Timeout must not be negative");
if (this.subscriptions.hasNoSubscriptionOrUserAssignment())
throw new IllegalStateException("Consumer is not subscribed to any topics or assigned any partitions");
// poll for new data until the timeout expires
long start = time.milliseconds();
long remaining = timeout;
do {
Map
>> records = pollOnce(remaining); //pollOnce
if (!records.isEmpty()) {
// before returning the fetched records, we can send off the next round of fetches
// and avoid block waiting for their responses to enable pipelining while the user
// is handling the fetched records.
//
// NOTE: since the consumed position has already been updated, we must not allow
// wakeups or any other errors to be triggered prior to returning the fetched records.
if (fetcher.sendFetches() > 0) { //为了省时间,预先放fetch一次
client.pollNoWakeup();
}
if (this.interceptors == null)
return new ConsumerRecords<>(records);
else
return this.interceptors.onConsume(new ConsumerRecords<>(records)); //如果有interceptors,先处理一下
}
long elapsed = time.milliseconds() - start;
remaining
= timeout - elapsed; //在超时内,反复尝试poll
} while (remaining > 0);
return ConsumerRecords.empty(); //如果数据不ready,返回empty
} finally {
release();
}
}

 

pollOnce

/**
* Do one round of polling. In addition to checking for new data, this does any needed offset commits
* (if auto-commit is enabled), and offset resets (if an offset reset policy is defined).
*
@param timeout The maximum time to block in the underlying call to {@link ConsumerNetworkClient#poll(long)}.
*
@return The fetched records (may be empty)
*/
private Map>> pollOnce(long timeout) {
coordinator.poll(time.milliseconds());
//和ConsuemrCoordinator之间的心跳
// fetch positions if we have partitions we're subscribed to that we
// don't know the offset for
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(
this.subscriptions.missingFetchPositions()); //同步offset
// if data is available already, return it immediately
Map>> records = fetcher.fetchedRecords(); //已经有fetched
if (!records.isEmpty())
return records; //直接返回
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches(); //没有现成的数据,发送fetch命令
long now = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(now), timeout);
client.poll(pollTimeout, now,
new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});
// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();
return fetcher.fetchedRecords();
}

 

看下fetcher

public Fetcher(ConsumerNetworkClient client,
int minBytes,
int maxBytes,
int maxWaitMs,
int fetchSize,
int maxPollRecords,
boolean checkCrcs,
Deserializer
keyDeserializer,
Deserializer
valueDeserializer,
Metadata metadata,
SubscriptionState subscriptions,
Metrics metrics,
String metricGrpPrefix,
Time time,
long retryBackoffMs) {

创建时,

this.fetcher = new Fetcher<>(this.client,
config.getInt(ConsumerConfig.FETCH_MIN_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_BYTES_CONFIG),
config.getInt(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG),
config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
this.keyDeserializer,
this.valueDeserializer,
this.metadata,
this.subscriptions,
metrics,
metricGrpPrefix,
this.time,
this.retryBackoffMs);

可以看出对应的配置

 

fetcher.fetchedRecords

/**
* Return the fetched records, empty the record buffer and update the consumed position.
*
* NOTE: returning empty records guarantees the consumed position are NOT updated.
*
*
@return The fetched records per partition
*
@throws OffsetOutOfRangeException If there is OffsetOutOfRange error in fetchResponse and
* the defaultResetPolicy is NONE
*/
public Map>> fetchedRecords() {
Map
>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords; //最大poll records数
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isDrained()) { //如果nextInLineRecords是空的,没有records的
CompletedFetch completedFetch = completedFetches.poll(); //从completedFetches,fetched队列中取一个fetch
if (completedFetch == null)
break;
nextInLineRecords
= parseFetchedData(completedFetch); //parse Fetch到nextInLineRecords中
} else {
TopicPartition partition
= nextInLineRecords.partition;
List
> records = drainRecords(nextInLineRecords, recordsRemaining); //从nextInLineRecords取recordsRemaining个records
if (!records.isEmpty()) {
List
> currentRecords = drained.get(partition); //取出partition对应的record list
if (currentRecords == null) {
drained.put(partition, records);
//放入record list
} else {
// this case shouldn't usually happen because we only send one fetch at a time per partition,
// but it might conceivably happen in some rare cases (such as partition leader changes).
// we have to copy to a new list because the old one may be immutable
List> newRecords = new ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
drained.put(partition, newRecords);
}
recordsRemaining
-= records.size();
}
}
}
return drained; //返回
}

可以看到fetchedRecords只是从已经完成的fetch中读取数据

 

fetcher.sendFetches

先看

createFetchRequests

/**
* Create fetch requests for all nodes for which we have assigned partitions
* that have no existing requests in flight.
*/
private Map createFetchRequests() {
// create the fetch info
Cluster cluster = metadata.fetch();
Map
> fetchable = new LinkedHashMap<>();
for (TopicPartition partition : fetchablePartitions()) {
Node node
= cluster.leaderFor(partition); //找到partition的leader所在node
if (node == null) {
metadata.requestUpdate();
}
else if (this.client.pendingRequestCount(node) == 0) { //如果没有正在进行的fetch,一个partition同时只能有一个fetch请求
// if there is a leader and no in-flight requests, issue a new fetch
LinkedHashMap fetch = fetchable.get(node);
if (fetch == null) {
fetch
= new LinkedHashMap<>();
fetchable.put(node, fetch);
}
long position = this.subscriptions.position(partition);
fetch.put(partition,
new FetchRequest.PartitionData(position, this.fetchSize)); //创建FetchRequest,position,从哪儿开始读,fetchSize,读多少
log.trace("Added fetch request for partition {} at offset {}", partition, position);
}
else {
log.trace(
"Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
}
}
// create the fetches
Map requests = new HashMap<>();
for (Map.Entry> entry : fetchable.entrySet()) {
Node node
= entry.getKey();
FetchRequest fetch
= new FetchRequest(this.maxWaitMs, this.minBytes, this.maxBytes, entry.getValue()); //封装成FetchRequest
requests.put(node, fetch);
}
return requests;
}

 

/**
* Set-up a fetch request for any node that we have assigned partitions for which doesn't already have
* an in-flight fetch or pending fetch data.
*
@return number of fetches sent
*/
public int sendFetches() {
Map
fetchRequestMap = createFetchRequests(); //创建Fetch Request
for (Map.Entry fetchEntry : fetchRequestMap.entrySet()) {
final FetchRequest request = fetchEntry.getValue();
final Node fetchTarget = fetchEntry.getKey();
client.send(fetchTarget, ApiKeys.FETCH, request)
//send request
.addListener(new RequestFutureListener() {
@Override
public void onSuccess(ClientResponse resp) { //如果成功
FetchResponse respOnse= (FetchResponse) resp.responseBody();
Set
partitiOns= new HashSet<>(response.responseData().keySet());
for (Map.Entry entry : response.responseData().entrySet()) {
TopicPartition partition
= entry.getKey();
long fetchOffset = request.fetchData().get(partition).offset;
FetchResponse.PartitionData fetchData
= entry.getValue();
completedFetches.add(
new CompletedFetch(partition, fetchOffset, fetchData, metricAggregator)); //把fetchData封装成CompletedFetch,加入completedFetcheslist
}
sensors.fetchLatency.record(resp.requestLatencyMs());
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
}
@Override
public void onFailure(RuntimeException e) {
log.debug(
"Fetch request to {} failed", fetchTarget, e);
}
});
}
return fetchRequestMap.size();
}

 

client.send

ConsumerNetworkClient

/**
* Send a new request. Note that the request is not actually transmitted on the
* network until one of the {
@link #poll(long)} variants is invoked. At this
* point the request will either be transmitted successfully or will fail.
* Use the returned future to obtain the result of the send. Note that there is no
* need to check for disconnects explicitly on the {
@link ClientResponse} object;
* instead, the future will be failed with a {
@link DisconnectException}.
*
@param node The destination of the request
*
@param api The Kafka API call
*
@param request The request payload
*
@return A future which indicates the result of the send.
*/
public RequestFuture send(Node node,
ApiKeys api,
AbstractRequest request) {
return send(node, api, ProtoUtils.latestVersion(api.id), request);
}
private RequestFuture send(Node node,
ApiKeys api,
short version,
AbstractRequest request) {
long now = time.milliseconds();
RequestFutureCompletionHandler completionHandler
= new RequestFutureCompletionHandler();
RequestHeader header
= client.nextRequestHeader(api, version);
ClientRequest clientRequest
= new ClientRequest(node.idString(), now, true, header, request, completionHandler); //封装成client request
put(node, clientRequest); //没有真正发出,而是放入list
// wakeup the client in case it is blocking in poll so that we can send the queued request
client.wakeup();
return completionHandler.future;
}
private void put(Node node, ClientRequest request) {
synchronized (this) {
List
nodeUnsent = unsent.get(node);
if (nodeUnsent == null) {
nodeUnsent
= new ArrayList<>();
unsent.put(node, nodeUnsent);
}
nodeUnsent.add(request);
}
}

 

NetworkClient.wakeup

/**
* Interrupt the client if it is blocked waiting on I/O.
*/
@Override
public void wakeup() {
this.selector.wakeup();
}

wakeup就是让client从selector的block等待中,被唤醒,可以处理其他的请求

 

这里说了,只有当poll被调用的时候,才会真正的将request发送出去,poll是在哪儿被调用的?

 

在上面pollOnce的时候,有这样的逻辑

client.poll(pollTimeout, now, new PollCondition() {
@Override
public boolean shouldBlock() {
// since a fetch might be completed by the background thread, we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
}
});

意思是调用poll的超时是pollTimeout,


PollCondition.shouldBlock,意思是何时我们需要block等待,当hasCompletedFetches时,是不需要等数据的,所以只有当没有现成的数据的时候,才需要等

 

ConsumerNetworkClient.poll

/**
* Poll for any network IO.
*
@param timeout timeout in milliseconds
*
@param now current time in milliseconds
*/
public void poll(long timeout, long now, PollCondition pollCondition) {
// there may be handlers which need to be invoked if we woke up the previous call to poll
firePendingCompletedRequests();
synchronized (this) {
// send all the requests we can send now
trySend(now);
// check whether the poll is still needed by the caller. Note that if the expected completion
// condition becomes satisfied after the call to shouldBlock() (because of a fired completion
// handler), the client will be woken up.
if (pollCOndition== null || pollCondition.shouldBlock()) {
// if there are no requests in flight, do not block longer than the retry backoff
if (client.inFlightRequestCount() == 0)
timeout
= Math.min(timeout, retryBackoffMs);
client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now
= time.milliseconds();
}
else {
client.poll(
0, now);
}
// handle any disconnects by failing the active requests. note that disconnects must
// be checked immediately following poll since any subsequent call to client.ready()
// will reset the disconnect status
checkDisconnects(now);
// trigger wakeups after checking for disconnects so that the callbacks will be ready
// to be fired on the next call to poll()
maybeTriggerWakeup();

// throw InterruptException if this thread is interrupted
maybeThrowInterruptException();
// try again to send requests since buffer space may have been
// cleared or a connect finished in the poll
trySend(now);
// fail requests that couldn't be sent if they have expired
failExpiredRequests(now);
}
// called without the lock to avoid deadlock potential if handlers need to acquire locks
firePendingCompletedRequests();
}

 

trySend

private boolean trySend(long now) {
// send any requests that can be sent now
boolean requestsSent = false;
for (Map.Entry> requestEntry: unsent.entrySet()) { // 前面send的时候时候,request放入unsent
Node node
= requestEntry.getKey();
Iterator
iterator = requestEntry.getValue().iterator();
while (iterator.hasNext()) {
ClientRequest request
= iterator.next();
if (client.ready(node, now)) {// Begin connecting to the given node, return true if we are already connected and ready to send to that node
client.send(request, now); // 调用send,发送request
iterator.remove();
requestsSent
= true;
}
}
}
return requestsSent;
}

 

NetworkClient.send

/**
* Queue up the given request for sending. Requests can only be sent out to ready nodes.
*
@param request The request
*
@param now The current timestamp
*/
@Override
public void send(ClientRequest request, long now) {
doSend(request,
false, now);
}

 

private void doSend(ClientRequest request, boolean isInternalRequest, long now) {
String nodeId
= request.destination();
if (request.header().apiKey() == ApiKeys.API_VERSIONS.id) {
if (!canSendApiVersionsRequest(nodeId))
throw new IllegalStateException("Attempt to send API Versions request to node " + nodeId + " which is not ready.");
}
else if (!canSendRequest(nodeId))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
Send send
= request.body().toSend(nodeId, request.header());
InFlightRequest inFlightRequest
= new InFlightRequest(
request.header(),
request.createdTimeMs(),
request.destination(),
request.callback(),
request.expectResponse(),
isInternalRequest,
send,
now);
this.inFlightRequests.add(inFlightRequest); // 加入inFlightRequest
selector.send(inFlightRequest.send);
}

最终用selector.send来发送Send

/**
* Queue the given request for sending in the subsequent {
@link #poll(long)} calls
*
@param send The request to send
*/
public void send(Send send) {
String connectionId
= send.destination();
if (closingChannels.containsKey(connectionId))
this.failedSends.add(connectionId);
else {
KafkaChannel channel
= channelOrFail(connectionId, false); // 从Map channels中get该connect对应的channel

try {
channel.setSend(send);
}
catch (CancelledKeyException e) {
this.failedSends.add(connectionId);
close(channel,
false);
}
}
}

KafkaChannel.setSend

public void setSend(Send send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}

可以看到select.send也只是把send放到channel中,

真正发送要等到调用NetworkClient.poll

在ConsumerNetworkClient.poll中,

if (pollCOndition== null || pollCondition.shouldBlock()) {
// if there are no requests in flight, do not block longer than the retry backoff
if (client.inFlightRequestCount() == 0)
timeout
= Math.min(timeout, retryBackoffMs);
client.poll(Math.min(MAX_POLL_TIMEOUT_MS, timeout), now);
now
= time.milliseconds();
}
else {
client.poll(
0, now);
}

如果需要block或没有pollCondition,选择block timeout来等待数据

否则调用client.poll(0, now),意思是没有数据即刻返回

NetworkClient.poll

@Override
public void poll(long timeout) throws IOException {
if (timeout <0)
throw new IllegalArgumentException("timeout should be >= 0");
clear();
if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
timeout
= 0;
/* check ready keys */
long startSelect = time.nanoseconds();
int readyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
pollSelectionKeys(
this.nioSelector.selectedKeys(), false, endSelect);
pollSelectionKeys(immediatelyConnectedKeys,
true, endSelect);
}
addToCompletedReceives();
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
// we use the time at the end of select to ensure that we don't close any connections that
// have just been processed in pollSelectionKeys
maybeCloseOldestConnection(endSelect);
}

 

select

private int select(long ms) throws IOException {
if (ms <0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (ms == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(ms);
}

 

pollSelectionKeys

private void pollSelectionKeys(Iterable selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
Iterator
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key
= iterator.next();
iterator.remove();
KafkaChannel channel
= channel(key);
try {
/* complete any connections that have finished their handshake (either normally or immediately) */
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(channel.id());
this.sensors.connectionCreated.record();
SocketChannel socketChannel
= (SocketChannel) key.channel();
log.debug(
"Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
channel.id());
}
else
continue;
}
/* if channel is not ready finish prepare */
if (channel.isConnected() && !channel.ready())
channel.prepare();
/* if channel is ready read from any connections that have readable data */
if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
NetworkReceive networkReceive;
while ((networkReceive = channel.read()) != null)
addToStagedReceives(channel, networkReceive);
}
/* if channel is ready write to any sockets that have space in their buffer and for which we have data */
if (channel.ready() && key.isWritable()) {
Send send
= channel.write(); //真正写出数据
if (send != null) {
this.completedSends.add(send);
}
}
/* cancel any defunct sockets */
if (!key.isValid())
close(channel,
true);
}
catch (Exception e) {
}
}
}

 

直接用NIO写应用,是需要勇气的



推荐阅读
  • 深入解析Android 4.4中的Fence机制及其应用
    在Android 4.4中,Fence机制是处理缓冲区交换和同步问题的关键技术。该机制广泛应用于生产者-消费者模式中,确保了不同组件之间高效、安全的数据传输。通过深入解析Fence机制的工作原理和应用场景,本文探讨了其在系统性能优化和资源管理中的重要作用。 ... [详细]
  • 本文探讨了Go语言中iota关键字的具体含义及其在常量声明中的应用。 ... [详细]
  • 目录预备知识导包构建数据集神经网络结构训练测试精度可视化计算模型精度损失可视化输出网络结构信息训练神经网络定义参数载入数据载入神经网络结构、损失及优化训练及测试损失、精度可视化qu ... [详细]
  • Hadoop的文件操作位于包org.apache.hadoop.fs里面,能够进行新建、删除、修改等操作。比较重要的几个类:(1)Configurati ... [详细]
  • poj 3352 Road Construction ... [详细]
  • 开机自启动的几种方式
    0x01快速自启动目录快速启动目录自启动方式源于Windows中的一个目录,这个目录一般叫启动或者Startup。位于该目录下的PE文件会在开机后进行自启动 ... [详细]
  • 本文总结了一些开发中常见的问题及其解决方案,包括特性过滤器的使用、NuGet程序集版本冲突、线程存储、溢出检查、ThreadPool的最大线程数设置、Redis使用中的问题以及Task.Result和Task.GetAwaiter().GetResult()的区别。 ... [详细]
  • PTArchiver工作原理详解与应用分析
    PTArchiver工作原理及其应用分析本文详细解析了PTArchiver的工作机制,探讨了其在数据归档和管理中的应用。PTArchiver通过高效的压缩算法和灵活的存储策略,实现了对大规模数据的高效管理和长期保存。文章还介绍了其在企业级数据备份、历史数据迁移等场景中的实际应用案例,为用户提供了实用的操作建议和技术支持。 ... [详细]
  • Java Socket 关键参数详解与优化建议
    Java Socket 的 API 虽然被广泛使用,但其关键参数的用途却鲜为人知。本文详细解析了 Java Socket 中的重要参数,如 backlog 参数,它用于控制服务器等待连接请求的队列长度。此外,还探讨了其他参数如 SO_TIMEOUT、SO_REUSEADDR 等的配置方法及其对性能的影响,并提供了优化建议,帮助开发者提升网络通信的稳定性和效率。 ... [详细]
  • 本文介绍了如何在 ASP.NET 中设置 Excel 单元格格式为文本,获取多个单元格区域并作为表头,以及进行单元格合并、赋值、格式设置等操作。 ... [详细]
  • 本题主要考察二维数组的遍历和重塑。通过将二维数组降为一维,再根据新的行数和列数重新构建矩阵。 ... [详细]
  • 本文介绍如何使用OpenCV和线性支持向量机(SVM)模型来开发一个简单的人脸识别系统,特别关注在只有一个用户数据集时的处理方法。 ... [详细]
  • IOS Run loop详解
    为什么80%的码农都做不了架构师?转自http:blog.csdn.netztp800201articledetails9240913感谢作者分享Objecti ... [详细]
  • 利用python爬取豆瓣电影Top250的相关信息,包括电影详情链接,图片链接,影片中文名,影片外国名,评分,评价数,概况,导演,主演,年份,地区,类别这12项内容,然后将爬取的信息写入Exce ... [详细]
  • Ihavetwomethodsofgeneratingmdistinctrandomnumbersintherange[0..n-1]我有两种方法在范围[0.n-1]中生 ... [详细]
author-avatar
重报礼坊官肀网
这个家伙很懒,什么也没留下!
PHP1.CN | 中国最专业的PHP中文社区 | DevBox开发工具箱 | json解析格式化 |PHP资讯 | PHP教程 | 数据库技术 | 服务器技术 | 前端开发技术 | PHP框架 | 开发工具 | 在线工具
Copyright © 1998 - 2020 PHP1.CN. All Rights Reserved | 京公网安备 11010802041100号 | 京ICP备19059560号-4 | PHP1.CN 第一PHP社区 版权所有