Pub/Sub Event Model for DNX and ASP.NET 5 with Dependency Injection

I've been continuing my adventures with the new DNX and ASP.NET 5 platform as it progresses through it's various beta stages heading towards RC. I am really liking a lot of the new features that are being built, and how those features open up new scenarios for us developers.

The new stack is incredibly versatile and it is already offering up a myriad of ways of plugging into the new framework. And it is this extensibility, and the way things get composed together that gets the cogs whirring again. It is an exciting time to be a .NET developer, if you have the chance.

One thing which I feel is missing, and not from the ASP.NET 5 stack specifically, is the concept of an evented programming model. That being the ability to respond to your events using your code. Desktop applications (or should we say stateful applications) commonly use an evented programming model. WinForms is an example, and even WPF-based applications - the concept of implementing events is not new. But with a stateless application, like a website, events then become limited in functionality, because I guess you could consider the lifetime of an event to be the lifetime of the request itself.

I wanted to investigate whether or not we could bring in an event model for ASP.NET 5 applications, that uses a little trickery from the DI system to support a dynamic composition of event subscribers. And I started by looking at Prism.

The Event Provider

If you haven't discovered Prism before, it's a toolkit for building composite WPF applications - WPF-based applications composed of loosely coupled components. One feature of Prism as an event aggregator component which is designed to provision events which a subject can subscribe to dynamically. This seemed like a good basis for a dynamic pub/sub event model, so I took what we have there, and defined a new type in my project, the EventProvider. I wasn't overly sure if the naming of *Aggregator was correct - it's not really aggregating anything, and it's not really a *Factory either, because in my model, it's not explicitly creating events either - so I went with *Provider. Here is my contract:

public interface IEventProvider
{
    IEventSubscription<TPayload> CreateEventSubscription<TPayload>(
      SubscriptionToken token, 
      Func<TPayload, CancellationToken, Task> notificationFunc, 
      Func<TPayload, CancellationToken, Task<bool>> filterFunc = null
    );

    TEvent GetEvent<TEvent>() 
      where TEvent : IEvent;

    TEvent GetEvent<TEvent>(Func<TEvent> factory) 
      where TEvent : IEvent;

    IEnumerable<IEventSubscriber<TEvent, TPayload>> GetExternalEventSubscribers<TEvent, TPayload>() 
      where TEvent : IEvent;
}

It follows the original EventAggregator quite closely, but there are a few differences:

  • CreateEventSubscription is a factory method used to create a subscription - it is not the subscriber, but more of the binding between an event and a subscriber.
  • async/await subscribers. Because we don't know what subscribers may be doing on notification, we treat all subscribers as asynchronous. We give them a CancellationToken, they give us back a Task. This means we can await on each subscriber to perform their work.
  • GetExternalEventSubscribers is our hook into the DI system to return event subscribers provided through the IOC container.

The Event

So, what is an event? An event is a .NET class that implements the IEvent<T> contract, where T is our payload type. We split the concerns here with IEvent being the event, and T being the data for our event. E.g., the event could be UserRegistered and the User being the contextual data. The non-generic IEvent contract provides a marker interface used mainly for generic constraints, but also provides a reference to the source IEventProvider.

public interface IEvent<TPayload> : IEvent
{
    Task PublishAsync(
      TPayload payload, 
      CancellationToken cancellationToken = default(CancellationToken));

    SubscriptionToken Subscribe(
      Func<TPayload, CancellationToken, Task> notificationFunc, 
      Func<TPayload, CancellationToken, Task<bool>> filterFunc = null);

    void Unsubscribe(SubscriptionToken token);
}

An individual event controls its own set of subscribers, and by obtaining a reference to the event from the provider - means you can trigger your publications as part of your workflow.

What this mechanism enables is a classic pub/sub event mechanism you can use through the IDisposable pattern of using:

var provider = new EventProvider();
var @event = provider.GetEvent<UserRegistered>();
using (
  @event.Subscribe(async (user, ct) => await SendRegistrationEmailAsync(user, ct))
) {
  var user = new User { Name = "Matt" }:
  await @event.PublishAsync(user, cancellationToken);
}

While this is great because it means we can subscribe to those events we care about, it does add the complication that we now have to make our workflow more complex - which violates the single responsibility principal. We want to keep our code simple, which is more predictable, easier to debug and easier to test. So how do we go about doing this?

Providing Subscribers through DI

