MultiThreadingProtectorAspect<T, TResult>

sowaHi, today I would like to share with you another aspect for protection service method in multi-threading environment. I also try to examine how fast this solution is and how much of the CPU time it consumes. Todays implementation is extended for most popular usage and it is a method that return value and it is still protected. I will try to show you a test program that will be a kind of console application invoker. This tester will use multi-threaded service invoker. I will show you two example of services that implement the same service interface. And than I will show you my protector idea class implementation. And at the end I will present results of benchmark and I will try make some notes conclusions to you. There it is a lot to cover so, let’s get started with our test application benchmark. It looks like is shown below.

using System;
using System.Diagnostics;

namespace MultiThreadingProtectorAspectSandbox
{
  class Program
  {
    static void Main()
    {
      int numThreads = 250;
      int count = 1000000;

      Console.WriteLine("==> Experiment with {0} threads <==", numThreads);

      var meter = new Stopwatch();

      Console.WriteLine("==> 1 mln of locked invocations start.");
      meter.Start();

      var countWithLock = ServiceInvokerTester<TestServiceWithLock>.RunServiceTest(
            new TestServiceWithLock(),
            numThreads,
            count);

      meter.Stop();
      Console.WriteLine("==> locked invocation took {0} us, count field equals {1}.",
        meter.ElapsedMilliseconds,
        countWithLock);

      meter.Reset();

      Console.WriteLine("==> 1 mln of unlocked invocations start.");
      meter.Start();

      var countWithoutLock = ServiceInvokerTester<TestServiceWithoutLock>.RunServiceTest(
            new TestServiceWithoutLock(),
            numThreads,
            count);

      meter.Stop();
      Console.WriteLine("==> unlocked invocation took {0} us, count field equals {1}.",
        meter.ElapsedMilliseconds,
        countWithoutLock);

      meter.Reset();

      Console.WriteLine("Press any key to close...");
      Console.ReadKey();
    }
  }
}

As you can see I use ServiceInvokerTester that run static method in 10 threads. It is kind of light service invoker simulator like that one in WCF or in another interoperability open source implementations like Thrift, Zero ICE, ZeroMQ or something similar. My tester invoker code looks like is shown below.

using System;
using System.Threading;

namespace MultiThreadingProtectorAspectSandbox
{
  public class ServiceInvokerTester<T> where T : ITestService
  {
    public static int RunServiceTest(T service, int numThreads, int count)
    {
      ThreadRunner(service.ServiceMethod, numThreads, count);
      SpinWait.SpinUntil(() => count == service.GetCount());
      return service.GetCount();
    }

    static void ThreadRunner(Func<int, int> invoker, int numThreads, int count)
    {
      var threads = new Thread[numThreads];

      for (var i = 0; i < numThreads; ++i)
      {
        threads[i] = new Thread(
        () =>
        {
          var inv = invoker;
          var min = ((count / numThreads) * i);
          var max = ((count / numThreads) * (i + 1));
          Runner(inv, min, max);
        }
        );
      }

      for (var i = numThreads - 1; i >= 0; --i)
        threads[i].Start();
    }

    static void Runner(Func<int, int> invoker, int min, int max)
    {
      for (var arg = min; arg < max; ++arg)
        invoker(arg);
    }
  }
}

As you can see I use trivial partitioning of invocations and waiting for end up of the service invocations status. You probably wonder about ITestService interface implementation. It is shown below.

namespace MultiThreadingProtectorAspectSandbox
{
  public interface ITestService
  {
    int GetCount();
    int ServiceMethod(int arg);
  }
}

There is a kind of public contract service interface with two methods. First one named GetCount read a state of the service, and next one named ServiceMethod do some operations and probably all of you may guess that second method change the service state getting by the first method. Ok, this may be a little difficult at this moment, but I am sure that it will be very clear when you will see two implementations. The first one that uses locking is shown below.

using System;
using System.Runtime.CompilerServices;

namespace MultiThreadingProtectorAspectSandbox
{
  public class TestServiceWithLock : ITestService
  {
    private int _count;

    public TestServiceWithLock()
    {
      _count = 0;
    }

    [MethodImpl(MethodImplOptions.Synchronized)]
    public int ServiceMethod(int arg)
    {
      try
      {
        return ProtectForManyThreads(arg);
      }
      catch (Exception exception)
      {
        HandleException(exception);
      }
      return default(int);
    }

    public int GetCount()
    {
      return _count;
    }

    private void HandleException(Exception exception)
    {
      Console.WriteLine(exception.ToString());
    }

    private int ProtectForManyThreads(int arg)
    {
      // slow down about 600 us.
      for (var i = 0; i < 1000; ++i)
        ;

      _count++;

      if (_count == 250000)
        throw new InvalidOperationException(string.Format("Booo! at {0}", arg));

      return arg + 1;
    }
  }
}

I think that above implementation is very trivial. And as you can see for the state of the service I used a _count filed that needs to be protected for changes with the lock as a protector. Above implementation show you implementation with locking by the MethodImpl attribute, it is the best way to lock entire method from the performance perspective. I also prepared service implementation with protector and it is shown below.

using System;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;

namespace MultiThreadingProtectorAspectSandbox
{
  public class TestServiceWithoutLock : ITestService
  {
    private readonly MultiThreadingProtectorAspect<int, int>
      _protector;
    private int _count;

    public TestServiceWithoutLock()
    {
      _protector = new MultiThreadingProtectorAspect<int, int>
                   (ProtectForManyThreads, HandleException);
      _count = 0;
    }

    public int ServiceMethod(int arg)
    {
      return _protector.Invoke(arg);
    }

    public int GetCount()
    {
      return _count;
    }

