In my previous post I sketched how one could implement a messaging solution on top of the EventStore. This time I’d like to elaborate on one implementation detail. The code I presented in the previous post silently assumed that more than one projection can emit events to a single stream. This is actually not the case. A projection claims an exclusive lock on the target stream and nobody else (both other projections and external clients) can write there or nasty errors happen. This is by design because a projection needs to control the stream version e.g. in order to handle failures. What can we do then to allow multiple other components sending messages to our input queue/stream?

We can solve this problem, as any software-related one, by introducing another layer of abstraction. We can have each component write to its own queue/stream and have an additional projection that joins the incoming streams. The problem here is that we can’t just use FromStreams('A','B','C') as a source of events because we, as a receiver, don’t know the number and names of the incoming streams. The way to solve this is to use categories. What are they? If a name of the stream contains a hyphen, the first part before the hyphen is treated in a special way, as a name of a category the stream belongs to. This is of course not enough. There is a special built-in projection $by_category which allows other projection to use FromCategory('SomeCategory') as a source of events. Here’s a sample how it works:

fromStream('VideoStore.ECommerce_intermediate_out')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationComponent !== 'undefined') {
			linkTo('in_'+e.metadata.destinationComponent+'-VideoStore.ECommerce', e);
		} else {
			emit('VideoStore.ECommerce_out', e.eventType, e.data, e.metadata);
		}
	}
})

The above projection is a router projection of VideoStore.ECommerce component. As you can see, if a message specifies its destination (a command message), it is routed to a stream which name consists of two parts: the name of the destination component as a category (before the hyphen) and the name of the source (VideoStore.ECommerce). Because of that, the projection below (the receiver sink of the VideoStore.ContentManagement component) can gather all the events from its category into a single stream that can be subscribed by the message processor.

fromCategory('in_VideoStore.ContentManagement.Retries')
.when({
	$any: function (s, e) {
		emit('VideoStore.ContentManagement.Retries_in', e.eventType, e.data, e.metadata);
	}
})

This schema is illustrated in the following picture

Joining

VN:F [1.9.22_1171]
Rating: 3.0/5 (1 vote cast)
Using EventStore as a messaging platform - Joining streams, 3.0 out of 5 based on 1 rating