In stateless applications, like websites, the actual application composition occurs at startup. I can't think of a great many solutions that offer dynamic composition during the lifetime of a web application. Typically you have a set of components you wire up at the start, and the application really doesn't change from that point onwards. By that, I mean code doesn't change - obviously data does.

We can take advantage of this by allowing our event provider to use the IoC container to resolve our event subscribers. In this model, event subscribers provided through the DI system I've called external event subscribers, and otherwise they are direct. So how do we do this, firstly we define another contract:

public interface IEventSubscriber<TEvent, TPayload> where TEvent : IEvent
{
    Task<bool> FilterAsync(
      TPayload payload, 
      CancellationToken cancelationToken = default(CancellationToken));

    Task NotifyAsync(
      TPayload payload, 
      CancellationToken cancellationToken = default(CancellationToken));
}

This interface contract provides the subscribe methods for notification (called when an item is published), and filtering (so the subscriber can be notified of only the payloads it cares about).

So let's implement an external event subscriber for the ASP.NET 5 starter template project. Firstly, let's define our event:

public class ApplicationUserCreatedEvent : Event<ApplicationUser>
{
}

The event will be trigged when the ApplicationUser class is saved. Next, let's define our event subscriber:

public class SendAuthorisationEmailEventSubscriber : EventSubscriber<ApplicationUserCreatedEvent, ApplicationUser>
{
    private readonly IEmailSender _emailSender;

    public SendAuthorisationEmailEventSubscriber(IEmailSender emailSender)
    {
        _emailSender = emailSender;
    }

    public override Task<bool> FilterAsync(ApplicationUser payload, CancellationToken cancelationToken = default(CancellationToken))
    {
        return Task.FromResult(payload.Email != null);
    }

    public override async Task NotifyAsync(ApplicationUser payload, CancellationToken cancellationToken = default(CancellationToken))
    {
        await _emailSender.SendEmailAsync(payload.Email, $"Welcome to the site", $"Hi {payload.UserName}, this is your welcome email.");
    }
}

What this example event subscriber is doing, is sending confirmation (authorisation) emails to new users. The subscriber itself is provided through the DI system, which means it itself can have its own dependencies provided the same way. There are a number of benefits to this approach:

  • Event subscribers define their own dependencies.
  • Event publishers are simplified because they don't need to know about the dependencies of subscribers.
  • Both subscribers and publisher's code surface is a lot smaller - it's easier to test, very decoupled and can be focused on that single responsibility.

We can register our services in Startup:

public void ConfigureServices(IServiceCollection services)
{
    // Other services...

    // Register event provider and subscribers.
    services.AddScoped<IEventProvider>(sp => new EventProvider(() => sp));
    services.AddTransientEventSubscriber<ApplicationUserCreatedEvent, ApplicationUser, SendAuthorisationEmailEventSubscriber>();
}

We make the IEventProvider scoped to the request, but events can either be scoped or transient - depending on the services you want them to consume. In my demo project, I've taken the basic ASP.NET 5 starter template, and modified the AccountController:

[Authorize]
public class AccountController : Controller
{
    // Code removed for berevity...
    private readonly IEventProvider _eventProvider;
    private readonly ApplicationUserCreatedEvent _userCreatedEvent;

    public AccountController(
        // Code removed for berevity...
        ApplicationDbContext applicationDbContext,
        IEventProvider eventProvider)
    {
        // Code removed for berevity...
        _applicationDbContext = applicationDbContext;
        _eventProvider = eventProvider;

        _userCreatedEvent = eventProvider.GetEvent<ApplicationUserCreatedEvent>();
    }

    // Code removed for berevity...

    [HttpPost]
    [AllowAnonymous]
    [ValidateAntiForgeryToken]
    public async Task<IActionResult> Register(RegisterViewModel model)
    {
        if (ModelState.IsValid)
        {
            var user = new ApplicationUser { UserName = model.Email, Email = model.Email };
            var result = await _userManager.CreateAsync(user, model.Password);
            if (result.Succeeded)
            {
                await _userCreatedEvent.PublishAsync(user);

                // Code removed for berevity...
            }
            AddErrors(result);
        }

        // If we got this far, something failed, redisplay form
        return View(model);
    }
}

Now all the AccountController needs to do is publish the event and allow the framework to take care of the rest.

I've pushed this sample project to GitHub. Comments, criticisms welcome.