    private void HandleException(Exception exception)
    {
      Console.WriteLine(exception.ToString());
    }

    private int ProtectForManyThreads(int arg)
    {
      // slow down about 600 us.
      for (var i = 0; i < 1000; ++i)
        ;

      _count++;

      if (_count == 250000)
        throw new InvalidOperationException(string.Format("Booo! at {0}", arg));

      return arg + 1;
    }
  }
}

Ok, and that is it. That is test implementation. I hope you are curios about the results :). the results of invocation is show below.

==> Experiment with 250 threads <==
==> 1 mln of locked invocations start.
System.InvalidOperationException: Booo! at 1000470
   at MultiThreadingProtectorAspectSandbox.TestServiceWithLock.ProtectForManyThreads(Int32 arg) in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\TestServiceWithLock.cs:line 47
   at MultiThreadingProtectorAspectSandbox.TestServiceWithLock.ServiceMethod(Int32 arg) in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\TestServiceWithLock.cs:line 20
==> locked invocation took 893 us, count field equals 1000000.
==> 1 mln of unlocked invocations start.
System.InvalidOperationException: Booo! at 1000663
   at MultiThreadingProtectorAspectSandbox.TestServiceWithoutLock.ProtectForManyThreads(Int32 arg) in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\TestServiceWithoutLock.cs:line 45
   at MultiThreadingProtectorAspectSandbox.MultiThreadingProtectorAspect`2.NestedInvoker`2.Invoke() in C:\Stuff\Projects\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspectSandbox\MultiThreadingProtectorAspect.cs:line 94
==> unlocked invocation took 1077 us, count field equals 1000000.
Press any key to close...

So before conclusions I will show you a protector implementation below.

using System;
using System.Collections.Concurrent;
using System.Threading;

namespace MultiThreadingProtectorAspectSandbox
{
  public class MultiThreadingProtectorAspect<T, TResult> : IDisposable
  {
    private readonly Func<T, TResult> _invoker;
    private readonly ThreadLocal<NestedInvoker<T, TResult>> _localNestedInvoker;
    private readonly Action<Exception> _exceptionHandler;
    private bool _disposed;

    public MultiThreadingProtectorAspect(
        Func<T, TResult> invoker,
        Action<Exception> exceptionHandler = null) {

      _invoker = invoker;
      _exceptionHandler = exceptionHandler;
      _disposed = false;

      _localNestedInvoker = new ThreadLocal<NestedInvoker<T, TResult>>
          (() => new NestedInvoker<T, TResult>(_invoker, _exceptionHandler));

      new Thread(Consume) { IsBackground = true }.Start();
    }

    private readonly ConcurrentQueue<NestedInvoker<T, TResult>> _nestedInvokers
        = new ConcurrentQueue<NestedInvoker<T, TResult>>();

    public TResult Invoke(T arg) {
      var nestedInvoker = _localNestedInvoker.Value;
      nestedInvoker.SetItem(arg);
      _nestedInvokers.Enqueue(nestedInvoker);
      nestedInvoker.Wait();
      return nestedInvoker.GetResult();
    }

    public Tuple<Action, Func<TResult>> BeginInvoke(T arg) {
      var nestedInvoker = new NestedInvoker<T, TResult>(_invoker, _exceptionHandler);
      nestedInvoker.SetItem(arg);
      _nestedInvokers.Enqueue(nestedInvoker);
      return new Tuple<Action,Func<TResult>>(nestedInvoker.Wait,
        nestedInvoker.GetResult);
    }

    public TResult EndInvoke(Tuple<Action, Func<TResult>> asyncState) {
      var waiter = asyncState.Item1;
      var result = asyncState.Item2;
      waiter.Invoke();
      return result.Invoke();
    }

    private void Consume() {
      NestedInvoker<T, TResult> invoker;
      while (!_disposed)
        if (SpinWait.SpinUntil(() => !_nestedInvokers.IsEmpty, 100) &&
            _nestedInvokers.TryDequeue(out invoker))
          invoker.Invoke();
    }

    public void Dispose() {
      _disposed = true;
    }

    private class NestedInvoker<TItem, TItemResult>
    {
      private TItem _item;
      private TItemResult _result;
      private readonly Func<TItem, TItemResult> _invoker;
      private readonly Action<Exception> _exceptionHandler;
      private bool _invoked;

      public NestedInvoker(
          Func<TItem, TItemResult> invoker,
          Action<Exception> exceptionHandler = null) {
        _invoker = invoker;
        _exceptionHandler = exceptionHandler;
      }

      public void Invoke() {
        try {
          _result = _invoker.Invoke(_item);
        } catch (Exception exception) {
          _result = default(TItemResult);
          if (_exceptionHandler != null)
            _exceptionHandler.Invoke(exception);
        } finally {
          _invoked = true;
        }
      }

      public void SetItem(TItem value) { _item = value; }

      public TItemResult GetResult() { return _result; }

      public void Wait() {
        SpinWait.SpinUntil(() => _invoked);
        _invoked = false;
      }
    }
  }
}

Ok, that last piece of code is a protector aspect. I try to not write a saga about it. I will try to focus only on the results. One very experienced Technical Architect told me once that when I am using SpinWait.SpinUntil and ConcurrentQueue<T> I do probably something wrong. I cannot agree with that. As you can see results are very, very good and a method itself has almost the same speed than locking. That means you can prepare very fast implementations in this way for your services. The results show you how much 1 invocation consume CPU time and I think that on my old slow laptop about 1000 us by invocation is a very good result and in production servers, it should be much better. I hope you enjoy this blog entry. Maybe someday I will win the ultimate edition of the Visual Studio to check performance with concurrent profilers on it. Until this time I have only intuition and console application tests :).

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.