I am trying to implement a simple example/demo for a state machine using Automatonymous with RabbitMQ. Unfortunately I could not find one to rebuild / learn from (I found the ShoppingWeb, but in my eyes it's anything but simple). Also in my opinion the documentation is lacking information.
This is the state machine example I thought of (sorry, it's pretty ugly): Please note that this example is completely made up and it's not important if it makes sense or not. This project's purpose is to get "warm" with Automatonymous.
What I want to do / to have is:
- Four applications running:
- The state machine itself
- The "requester" sending requests to be interpreted
- The "validator" or "parser" checking if the provided request is valid
- The "interpreter" interpreting the given request
- An example of this could be:
- Requester sends "x=5"
- Validator checks if a "=" is contained
- Intepreter says "5"
My implementation of the state machine looks like this:
public class InterpreterStateMachine : MassTransitStateMachine<InterpreterInstance>
{
public InterpreterStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => Requesting, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString)
.SelectId(context => Guid.NewGuid()));
Event(() => Validating, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString));
Event(() => Interpreting, x => x.CorrelateBy(request => request.Request.RequestString, context => context.Message.Request.RequestString));
Initially(
When(Requesting)
.Then(context =>
{
context.Instance.Request = new Request(context.Data.Request.RequestString);
})
.ThenAsync(context => Console.Out.WriteLineAsync($"Request received: {context.Data.Request.RequestString}"))
.Publish(context => new ValidationNeededEvent(context.Instance))
.TransitionTo(Requested)
);
During(Requested,
When(Validating)
.Then(context =>
{
context.Instance.Request.IsValid = context.Data.Request.IsValid;
if (!context.Data.Request.IsValid)
{
this.TransitionToState(context.Instance, Error);
}
else
{
this.TransitionToState(context.Instance, RequestValid);
}
})
.ThenAsync(context => Console.Out.WriteLineAsync($"Request '{context.Data.Request.RequestString}' validated with {context.Instance.Request.IsValid}"))
.Publish(context => new InterpretationNeededEvent(context.Instance))
,
Ignore(Requesting),
Ignore(Interpreting)
);
During(RequestValid,
When(Interpreting)
.Then((context) =>
{
//do something
})
.ThenAsync(context => Console.Out.WriteLineAsync($"Request '{context.Data.Request.RequestString}' interpreted with {context.Data.Answer}"))
.Publish(context => new AnswerReadyEvent(context.Instance))
.TransitionTo(AnswerReady)
.Finalize(),
Ignore(Requesting),
Ignore(Validating)
);
SetCompletedWhenFinalized();
}
public State Requested { get; private set; }
public State RequestValid { get; private set; }
public State AnswerReady { get; private set; }
public State Error { get; private set; }
//Someone is sending a request to interprete
public Event<IRequesting> Requesting { get; private set; }
//Request is validated
public Event<IValidating> Validating { get; private set; }
//Request is interpreted
public Event<IInterpreting> Interpreting { get; private set; }
class ValidationNeededEvent : IValidationNeeded
{
readonly InterpreterInstance _instance;
public ValidationNeededEvent(InterpreterInstance instance)
{
_instance = instance;
}
public Guid RequestId => _instance.CorrelationId;
public Request Request => _instance.Request;
}
class InterpretationNeededEvent : IInterpretationNeeded
{
readonly InterpreterInstance _instance;
public InterpretationNeededEvent(InterpreterInstance instance)
{
_instance = instance;
}
public Guid RequestId => _instance.CorrelationId;
}
class AnswerReadyEvent : IAnswerReady
{
readonly InterpreterInstance _instance;
public AnswerReadyEvent(InterpreterInstance instance)
{
_instance = instance;
}
public Guid RequestId => _instance.CorrelationId;
}
}
Then I have services like this:
public class RequestService : ServiceControl
{
readonly IScheduler scheduler;
IBusControl busControl;
BusHandle busHandle;
InterpreterStateMachine machine;
InMemorySagaRepository<InterpreterInstance> repository;
public RequestService()
{
scheduler = CreateScheduler();
}
public bool Start(HostControl hostControl)
{
Console.WriteLine("Creating bus...");
machine = new InterpreterStateMachine();
repository = new InMemorySagaRepository<InterpreterInstance>();
busControl = Bus.Factory.CreateUsingRabbitMq(x =>
{
IRabbitMqHost host = x.Host(new Uri(/*rabbitMQ server*/), h =>
{
/*credentials*/
});
x.UseInMemoryScheduler();
x.ReceiveEndpoint(host, "interpreting_answer", e =>
{
e.PrefetchCount = 5; //?
e.StateMachineSaga(machine, repository);
});
x.ReceiveEndpoint(host, "2", e =>
{
e.PrefetchCount = 1;
x.UseMessageScheduler(e.InputAddress);
//Scheduling !?
e.Consumer(() => new ScheduleMessageConsumer(scheduler));
e.Consumer(() => new CancelScheduledMessageConsumer(scheduler));
});
});
Console.WriteLine("Starting bus...");
try
{
busHandle = MassTransit.Util.TaskUtil.Await<BusHandle>(() => busControl.StartAsync());
scheduler.JobFactory = new MassTransitJobFactory(busControl);
scheduler.Start();
}
catch (Exception)
{
scheduler.Shutdown();
throw;
}
return true;
}
public bool Stop(HostControl hostControl)
{
Console.WriteLine("Stopping bus...");
scheduler.Standby();
if (busHandle != null) busHandle.Stop();
scheduler.Shutdown();
return true;
}
static IScheduler CreateScheduler()
{
ISchedulerFactory schedulerFactory = new StdSchedulerFactory();
IScheduler scheduler = MassTransit.Util.TaskUtil.Await<IScheduler>(() => schedulerFactory.GetScheduler()); ;
return scheduler;
}
}
My questions are:
- How do I send the "intial" request, so that the state machine will transition to my initial state
- How do I "react" within the consumers to check the data that were sent and then send new data like in 1?