How to retain slf4j MDC logging context in CompletableFuture?
Asked Answered
H

4

19

When executing async CompletableFuture, the parent threadcontext and moreover the org.slf4j.MDC context is lost.

This is bad as I'm using some kind of "fish tagging" to track logs from one request among multiple logfiles.

MDC.put("fishid", randomId())

Question: how can I retain that id during the tasks of CompletableFutures in general?

List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
    tasks.stream()
        .map(task -> CompletableFuture.supplyAsync(
            () -> businesslogic(task))
        .collect(Collectors.toList());

List results = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

public void businesslogic(Task task) {
       LOGGER.info("mdc fishtag context is lost here");
}
Hastate answered 5/3, 2018 at 12:40 Comment(3)
Wouldn't the solution from How to use MDC with thread pools? work? Only restriction is that you need to pass the executor in all *Async() calls. If you use Spring @Async you just have to configure that executor for @Async.Cilo
Also closely related, but didn't get an answer: Does CompletableFuture have a corresponding Local context?Cilo
any solution may work internally with JUST your code. It won't work through libraries that then use futures as well. Java will have more and more libraries using future and MDC won't be transferred into it correctly until Java fixes this.Auspicious
T
31

The most readable way I solved this problem was as below -

---------------Thread utils class--------------------

public static Runnable withMdc(Runnable runnable) {
    Map<String, String> mdc = MDC.getCopyOfContextMap();
    return () -> {
        MDC.setContextMap(mdc);
        runnable.run();
    };
}

public static <U> Supplier<U> withMdc(Supplier<U> supplier) {
    Map<String, String> mdc = MDC.getCopyOfContextMap();
    return (Supplier) () -> {
        MDC.setContextMap(mdc);
        return supplier.get();
    };
}

---------------Usage--------------

CompletableFuture.supplyAsync(withMdc(() -> someSupplier()))
                 .thenRunAsync(withMdc(() -> someRunnable())
                 ....

WithMdc in ThreadUtils would have to be overloaded to include other functional interfaces which are accepted by CompletableFuture

Please note that the withMdc() method is statically imported to improve readability.

Titular answered 1/6, 2019 at 4:17 Comment(4)
Thanks for your improvement. This is even better as my original answer, so I will accept yours instead. It's also more consistent even if only using one single .supplyAsync() statement.Hastate
I think the context remains on the worker thread afterwards. Shouldn't it be reset back to the original context afterwards?Continuate
With this code / solution aren't you creating blocking calls (thread will get blocked in get() or run() calls) ? If you're using async / non blocking IO you shouldn't do that, right?Bouleversement
using get or join is not a proper solution... you're forcing the caller thread to wait on the futureBouleversement
H
7

At the end I created a Supplier wrapper retaining the MDC. If anyone has a better idea feel free to comment.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
    return CompletableFuture.supplyAsync(new SupplierMDC(supplier), executor);
}

private static class SupplierMDC<T> implements Supplier<T> {
    private final Supplier<T> delegate;
    private final Map<String, String> mdc;

    public SupplierMDC(Supplier<T> delegate) {
        this.delegate = delegate;
        this.mdc = MDC.getCopyOfContextMap();
    }

    @Override
    public T get() {
        MDC.setContextMap(mdc);
        return delegate.get();
    }
}
Hastate answered 5/3, 2018 at 14:42 Comment(6)
trying it, looks good.I was thinking similar, got the code handy :)Honour
Can you please share the code , how you have used itHypoderma
Please note : Usually CompletableFuture is used as a fluid API, so if you chain another supplier with the static supplyAsync method above, it will be a normal supplier and not a SupplierMDC. Thus the MDC context will be lost for subsequent asynch processingTitular
@Titular that's true. Feel free to add your suggestions how that could be solved.Hastate
@Membersound Thanks to your original answer, I refactored it a little bit and added some code-sugar so that it is compatible with the fluidity of CompletableFuture https://mcmap.net/q/633458/-how-to-retain-slf4j-mdc-logging-context-in-completablefutureTitular
using get or join is not a proper solution... you're forcing the caller thread to wait on the futureBouleversement
D
3

My solution theme would be to (It would work with JDK 9+ as a couple of overridable methods are exposed since that version)

Make the complete ecosystem aware of MDC

And for that, we need to address the following scenarios:

  • When all do we get new instances of CompletableFuture from within this class? → We need to return a MDC aware version of the same rather.
  • When all do we get new instances of CompletableFuture from outside this class? → We need to return a MDC aware version of the same rather.
  • Which executor is used when in CompletableFuture class? → In all circumstances, we need to make sure that all executors are MDC aware

For that, let's create a MDC aware version class of CompletableFuture by extending it. My version of that would look like below

import org.slf4j.MDC;

import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.function.Supplier;

public class MDCAwareCompletableFuture<T> extends CompletableFuture<T> {

    public static final ExecutorService MDC_AWARE_ASYNC_POOL = new MDCAwareForkJoinPool();

