Performance Manager Aspects (New)

imageHi today I created powerful combination of performance aspects. First MemoryManagerAspect<T> and second ThreadsManagerAspect. Both of that ideas are for very fast services created on multicore systems. I will show you code that produce following output that can be a proof that it is possible to create very low latency and asynchronous code in C#.

Async invoke 2500000 times took 821 ms,
means 3045067 per second.
Async invoke 2500000 times took 811 ms,
means 3082614 per second.
Async invoke 2500000 times took 813 ms,
means 3075031 per second.
Async invoke 2500000 times took 800 ms,
means 3125000 per second.
Async invoke 2500000 times took 836 ms,
means 2990431 per second.
Async invoke 2500000 times took 814 ms,
means 3071253 per second.
Async invoke 2500000 times took 822 ms,
means 3041363 per second.
Async invoke 2500000 times took 826 ms,
means 3026634 per second.
Async invoke 2500000 times took 810 ms,
means 3086420 per second.
Async invoke 2500000 times took 813 ms,
means 3075031 per second.
Press any key to close...

This code was produced by Idea of two performance aspects. First manages memory to help in a garbage collector work and second manage threads. Test Code looks like below.

namespace PerformanceManagersSandbox {
  using System;
  using System.Diagnostics;
  using System.Threading;

  class TestProgram {
    static void Main(string[] args) {
      var pool = new ThreadsManagerAspect(BackgroundAction);
      var count = 2500000;

      for (var i = 0; i < 10; ++i)
        Test(ref pool, count);

      Console.WriteLine("Press any key to close...");
      Console.ReadKey();
    }

    static int invokeCount = 0;

    static void Wait(int count) {
      SpinWait.SpinUntil(() => invokeCount == count);
      invokeCount = 0;
    }

    static void Test(ref ThreadsManagerAspect pool, int count) {
      var meter = Stopwatch.StartNew();
      for (var i = 0; i < count; ++i)
        pool.InvokeAsync();
      Wait(count);
      meter.Stop();

      Console.WriteLine(
        "Async invoke {0} times took {1} ms,{2}means {3} per second.",
        count,
        meter.ElapsedMilliseconds,
        Environment.NewLine,
        Math.Round((1d * count / meter.ElapsedMilliseconds) * 1000d), 4);
    }    

    static void BackgroundAction() {
      Interlocked.Increment(ref invokeCount);
    }
  }
}

It is very similar to code from yesterday, but implementation is completely new fresh stuff coming from my head about an hour ago and it is looks like below. I will show both of that piece of code into one section because the real strong is in correct combination of two of them.

namespace PerformanceManagersSandbox {
  using System;
  using System.Collections.Concurrent;
  using System.Collections.Generic;
  using System.Threading;

  public class MemoryManagerAspect<T> {
    private readonly ConcurrentQueue<T>
      dataStore = new ConcurrentQueue<T>();

    private readonly Func<T> funcCreate;
    private readonly int poolSize;

    public MemoryManagerAspect(Func<T> funcCreate, int poolSize) {
      this.funcCreate = funcCreate;
      this.poolSize = poolSize;
      for (var i = 0; i < poolSize; ++i) {
        var data = funcCreate.Invoke();
        dataStore.Enqueue(data);
      }
    }

    public void Create(out T data) {
      T dataRestored;
    RETRY:
      if (dataStore.IsEmpty)
        SpinWait.SpinUntil(() => !dataStore.IsEmpty);
      if (dataStore.TryDequeue(out dataRestored)) {
        data = dataRestored;
      }
      else {
        goto RETRY;
      }
    }

    public void Reuse(ref T data) {
      dataStore.Enqueue(data);
    }
  }

  public class ThreadsManagerAspect {
    private static readonly MemoryManagerAspect<NestedInvoker>
      invokeStore;
    private static int
      workerThreads,
      completionPortThreads,
      instancesCount,
      procesorCount;

    static ThreadsManagerAspect() {
      procesorCount = Environment.ProcessorCount;
      invokeStore = new MemoryManagerAspect<NestedInvoker>
        (InvokerCreate, 512 * procesorCount);
      ThreadPool.GetMaxThreads(
        out workerThreads,
        out completionPortThreads);
    }

    private static List<Action<Exception>>
      ActionExceptions = new List<Action<Exception>>();

    private static NestedInvoker InvokerCreate() {
      return new NestedInvoker(ReuseThread, ActionExceptions);
    }

    private readonly Action actionAsync;

    public ThreadsManagerAspect(
      Action actionAsync,
      Action<Exception> actionException = null) {
      this.actionAsync = actionAsync;
      if (actionException != null)
        ActionExceptions.Add(actionException);
      Interlocked.Increment(ref instancesCount);
      ThreadPool.SetMaxThreads(
        32 * procesorCount * instancesCount,
        completionPortThreads);
    }

    ~ThreadsManagerAspect() {
      ThreadPool.SetMaxThreads(
        32 * procesorCount * instancesCount,
        completionPortThreads);
      Interlocked.Decrement(ref instancesCount);
    }

    private static void ReuseThread(NestedInvoker invoker) {
      invokeStore.Reuse(ref invoker);
    }

    private void CreateThread(out NestedInvoker invoker) {
      invokeStore.Create(out invoker);
    }

    private void Invoker(ref NestedInvoker invoker, ref Action action) {
      invoker.InvokeAsync(action);
    }

    public void InvokeAsync() {
      if (actionAsync == null)
        throw new InvalidOperationException(
          "There is no default Action defined in a constructor.");
      NestedInvoker invoker;
      Action action = actionAsync;
      CreateThread(out invoker);
      Invoker(ref invoker, ref action);
    }

    public void InvokeAsync(ref Action invokeAction) {
      NestedInvoker invoker;
      Action action = invokeAction;
      CreateThread(out invoker);
      Invoker(ref invoker, ref action);
    }

    private class NestedInvoker {
      private readonly Action<NestedInvoker> actionReuse;
      private readonly List<Action<Exception>> actionExceptions;

      public NestedInvoker(
        Action<NestedInvoker> actionReuse,
        List<Action<Exception>> actionExceptions) {
        this.actionReuse = actionReuse;
        this.actionExceptions = actionExceptions;
      }

      private void InvokeSync(object action) {
        Action actionAsync = (Action)action;
        try {
          actionAsync.Invoke();
          actionReuse.Invoke(this);
        }
        catch (Exception ex) {
          foreach (var actionException in actionExceptions) {
            try { actionException.Invoke(ex); }
            catch { }
          }
        }
      }

      public void InvokeAsync(Action action) {
        ThreadPool.QueueUserWorkItem(InvokeSync, action);
      }
    }
  }
}

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.