Posts tagged architecture

Using EventStore as a messaging platform

It’s November and I am again experimenting with using EventStore as a messaging platform. Last year I built this, with the main focus on researching automated deployment and upgrade of components communicating via pub/sub. This time I am more focused on doing messaging in a way that is most natural to EventStore, trying to leverage it’s full potential (e.g. projections).

Message processors

In  the centre of each messaging agent sits a message processor. It is a stand-alone application (in my case written in C#) that connects to the messaging platform. On one end if receives incoming messages and on the other it outputs messages either by sending commands or publishing events (both commands and events are messages, the difference is a command has a destination address while an event does not). I won’t focus for now on how those message processors are implemented. The only important thing at this point is the assumption that all of them connect to the same instance of EventStore so EventStore acts as a bus in this scenario.

MessageProcessor

Receiving

On the receive side each component maintains an input stream/queue (I will use these two terms interchangeably as technically it is EventStore stream but in our scenario acting as a queue). Following picture illustrates how messages get into this queue

Receive

First, other components might send messages directly to our component. In such case they need to put (append in EventStore lingo) a message into our input queue. Quite easy. But how about the case when we want to subscribe for messages (events) published by some other component? This is the responsibility of Subscription Projection. I could not find a matching Enterprise Integration Patterns name for this guy and after some discussion on twitter the name I like best is Message Sink. The idea is that is moves messages from various output queues of other components into our input queue. Subscription Projection plays also a role of Message Filter as from each output stream it only takes the messages our component is interested in.

options({
    reorderEvents: true
});
fromStreams(['comp2_out','comp3_out'])
.when({
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event1'/*comp2_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
	'NServiceBus.AddIn.Tests.EventStoreSubscriptionManagerTests+Event2'/*comp3_out*/: function (s, e) {
        linkto('comp1_in',e);
    }
})

Sending and publishing

Let’s face the truth, EventStore is not a message queueing product and this fact of life can make some things more difficult than they usually are. An example would be receiving and sending messages in one transaction which simply cannot be done as there is no native concept of receiving in the EventStore. We need to rely on our message handlers to be smart enough to handle this problem. There are two basic rules we need to follow:

  • message handlers are idempotent, which means processing the same logical message more than once does not change the outcome
  • message handlers can actually detect when the message is re-processed and in such case not only ensure that no new updates are done (idempotency) but also do re-send and re-publish all the messages that were supposed to be generated during this message processing

These two ensure that, given infinite amount of retries, eventually the message will be processed and all the messages that are the result of processing will be sent/published at least once. The simple variant of this schema is shown in the following picture.

NonTransactionalSend

The component is sending and publishing each message separately which means that some recipients might get their messages long before others if the components goes down in the middle of message processing. It also means that some messages might be sent even though the internal data store update has failed for some reason. The eventual consistency guarantee here is very loose but still for some cases it can be useful.

If your component need a stronger guarantee, EventStore transactions can be used. The following picture presents transactional send scheme.

TransactionalSend

All the messages (both commands and events) resulting from processing an incoming message are sent in one transaction to an intermediate queue. This queue is an input for a routing projection (Message Router pattern) that forwards command messages to their destination queues and forwards event messages to the actual output queue.

fromStream('comp2_intermediate_out')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})

Sending and publishing, event sourced

The previous section assumed that a message processor uses a traditional data store to persist data and messaging library API to send or publish messages. In case it uses the event sourcing approach, publishing events is more a persistence concern rather than messaging one. In such scenario processing a message would result in one or more events appended to a stream (directly via EventStore API) corresponding to an aggregate that processed the message. We could relax this rule and allow also to put command messages into such stream although these would not be used when reconstituting an aggregate. You can now see that we actually have here almost exact same situation as with transactional send/publish.

EventSourced

The only difference is that in the normal transactional send we have one intermediate queue and here we have any number of aggregate streams. With EventStore we can handle this easily by having a projection whose input are all streams of some category.

fromCategory('comp2_aggregate')
.when({
	$any: function (s, e) {
		if (typeof e.metadata.destinationStream !== "undefined") {
			emit(e.metadata.destinationStream, e.eventType, e.data, e.metadata);
		} else {
			emit('comp2_out', e.eventType, e.data, e.metadata);
		}
	}
})
VN:F [1.9.22_1171]
Rating: 5.0/5 (4 votes cast)

Do we still need an OS anyway?

