以blockingUnaryCall为例, 通过
ClientCall call = channel.newCall(method,
callOptions.withExecutor(executor));
得到一个ClientCall, 实现一般为ClientCallImpl
最终通过call.start() 发起调用, 参考
private static void
startCall(ClientCall call,
ClientCall.Listener responseListener, boolean streamingResponse)
{
call.start(responseListener, new Metadata());
if (streamingResponse)
{
call.request(1);
} else {
//
Initially ask for two responses from flow-control so that if a
misbehaving server sends
//
more than one responses, we can catch it and fail it in the
listener.
call.request(2);
}
}
ClientCallImpl.start 里
public void start(final Listener observer, Metadata headers)
{
checkState(stream ==
null, "Already started");
checkNotNull(observer,
"observer");
checkNotNull(headers,
"headers");
if
(context.isCancelled()) {
//
Context is already cancelled so no need to create a real stream,
just notify the observer
// of
cancellation via callback on the executor
stream = NoopClientStream.INSTANCE;
class
ClosedByContext extends ContextRunnable {
ClosedByContext() {
super(context);
}
@Override
public void runInContext() {
closeObserver(observer,
statusFromCancelled(context), new Metadata());
}
}
callExecutor.execute(new ClosedByContext());
return;
}
final String
compressorName = callOptions.getCompressor();
Compressor compressor =
null;
if (compressorName !=
null) {
compressor =
compressorRegistry.lookupCompressor(compressorName);
if
(compressor == null) {
stream = NoopClientStream.INSTANCE;
class ClosedByNotFoundCompressor extends
ContextRunnable {
ClosedByNotFoundCompressor()
{
super(context);
}
@Override
public void runInContext()
{
closeObserver(
observer,
Status.INTERNAL.withDescription(
String.format("Unable to find compressor by name
%s", compressorName)),
new Metadata());
}
}
callExecutor.execute(new
ClosedByNotFoundCompressor());
return;
}
} else {
compressor = Codec.Identity.NONE;
}
prepareHeaders(headers,
decompressorRegistry, compressor, statsTraceCtx);
Deadline
effectiveDeadline = effectiveDeadline();
boolean deadlineExceeded
= effectiveDeadline != null &&
effectiveDeadline.isExpired();
if (!deadlineExceeded)
{
updateTimeoutHeaders(effectiveDeadline,
callOptions.getDeadline(),
context.getDeadline(),
headers);
ClientTransport transport =
clientTransportProvider.get(callOptions);
Context origContext = context.attach();
try
{
stream = transport.newStream(method, headers,
callOptions, statsTraceCtx);
}
finally {
context.detach(origContext);
}
} else {
stream = new FailingClientStream(DEADLINE_EXCEEDED);
}
if
(callOptions.getAuthority() != null) {
stream.setAuthority(callOptions.getAuthority());
}
stream.setCompressor(compressor);
stream.start(new
ClientStreamListenerImpl(observer));
// Delay any sources of
cancellation after start(), because most of the transports are
broken if
// they receive cancel
before start. Issue #1343 has more details
// Propagate later
Context cancellation to the remote side.
context.addListener(this, directExecutor());
if (effectiveDeadline !=
null
// If the context has the effective deadline, we
don't need to schedule an extra task.
&& context.getDeadline() !=
effectiveDeadline) {
deadlineCancellationFuture =
startDeadlineTimer(effectiveDeadline);
}
if
(cancelListenersShouldBeRemoved) {
//
Race detected! ClientStreamListener.closed may have been called
before
//
deadlineCancellationFuture was set / context listener added,
thereby preventing the future
//
and listener from being cancelled. Go ahead and cancel again, just
to be sure it
//
was cancelled.
removeContextListenerAndCancelDeadlineFuture();
}
}
如果正常流程(没有发生错误), 会生成一个stream 实例 ,通过 stream =
transport.newStream(method, headers, callOptions,
statsTraceCtx);
并设置超时定时器,
deadlineCancellationFuture =
startDeadlineTimer(effectiveDeadline);
一旦超时就 调用
stream.cancel(DEADLINE_EXCEEDED);
private class DeadlineTimer implements
Runnable {
@Override
public void run()
{
//
DelayedStream.cancel() is safe to call from a thread that is
different from where the
//
stream is created.
stream.cancel(DEADLINE_EXCEEDED);
}
}
stream 实例 会是OkHttpClientStream 或是
NettyClientStream,
会是OkHttpClientStream 的cancel 实现
@Override
public final void
cancel(Status reason) {
checkArgument(!reason.isOk(), "Should not cancel
with OK status");
cancelled = true;
sendCancel(reason);
dispose();
}
@Override
protected void sendCancel(Status reason) {
synchronized (lock) {
if (cancelSent) {
return;
}
cancelSent = true;
if (pendingData != null)
{
// stream
is pending.
transport.removePendingStream(this);
// release
holding data, so they can be GCed or returned to pool
earlier.
requestHeaders = null;
for
(PendingData data : pendingData) {
data.buffer.clear();
}
pendingData
= null;
transportReportStatus(reason, true, new
Metadata());
} else {
// If
pendingData is null, start must have already been called, which
means synStream has
// been
called as well.
transport.finishStream(id(), reason,
ErrorCode.CANCEL);
}
}
}
如果有pendingData,remove,然后report
status,
否则transport.finishStream(id(), reason,
ErrorCode.CANCEL); 发送rst stream
void finishStream(int streamId, @Nullable
Status status, @Nullable ErrorCode errorCode) {
synchronized (lock) {
OkHttpClientStream stream =
streams.remove(streamId);
if (stream != null) {
if (errorCode != null)
{
frameWriter.rstStream(streamId, ErrorCode.CANCEL);
}
if (status != null) {
boolean
isCancelled = (status.getCode() == Code.CANCELLED
|| status.getCode() ==
Code.DEADLINE_EXCEEDED);
stream.transportReportStatus(status, isCancelled, new
Metadata());
}
if (!startPendingStreams())
{
stopIfNecessary();
maybeClearInUse();
}
}
}
}
如果是NettyClientStream, cancel
入队一个CancelClientStreamCommand
@Override
public void cancel(Status
status) {
writeQueue.enqueue(new
CancelClientStreamCommand(transportState(), status), true);
}
而writeQueue 会把 通过
channel的eventloop 运行later 调用 flush 把cmd 写出去
private final Runnable later = new
Runnable() {
@Override
public void run() {
flush();
}
};
private void flush() {
try
{
QueuedCommand cmd;
int i = 0;
boolean flushedOnce = false;
while ((cmd = queue.poll()) != null) {
channel.write(cmd,
cmd.promise());
if (++i == DEQUE_CHUNK_SIZE)
{
i =
0;
// Flush
each chunk so we are releasing buffers periodically. In theory this
loop
// might
never end as new events are continuously added to the queue, if we
never
// flushed
in that case we would be guaranteed to OOM.
channel.flush();
flushedOnce
= true;
}
}
// Must flush at least once, even if there were
no writes.
if (i != 0 || !flushedOnce) {
channel.flush();
}
}
finally {
// Mark the write as done, if the queue is
non-empty after marking trigger a new write.
scheduled.set(false);
if (!queue.isEmpty()) {
scheduleFlush();
}
}
}
write的实现是
@Override
public void
write(ChannelHandlerContext ctx, Object msg, ChannelPromise
promise)
throws
Exception {
if
(msg instanceof CreateStreamCommand) {
createStream((CreateStreamCommand) msg,
promise);
} else
if (msg instanceof SendGrpcFrameCommand) {
sendGrpcFrame(ctx, (SendGrpcFrameCommand) msg,
promise);
} else
if (msg instanceof CancelClientStreamCommand) {
cancelStream(ctx, (CancelClientStreamCommand)
msg, promise);
} else
if (msg instanceof SendPingCommand) {
sendPingFrame(ctx, (SendPingCommand) msg,
promise);
} else
if (msg instanceof GracefulCloseCommand) {
gracefulClose(ctx, (GracefulCloseCommand) msg,
promise);
} else
if (msg instanceof ForcefulCloseCommand) {
forcefulClose(ctx, (ForcefulCloseCommand) msg,
promise);
} else
if (msg == NOOP_MESSAGE) {
ctx.write(Unpooled.EMPTY_BUFFER, promise);
} else
{
throw new AssertionError("Write called for
unexpected type: " + msg.getClass().getName());
}
}
如果是CancelClientStreamCommand
则cancelStream(ctx, (CancelClientStreamCommand)
msg, promise);
private void cancelStream(ChannelHandlerContext ctx,
CancelClientStreamCommand cmd,
ChannelPromise promise) {
NettyClientStream.TransportState stream = cmd.stream();
stream.transportReportStatus(cmd.reason(), true, new
Metadata());
encoder().writeRstStream(ctx, stream.id(),
Http2Error.CANCEL.code(), promise);
}
最终也是写出一个rst stream