Spring Boot 3 TaskExecutor context propagation in micrometer tracing
Asked Answered
J

5

7

Spring Boot 3 has changed context propagation in tracing. https://github.com/micrometer-metrics/tracing/wiki/Spring-Cloud-Sleuth-3.1-Migration-Guide#async-instrumentation

They deliver now library to this issue. I guess I don't quite understand how it works. I have created a taskExecutor as in guide.

@Bean(name = "taskExecutor")
    ThreadPoolTaskExecutor threadPoolTaskScheduler() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor() {
            @Override
            protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
                ExecutorService executorService = super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                return ContextExecutorService.wrap(executorService, ContextSnapshot::captureAll);
            }
        };
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

And I have marked @Async like this:

 @Async("taskExecutor")
    public void run() {
        // invoke some service
    }

But context is not propagated to child context in taskExecutor thread.

Jelene answered 9/2, 2023 at 16:4 Comment(0)
P
9

You can autowire your ThreadPoolTaskExecutor and Context wrap in AsyncConfigurer.

import io.micrometer.context.ContextExecutorService;
import io.micrometer.context.ContextSnapshot;
import java.util.concurrent.Executor;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration(proxyBeanMethods = false)
@RequiredArgsConstructor
public class AsyncTraceContextConfig implements AsyncConfigurer {
  
  // NOTE: By design you can only have one AsyncConfigurer, thus only one executor pool is
  // configurable.
  @Qualifier("taskExecutor") // if you have more than one task executor pools
  private final ThreadPoolTaskExecutor taskExecutor;

  @Override
  public Executor getAsyncExecutor() {
    return ContextExecutorService.wrap(
        taskExecutor.getThreadPoolExecutor(), ContextSnapshot::captureAll);
  }
}

UPDATE

If you have more than one executor pools and wants to add tracing to all, use the TaskDecorator with ContextSnapshot.wrap():

import io.micrometer.context.ContextSnapshot;
import java.util.concurrent.Executor;
import org.springframework.boot.task.TaskExecutorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;

@Configuration
public class AsyncConfig {
  @Bean
  public TaskDecorator otelTaskDecorator() {
    return (runnable) -> ContextSnapshot.captureAll(new Object[0]).wrap(runnable);
  }

  @Bean("asyncExecutorPool1")
  public Executor asyncExecutorPool1(TaskDecorator otelTaskDecorator) {
    return new TaskExecutorBuilder()
        .corePoolSize(5)
        .maxPoolSize(10)
        .queueCapacity(10)
        .threadNamePrefix("threadPoolExecutor1-")
        .taskDecorator(otelTaskDecorator)
        .build();
  }

  @Bean("asyncExecutorPool2")
  public Executor asyncExecutorPool2(TaskDecorator otelTaskDecorator) {
    return new TaskExecutorBuilder()
        .corePoolSize(5)
        .maxPoolSize(10)
        .queueCapacity(10)
        .threadNamePrefix("threadPoolExecutor2-")
        .taskDecorator(otelTaskDecorator)
        .build();
  }
}

NOTE: You can follow this blog for more setup details and sample github project code.

Performing answered 11/4, 2023 at 0:58 Comment(9)
Yes, I can do like this. But I need way to capture all my executors in application.Jelene
Updated my answer to suit your exact needs.Performing
ContextSnapshot::captureAll is deprecated. ContextSnapshotFactory.builder().build().captureAll() should be usedMorningglory
I suppose otelTaskDecorator is a method call in .taskDecorator(otelTaskDecorator)? If yes, do we need to make that decorator a bean?Disreputable
this method in the config above creates that bean: public TaskDecorator otelTaskDecorator()Performing
Thanks Amith. But for some reason adding a task decorator as you've mentioned in solution 2 isn't working for me. I don't see the traceId being propagated. However, following https://mcmap.net/q/134252/-how-to-use-mdc-with-thread-pools did help propagating the traceIds. What am I missing?Disreputable
I have also written this blog, check that out for more details and there is a link to Github sample project there too. May be compare your code to see what is missing. If nothing works out, share your sample code through github and I can take a peek to see what's missing.Performing
@Jelene can you share what works for you? i struggled with the acceptance answer with no luckCaprice
I try it, but I have exception 2024-09-24 10:35:49.630 [,,][JobExecutor[org.camunda.bpm.engine.spring.components.jobexecutor.SpringJobExecutor]] ERROR org.camunda.bpm.engine.jobexecutor - ENGINE-14019 Exception during job acquisition No ContextAccessor for contextType: class [Ljava.lang.Object; java.lang.IllegalStateException: No ContextAccessor for contextType: class [Ljava.lang.Object; at io.micrometer.context.ContextRegistry.getContextAccessorForRead(ContextRegistry.java:206) ~[context-propagation-1.1.1.jar:1.1.1]Klockau
M
1

Register a bean ContextPropagatingTaskDecorator. It will be picked by the auto configuration and wired to ThreadPoolTaskExecutorBuilder.

See: org.springframework.boot.autoconfigure.task.TaskExecutorConfigurations.ThreadPoolTaskExecutorBuilderConfiguration#threadPoolTaskExecutorBuilder

   @Bean
   public TaskDecorator decorator(){
       return new ContextPropagatingTaskDecorator();
   }

That way you can use it in


   @Autowired
   ThreadPoolTaskExecutor scheduler;


   void test() {
        scheduler.submit(() -> log.info("Running task using scheduler "));
   }
Mancy answered 18/10, 2023 at 19:23 Comment(0)
H
0

I was facing the same problem. Pls add this code to the configuration and everything works as expected.

  @Configuration(proxyBeanMethods = false)
  static class AsyncConfig implements AsyncConfigurer, WebMvcConfigurer {

    @Override
    public Executor getAsyncExecutor() {
      return ContextExecutorService.wrap(Executors.newCachedThreadPool(), ContextSnapshot::captureAll);
    }

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
      configurer.setTaskExecutor(new SimpleAsyncTaskExecutor(r -> new Thread(ContextSnapshot.captureAll().wrap(r))));
    }
  }
Houseclean answered 13/2, 2023 at 15:41 Comment(1)
Yes, but only for executors from autoconfiguration.Jelene
E
0

You can try this way, but I suspect it's a bug.

ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(){
            @Override
            protected ExecutorService initializeExecutor(ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) {
                ExecutorService executorService = super.initializeExecutor(threadFactory, rejectedExecutionHandler);
                return ContextExecutorService.wrap(executorService, ContextSnapshot::captureAll);
            }
            @Override
            public void execute(Runnable task) {
                super.execute(ContextSnapshot.captureAll().wrap(task));
            }

            @Override
            public Future<?> submit(Runnable task) {
                return super.submit(ContextSnapshot.captureAll().wrap(task));
            }

            @Override
            public <T> Future<T> submit(Callable<T> task) {
                return super.submit(ContextSnapshot.captureAll().wrap(task));
            }
        };
Epanorthosis answered 17/3, 2023 at 12:27 Comment(0)
M
0

ContextSnapshot::captureAll is deprecated. This is the code that worked for me.

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {

    @Value("${setting.threads:2}")
    private int numOfThreads;

    @Override
    public Executor getAsyncExecutor() {
        return ContextExecutorService.wrap(Executors.newFixedThreadPool(numOfThreads), ContextSnapshotFactory.builder().build()::captureAll);
    }
}
Mitchiner answered 9/1 at 18:17 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.