Years pass by, every now and then a new user interface paradigm is created, but on the server side nothing really changes. No matter if we use Java, .NET or Ruby, the same old story is repeated again and again. A packet comes in, it goes to some kernel-level TCP library, then HTTP library or whole web server to finally come to the managed world of a virtual machine. During its processing a considerable amount of garbage gets generated and left to be later collected by the GC. This whole interaction happens under a control of an operating system that hosts a myriad of other processes (including a graphical user interface). The OS itself is virtual, working under control of a hypervisor.

Multiple layers of indirection cause heavy indeterminism. And I am not even mentioning metal level where x86 instructions are mapped to actual RISC instructions. I think we are slowly reaching the limits of such approach. We can’t increase the complexity of the architecture indefinitely. At some point we need to remove one moving part in order to be able to add another.

What can we do? Let’s imagine the runtime environment of the future. It does not need an OS. It boots directly as a guest VM. Because it is a virtual machine that runs managed code it does not need memory protection nor processes in order to achieve proper isolation. It can statically verify if the code does not violate the security restrictions on memory access (as implemented in Singularity). As RAM is super cheap these days there is no need for any form of memory paging.

When a request comes in, it is redirected to a lightweight process (consisting of a thread of control, a stack and a heap) that has both HTTP library and user code loaded. After the request is handled, the whole process is destroyed and its heap disposed. No garbage gets accumulated over time hence no ‘stop the world’ collection is ever necessary.

Everything happens inside one address space that runs in kernel mode (as the whole VM runs as OS). There is no need for any devices drives other than one for the network adapter. The number of moving part is reduced to minimum.

Because there is no OS and hence no shell of any form, how do we start our server program? We don’t. The VM image contains the actual VM binary, the user code binaries and all the third-party binaries it references.  The image is assembled at build time and it automatically boots the user code when started.

I would love to see it implemented with CLR and C# but it seems not possible. There is, however, a similar concept for Erlang — Erlang on Xen and I think it would be relatively easy to implement such thing with node/V8 as a VM. What do you think?

VN:F [1.9.22_1171]
Rating: 5.0/5 (3 votes cast)

Event sourcing and prototyping

Recently I started working on a completely new project. The client insisted on having a separate analysis and design phase before the main project starts so we are now writing lots of documents. Although I don’t like this kind of approach, I kind of understand them. It is their first project with our company so they don’t have any reasons to trust us.

Why is it that I don’t like such analysis and design phase? Simply because you can’t get the architecture right just by reading the requirements document. You need to actually see it working to evaluate various trade-offs. That’s why I like the agile approach — doing just enough design when it’s necessary and adapting as new requirements are implemented (or even discovered).

One of the trade-offs I particularly found difficult to verify was if we should use event sourcing pattern as the default persistence approach or stick to classic relational database. Before you tell me that persistence is an implementation detail of each service think about what consequences would lack of standardisation have here. This is a green field project that will consist of about ten services (or bounded contexts if you will). I would argue that in such case having one suboptimal persistence approach that most services would use is much better that having each service use an optimal mechanism and then deal with maintaining five different data stores (think about monitoring, backups, migrations etc.).

Back to the main subject, use event sourcing or not? Or let me rephrase this question: are the business requirements easy to model using event sourcing approach (compared to object-oriented1 approach)?

I cannot answer this question just by looking at the requirements. I need to actually see it working (or not working). I decided to use following set of primitives (based on my previous research):

  • Receptor — transforming events to commands
  • Command handler — transforming commands to aggregate method calls
  • Aggregate — implementing business logic, emitting events for state changes
  • Views — providing lookups based on processed events

Sagas (or more precisely, process managers) are no different from aggregates in this model. With these tools in my hands I was able in just two days build a walking skeleton which can run one of the most complex processes in the new system (matching, fulfilling, charging fees and generating accounting entries for a purchase order). I was actually surprised by my productivity so I spent a moment trying to reflect on what happened.

Event sourcing is great for prototyping because it makes you focus on behaviour rather than state. You start with just enough state to allow for proper behaviour. In my case I deliberately omitted currencies in the first draft. The command-event chains were not affected by them. Only when I thought the model is fine I added currencies to my amounts to make it more realistic. You start with commands and event and then form aggregates where consistency is needed. Later you can easily move aggregates around, remove some and add new, usually without changes in commands and events.

After two days I am pretty sure we’ll stick with event sourcing as the default approach in the system. Of course having specified a default approach does not mean we won’t use any other way when necessary. Hell, we may even use Microsoft CRM in one bounded context!


