It’s November and I am again experimenting with using EventStore as a messaging platform. Last year I built this, with the main focus on researching automated deployment and upgrade of components communicating via pub/sub. This time I am more focused on doing messaging in a way that is most natural to EventStore, trying to leverage it’s full potential (e.g. projections).

Message processors

In  the centre of each messaging agent sits a message processor. It is a stand-alone application (in my case written in C#) that connects to the messaging platform. On one end if receives incoming messages and on the other it outputs messages either by sending commands or publishing events (both commands and events are messages, the difference is a command has a destination address while an event does not). I won’t focus for now on how those message processors are implemented. The only important thing at this point is the assumption that all of them connect to the same instance of EventStore so EventStore acts as a bus in this scenario.

MessageProcessor

Receiving

On the receive side each component maintains an input stream/queue (I will use these two terms interchangeably as technically it is EventStore stream but in our scenario acting as a queue). Following picture illustrates how messages get into this queue

Receive

First, other components might send messages directly to our component. In such case they need to put (append in EventStore lingo) a message into our input queue. Quite easy. But how about the case when we want to subscribe for messages (events) published by some other component? This is the responsibility of Subscription Projection. I could not find a matching Enterprise Integration Patterns name for this guy and after some discussion on twitter the name I like best is Message Sink. The idea is that is moves messages from various output queues of other components into our input queue. Subscription Projection plays also a role of Message Filter as from each output stream it only takes the messages our component is interested in.

options({
    reorderEvents: true
});
fromStreams(['comp2_out','comp3_out'])
.when({
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event1'/*comp2_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event2'/*comp3_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
})

Sending and publishing

Let’s face the truth, EventStore is not a message queueing product and this fact of life can make some things more difficult than they usually are. An example would be receiving and sending messages in one transaction which simply cannot be done as there is no native concept of receiving in the EventStore. We need to rely on our message handlers to be smart enough to handle this problem. There are two basic rules we need to follow:

  • message handlers are idempotent, which means processing the same logical message more than once does not change the outcome
  • message handlers can actually detect when the message is re-processed and in such case not only ensure that no new updates are done (idempotency) but also do re-send and re-publish all the messages that were supposed to be generated during this message processing

These two ensure that, given infinite amount of retries, eventually the message will be processed and all the messages that are the result of processing will be sent/published at least once. The simple variant of this schema is shown in the following picture.

NonTransactionalSend

The component is sending and publishing each message separately which means that some recipients might get their messages long before others if the components goes down in the middle of message processing. It also means that some messages might be sent even though the internal data store update has failed for some reason. The eventual consistency guarantee here is very loose but still for some cases it can be useful.

If your component need a stronger guarantee, EventStore transactions can be used. The following picture presents transactional send scheme.

TransactionalSend

All the messages (both commands and events) resulting from processing an incoming message are sent in one transaction to an intermediate queue. This queue is an input for a routing projection (Message Router pattern) that forwards command messages to their destination queues and forwards event messages to the actual output queue.

fromStream('comp2_intermediate_out')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})

Sending and publishing, event sourced

The previous section assumed that a message processor uses a traditional data store to persist data and messaging library API to send or publish messages. In case it uses the event sourcing approach, publishing events is more a persistence concern rather than messaging one. In such scenario processing a message would result in one or more events appended to a stream (directly via EventStore API) corresponding to an aggregate that processed the message. We could relax this rule and allow also to put command messages into such stream although these would not be used when reconstituting an aggregate. You can now see that we actually have here almost exact same situation as with transactional send/publish.

EventSourced

The only difference is that in the normal transactional send we have one intermediate queue and here we have any number of aggregate streams. With EventStore we can handle this easily by having a projection whose input are all streams of some category.

fromCategory('comp2_aggregate')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})
VN:F [1.9.22_1171]
Rating: 5.0/5 (4 votes cast)
Using EventStore as a messaging platform, 5.0 out of 5 based on 4 ratings