How to use Reactor context to share and log contextual data
Photo by Robert Lukeman on Unsplash

How to use Reactor context to share and log contextual data

Switching from writing blocking code to non-blocking it’s not always straightforward. One of the many challenges that I have found along the way is how to implement reactive contextual logging.

Using Mapped Diagnostic Context is usually the preferred solution but how does that work with Reactor?

MDC implementations normally relies on ThreadLocal to store contextual information, reactor however has it’s own thread pool and it internally jumps from one thread to another when subscribing to a publisher. Let’s have a look at the following diagram:

Suppose we have an application that has a thread pool. Thread 1 is assigned by our application thread pool to process the incoming request. As a first step we want store the contextual information about the request (MDC) into ThreadLocal. The request handler then invokes and subscribes to a method that returns a publisher. When subscribing to the publisher Thread 1 is released and the internal processing of that publisher runs on threads provided by Reactor. Any variables previously stored in Thread 1 via ThreadLocal is now gone with it.

This model is what makes reactor faster and lighter compared to classic multi-threaded applications. In Reactor a publisher provides some methods to perform logging, however in this article I will describe how to leverage the Context to achieve contextual logging.

Say that for each log line we want to log the user associated with an incoming request. The user object might look like:

public class User {

    private final UUID id;
    private final String name;

    public User(final String name) {
        // for the purpose of this example the id is goind to be randomly generated
        this.id = UUID.randomUUID(); 
        this.name = name;
    }

    public UUID getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    // ...
}

Now suppose we have a method that fetches the email of a user given its id, we might have a method that looks like:

/**
    * Get the email for a user and log the content of the {@link Context}.
    *
    * @param userId the user id to find the email for
    * @throws NoSuchElementException when the {@link ContextType#USER} is not found in the stream
    */
public Mono<String> getUserEmail(final UUID userId) {
    // get the user email
    return Mono.just(emailFor(userId))
            // log the context info
            .doOnEach(stringSignal -> {
                // log a line for the next signal
                if (stringSignal.isOnNext()) {
                    final Context context = stringSignal.getContext();
                    log.info("email for: {}", context.get(ContextType.USER.name()).toString());
                }
            });
}

The method calls doOnEach and access the context from the signal. The line is only logged for the onNext signal, so we are purposely ignoring any other signal type. If you wanted to log an error for instance you could change the if statement to check when the signal isOnError. Given the above snippet when I run the following test:

@Test
@DisplayName("It should get the user email and log the context to the console")
void getUserEmail_withLogging() {
    StepVerifier.create(userService.getUserEmail(user.getId())
            .subscriberContext(Context.of(ContextType.USER.name(), user)))
            .expectNext(String.format("%[email protected]", user.getId()))
            .verifyComplete();
}

I will get the following line logged:

16:13:40.554 [Test worker] INFO com.codingbit.context.UserService - email for: User{id=a5fbd15e-abdf-434d-a08e-00c9b7db85aaname=***}

Basically Reactor is making sure whichever Context is defined it is available down the stream. This solves the problem that ThreadLocal solves in imperative programming. The Reactor context is a very handy feature and it could be useful for more than just contextual logging. Say for instance you need to have access to the authenticated user to perform some operations. If you normally store your authenticated user in ThreadLocal then again the Reactor Context is your friend here. Let’s have a look at the following method:

/**
    * Get email for a user via {@link Context}.
    *
    * @return the user email
    * @throws NoSuchElementException when the {@link ContextType#USER} is not found in the stream
    */
public Mono<String> getUserEmail() {
    // access the context
    return Mono.subscriberContext()
            .map(context -> {
                // return the user id
                User user = context.get(ContextType.USER.name());
                return Mono.just(user.getId());
            })
            // get the email for this user
            .map(idMono -> idMono.map(this::emailFor))
            .flatMap(one -> one);
}

The method is going to try accessing the authenticated user via context. It then fetches the email for that user and returns it. It’s important to remember that should this method be called without previously defining the context, then a NoSuchElementException will be thrown. The following couple of tests verify the behaviour with and without the context definition.

@Test
@DisplayName("It should throw the NoSuchElementException when the user context is not defined")
void getUserEmail_noContext() {
    StepVerifier.create(userService.getUserEmail())
            .expectError(NoSuchElementException.class)
            .verify();
}

@Test
@DisplayName("It should get the user email via context")
void getUserEmail_withContext() {
    StepVerifier.create(userService.getUserEmail()
            .subscriberContext(Context.of(ContextType.USER.name(), user)))
            .expectNext(String.format("%[email protected]", user.getId()))
            .verifyComplete();
}

So, whenever you are writing reactive code and thinking of how good it would be to be able to store contextual values, then look no further than the Reactor Context.

The complete source code for the snippets illustrated in this article can be found in the java-series repo.