1 Notice how I contrasted event sourcing with object-oriented. The fact that I use C# to code it does not imply the code is actually object-oriented. In fact event sourcing is a purely functional approach defined by two functions: f(state, command) → event and f(state, event) → state. For details, ask Greg Young or Jérémie Chassaing.

VN:F [1.9.22_1171]
Rating: 5.0/5 (3 votes cast)

Event sourcing the past and today

It’s been quite some time since I first read about event sourcing on Mark Nijhof‘s blog (Mark has moved his blog several times since then). My understanding of the subject has been evolving for few years. Same with other people in the community. Greg Young, the father of the event sourcing and CQRS ideas, might tell you that the biggest mistake community did during these years was building frameworks. To some extent I agree but on the other hand I think that bashing all frameworks is like throwing out the baby with the bathwater.

Problems with frameworks

The biggest problem with CQRS frameworks of the past (and I know this because I contributed to one of them) was in my opinion the fact that we focused on wrong things. We, as a community, didn’t understand the whole concept well enough. Sure, there were always people who did and tried to warn the others but we were not listening because we were too busy coding The Most Beautiful And Complete Implementation Of Aggregate. Standard CQRS framework from the past looked like this:

The most important concept is the aggregate. This is logical, right? It is the place where business logic lives so it must be the most important concept. Aggregates produce events which are also quite important because they are used to maintain the state of the aggregate. Events themselves are stored in an event store. In those early days there was no mature ready-made event store implementation so a framework had to include one. Event versioning and commands manage to attract some attention but not a lot. In case of commands, this attention was mostly focused not on commands themselves but rather on how we handle them. Concepts like messages, integration, duplication/transactions and read models were in most cases out of scope.

One can summarize this approach as storing business object state by serializing state-mutating method calls. We surely missed the whole point.

Value of frameworks

This is the baby and bathwater part. Although as I wrote, we missed the whole point, I still see value in what we created. Frameworks do lower the entry barrier for people who want to try a new approach. NCQRS, a framework I contributed to (kudos to Pieter for building it) did help some people to grasp the concept of event sourcing. Frameworks also (at least the good ones) remove the need of writing boilerplate code and let us focus on the business problem we try to solve. In case of event sourcing and CQRS the problem was that we initially misidentified the areas where framework would have most value. Frameworks did provide ready-made solutions for things that most people wanted to be custom-made while failing to help in tough and/or generic problems such as:

  • read models
  • integration
  • transactions and coordination

A better approach

While I won’t say that I know The Best Approach To CQRS now, I would argue that I at least know one that is better than the ones we used in the past. The diagram of today’s framework should look like this:

The whole thing is more balanced. No longer a single concept dominates the whole picture. An event store has been moved out of scope of the framework as we have at least two production-ready implementations provided by Jonathan Oliver and Greg Young. The former is an abstraction layer built on top of other data stores (e.g. RDBMS) while the latter is an event store build from the ground up.

The key concept is message — an atomic piece of information we can package and send between processes, machines and systems. Not all messages are created equal though. A smart framework would distinguish commands (messages that tell the recipient to perform some action) from events (ones that inform everybody that something has happened). Handling duplication of messages is key to build a robust message-driven system. A good framework should provide a solution for this.

Event versioning should get some attention also as it is critical for maintainability of the system in the long run. Another interesting bit is integration — how do we communicate with other systems? The general answer is we allow them to read our event store. This requires however that we have durable subscription feature somewhere in our stack, either within the store or the framework.

If we are building a CQRS framework, there need to be a decent support for the read side also. Problems like automatic view rebuilding when code changes are the ones that a framework should be able to take care of.

The last thing in the picture in an aggregate. Noticed the different from the previous diagram? In a modern approach is has a servant role. In modelling phase aggregates are a corollary of command-event chains. We simply group related command-event pairs into aggregates according to their consistency requirements. In the implementation phase aggregates provide deduplication capabilities which are essential to most business logic.

And beyond

All the things I’ve just mentioned can be built in less than a month by a single person so the effort can be easily accommodated by even a medium size project. So far not much value added, at least for people who know messaging and event sourcing sourcing inside-out. True, but the thing I described, if done right, would be a very solid platform that we can use to build value added services on top of it. I am thinking of things like:

  • enforcing SOA/EDA architecture using best practices preached by Udi Dahan
  • being able tell the software to upgrade service A from version X to Y and having all dependencies upgrade automatically (and one-by-one to limit any down time)
  • monitoring any aspect of both the infrastructure and the user code running on top of it
  • and many, many others
VN:F [1.9.22_1171]
Rating: 5.0/5 (6 votes cast)

Really Simple Architecture components

In the previous post I’ve mentioned the idea of a Really Simple Architecture. This time I’d like to discuss the components of RSA in more detail. Because the whole concept evolves in my mind, by no means treat as any form of reference architecture. At least not yet ;-) This is just a thought experiment for now.

Components

First, I must admit I borrowed most of the ideas from Rinat’s blog.

Receptor

Receptors come in two flavours: internal and external. Both look and work exactly the same.

The difference is the source of the events they receive. An external receptor processes events coming from other service (or services) while an internal receptor reads from its own event store. External event receptors are the only way services can exchange data: one service publishes events that the other service reads and acts upon. Internal receptors can be used to integrate multiple command handlers into a pipeline: they turn events published by one handler into a commands for another.

Command handler

Unfortunately I could not find a better name for this thing. At least it says exactly what it does: process commands and publish events as a result. In general command handlers can do anything to handle an event but there are two specialization that are especially worth mentioning.

A stateless handler executes logic that does not depend on any persistent state. It just transforms commands to events using some kind of an algorithm.

An aggregate-based handler uses Domain-Driven Design’s Aggregate pattern to structure the logic. An incoming command is mapped to a method call on aggregate object. The method results in generating on or more events. These events are then persisted in the event store in one transaction. Aggregate’s state is loaded into memory each time a command is processed so the processing logic can be based on past events.

View projection

This component updates so called view model (or read model) based on processed events.

As with receptors, view projections can be internal (as on picture above) or external (not shown).

Gateway

They too come in two versions. First transforms commands into events using some external system.

When coupled with an internal receptor, this type of gateway can be used to send commands to an external system. The flow would be following:

  • a command handler publishes SomethingRequested event
  • an internal receptor transforms this event to RequestSomething command
  • a gateway receives the command, sends a message to an external system (e.g. via SOAP), receives a response and publishes the result as an SomethingDone event
  • an internal receptor transforms this event to HandleSomethingDone command targeted to the command handler that initiated the communication

In short, this type of gateway can be used when external party we are dealing with can reject the message we are sending. Otherwise, the second version of gateway, which is easier to plug into the system, can be used

This type gateway combines two features. It processes an event stream and publishes the data to external systems and can also receive data from this system and inject it to our service in form of commands.

The above list is not complete nor final. It evolves constantly while I try to implement more and more use cases using my RSA. There are, however, much more stable concepts in the architecture. Let’s call them primitives. There are only four of them.

Primitives

Command

A command is a message that instructs somebody to perform some action. The architecture acknowledges that commands do get duplicated from time to time. Command handlers are expected to deal with this fact of life. To make it possible, a command always carries an ID.

Event

An event is a message that denotes something that has happened in the past. An event may (but is not required to) point to a command that caused it. Events are organized in streams that have business meaning e.g. responses from external party X or events related to service Y. Event streams may or may not guarantee to be free of duplicates. Allowing duplication is a property of an event stream that is dependent on kind of a command handler(s) that emits events to the stream.

Event Handler

An event handler is a thing that processes events from one or more logical event stores. In a result it can either emit a command or update a view. Event handlers can read from views in order to provide context for the processing. An event handler is responsible for assigning proper IDs to generated commands. In case of an event stream that guarantee no event duplication (most published event stream are like that), the easiest way to assign an ID to command is to use event’s unique ID. Otherwise, a business-level ID has to be used.

Command Handler

A command handler is a thing that processes commands from a queue. In a result it emits events to some stream. Event handlers can read from views in order to provide context for the processing. Command handler might provide no duplication guarantee for the event stream(s) it generates. Deduplication can be achieved by loading events generated in the past and checking if any one of them is related to a command being processed. If so, a command is considered a duplicate.

Usually handlers can’t afford loading whole history for each processed command so it leaves two optimization options. Command handler can use some business-level ID carried by a command to decide what stream to emit resulting events to. In such case resulting streams can be short enough to allow to load them before processing each command. In other words, a concept of aggregate can be used. As an alternative, a sliding deduplication window can be used to limit the number of past events that needs to be loaded.

View store

Anything that holds persistent state.

The Idea

As you may expect, these four primitives are based on something even more fundamental. The Idea behind RSA is that any message in a distributed system (as in real life) can be duplicated. RSA does not try to create an illusion that it is not the case. Rather than, it provides means for creating deduplicated bubbles within the solution where it makes sense.

VN:F [1.9.22_1171]
Rating: 5.0/5 (5 votes cast)