This is my current CQRS use-case setup:
Command DTO is received in application layer handler where we map Command DTO to appropriate domain objects if needed, re-hydrate aggregate root from repo and call some method on AR
class CommandHandler{ handle(Command command){ Dom1 dom1 = new Dom1(command.d1); Dom2 dom2 = new Dom2(command.d2); AggregateRoot ar = repo.rehydrate(command.arId); ar.doSmth(dom1, dom2, command.int1); } }Inside doSmth method in AR, I don't have immediately event to apply, but I have to pass some args to entity that is part of this AR. So inside doSmth method I have
class AggregateRoot{ void doSmth(Dom1 dom1, Dom2 dom2, Integer int1){ subEnt = new OrherSubEntity(); subEnt.doSmth(int1); } }Now I have sub entity that is activated and generates one DomainEvent. This domain event should be part of event stream that is saved in EventStore and at same time, it is propagated to any listener in this aggregate root.
class SubEntity{ void doSmth(Integer int1){ //validate int1 apply(new SubEntityEvent(int1)); } void when(SubEntityEvent event){ //just modify local fields //used also in event sourcing rehydration } }Since second sub entity that is listening to SubEntityEvent is now activated, it performs CPU intense operation and generates SomeCPUIntenseCalcHappened DomainEvent. This event should also be part of the event stream that is saved in EventStore. It is also propagated to other sub-entities in same AR but since nobody else listens to it then we have no further processing.
class OtherSubEntity{ void listen(SubEntityEvent event){ data = SomeCPUIntenseCalc(); apply(SomeCPUIntenseCalcHappened(data)); } void when(SomeCPUIntenseCalcHappened event){ //just modify local fields //used also in event sourcing rehydration } }
Questions:
- Since AggreagateRoot is responsible for saving all of the events to event store, how can it know for existence of all of this sub events that happened in it's tree.
- In this transaction we generated 2 events. Should we store them inside event store as array of events, or we should store them simply one event after another. In order for AR to be in consistent state, both events must be applied in single transaction.
- In OtherSubEntity we have listen method that is listening to SubEntityEvent. Since on re-hydration from event store we will call SubEntity.when(SubEntityEvent event), how to prevent OtherSubEntity.listen to be called at the same time?
- Since aggregate root needs to send 1 resulting message down the pipeline (in my case AMQP message), I suppose that this message will actually be projection of this 2 domain events - basically a read model DTO. Where should I create this model? Something similar as read model just with propagation?