How to use MDC with parallelStream in Java and logback
Asked Answered
T

2

11

I need to log a few attributes of the request like the request id and the locale, but when using parallelStream, it seems that the ThreadLocal of the MDC looses the information.

Ive analyzed the solution of passing the MDC context between the threads when creating the parallelStream but it seems dirty and I also have a lot of usages of the parallelStream.

Is there any other way of doing this?

Thank you

Triad answered 20/12, 2018 at 19:54 Comment(0)
B
7

The only solution I found, is to copy the context into a final variable outside of the stream and apply it for every single iteration:

Map<String, String> contextMap = MDC.getCopyOfContextMap();
Stream.iterate(0, i -> i + 1).parallel()
    .peek(i -> MDC.setContextMap(contextMap))
    // ...logic...
    // in case you're using a filter, you need to use a predicate and combine it with a clear step:
    filter(yourPredicate.or(i -> {
                MDC.clear();
                return false;
            }))
    // clear right before terminal operation
    .peek(i -> MDC.clear())
    .findFirst();

// since the initial thread is also used within the stream and the context is cleared there, 
// we need to set it again to its initial state
MDC.setContextMap(contextMap);    

The cost for that solution is 1) a few microseconds per 100 iterations and 2) worse readability, but both are acceptable:

  1. This is a benchmark comparing an IntStream.range(0, 100).parallel().sum() (=baseline) with same stream that uses that MDC copy logic:
Benchmark               Mode  Cnt   Score   Error   Units
MDC_CopyTest.baseline  thrpt    5   0,038 ± 0,005  ops/us
MDC_CopyTest.withMdc   thrpt    5   0,024 ± 0,001  ops/us
MDC_CopyTest.baseline   avgt    5  28,239 ± 1,308   us/op
MDC_CopyTest.withMdc    avgt    5  40,178 ± 0,761   us/op
  1. To improve readability, it can be wrapped into a small helper class:
public class MDCCopyHelper {
    private Map<String, String> contextMap = MDC.getCopyOfContextMap();

    public void set(Object... any) {
        MDC.setContextMap(contextMap);
    }

    public void clear(Object... any) {
        MDC.clear();
    }

    public boolean clearAndFail() {
        MDC.clear();
        return false;
    }
}

The streaming code looks then a bit nicer:

MDCCopyHelper mdcHelper = new MDCCopyHelper();
try {
    Optional<Integer> findFirst = Stream.iterate(0, i -> i + 1)
        .parallel()
        .peek(mdcHelper::set)
        // ...logic...
        // filters predicates should be combined with clear step
        .filter(yourPredicate.or(mdcHelper::clearAndFail))
        // before terminal call:
        .peek(mdcHelper::clear)
        .findFirst();
} finally {
    // set the correct MDC at the main thread again
    mdcHelper.set();
}
Bonniebonns answered 3/9, 2019 at 13:36 Comment(8)
What if an exception is thrown inside the logic? Wouldn't it leave the MDC dirty? Did you manage to come up with a solution considering exceptions?Pippy
Nope, I don't know how to handle that. Streams are awful in terms of exception handling.Bonniebonns
Be careful with the .peek(mdcHelper::clear) at the end. Everything after the parallel() is executed in multiple threads, but this includes the main thread. The main thread does not idle, it is actively used. So you can end up with an empty MDC in the main thread.Anderaanderea
@Anderaanderea yes, you are right. That's why I call mdcHelper.set(); after the stream execution. The mdcHelper holds a copy of the MDC and puts the data back to the main thread's MDC.Bonniebonns
I wasn't aware of that, so I skipped the last command in my code, since I thought that is a useless extra command. When the parallel stream can throw an exception there is also a chance that the MDC is not restored in the main thread after the stream execution.Anderaanderea
That's correct, so if exceptions are expected, that part should also go into the catch/finally block.Bonniebonns
1. Will intermediate pipeline buffer operation such as sorted and distinct use same thread, if not then we have to handle those special scenarios as well same as how filter is handled. 2. One final outer catch block will still leave common pool threads MDC context dirty. Every operation part of Stream needs to have it's own try catch block.Quadrature
@sanjaypatel: good points. Regarding 1: I don't know, it could be implementation specific. Regarding 2: if something like the MDCCopyHelper is used, then for every consumer etc. a separate method could be added that handles potential exceptions.Bonniebonns
L
2

My solution is to wrap those's functional interface. Similar to the static proxy pattern.
For example

public static void main(String[] args) {
    System.err.println(Thread.currentThread().getName());
    String traceId = "100";
    MDC.put("id", traceId);
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          .forEach((num -> {
              System.err.println(Thread.currentThread().getName()+" "+ traceId.equals(MDC.get("id")));
          }));
    System.err.println("------------------------");
    Stream.of(1, 2, 3, 4)
          .parallel()
          // the key is the TraceableConsumer
          .forEach(new TraceableConsumer<>(num -> {
              System.err.println(Thread.currentThread().getName() + " " + traceId.equals(MDC.get("id")));
          }));
}

public class TraceableConsumer<T> extends MDCTraceable implements Consumer<T> {

    private final Consumer<T> target;

    public TraceableConsumer(Consumer<T> target) {
        this.target = target;
    }

    @Override
    public void accept(T t) {
        setMDC();
        target.accept(t);
    }
}

public abstract class MDCTraceable {

    private final Long id;

    private final Long userId;

    public MDCTraceable() {
        id = Optional.ofNullable(MDC.get("id")).map(Long::parseLong).orElse(0L);
        userId = Optional.ofNullable(MDC.get("userId")).map(Long::parseLong).orElse(0L);
    }

    public void setMDC(){
        MDC.put("id", id.toString());
        MDC.put("userId", userId.toString());
    }

    public void cleanMDC(){
        MDC.clear();
    }
}
Lifeguard answered 12/1, 2021 at 13:20 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.