Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[reactor-grpc] Reactor context does not propagate if stub is used #195

Open
krakowski opened this issue Oct 11, 2019 · 18 comments · May be fixed by #326
Open

[reactor-grpc] Reactor context does not propagate if stub is used #195

krakowski opened this issue Oct 11, 2019 · 18 comments · May be fixed by #326

Comments

@krakowski
Copy link
Contributor

Description

Whenever a stub is used in a chain together with Reactor's Context API the provided context does not propagate correctly. My guess is, that using subscribe() within ClientCalls is preventing the context from propagating up the chain.

Example

package com.salesforce.reactorgrpc;

import com.salesforce.grpc.testing.contrib.NettyGrpcServerRule;
import org.junit.Rule;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
import reactor.util.context.Context;

import java.util.stream.Collectors;

import static org.assertj.core.api.Assertions.assertThat;

public class ReactorContextPropagationTest {

    @Rule
    public NettyGrpcServerRule serverRule = new NettyGrpcServerRule();

    private static class SimpleGreeter extends ReactorGreeterGrpc.GreeterImplBase {
        @Override
        public Mono<HelloResponse> sayHello(Mono<HelloRequest> request) {
            return request.map(HelloRequest::getName)
                    .map(name -> HelloResponse.newBuilder().setMessage("Hello " + name).build());
        }

        @Override
        public Mono<HelloResponse> sayHelloReqStream(Flux<HelloRequest> request) {
            return request.map(HelloRequest::getName)
                    .collect(Collectors.joining("and"))
                    .map(names -> HelloResponse.newBuilder().setMessage("Hello " + names).build());
        }
    }

    @Test
    public void contextDoesNotPropagate() {
        serverRule.getServiceRegistry().addService(new SimpleGreeter());

        ReactorGreeterGrpc.ReactorGreeterStub stub = ReactorGreeterGrpc.newReactorStub(serverRule.getChannel());
        Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactor").build());

        Mono<HelloResponse> resp = req
                // This assertion will fail 
                .doOnEach(signal -> assertThat(signal.getContext().getOrEmpty("key")).isNotEmpty())
                .compose(stub::sayHello)
                // This assertion won't fail
                .doOnEach(signal -> assertThat(signal.getContext().getOrEmpty("key")).isNotEmpty())
                .subscriberContext(Context.of("key", "dummy"));

        StepVerifier.create(resp)
                .expectNextMatches(response -> response.getMessage().equals("Hello reactor"))
                .verifyComplete();
    }
}
@rmichela rmichela added the bug label Oct 11, 2019
@krakowski
Copy link
Contributor Author

krakowski commented Oct 11, 2019

I could fix this in the one-to-one case by using flatMap instead of subscribe, though I'm not sure if this is correct since I removed the SubscribeOnlyOnceLifter.

public static <TRequest, TResponse> Mono<TResponse> oneToOne(
        Mono<TRequest> monoSource,
        BiConsumer<TRequest, StreamObserver<TResponse>> delegate) {
    try {
        return monoSource.flatMap(request -> Mono.create(callback -> {
            delegate.accept(request, new StreamObserver<TResponse>() {
                @Override
                public void onNext(TResponse tResponse) {
                    callback.success(tResponse);
                }

                @Override
                public void onError(Throwable throwable) {
                    callback.error(throwable);
                }

                @Override
                public void onCompleted() {
                    // do nothing
                }
            });
        }));
    } catch (Throwable throwable) {
        return Mono.error(throwable);
    }
}

@krakowski
Copy link
Contributor Author

I implemented a failing test for this issue.

  • oneToMany - ✔️
  • oneToOne - ❌
  • manyToOne - ❌
  • manyToMany - ❌

@dyangelo-grullon
Copy link

dyangelo-grullon commented May 22, 2020

@rmichela a team of mine is thinking of using this library, but it's concerning that this bug has been opened since October 2019 without a response from the maintainers. Is there any documentation on how to properly propagate the reactor context? Is this an actual bug?

@rmichela
Copy link
Collaborator

rmichela commented May 23, 2020

Is there any documentation on how to properly propagate the reactor context?

I've found the documentation and examples for using reactor context to be sorely lacking.

Is this an actual bug?

I'm not entirely sure. The absence of "me too" reports has me wondering if this is a real issue.

@rmichela
Copy link
Collaborator

Do you use reactor context? If so, are you experiencing this issue?

@rmichela
Copy link
Collaborator

Following up. Is this still an issue for you?

@krakowski
Copy link
Contributor Author

Hi all,

just rebased my tests from last year on the current master branch and ran them again.

tests

I use Reactor's Context API to propagate metadata (e.g. JWT or tracing information) inside reactive sequences. Using the gRPC Context API directly within reactive sequences is not possible for me, since the context is stored within a ThreadLocal.

The default implementation will put the current context in a ThreadLocal.

After switching threads the context would thus no longer be accessible.

