2

Hi Axon Framework community,

I'd like to have your opinion on how to solve the following problem properly.

My Axon Test Setup

  • Two instances of the same Spring Boot application (using axon-spring-boot-starter 4.4 without Axon Server)
  • Every instance publishes the same events on a regular interval
  • Both instances are connected to the same EventSource (single SQL Server instance using JpaEventStorageEngine)
  • Every instance is configured to use TrackingEventProcessors
  • Every instances has the same event handlers registered

What I want to achieve

I'd like that events published by one instance are only handled by the very same instance

If instance1 publishes eventX then only instance1 should handle eventX

What I've tried so far

  • I can achieve the above scenario using SubscribingEventProcessor. Unfortunately this is not an option in my case, since we'd like to have the option to replay events for rebuilding / adding new query models.
  • I could assign the event handlers of every instance to differed processing groups. Unfortunately this didn't worked. Maybe because every TrackingEventProcessors instance processes the same EventStream ? - not so sure about this though.
  • I could implement a MessageHandlerInterceptor which only proceeds in case the event origin is from the same instance. This is what I implemented so far and which works properly: MessageHandlerInterceptor
class StackEventInterceptor(private val stackProperties: StackProperties) : MessageHandlerInterceptor<EventMessage<*>> {

    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?): Any? {
        val stackId = (unitOfWork?.message?.payload as SomeEvent).stackId
        if(stackId == stackProperties.id){
            interceptorChain?.proceed()
        }
        return null
    }
}
@Configuration
class AxonConfiguration {

    @Autowired
    fun configure(eventProcessingConfigurer: EventProcessingConfigurer, stackProperties: StackProperties) {
        val processingGroup = "processing-group-stack-${stackProperties.id}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { StackEventInterceptor(stackProperties) }
    }
}

Is there a better solution ?

I have the impression that my current solution is properly not the best one, since ideally I'd like that only the event handlers which belongs to a certain instance are triggered by the TrackingEventProcessor instances.

How would you solve that ?

2 Answers 2

2

Interesting scenario you're having here @thowimmer. My first hunch would be to say "use the SubscribingEventProcessor instead". However, you pointed out that that's not an option in your setup. I'd argue it's very valuable for others who're in the same scenario to know why that's not an option. So, maybe you can elaborate on that (to be honest, I am curious about that too).

Now for your problem case to ensure events are only handled within the same JVM. Adding the origin to the events is definitely a step you can take, as this allows for a logical way to filter. "Does this event originate from my.origin()?" If not, you'd just ignore the event and be done with it, simple as that. There is another way to achieve this though, to which I'll come to in a bit.

The place to filter is however what you're looking for mostly I think. But first, I'd like to specify why you need to filter in the first place. As you've noticed, the TrackingEventProcessor (TEP) streams events from a so called StreamableMessageSource. The EventStore is an implementation of such a StreamableMessageSource. As you are storing all events in the same store, well, it'll just stream everything to your TEPs. As your events are part of a single Event Stream, you are required to filter them at some stage. Using a MessageHandlerInterceptor would work, you could even go and write a HandlerEnhacnerDefinition allowing you to add additional behaviour to your Event Handling functions. However you put it though, with the current setup, filtering needs to be done somewhere. The MessageHandlerInterceptor is arguably the simplest place to do this at.

However, there is a different way of dealing with this. Why not segregate your Event Store, into two distinct instances for both applications? Apparently they do not have the need to read from one another, so why share the same Event Store at all? Without knowing further background of your domain, I'd guess you are essentially dealing with applications residing in distinct bounded contexts. Very shortly put, there is zero interest to share everything with both applications/contexts, you just share specifics portions of your domain language very consciously with one another.

Note that support for multiple contexts, using a single communication hub in the middle, is exactly what Axon Server can achieve for you. I am not here to say you cant configure this yourself though, I have done this in the past. But leaving that work to somebody or something else, freeing you from the need to configure infrastructure, that would be a massive timesaver.

