Closure in onFailure continuation for a scala.concurrent.Future not working as expected
Asked Answered
G

1

0

I'm encountering an issue where I have two methods, the first calling the second in a loop and the second creating a Future as follows:

public class WorkersCoordinator {
    private static Logger LOGGER = 
        LoggerFactory.getLogger(WorkersCoordinator.class);

    private final Timeout timeout;

    private final ActorSystem system;

    private final List<Class<? extends BaseWorker>> workers;

    private final Map<Class, ActorRef> refMap;

    private final WorkResultPackageQueue pkgQueue;

    private final ActorFactory actorFactory;

    @Autowired
    public WorkersCoordinator(final ApplicationConfiguration config,
                             final ActorSystem system,
                             final ActorFactory actorFactory, 
                             final WorkerFactory workerFactory,
                             final WorkResultPackageQueue pkgQueue) {
        timeout = new Timeout(config.getTimeoutInMilliseconds(), 
                              TimeUnit.MILLISECONDS);

        this.system = system;
        this.actorFactory = actorFactory;
        this.pkgQueue = pkgQueue;

        refMap = Map.newHashMap();
        workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
    }

    public void delegateWorkers() {
        for (Class<? extends BaseWorker> worker : workers) {
            if (refMap.containsKey(worker) continue;
            sendWork(worker);
        }
    }

    private void sendWork(Class<? extends BaseWorker> worker) {
        // GetDataActor extends AbstractActor
        ActorRef actorRef = actorFactory.create(GetDataActor.class);
        Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);

        responseRef.onFailure(new OnFailure() {
            @Override
            public void onFailure(Throwable failure) throws Throwable {
                LOGGER.error("Worker {} encountered a problem - cancelling.", 
                             worker.getSimpleName());
                if (refMap.containsKey(worker)) {
                    refMap.remove(worker);
                }
            }
        }, system.dispatcher());

        responseRef.onSuccess(new OnSuccess<Object>() {
            @Override
            public void onSuccess(Object msg) throws Throwable {
                if (msg instanceof WorkResultPackage) {
                    final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                    LOGGER.info(
                        "Received AssetDataPackage from {}, containing {} items",
                        reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                        reportPackage.getPkg().getData().size());

                    pkgQueue.enqueue(reportPackage);
                } else {
                    LOGGER.eror(
                        "Expected to receive WorkResultPackage Object but received: {}",
                        msg.getClass());
                        throw new UnknownObjectReceived(msg);
                }
            }
        }, system.dispatcher());

        refMap.put(worker, actorRef);
    }
}

The issue is that the closure I believe responseRef.onFailure makes doesn't act as I would expect. If I call this with 3 workers, one of which I configure to fail, a failure is handled but the logging is indeterminate as to which worker will be reported as having failed even though it should be consistently the one I marked to fail. I'm new to this tech stack (Java, Scala Futures, and AKKA) and the established code base in which I find this established pattern so I don't know if I'm overlooking something or misunderstanding closures in Java/Scala Futures. The sticking point here is that I need to report which worker failed and remove it from the refMap so that it will no longer be considered in process. Even stranger is the fact that all workers appear to complete and to be removed from refMap even while the wrong worker is reported as failed.

Update: After having no luck with getting an answer why the closure wasn't working, I did some investigation and found another post answering whether Java 8 even supports closures:

Does Java 8 Support Closures?

