Hi, I'm using the TaskDecorator interface with ThreadPoolTaskExecutor to keep track of the MDC context in parallel operations by copying it in the TaskDecorator. I've encountered an issue with thenApplyAsync in CompletableFuture using this approach.

In the basic case, I would like to retain my MDC context from first executor to second executor when doing this:

CompletableFuture.supplyAsync(() -> foo(), ioHeavyExecutor)
.thenApplyAsync(foo -> bar(foo), businessLogicExecutor).
  • Both executors are decorated with the same MDC context copying decorator.
  • Most of the time, the execute method (so the decorated runnable creation) in thenApplyAsync is called by a thread from the first executor. This is the expected behaviour, and the context is passed httpRequestThread -> firstExecutorThread -> secondExecutorThread.
  • Sometimes instead of that flow I get httpRequestThread -> firstExecutorThread for supplyAsync and httpRequestThread -> secondExecutorThread for applyAsync.

I get a copy of original context in second executor, instead of copy of context from first executor. I have a hunch that it's not a ThreadPoolTaskExecutor issue but more of a java lang problem, perhaps lack of some intrinsic execution guarantees. The payload executes always on the correct thread pool. Here's a minimal example (fails ~about 3 times per 10k executions or even less often):

package com.foo;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.RepeatedTest;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

class DecorationTest {

    record WhoCalled(String decorator, String runnable) {}

    static class RecordCallerDecorator implements TaskDecorator {

        List<WhoCalled> callers = new ArrayList<>();

        @Override
        public synchronized Runnable decorate(final Runnable runnable) {
            String decoratingThread = Thread.currentThread().getName();
            return () -> {
                String runningThread = Thread.currentThread().getName();
                callers.add(new WhoCalled(decoratingThread, runningThread));
                runnable.run();
            };
        }
    }


    ThreadPoolTaskExecutor singleThreadExecutor(RecordCallerDecorator recordCallerDecorator, String threadNamePrefix) {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(1);
        executor.setMaxPoolSize(1);
        executor.setTaskDecorator(recordCallerDecorator);
        executor.setThreadNamePrefix(threadNamePrefix);
        executor.initialize();
        return executor;
    }

    ThreadPoolTaskExecutor firstExecutor;
    ThreadPoolTaskExecutor secondExecutor;
    RecordCallerDecorator decorator;

    @BeforeEach
    void beforeEach() {
        decorator = new RecordCallerDecorator();
        firstExecutor = singleThreadExecutor(decorator, "firstExecutor-");
        secondExecutor = singleThreadExecutor(decorator, "secondExecutor-");
    }

    @AfterEach
    void afterEach() {
        firstExecutor.shutdown();
        secondExecutor.shutdown();
    }


    @RepeatedTest(10_000)
    void testWhoCalled() throws Exception {
        Integer result = CompletableFuture.supplyAsync(() -> 1, firstExecutor)
                .thenApplyAsync(supplyResult -> supplyResult, secondExecutor)
                .get();

        assert result == 1;

        WhoCalled firstCallers = decorator.callers.get(0);
        assert firstCallers.decorator().equals("main");
        assert firstCallers.runnable().equals("firstExecutor-1");

        WhoCalled secondCallers = decorator.callers.get(1);
        assert secondCallers.decorator().equals("firstExecutor-1") : "Unexpected second decorator: " + secondCallers.decorator;
        assert secondCallers.runnable().equals("secondExecutor-1");
    }
}

Comment From: bclozel

Sorry this got overlooked.

I agree, this is not strictly a Spring issue but a broader problem about propagating context information between Threads. We have a dedicated project to solve such issues, the Context Propagation project. We are using it in Spring Framework, Spring for GraphQL and other libraries to propagate context information between Threads/custom context objects/Reactor context.

There are also implementations that might interest you in the io.micrometer.context package there.

I'm closing this issue as a result. Thanks!