Hope this helps you set the context a little of my thoughts on the matter @thowimmer.

Sign up to request clarification or add additional context in comments.

9 Comments

Hi @Steven, first of all many thanks your great support ! Axon is and outstanding team when it comes to supporting the community, building great tech and being respectful to each other - kudos ! A little more background to my problem: We are using blue/green deployments in an active/passive setup. This means that during a deployment an event originated by a working node could be handled by a potentially broken node. Each of both nodes have a dedicated DB instance which are mirrored, so that we can patch DB instances with zero downtime.
To your suggestions: 1) Subscribing vs Tracking: We'd like to replay events to add/rebuild query models later. (edited my question) 2) Segregate EventStore: I've actually thought about that but wanted to evaluate other options first - Nevertheless it's probably the way to go as outlined by you. 3) Different bounded contexts: In fact it's the very same app, but just different instances for operational purposes. 4) Axon Server: On the mid term we will switch probably to Axon Server. I will further evaluate its options :-)
First of, thanks for the compliment! We strive to provide thorough and clear explanation to any questions on any of the Axon products, so it is always pleasant to hear the experience is as such. :-)
About your scenario, gotcha! If you're talking about a blue-green deployment strategy, the requirements are slightly different of course. What is the exact issue you are trying to mitigate by segregating the events from the blue and green app?
Yes this is an other option -> register evenhandlers once everything is ready & in a healthy state. This would of course be possible but would require some additional step during deployments which I would like to avoid to reduce deployment complexity. I will try out the application level approach: github.com/thowimmer/axon-eventhandling-same-instance - its easy to understand and solves our problem. I'm not yet sure whether the PID approach is the best one though, since we might loose events when PID change (e.g. during respawning/switching instances)... I will keep you up to date ;-)
|
0

Sumup:

Using the same EventStore for both instances is probably no an ideal setup in case we want to use the capabilities of the TrackingEventProcessor.

Options to solve it:

  • Dedicated (not mirrored) DB instance for each application instance.
  • Using multiple contexts using AxonServer.

If we decide to solve the problem on application level filtering using MessageHandlerInterceptor is the most simplest solution.

Thanks @Steven for exchanging ideas.


EDIT:

Solution on application level using CorrelationDataProvider & MessageHandlerInterceptor by filtering out events not originated in same process.

AxonConfiguration.kt

const val METADATA_KEY_PROCESS_ID = "pid"
const val PROCESSING_GROUP_PREFIX = "processing-group-pid"

@Configuration
class AxonConfiguration {

    @Bean
    fun processIdCorrelationDataProvider() = ProcessIdCorrelationDataProvider()

    @Autowired
    fun configureProcessIdEventHandlerInterceptor(eventProcessingConfigurer: EventProcessingConfigurer) {
        val processingGroup = "$PROCESSING_GROUP_PREFIX-${ApplicationPid()}"
        eventProcessingConfigurer.byDefaultAssignTo(processingGroup)
        eventProcessingConfigurer.registerHandlerInterceptor(processingGroup) { ProcessIdEventHandlerInterceptor() }
    }
}

class ProcessIdCorrelationDataProvider() : CorrelationDataProvider {
    override fun correlationDataFor(message: Message<*>?): MutableMap<String, *> {
        return mutableMapOf(METADATA_KEY_PROCESS_ID to ApplicationPid().toString())
    }
}

class ProcessIdEventHandlerInterceptor : MessageHandlerInterceptor<EventMessage<*>> {
    override fun handle(unitOfWork: UnitOfWork<out EventMessage<*>>?, interceptorChain: InterceptorChain?) {
        val currentPid = ApplicationPid().toString()
        val originPid = unitOfWork?.message?.metaData?.get(METADATA_KEY_PROCESS_ID)
        if(currentPid == originPid){
            interceptorChain?.proceed()
        }
    }
}

See full demo project on GitHub

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.