Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Requests with Grpc Reactive Client are completed at the end of the flow #222

Open
jorgerod opened this issue Sep 1, 2020 · 4 comments

Comments

@jorgerod
Copy link

jorgerod commented Sep 1, 2020

Describe the Bug

Requests with Reactive Grpc Client (integration with reactor) are completed at the end of the flow and not when the response to that request is obtained.

This treatment causes the incorrect behaviour of the instrumentation with other components (metrics or traces).

 grpcClient.findById(productId, storeId) //<1>
                .map(product -> {    //<2>
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return product.getName();
                })

In this example, as the call to the onComplete function is made to the end of flow.
Brave tracing instrumentation or metrics instrumentation are incorrect because the grpc request time is the sum of the grpc client operation <1> plus the map operation <2>

zipkin

Expected Behaviour

Method OnComplete should be called when the grpcClient request ends and not when the flow ends.

How do you see it?

@rmichela
Copy link
Collaborator

rmichela commented Sep 1, 2020

Thanks for pointing this out @jorgerod. It's a great example of some subtle and unintuitive interaction between gRPC and Rx.

In gRPC, unary blocking RPC is a lie. The only part about gRPC that understands unary blocking RPC is the topmost layer of generated client stub. Below that, everything in gRPC is streaming and asynchronous. A unary blocking request is really just a request and response stream of exactly one item, where the client blocks until exactly one response message or error is received. Similarly, a Mono<T> has the same reactive semantics as Flowable<T> with the exception that it can only produce one object.

In Reactor and streaming gRPC, message production and stream termination are separate actions, with separate timelines. By design, reactive operators like map() and the gRPC client and server runtimes make no assumptions about the number of messages that may be received, or the timing those messages may follow.

With that background out of the way, here's what's happening in your example, and why it is the "correct" behavior from a reactive perspective. I assume grpcClient.findById(productId, storeId) is a wrapper around a Mono.just() operation, so the expanded example looks like this.

Mono.just(new FindRequest(productId, storeId)) //<1>
    .transform(productStub::findById) //<2>
    .map(product -> {    //<3>
                    try {
                        Thread.sleep(5000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return product.getName();
                })
    .subscribe(System.out::println); //<4>
  1. When the chain is subscribed to, a subscribe() call propagates backwards from <4> to <1>. This triggers the gRPC client stub in <2> to establish a connection to your server, but no request object is sent. Remember, these are streams, even if the service appears unary. This is where your trace span begins.
  2. Next, the subscriber sends a request(1) message up the chain from the subscriber <4> to <1>, which signals the mono to emit its single message.
  3. <1> calls onNext() on <2>, passing in the FindRequest.
  4. The reactive stub in <2> "transforms" the FindRequest to a FindResponse with two reactive streams. The first accepts messages from its upstream <1> and sends them to the server as they are received. The second receives messages from the server as they are received and passes them downstream to its subscriber <3>. The two streams are bound together so that cancellation, completion, and error propagation work as expected.
  5. When the response stream in <2> finally produces a message, <3> does its map operation before handing the result off to <4>. At this point, the message is still in flight. It hasn't been consumed yet by a subscriber.
  6. After the sleep, the subscriber <4> receives the message and consumes it, performing its println action.
  7. Since <1> is a Mono, it followed up its call to onNext() with an immediate call to onComplete(), but reactive messages are handled in order, so the complete signal is not processed until the entire first message is done processing.
  8. The complete signal passes through <2>, where the reactive gRPC client stub ends the request. Because gRPC is all streaming internally, the client stub doesn't actually know how many messages it needs to send, so it cannot preemptively close the connection after the first message. This is where your trace span ends.
  9. On complete continues to propagate through <3> and <4>, where it finally terminates the subscriber.

TL;DR, the behavior you are seeing is the expected behavior for a reactive system because reactive doesn't follow traditional imperative method call semantics.

@jorgerod
Copy link
Author

jorgerod commented Sep 3, 2020

Hello

First of all, thank you very much for your answer.

I have continued to review the issue in more depth and have tested the behavior in the following cases:

  • Streaming
  • Futures
  • Reactive

Grpc client Streaming:

With the example below, I tried streaming.

    @Autowired
    private GreeterGrpc.GreeterStub greeterStreamingService;
...
        greeterStreamingService.sayHelloRespStream(request, new StreamObserver<>() {
            @Override
            public void onNext(HelloResponse value) {
                Span mySpan = tracer.buildSpan("my-span").start();
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    mySpan.finish();
                }
            }

            @Override
            public void onError(Throwable t) {
            }

            @Override
            public void onCompleted() {
                LOG.info("Finish");
            }
        });

I agree with you, the span should be closed when the onComplete event is emitted because you don't know the number of messages.

Traces created are the following:
grpc_streaming

Grpc client with future stub

    @Autowired
    private GreeterGrpc.GreeterFutureStub greeterFutureService;

    ....

   Futures.addCallback(greeterFutureService.sayHello(request), new FutureCallback<>() {

            @Override
            public void onSuccess(@Nullable HelloResponse helloResponse) {
                    Span mySpan = tracer.buildSpan("my-span").start(); //<1>
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        mySpan.finish();
                    }
            }

            @Override
            public void onFailure(Throwable throwable) {
            }
        }, MoreExecutors.directExecutor());
   

As you can see in the following image, grpc-client span is closed when the response is received and in the callback it opens my-span <1>. I think it's the right behavior
grpc_future

Grpc client reactive

Finally, the reactive flavour

    @Autowired
    private ReactorGreeterGrpc.ReactorGreeterStub greeterReactiveService;

    ...

   return greeterReactiveService.sayHello(request)
                .map(helloResponse -> {
                    Span mySpan = tracer.buildSpan("my-span").start();
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        mySpan.finish();
                    }
                    return helloResponse.getMessage();
                });

Traces:
image

I think the behavior is incorrect and should be similar to Future Stub.
In reactive, when execute an unary Call, the client stub must assume that it will be a single data and therefore close the connection (like FutureStub).

@rmichela What do you think?

Thank you very much

@jorgerod
Copy link
Author

Hi

@rmichela What do you think?

This topic is very important to us.

Trank you

@rmichela
Copy link
Collaborator

I'm really on the fence about this issue because it changes the internal behavior of reactive-grpc. Do you have an example for how this would look in code?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants