Posts tagged patterns

Mechanics of durable subscription

<Announcement>

There are still three spots left on Greg Young’s Advanced Class in Krakow (February 6th-8th). Go register here before they are gone.

</Announcement>

Today I’d like to talk a bit about the mechanics of a robust durable subscription system. First, what is a durable subscription? Here’s a description from Enterprise Integration Patterns (I really recommend this book):

Use a Durable Subscriber to make the messaging system save messages published while the subscriber is disconnected.

The goal is not to loose messages while subscriber is offline (for whatever reason). There are many possible implementations based on the specifics of a messaging system we are working with. The main distinction between them is who, publisher or subscriber, is responsible for maintaining the state of the subscription.

In theory, the mechanics are simple, no matter who’s responsible for the state. In case subscriber goes down, the state is simply not updated while new messages are generated (and stored). When subscriber is up again the publisher picks up where he left based on the state and sends all the missed messages. In case where publisher is responsible the only additional step is an acknowledgement that has to be sent to publisher every N messages (where N depends on how much you don’t like duplicates).

In practice however, many messaging platforms (including EventStore which is recently my favourite) offer two distinct APIs for implementing subscriptions:

  • push API where a client (subscriber) opens a persistent channel (TCP connection) via which a server (publisher) sends messages immediately as they are generated
  • pull API where a client (subscriber)  periodically polls a server (publisher) asking if there are any messages newer than X where X is the current state of the subscription

The advantage of push API is, it is usually a lot faster (less effort on publisher side and minimum latency). The problem is, it works only when both parties are up and running and there is no lag. If the subscriber processes messages slower than the publisher generates, them sooner or later buffers on the publisher side become too big to fit in the memory. EventStore is smart enough to drop the subscription automatically in such case.

There  is no such problem with the pull API because the client requests messages in its own pace and can even crash every now and then without the publisher ever noticing. This comes with a price however. Pull API is inherently slower (meaning higher latency) and it requires additional reads on the publisher side.

As usual, the best solution combines both approaches. The idea is simple: when the subscriber is up to date with the message stream, it switches to push API and whenever it is not — switches back to pull. The actual working algorithm is slightly more complex:

  1. The subscriber always starts with pull assuming there were some messages generated while it was offline
  2. The subscriber pulls messages until there’s nothing left to pull (it is up to date with the stream)
  3. Push subscription is started  but arriving messages are not processed immediately but temporarily redirected to a buffer
  4. One last pull is done to ensure nothing happened between step 2 and 3
  5. Messages from this last pull are processed
  6. Processing messages from push buffer is started. While messages are processed, they are checked against IDs of messages processed in step 5 to ensure there’s no duplicates.
  7. System works in push model until subscriber is killed or subscription is dropped by publisher drops push subscription.

This algorithm is the single most important thing in my new project.

VN:F [1.9.22_1171]
Rating: 5.0/5 (4 votes cast)

The handler pattern

Today I want to tell you about a pattern I see more and more frequently in my codebases. It is not a pattern in the sense of Gang of Four book. At least I haven’t thought about it in terms of a design pattern. It’s just a piece of code that shows here and there in slightly different versions. Let’s call it ‘handler’ because I can’t come up with a better name.

The idea is to have a set of classes that do something, whatever, and a class (façade) that allows you to execute all these classes for a given set of arguments. The classes that do something declare their capability of doing it by implementing a generic interface such as this one

public interface IHandler<in T>
{
    void Handle(T argument);
}

In various versions the Handle can have various arguments. This example is the simplest one. In my current project I used this pattern to implement at least 4 concepts:

  • service request validators (instead of Handle, there is a Validate method)
  • service filters (with two methods, namely BeforeProcessing and AfterProcessing)
  • request handlers (these look exactly the same as the example)
  • event handlers

It may be the case that if you have a hammer, everything looks like a nail, but I find this pattern extremely useful. Because of that, I want to share with you a simple reference implementation of it. I know it’s neither optimal nor very clean but it can give you some ideas and you can make it better. Unfortunately I can’t share the original code because of copyright issues.

public class Handlers
{
    private readonly IEnumerable<Type> _handlerTypes;

    public Handlers(params Type[] handlerTypes)
    {
        _handlerTypes = handlerTypes;
    }

    public void Handle(object argument)
    {
        var matchingHandlerTypes = FindMatchingHandlerTypes(argument.GetType());
        var handlers = matchingHandlerTypes.Select(CreateHandlerInstance);

        dynamic dynamicArgument = argument;
        foreach (dynamic handler in handlers)
        {
            handler.Handle(dynamicArgument);
        }
    }

    private IEnumerable<Type> FindMatchingHandlerTypes(Type argumentType)
    {
        return _handlerTypes.Where(x => ImplementsHandlerOf(x, argumentType));
    }

    private static bool ImplementsHandlerOf(Type candicateHandlerType, Type argumentType)
    {
        return candicateHandlerType.GetInterfaces().Any(x => IsHandlerOf(x, argumentType));
    }

    private static bool IsHandlerOf(Type interfaceType, Type argumentType)
    {
        return interfaceType.IsGenericType
            && interfaceType.GetGenericTypeDefinition() == typeof (IHandler<>)
            && IsMatch(interfaceType.GetGenericArguments()[0], argumentType);
    }

    private static bool IsMatch(Type handlerInterfaceGenericArgument, Type argumentType)
    {
        return IsExactMatch(argumentType, handlerInterfaceGenericArgument)
            || IsInheritanceMatch(argumentType, handlerInterfaceGenericArgument);
    }

    private static bool IsInheritanceMatch(Type argumentType, Type handlerInterfaceGenericArgument)
    {
        return argumentType.IsSubclassOf(handlerInterfaceGenericArgument);
    }

    private static bool IsExactMatch(Type argumentType, Type handlerInterfaceGenericArgument)
    {
        return argumentType == handlerInterfaceGenericArgument;
    }

    private static object CreateHandlerInstance(Type handlerType)
    {
        return Activator.CreateInstance(handlerType);
    }
}

Three things worth notice here are

  • I really, really like functional programming with LINQ
  • You can (and should) integrate your version with your favourite Inversion of Control container instead of using Activator.CreaterInstance
  • Dynamic feature of C# is used as a replacement of some ugly wrapper classes and/or reflection. It can be easily rewritten in a way it compiles with C# 3.0.

Happy handling!

VN:F [1.9.22_1171]
Rating: 0.0/5 (0 votes cast)