Posts in English

Using EventStore as a messaging platform

It’s November and I am again experimenting with using EventStore as a messaging platform. Last year I built this, with the main focus on researching automated deployment and upgrade of components communicating via pub/sub. This time I am more focused on doing messaging in a way that is most natural to EventStore, trying to leverage it’s full potential (e.g. projections).

Message processors

In  the centre of each messaging agent sits a message processor. It is a stand-alone application (in my case written in C#) that connects to the messaging platform. On one end if receives incoming messages and on the other it outputs messages either by sending commands or publishing events (both commands and events are messages, the difference is a command has a destination address while an event does not). I won’t focus for now on how those message processors are implemented. The only important thing at this point is the assumption that all of them connect to the same instance of EventStore so EventStore acts as a bus in this scenario.

MessageProcessor

Receiving

On the receive side each component maintains an input stream/queue (I will use these two terms interchangeably as technically it is EventStore stream but in our scenario acting as a queue). Following picture illustrates how messages get into this queue

Receive

First, other components might send messages directly to our component. In such case they need to put (append in EventStore lingo) a message into our input queue. Quite easy. But how about the case when we want to subscribe for messages (events) published by some other component? This is the responsibility of Subscription Projection. I could not find a matching Enterprise Integration Patterns name for this guy and after some discussion on twitter the name I like best is Message Sink. The idea is that is moves messages from various output queues of other components into our input queue. Subscription Projection plays also a role of Message Filter as from each output stream it only takes the messages our component is interested in.

options({
    reorderEvents: true
});
fromStreams(['comp2_out','comp3_out'])
.when({
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event1'/*comp2_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event2'/*comp3_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
})

Sending and publishing

Let’s face the truth, EventStore is not a message queueing product and this fact of life can make some things more difficult than they usually are. An example would be receiving and sending messages in one transaction which simply cannot be done as there is no native concept of receiving in the EventStore. We need to rely on our message handlers to be smart enough to handle this problem. There are two basic rules we need to follow:

  • message handlers are idempotent, which means processing the same logical message more than once does not change the outcome
  • message handlers can actually detect when the message is re-processed and in such case not only ensure that no new updates are done (idempotency) but also do re-send and re-publish all the messages that were supposed to be generated during this message processing

These two ensure that, given infinite amount of retries, eventually the message will be processed and all the messages that are the result of processing will be sent/published at least once. The simple variant of this schema is shown in the following picture.

NonTransactionalSend

The component is sending and publishing each message separately which means that some recipients might get their messages long before others if the components goes down in the middle of message processing. It also means that some messages might be sent even though the internal data store update has failed for some reason. The eventual consistency guarantee here is very loose but still for some cases it can be useful.

If your component need a stronger guarantee, EventStore transactions can be used. The following picture presents transactional send scheme.

TransactionalSend

All the messages (both commands and events) resulting from processing an incoming message are sent in one transaction to an intermediate queue. This queue is an input for a routing projection (Message Router pattern) that forwards command messages to their destination queues and forwards event messages to the actual output queue.

fromStream('comp2_intermediate_out')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})

Sending and publishing, event sourced

The previous section assumed that a message processor uses a traditional data store to persist data and messaging library API to send or publish messages. In case it uses the event sourcing approach, publishing events is more a persistence concern rather than messaging one. In such scenario processing a message would result in one or more events appended to a stream (directly via EventStore API) corresponding to an aggregate that processed the message. We could relax this rule and allow also to put command messages into such stream although these would not be used when reconstituting an aggregate. You can now see that we actually have here almost exact same situation as with transactional send/publish.

EventSourced

The only difference is that in the normal transactional send we have one intermediate queue and here we have any number of aggregate streams. With EventStore we can handle this easily by having a projection whose input are all streams of some category.

fromCategory('comp2_aggregate')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})
VN:F [1.9.22_1171]
Rating: 5.0/5 (4 votes cast)

Do we still need an OS anyway?

Years pass by, every now and then a new user interface paradigm is created, but on the server side nothing really changes. No matter if we use Java, .NET or Ruby, the same old story is repeated again and again. A packet comes in, it goes to some kernel-level TCP library, then HTTP library or whole web server to finally come to the managed world of a virtual machine. During its processing a considerable amount of garbage gets generated and left to be later collected by the GC. This whole interaction happens under a control of an operating system that hosts a myriad of other processes (including a graphical user interface). The OS itself is virtual, working under control of a hypervisor.

Multiple layers of indirection cause heavy indeterminism. And I am not even mentioning metal level where x86 instructions are mapped to actual RISC instructions. I think we are slowly reaching the limits of such approach. We can’t increase the complexity of the architecture indefinitely. At some point we need to remove one moving part in order to be able to add another.

What can we do? Let’s imagine the runtime environment of the future. It does not need an OS. It boots directly as a guest VM. Because it is a virtual machine that runs managed code it does not need memory protection nor processes in order to achieve proper isolation. It can statically verify if the code does not violate the security restrictions on memory access (as implemented in Singularity). As RAM is super cheap these days there is no need for any form of memory paging.

When a request comes in, it is redirected to a lightweight process (consisting of a thread of control, a stack and a heap) that has both HTTP library and user code loaded. After the request is handled, the whole process is destroyed and its heap disposed. No garbage gets accumulated over time hence no ‘stop the world’ collection is ever necessary.

Everything happens inside one address space that runs in kernel mode (as the whole VM runs as OS). There is no need for any devices drives other than one for the network adapter. The number of moving part is reduced to minimum.

Because there is no OS and hence no shell of any form, how do we start our server program? We don’t. The VM image contains the actual VM binary, the user code binaries and all the third-party binaries it references.  The image is assembled at build time and it automatically boots the user code when started.

I would love to see it implemented with CLR and C# but it seems not possible. There is, however, a similar concept for Erlang — Erlang on Xen and I think it would be relatively easy to implement such thing with node/V8 as a VM. What do you think?

VN:F [1.9.22_1171]
Rating: 5.0/5 (3 votes cast)

Event sourced process managers

In the comments to one of my previous posts Jarek pointed out that the set of concepts I use (based on excellent work by Rinat Abdullin) reminds him, rightfully, of Boundary-Control-Entity pattern. A much better summary of the pattern (and related patterns) than I would be ever capable of writing has already been provided by Uncle Bob. TL;DR: put technology-agnostic things in the centre and technology-aware on the outside, use Dependency Inversion Principle to wire things up.

What I want to clear is the place of Controls component in my model. Intuitively is seems like a Command Handler plays this role but that turns out to be impossible for a couple of reasons. A Command Handler is only responsible for translating commands (messages) to method calls on Aggregates. I can’t even touch more than one aggregate when processing a command (due to scalability requirements). Last but not least, it is stateless. So if not Command Handler than what?

Meet Process Manager (a.k.a. Saga). A Process Manager coordinates long running processes by means of message exchange. In some technology stacks (e.g. NServiceBus) Sagas are a built-in mechanism that programmers can use out-of-the-box. In my custom-built stack, however, Saga is just another Aggregate. There is absolutely no difference in technology. The difference is in behaviour. While normal Aggregates are usually passive (they execute the business logic and emit events for state changes), a Process Manager is active in a sense that it usually expects a response for the events it emits. Process Manager-as-an-aggregate needs to accompanied by a bunch of helper components to perform it’s job, namely:

  • Receptor that reacts on outside events and turns them into commands understood by a Process Manager
  • Command Handler that translates those commands to method calls
  • Gateway that translates Process Manager’s state changes (events) into commands sent to other Aggregates

And there we arrived at Jarek’s quesion: why do we need Gateways at all? We could as well use a Receptor on receiving Aggregate’s side, right? Their raison d’être is subtle. In case of a Process Manager, the responsibility for generating a command (an order if you will) is on PM’s side and the receiving Aggregate can’t ignore it. If we used a Receptor (logically belonging to this other Aggregate), it would mean that PM’s event can be safely ignored.

VN:F [1.9.22_1171]
Rating: 4.3/5 (3 votes cast)

Event sourcing and prototyping

Recently I started working on a completely new project. The client insisted on having a separate analysis and design phase before the main project starts so we are now writing lots of documents. Although I don’t like this kind of approach, I kind of understand them. It is their first project with our company so they don’t have any reasons to trust us.

Why is it that I don’t like such analysis and design phase? Simply because you can’t get the architecture right just by reading the requirements document. You need to actually see it working to evaluate various trade-offs. That’s why I like the agile approach — doing just enough design when it’s necessary and adapting as new requirements are implemented (or even discovered).

One of the trade-offs I particularly found difficult to verify was if we should use event sourcing pattern as the default persistence approach or stick to classic relational database. Before you tell me that persistence is an implementation detail of each service think about what consequences would lack of standardisation have here. This is a green field project that will consist of about ten services (or bounded contexts if you will). I would argue that in such case having one suboptimal persistence approach that most services would use is much better that having each service use an optimal mechanism and then deal with maintaining five different data stores (think about monitoring, backups, migrations etc.).

Back to the main subject, use event sourcing or not? Or let me rephrase this question: are the business requirements easy to model using event sourcing approach (compared to object-oriented1 approach)?

I cannot answer this question just by looking at the requirements. I need to actually see it working (or not working). I decided to use following set of primitives (based on my previous research):

  • Receptor — transforming events to commands
  • Command handler — transforming commands to aggregate method calls
  • Aggregate — implementing business logic, emitting events for state changes
  • Views — providing lookups based on processed events

Sagas (or more precisely, process managers) are no different from aggregates in this model. With these tools in my hands I was able in just two days build a walking skeleton which can run one of the most complex processes in the new system (matching, fulfilling, charging fees and generating accounting entries for a purchase order). I was actually surprised by my productivity so I spent a moment trying to reflect on what happened.

Event sourcing is great for prototyping because it makes you focus on behaviour rather than state. You start with just enough state to allow for proper behaviour. In my case I deliberately omitted currencies in the first draft. The command-event chains were not affected by them. Only when I thought the model is fine I added currencies to my amounts to make it more realistic. You start with commands and event and then form aggregates where consistency is needed. Later you can easily move aggregates around, remove some and add new, usually without changes in commands and events.

After two days I am pretty sure we’ll stick with event sourcing as the default approach in the system. Of course having specified a default approach does not mean we won’t use any other way when necessary. Hell, we may even use Microsoft CRM in one bounded context!


1 Notice how I contrasted event sourcing with object-oriented. The fact that I use C# to code it does not imply the code is actually object-oriented. In fact event sourcing is a purely functional approach defined by two functions: f(state, command) → event and f(state, event) → state. For details, ask Greg Young or Jérémie Chassaing.

VN:F [1.9.22_1171]
Rating: 5.0/5 (3 votes cast)

Modelling accounts, event driven part 2

In the previous post I discussed the various approaches for modelling account operations. The outcome of this discussion was the insight that, when using event sourcing, aggregates are the derived concept with the primary concept being the domain invariants. I used money transfer as an example. As Maksim pointed out, in such scenario usually there is another invariant which was missing in my model

The balance of an account should not be less than zero

It is clear that this invariant cannot be satisfied by our Transaction aggregate. We must re-introduce the Account as an aggregate. By the way, what do you think about the new requirement? It is pretty specific, isn’t it? This might be a sign that we are missing a more general concept in the model. How about this

Account can reject participation in a transaction

This is called in Domain-Driven Design a refactoring towards deeper insight. Because of on it we can implement now all kinds of accounts, e.g. credit-only accounts, debit-only accounts (useful on system boundary), credit card accounts (accounts which balance cannot be positive) and many more and we have much better understanding of the domain concepts.

Let’s try to sketch the event-command chain for a successful transaction that would honor both consistency requirements

  1. Transfer command creates new instance of a Transaction aggregate. As a result, a Prepared event is emitted.
  2. Account receptor reacts on the Prepared event emitting two commands, one for each account involved in a transaction: PrepareDebit and PrepareCredit
  3. Destination Account processes PrepareCredit command based on its internal state and rules. In case of normal (non-negative balance) account, it does nothing. As a result, CreditPrepared event is emitted.
  4. Source Account processes PrepareDebit command based on its internal state and rules. In case of normal (non-negative balance) account, it decrements the available funds value. As a result, DebitPrepared event is emitted. Alternatively, DebitPreparationFailed event can be emitted if account rejects participating in the transaction.
  5. Transaction receptor reacts on CreditPrepared and DebitPrepared events by emitting appropriate notification commands.
  6. Transaction aggregate processes notification. If any account rejected the operation, it immediately emits TransactionCancelled event. Accounts which successfully prepared themselves for this transaction should react on this event by undoing any changes. When any account completes preparation, appropriate Confirmed event is emitted to update Transaction state. If both accounts confirmed, a pair of Debited/Credited events are emitted in addition to the Confirmed event. All three events are emitted in one transaction ensuring that funds transfer is done atomically.
  7. Account receptor reacts on Debited and Credited events by emitting appropriate notification commands.
  8. Destination Account processes the notification command based on its internal state and rules. In case of normal (non-negative balance) account, it increments both available funds and balance values.
  9. Source Account processes the notification command based on its internal state and rules. In case of normal (non-negative balance) account, it decrements the value of balance.

The double entry accounting rule is satisfied by Transaction aggregate via emitting both Credited and Debited events in one transaction while veto requirement (a generalization of non-negative balance invariant) is satisfied by using a variation two-phase commit protocol. Despite the fact that the whole interaction is asynchronous (events causing commands to be sent to other aggregates) the externally visible state of the system is always consistent: either there are no entries or there is a pair of Credited and Debited entries. The consequence of eventual consistency is the fact that, in theory, if the settlement phase is lagging for some reason the available funds value is lower than it could be. It means that there might be some false positive rejections which is of course not a desirable thing. But this shows how important monitoring is in asynchronous systems.

VN:F [1.9.22_1171]
Rating: 4.5/5 (2 votes cast)