MultiThreadingProtectorAspect<T>

392735_2313298187894_683652837_n[1]Hi, Have you ever wants to protect your method against many concurrent thread in multi-threading environment? Let’s say you have WCF service implementation and in that implementation best practice is create static class with static methods because of performance (static is about 40% faster than non-static). Oh right and let’s say that you need to access to for example file store in that file something or you have field that should be protect or you can simply want to write everything without locking. That last reason is key of success in concurrent backend systems. Of course you can use locking. That’s the easiest and lazy way. But you can also think twice and create protector that do not buffer anything but it is ready for concurrent invocation and it can protect your resource from access by many threads in the same time. So I think about such kind of usage you can see below.

using System;
using System.Diagnostics;
using System.Globalization;
using System.Threading.Tasks;

namespace MultiThreadingProtectorAspect
{
  internal class Program
  {
    private static int _count;

    private static readonly MultiThreadingProtectorAspect<string>
      _protector = new MultiThreadingProtectorAspect<string>
                   (ProtectForManyThreads, HandleException);

    static void Main() {
      _count = 0;

      Console.WriteLine("==> 1 mln of invocations start.");

      var meter = Stopwatch.StartNew();
      Parallel.For(
          0,
          1000000,
          i => ServiceMethod(i.ToString()));
      meter.Stop();

      Console.WriteLine("==> 1 mln of invocations took {0} ms, count field equals {1}.",
        meter.ElapsedMilliseconds,
        _count);

      Console.WriteLine("Press any key to close...");
      Console.ReadKey();
    }

    public static void ServiceMethod(string arg)
    {
      _protector.Invoke(arg);
    }    

    static void ProtectForManyThreads(string arg) {
      _count++;

      if (_count == 250000)
        throw new InvalidOperationException("Booo!");
    }

    static void HandleException(Exception exception) {
      var color = Console.ForegroundColor;

      Console.ForegroundColor = ConsoleColor.Yellow;

      Console.WriteLine(exception.ToString());

      Console.ForegroundColor = color;
    }
  }
}

As you can see I have a _count filed lets say that I need to protect this counter for some reason. If you invoke method ProtectForManyThreads in Parallel.For 1 mln times you can be sure that incrementing counter by ++ operator gives you wrong count value. It is a classic problem very often resolution is by interlocked invocation. But I need something much nicer for protect everything not only this one method. And for that I invented this aspect. as you can see aspect hide original invocation and takes responsibility in ServiceMethod method for invocation and protect our static method. It can be a method in WCF service and Paraller.For was created for simulation of concurrent invocation. So you are probably wonder about output of this program and about implementation. I start with output and it is look like is shown below.

==> 1 mln of invocations start.
System.InvalidOperationException: Booo!
   at MultiThreadingProtectorAspect.Program.ProtectForManyThreads(String arg) in D:\Projekty\MultiThreadingProtectorAspect\MultiThreadingProtectorAspect\Program.cs:line 42
   at MultiThreadingProtectorAspect.MultiThreadingProtectorAspect`1.NestedInvoker`1.Invoke() in D:\Projekty\MultiThreadingProtectorAspect\MultiThreadingProtectorAspect\MultiThreadingProtectorAspect.cs:line 69
==> 1 mln of invocations took 608 ms, count field equals 1000000.
Press any key to close...

I have very slow laptop but event on it 1 million invocations took about 600 milliseconds that means that per 1 invocation you need about 0,0006 milliseconds. So, it is pretty fast. And moreover you can see that count filed is concurrent, so nothing was loosed. Ok, so as I promised just before this is implementation of this aspect below.

using System;
using System.Collections.Concurrent;
using System.Threading;

namespace MultiThreadingProtectorAspect
{
  public class MultiThreadingProtectorAspect<T> : IDisposable
  {
    private readonly Action<T> _invoker;
    private readonly Action<Exception> _exceptionHandler;
    private bool _disposed;

    public MultiThreadingProtectorAspect(
        Action<T> invoker,
        Action<Exception> exceptionHandler = null) {

      _invoker = invoker;
      _exceptionHandler = exceptionHandler;
      _disposed = false;

      new Thread(Consume) { IsBackground = true }.Start();
    }

    private readonly ConcurrentQueue<NestedInvoker<T>> _invokeQueue
        = new ConcurrentQueue<NestedInvoker<T>>();

    public void Invoke(T arg) {
      var nestedInvoker = new NestedInvoker<T>(arg, _invoker, _exceptionHandler);
      _invokeQueue.Enqueue(nestedInvoker);
      nestedInvoker.Wait();
    }

    public void InvokeAsync(T arg) {
      var nestedInvoker = new NestedInvoker<T>(arg, _invoker, _exceptionHandler);
      _invokeQueue.Enqueue(nestedInvoker);
    }

    private void Consume() {
      NestedInvoker<T> invoker;
      while (!_disposed)
        if (SpinWait.SpinUntil(() => !_invokeQueue.IsEmpty, 100) &&
            _invokeQueue.TryDequeue(out invoker))
          invoker.Invoke();
    }

    public void Dispose() {
      _disposed = true;
    }

    private class NestedInvoker<TItem>
    {
      private readonly TItem _item;
      private readonly Action<TItem> _invoker;
      private readonly Action<Exception> _exceptionHandler;
      private bool _invoked;

      public NestedInvoker(
          TItem item,
          Action<TItem> invoker,
          Action<Exception> exceptionHandler = null) {

        _invoker = invoker;
        _item = item;
        _exceptionHandler = exceptionHandler;
      }

      public void Invoke() {
        try {
          _invoker.Invoke(_item);
        } catch (Exception exception) {
          if (_exceptionHandler != null)
            _exceptionHandler.Invoke(exception);
        } finally {
          _invoked = true;
        }
      }

      public void Wait() {
        SpinWait.SpinUntil(() => _invoked);
      }
    }
  }
}

How it is build? Source of the solution is based on ConcurrentQueue<T> that is my favorite concurrent collection in .NET 4.0 because of one pretty amazing feature that you can add to the queue elements without locking and it works always, except of course of OutOfMemmory exception situation :). Ok, let me return to the subject. I used this queue and I free the queue one by one elements synchronically. If you are study this example carefully you can see that there is no buffer at all even with queue. Of course you can said it is not parallel because it work synchronically, but it is only part of the true. Most important that order of the invocation is protection without locking. Now it is time for you, please examine how fast your solution will be with lock and 100 or later 500 concurrent threads? Please let me know about result of that experiment and than I told you why you observe such kind of behavior. Another exercise is to protect method that returns value. But I am sure that I was shown you a way that you can easily pass through with this last problem. I hope you enjoy this entry.

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.