I also tried to propagate tracing information between separate services over gRPC. For this purpose I wrote a ClientInterceptor which transfers tracing information between Reactor context and gRPC headers (TracingInterceptor). Unfortunately, accessing the Reactor context within the ClientInterceptor always yielded an empty Mono.

@rmichela rmichela added reactor and removed reactor labels Apr 16, 2021
@krakowski
Copy link
Contributor Author

krakowski commented Apr 30, 2021

Since I will most likely be using this library again in the near future, I took another look at the issue. Shortly after my last message last year, the following post was written regarding context loss detection, which discusses in which cases the Reactor context can be lost.

Basically, in case of a subscription, the context must be passed on via the CoreSubscriber#currentContext() method, otherwise the interface's default implementation takes effect, which returns an empty context.

In addition, there is a hook that detects the loss of a context and can be enabled using Hooks.enableContextLossTracking();.

Therefore, I think this issue could be solved easily by implementing the mentioned method in ReactorSubscriberAndClientProducer, ReactorSubscriberAndServerProducer and SubscribeOnlyOnceLifter.

@rmichela What do you think?


Just tried to run the included integration tests using mvn -pl :reactor-grpc-test test which does not execute any tests (other modules work fine). Am I missing something? I rarely use Maven.

mvn test -pl :reactor-grpc-test
...
[INFO] -------------------------------------------------------
[INFO]  T E S T S
[INFO] -------------------------------------------------------
[INFO] 
[INFO] Results:
[INFO] 
[INFO] Tests run: 0, Failures: 0, Errors: 0, Skipped: 0

@Freddv2
Copy link

Freddv2 commented Mar 9, 2022

Hello, we're having the same issue here. Activating enableContextLossTracking() show that the context was lost at:

.transform(Operators.lift(new SubscribeOnlyOnceLifter<TResponse>()));
	at reactor.core.publisher.ContextTrackingFunctionWrapper.lambda$apply$0(ContextTrackingFunctionWrapper.java:50)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoLift] :
	reactor.core.publisher.Mono.transform(Mono.java:4705)
	com.salesforce.reactorgrpc.stub.ClientCalls.oneToOne(ClientCalls.java:57)
Error has been observed at the following site(s):
	|_     Mono.transform ⇢ at com.salesforce.reactorgrpc.stub.ClientCalls.oneToOne(ClientCalls.java:57)

@code-uri
Copy link

code-uri commented Nov 4, 2022

Hello everyone,

Any Update one this. any work around?

@code-uri
Copy link

code-uri commented Nov 4, 2022

Hello again,
I tried reproducing it but the test is passing. am I missing something?

@code-uri
Copy link

code-uri commented Nov 4, 2022

I tried the test again using subscribeOn() the test failed.

@krakowski
Copy link
Contributor Author

Hey @code-uri,

I rebased my tests from 3 years ago again on top of the current master to check if this was fixed (120 commits since then) and they still seem to fail. You can try out the tests in my personal branch which reproduces this issue.

In my case the results are the same as 3 years ago:

Screenshot from 2022-11-04 13-29-45

I haven't found any workaround for this issue.

@denychen
Copy link

Currently on version 1.2.4, and I'm running into this bug as well.

@rmichela
Copy link
Collaborator

I've got to be honest, I've tried my best with my he edge cases of Reactor context propagation, and its interactions with gRPC thread pools. But I'm out of my depth.

I could really use some help with this issue.

@scottslewis scottslewis pinned this issue Oct 31, 2023
@code-uri
Copy link

code-uri commented Nov 2, 2023

I've got to be honest, I've tried my best with my he edge cases of Reactor context propagation, and its interactions with gRPC thread pools. But I'm out of my depth.

I could really use some help with this issue.

I agree that the context propagation is not working. Apart from the test case you mentioned, Could you explain the use case?
for example in my case I wanted to pass on the information to grpc layer to in the form of headers like this. I hope this will help.

 ReactorGreeterGrpc.ReactorGreeterStub targetStub = ...
    Mono<HelloRequest> req = Mono.just(HelloRequest.newBuilder().setName("reactor").build());
    req.flatMap(helloRequest -> {
        return Mono.deferContextual(contextView -> {
            Metadata header = new Metadata();
            Metadata.Key<String> key =
                    Metadata.Key.of("KEY", Metadata.ASCII_STRING_MARSHALLER);
            header.put(key, contextView.getOrDefault("key", "dummy"));


            targetStub = targetStub.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(header));
            targetStub.sayHello(helloRequest);
        }
    }).contextWrite(context -> Context.of("key", "dummy")).block();          

@code-uri
Copy link

code-uri commented Nov 2, 2023

@rmichela Also reactor now supports ThreadLocal propagation magic you can use that aswell to read the values in from ThreadLocal in grpc layer.

@rmichela
Copy link
Collaborator

rmichela commented Nov 2, 2023

Is there a doc link you can point me to?

@svametcalf svametcalf linked a pull request Dec 28, 2023 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants