Posts tagged messaging

Using EventStore as a messaging platform – Joining streams

In my previous post I sketched how one could implement a messaging solution on top of the EventStore. This time I’d like to elaborate on one implementation detail. The code I presented in the previous post silently assumed that more than one projection can emit events to a single stream. This is actually not the case. A projection claims an exclusive lock on the target stream and nobody else (both other projections and external clients) can write there or nasty errors happen. This is by design because a projection needs to control the stream version e.g. in order to handle failures. What can we do then to allow multiple other components sending messages to our input queue/stream?

We can solve this problem, as any software-related one, by introducing another layer of abstraction. We can have each component write to its own queue/stream and have an additional projection that joins the incoming streams. The problem here is that we can’t just use FromStreams('A','B','C') as a source of events because we, as a receiver, don’t know the number and names of the incoming streams. The way to solve this is to use categories. What are they? If a name of the stream contains a hyphen, the first part before the hyphen is treated in a special way, as a name of a category the stream belongs to. This is of course not enough. There is a special built-in projection $by_category which allows other projection to use FromCategory('SomeCategory') as a source of events. Here’s a sample how it works:

fromStream('VideoStore.ECommerce_intermediate_out')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationComponent !== 'undefined') {
			linkTo('in_'+e.metadata.destinationComponent+'-VideoStore.ECommerce', e);
		} else {
			emit('VideoStore.ECommerce_out', e.eventType, e.data, e.metadata);
		}
	}
})

The above projection is a router projection of VideoStore.ECommerce component. As you can see, if a message specifies its destination (a command message), it is routed to a stream which name consists of two parts: the name of the destination component as a category (before the hyphen) and the name of the source (VideoStore.ECommerce). Because of that, the projection below (the receiver sink of the VideoStore.ContentManagement component) can gather all the events from its category into a single stream that can be subscribed by the message processor.

fromCategory('in_VideoStore.ContentManagement.Retries')
.when({
	$any: function (s, e) {
		emit('VideoStore.ContentManagement.Retries_in', e.eventType, e.data, e.metadata);
	}
})

This schema is illustrated in the following picture

Joining

VN:F [1.9.22_1171]
Rating: 3.0/5 (1 vote cast)

Using EventStore as a messaging platform

It’s November and I am again experimenting with using EventStore as a messaging platform. Last year I built this, with the main focus on researching automated deployment and upgrade of components communicating via pub/sub. This time I am more focused on doing messaging in a way that is most natural to EventStore, trying to leverage it’s full potential (e.g. projections).

Message processors

In  the centre of each messaging agent sits a message processor. It is a stand-alone application (in my case written in C#) that connects to the messaging platform. On one end if receives incoming messages and on the other it outputs messages either by sending commands or publishing events (both commands and events are messages, the difference is a command has a destination address while an event does not). I won’t focus for now on how those message processors are implemented. The only important thing at this point is the assumption that all of them connect to the same instance of EventStore so EventStore acts as a bus in this scenario.

MessageProcessor

Receiving

On the receive side each component maintains an input stream/queue (I will use these two terms interchangeably as technically it is EventStore stream but in our scenario acting as a queue). Following picture illustrates how messages get into this queue

Receive

First, other components might send messages directly to our component. In such case they need to put (append in EventStore lingo) a message into our input queue. Quite easy. But how about the case when we want to subscribe for messages (events) published by some other component? This is the responsibility of Subscription Projection. I could not find a matching Enterprise Integration Patterns name for this guy and after some discussion on twitter the name I like best is Message Sink. The idea is that is moves messages from various output queues of other components into our input queue. Subscription Projection plays also a role of Message Filter as from each output stream it only takes the messages our component is interested in.

options({
    reorderEvents: true
});
fromStreams(['comp2_out','comp3_out'])
.when({
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event1'/*comp2_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event2'/*comp3_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
})

Sending and publishing

Let’s face the truth, EventStore is not a message queueing product and this fact of life can make some things more difficult than they usually are. An example would be receiving and sending messages in one transaction which simply cannot be done as there is no native concept of receiving in the EventStore. We need to rely on our message handlers to be smart enough to handle this problem. There are two basic rules we need to follow:

  • message handlers are idempotent, which means processing the same logical message more than once does not change the outcome
  • message handlers can actually detect when the message is re-processed and in such case not only ensure that no new updates are done (idempotency) but also do re-send and re-publish all the messages that were supposed to be generated during this message processing

These two ensure that, given infinite amount of retries, eventually the message will be processed and all the messages that are the result of processing will be sent/published at least once. The simple variant of this schema is shown in the following picture.

NonTransactionalSend

The component is sending and publishing each message separately which means that some recipients might get their messages long before others if the components goes down in the middle of message processing. It also means that some messages might be sent even though the internal data store update has failed for some reason. The eventual consistency guarantee here is very loose but still for some cases it can be useful.

If your component need a stronger guarantee, EventStore transactions can be used. The following picture presents transactional send scheme.

TransactionalSend

All the messages (both commands and events) resulting from processing an incoming message are sent in one transaction to an intermediate queue. This queue is an input for a routing projection (Message Router pattern) that forwards command messages to their destination queues and forwards event messages to the actual output queue.

fromStream('comp2_intermediate_out')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})

Sending and publishing, event sourced

The previous section assumed that a message processor uses a traditional data store to persist data and messaging library API to send or publish messages. In case it uses the event sourcing approach, publishing events is more a persistence concern rather than messaging one. In such scenario processing a message would result in one or more events appended to a stream (directly via EventStore API) corresponding to an aggregate that processed the message. We could relax this rule and allow also to put command messages into such stream although these would not be used when reconstituting an aggregate. You can now see that we actually have here almost exact same situation as with transactional send/publish.

EventSourced

The only difference is that in the normal transactional send we have one intermediate queue and here we have any number of aggregate streams. With EventStore we can handle this easily by having a projection whose input are all streams of some category.

fromCategory('comp2_aggregate')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})
VN:F [1.9.22_1171]
Rating: 5.0/5 (4 votes cast)

Greg Young’s Advanced Class

Last week, from Wednesday to Friday I attended Greg Young’s new advanced class. The event took place in Pod Wawelem hotel, 50 meters away from the Castle. I haven’t been to Greg’s previous training but I heard it was about Domain-Driven Design and CQRS. I was expecting something similar, but more in-depth. It turned out I was wrong but for sure not disappointed. The new content is great! Here’s a brief summary.

Day 1 – Alarm clock

We spend almost whole day discussing one topic — time. We learnt how hard it is to unit test time-dependent logic. Those who tried putting Thread.Sleep or ManualResetEvent in a test know how painful it is. Then we discovered that every time-dependant piece of logic (e.g. if cook does not respond in 5 minutes then…) can be transformed into order-of-messages problem. Instead of waiting for some period of time, we send a message to our future selves that will inform us we should take some action. I don’t have to convince you that sending and receiving messages is super easy to test, right?

To prove it, we spent some time doing one of well-known katas. I won’t tell which one to not spoil a surprise.

The last assignment of the day was a modelling task. We were modelling a business process of a restaurant. The funny/new/inspiring part of this assignment was that we were doing it by trying to figure out how would a Word template for document describing an order look like and how it would be passed between various people in a restaurant. We analysed what parts of the document are filled by each person and in what order.

Day 2 – Restaurant

We spent whole day implementing the solution for the restaurant according to the model we created a day before. We went through quite a few patterns from Enterprise Integration Patterns:

  • document message – that’s how we represented our order
  • adapter – waiter
  • enricher – cook and assistant manager, adding, respecively, preparation time and prices
  • competing consumer – when we have more then one cook
  • message dispatcher – a better (does not require locks) solution for load balancing cooks
  • message broker – centralize setup of queues

Day 3- More restaurant & Event Store

Before lunch we managed to cover some more advanced messaging patterns, namely:

  • correlation id – how we trace orders
  • routing slip – move responsibility for order flow from broker to a dedicated entity to add flexibility (e.g. normal flow & dodgy customer flow)
  • process manager – when routing slip is not enough (e.g. smart failure handling)
  • publish-subscribe – how to distribute information about changes in order state
  • event message – as above
  • wire tap – centralize diagnostics

Quite impressive, isn’t it?  After lunch we started playing with the Event Store. We managed to implement a simple chat application using Event Store’s ATOM APIs. Cool, but the rest was even better. We learnt how all the concepts we learnt during last two days can be easily implemented inside the Event Store using Projections. This stuff is really amazing and I say it despite my antipathy for JavaScript.

Summary

The cost of the training for early birds was just 300 euros. In exchange we got

  • three days of super high quality workshops with over 60 percent of coding time
  • gorgeus view of Wawel Castle during coffee breaks
  • three course lunches every day
  • after-party & beer

If this was not enough, Greg managed to give away three free tickets for CS students. Kudos man, you rock!

VN:F [1.9.22_1171]
Rating: 5.0/5 (6 votes cast)

Mechanics of durable subscription

<Announcement>

There are still three spots left on Greg Young’s Advanced Class in Krakow (February 6th-8th). Go register here before they are gone.

</Announcement>

Today I’d like to talk a bit about the mechanics of a robust durable subscription system. First, what is a durable subscription? Here’s a description from Enterprise Integration Patterns (I really recommend this book):

Use a Durable Subscriber to make the messaging system save messages published while the subscriber is disconnected.

The goal is not to loose messages while subscriber is offline (for whatever reason). There are many possible implementations based on the specifics of a messaging system we are working with. The main distinction between them is who, publisher or subscriber, is responsible for maintaining the state of the subscription.

In theory, the mechanics are simple, no matter who’s responsible for the state. In case subscriber goes down, the state is simply not updated while new messages are generated (and stored). When subscriber is up again the publisher picks up where he left based on the state and sends all the missed messages. In case where publisher is responsible the only additional step is an acknowledgement that has to be sent to publisher every N messages (where N depends on how much you don’t like duplicates).

In practice however, many messaging platforms (including EventStore which is recently my favourite) offer two distinct APIs for implementing subscriptions:

  • push API where a client (subscriber) opens a persistent channel (TCP connection) via which a server (publisher) sends messages immediately as they are generated
  • pull API where a client (subscriber)  periodically polls a server (publisher) asking if there are any messages newer than X where X is the current state of the subscription

The advantage of push API is, it is usually a lot faster (less effort on publisher side and minimum latency). The problem is, it works only when both parties are up and running and there is no lag. If the subscriber processes messages slower than the publisher generates, them sooner or later buffers on the publisher side become too big to fit in the memory. EventStore is smart enough to drop the subscription automatically in such case.

There  is no such problem with the pull API because the client requests messages in its own pace and can even crash every now and then without the publisher ever noticing. This comes with a price however. Pull API is inherently slower (meaning higher latency) and it requires additional reads on the publisher side.

As usual, the best solution combines both approaches. The idea is simple: when the subscriber is up to date with the message stream, it switches to push API and whenever it is not — switches back to pull. The actual working algorithm is slightly more complex:

  1. The subscriber always starts with pull assuming there were some messages generated while it was offline
  2. The subscriber pulls messages until there’s nothing left to pull (it is up to date with the stream)
  3. Push subscription is started  but arriving messages are not processed immediately but temporarily redirected to a buffer
  4. One last pull is done to ensure nothing happened between step 2 and 3
  5. Messages from this last pull are processed
  6. Processing messages from push buffer is started. While messages are processed, they are checked against IDs of messages processed in step 5 to ensure there’s no duplicates.
  7. System works in push model until subscriber is killed or subscription is dropped by publisher drops push subscription.

This algorithm is the single most important thing in my new project.

VN:F [1.9.22_1171]
Rating: 5.0/5 (4 votes cast)