    @Override
    public CompletableFuture newIncompleteFuture() {
        return new MDCAwareCompletableFuture();
    }

    @Override
    public Executor defaultExecutor() {
        return MDC_AWARE_ASYNC_POOL;
    }

    public static <T> CompletionStage<T> getMDCAwareCompletionStage(CompletableFuture<T> future) {
        return new MDCAwareCompletableFuture<>()
                .completeAsync(() -> null)
                .thenCombineAsync(future, (aVoid, value) -> value);
    }

    public static <T> CompletionStage<T> getMDCHandledCompletionStage(CompletableFuture<T> future,
                                                                Function<Throwable, T> throwableFunction) {
        Map<String, String> contextMap = MDC.getCopyOfContextMap();
        return getMDCAwareCompletionStage(future)
                .handle((value, throwable) -> {
                    setMDCContext(contextMap);
                    if (throwable != null) {
                        return throwableFunction.apply(throwable);
                    }
                    return value;
                });
    }
}

The MDCAwareForkJoinPool class would look like (have skipped the methods with ForkJoinTask parameters for simplicity)

public class MDCAwareForkJoinPool extends ForkJoinPool {
    //Override constructors which you need

    @Override
    public <T> ForkJoinTask<T> submit(Callable<T> task) {
        return super.submit(MDCUtility.wrapWithMdcContext(task));
    }

    @Override
    public <T> ForkJoinTask<T> submit(Runnable task, T result) {
        return super.submit(wrapWithMdcContext(task), result);
    }

    @Override
    public ForkJoinTask<?> submit(Runnable task) {
        return super.submit(wrapWithMdcContext(task));
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrapWithMdcContext(task));
    }
}

The utility methods to wrap would be such as

public static <T> Callable<T> wrapWithMdcContext(Callable<T> task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.call();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static Runnable wrapWithMdcContext(Runnable task) {
    //save the current MDC context
    Map<String, String> contextMap = MDC.getCopyOfContextMap();
    return () -> {
        setMDCContext(contextMap);
        try {
            return task.run();
        } finally {
            // once the task is complete, clear MDC
            MDC.clear();
        }
    };
}

public static void setMDCContext(Map<String, String> contextMap) {
   MDC.clear();
   if (contextMap != null) {
       MDC.setContextMap(contextMap);
    }
}

Below are some guidelines for usage:

  • Use the class MDCAwareCompletableFuture rather than the class CompletableFuture.
  • A couple of methods in the class CompletableFuture instantiates the self version such as new CompletableFuture.... For such methods (most of the public static methods), use an alternative method to get an instance of MDCAwareCompletableFuture. An example of using an alternative could be rather than using CompletableFuture.supplyAsync(...), you can choose new MDCAwareCompletableFuture<>().completeAsync(...)
  • Convert the instance of CompletableFuture to MDCAwareCompletableFuture by using the method getMDCAwareCompletionStage when you get stuck with one because of say some external library which returns you an instance of CompletableFuture. Obviously, you can't retain the context within that library but this method would still retain the context after your code hits the application code.
  • While supplying an executor as a parameter, make sure that it is MDC Aware such as MDCAwareForkJoinPool. You could create MDCAwareThreadPoolExecutor by overriding execute method as well to serve your use case. You get the idea!

With that, your code would look like

List<CompletableFuture<UpdateHotelAllotmentsRsp>> futures =
    tasks.stream()
        new MDCAwareCompletableFuture<UpdateHotelAllotmentsRsp>().completeAsync(
            () -> businesslogic(task))
        .collect(Collectors.toList());

List results = futures.stream()
    .map(CompletableFuture::join)
    .collect(Collectors.toList());

public UpdateHotelAllotmentsRsp businesslogic(Task task) {
       LOGGER.info("mdc fishtag context is not lost here");
}

You can find a detailed explanation of all of the above here in a post about the same.

Dichasium answered 30/12, 2019 at 4:4 Comment(0)
A
0

YES, Twitter Future did this correctly. They have a class Local.scala that Future.scala knows about.

The fix is for java authors to fix this issue so your Local state travels through ALL libaries that use CompletableFutures. Basically, Local.scala is used by Future and internally uses a ThreadLocal up until .thenApply or .thenAccept and it will capture state and transfer it when needed to the next one on and on. This works through all third party libraries with ZERO 3rd party library changes.

Here is more but poke Java Authors to fix their stuff... http://mail.openjdk.java.net/pipermail/core-libs-dev/2017-May/047867.html

until then, MDC will NEVER work through 3rd party libraries.

My SO post on this Does CompletableFuture have a corresponding Local context?

Auspicious answered 13/2, 2019 at 23:38 Comment(1)
The other option being to have EVERY 3rd party library that uses CompletableFuture to switch to a new kind of Future that we all do. heh, good luck with that one though!Auspicious

© 2022 - 2024 — McMap. All rights reserved.