Short answer, I believe it does. However, it spoke of final or effectively final variables. Thus, I updated my code as follows. Hopefully, this gets folks who understand closures to help me understand why they don't work as I'm used to (in C# and JavaScript). I'm only posting updates to sendWork(...) to highlight efforts I tried to no avail.

private void sendWork(Class<? extends BaseWorker> worker) {
    // GetDataActor extends AbstractActor
    ActorRef actorRef = actorFactory.create(GetDataActor.class);
    Future<Object> responseRef = Patterns.ask(actorRef, worker, timeout);

    Consumer<Throwable> onFailureClosure = (ex) -> {
            LOGGER.error("Worker {} encountered a problem - cancelling.", 
                         worker.getSimpleName());
            if (refMap.containsKey(worker)) {
                refMap.remove(worker);
            }
    }

    responseRef.onFailure(new OnFailure() {
        @Override
        public void onFailure(Throwable failure) throws Throwable {
            onFailureClosure.accept(failure);
        }
    }, system.dispatcher());

    responseRef.onSuccess(new OnSuccess<Object>() {
        @Override
        public void onSuccess(Object msg) throws Throwable {
            if (msg instanceof WorkResultPackage) {
                final WorkResultPackage reportPackage = (WorkResultPackage) msg;
                LOGGER.info(
                    "Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());

                pkgQueue.enqueue(reportPackage);
            } else {
                LOGGER.eror(
                    "Expected to receive WorkResultPackage Object but received: {}",
                    msg.getClass());
                    throw new UnknownObjectReceived(msg);
            }
        }
    }, system.dispatcher());

    refMap.put(worker, actorRef);
}
Gluten answered 14/12, 2017 at 1:16 Comment(3)
What does this have to do with completable-future?Maitilde
Perhaps nothing but there was no tag for scala.concurrent.Future which is why I made sure to call it out in the subject. If you feel strongly about it, edit to remove it.Gluten
From experimentation, I think what this boils down to is that, despite how it was understood by the fine folks who pointed to the aforementioned Java doc, closures are not implemented in Java, at least not fully. Rather, there is variable lifting being done but, despite the final requirement, do not support asynchrony.Gluten
P
0

There is a fundamental problem with the code that might be contributing to the behavior that you're seeing: the code is mutating data in a concurrent environment without any safeguards for that data. Future callbacks can be executed at any time and can potentially run in parallel. Having multiple future callbacks inspecting and mutating the same data can cause weird behavior.

The typical approach in Java to deal with concurrent access to mutable data is to use synchronization and locks. Fortunately, since you're using Akka, there is a better approach. Basically, refactor WorkersCoordinator to be an actor, and use the actor behavior of sequential message processing to safely handle the mutable state.

To further simplify matters, you can forgo the use of ask in this case and instead use tell to communicate between actors. I'm guessing that futures are used here in an attempt to capture errors, but a better approach to handling errors is Akka's supervisor strategy. That is, if WorkersCoordinator were an actor and if each GetDataActor instance were a child of WorkersCoordinator, then the latter could decide how to deal with errors that arise in the former. For example, if an exception is thrown in a GetDataActor, the coordinator could decide to log the error, then stop the child.

The following is an alternative WorkersCoordinator that incorporates the above ideas:

public class WorkersCoordinator extends AbstractActor {
  private static Logger LOGGER = ...
  private final List<Class<? extends BaseWorker>> workers;
  private final Map<ActorRef, Class> refMap;
  private final WorkResultPackageQueue pkgQueue;

  public WorkersCoordinator(final WorkerFactory workerFactory,
                            final WorkResultPackageQueue pkgQueue) {
    this.pkgQueue = pkgQueue;
    this.refMap = Map.newHashMap();
    this.workers = Lists.newArrayList(workerFactory.getAllAvailableWorkers());
  }

  static Props props(WorkerFactory factory, WorkResultPackageQueue queue) {
    return Props.create(WorkersCoordinator.class, factory, queue);
  }

  static public class Delegate {}

  private static SupervisorStrategy strategy =
    new OneForOneStrategy(10, Duration.create(1, TimeUnit.MINUTES), DeciderBuilder.
      matchAny(t -> {
         ActorRef failedChild = getSender();
         LOGGER.error("This child failed: {}", failedChild);
         refMap.remove(failedChild);
         stop();
      })
      .build());

  @Override
  public SupervisorStrategy supervisorStrategy() {
    return strategy;
  }

  @Override
  public void preStart() {
    for (worker : workers) {
       ActorRef child = getContext().actorOf(Props.create(GetDataActor.class));
       refMap.put(child, worker);
    }
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(Delegate.class, d -> {
        refMap.forEach((actor, msg) -> actor.tell(msg, getSelf()));
      })
      .match(WorkResultPackage.class, p -> {
        LOGGER.info("Received AssetDataPackage from {}, containing {} items",
                    reportPackage.getWorkReport().getFromWorker().getSimpleName(),
                    reportPackage.getPkg().getData().size());

        pkgQueue.enqueue(p);
        ActorRef dataActor = getSender();
        refMap.remove(dataActor);
      })
      .matchAny(
        msg -> LOGGER.eror("Expected to receive WorkResultPackage Object but received: {}", msg)
      )
      .build();
  }
}

Some notes about the above code:

  • Instead of using ActorFactory, which appears to be some custom class, Props is used instead.
  • refMap is inverted so that the ActorRef is now the key, and the class type of the work is the value. This allows us to remove an entry from refMap based on the ActorRef, both in the event of a successful response from a child actor and in the event of a child throwing an exception.
  • I removed the @Autowired annotation for simplicity. More information on dependency injection with actors is found here.
  • To create and start a WorkersCoordinator, invoke its props method. To initiate the work, the actor expects a custom Delegate message: once the actor receives this message, it iterates through refMap, sending each child in the map the unit of work that is associated with that child.
WorkerFactory factory = ...
WorkResultPackageQueue queue = ...
ActorRef coordinator = actorSystem.actorOf(WorkersCoordinator.props(factory, queue));

Delegate doWork = new Delegate();
coordinator.tell(doWork, ActorRef.noSender());
Pernicious answered 18/12, 2017 at 21:1 Comment(1)
I suspected this was the Akka answer, that we needed to refactor to use a Supervisor strategy. However, the current architecture wraps ActorRef and won't support that until we refactor which will need to wait until after the holidays. I will likely mark yours the answer. However, I do wonder one thing. Aside from needing to use Map.newConcurrentMap() instead of newHashMap() so that the mutation of refMap doesn't have concurrency issues, why isn't the worker ending up in a closure within the continuation? That way, when onFailure is called it would have the one it was defined with.Gluten

© 2022 - 2024 — McMap. All rights reserved.