ThreadPoolAbstractionAspect<T>

imageHi, this is an implementation,

namespace ThreadPoolAbstractionAspectSandbox {
  using System;
  using System.Diagnostics;
  using System.Threading;

  public class ThreadPoolAbstractionAspect<T> {
    static int instancesCount = 0;
    readonly Action<T> action;
    readonly Action<Exception> exceptionAction;
    int workerThreads;    
    int inProgress;    

    public ThreadPoolAbstractionAspect(
      Action<T> action,
      Action<Exception> exceptionAction = null) {
      this.action = action;
      this.exceptionAction = exceptionAction;
      int completionPortThreads;
      ThreadPool.GetMaxThreads(
        out workerThreads,
        out completionPortThreads);
      inProgress = 0;
      Interlocked.Increment(ref instancesCount);
    }

    ~ThreadPoolAbstractionAspect() {
      Interlocked.Decrement(ref instancesCount);
    }

    void Invoke(object dataT) {
      try {
        T data = (T) dataT;
        action.Invoke(data);
      }
      catch (Exception ex) {
        if (exceptionAction == null) return;
        try {
          exceptionAction.Invoke(ex);
        }
        catch {
        }
      }
      finally {
        Interlocked.Decrement(ref inProgress);
      }
    }

    public void InvokeAsync(T data){
      if (inProgress
          >
          workerThreads - 4*instancesCount) {
        Interlocked.Exchange(
          ref workerThreads,
          workerThreads + 8*instancesCount);
        ThreadPool.SetMaxThreads(
          workerThreads,
          workerThreads);
      }
      else if (inProgress - 4*instancesCount
               < workerThreads)
      {
        Interlocked.Exchange(
          ref workerThreads,
          workerThreads - 4*instancesCount);
        ThreadPool.SetMaxThreads(
          workerThreads,
          workerThreads);
      }

      if (ThreadPool.QueueUserWorkItem(Invoke, data))
        Interlocked.Increment(ref inProgress);
    }
  }

  class TestProgram {
    static void Main(string[] args) {
      var poolI
        = new ThreadPoolAbstractionAspect<int>(BackgroundActionInt);
      var poolD
        = new ThreadPoolAbstractionAspect<double>(BackgroundActionDouble);

      var count = 5000000;

      var meter = Stopwatch.StartNew();

      for (var i = 0; i < count; ++i) {
        poolI.InvokeAsync((int)i);
        poolD.InvokeAsync((double)i);
      }

      SpinWait.SpinUntil(() => invokeCount == 2 * count);
      meter.Stop();

      Console.WriteLine(
        "Background invocation {0} times took {1} ms,{2}means {3} per second.",
        2 * count,
        meter.ElapsedMilliseconds,
        Environment.NewLine,
        Math.Round((2d * count / meter.ElapsedMilliseconds) * 1000d), 4);

      Console.WriteLine("Press any key to close...");
      Console.ReadKey();
    }

    static int invokeCount = 0;

    static void BackgroundActionInt(int data) {
      Interlocked.Increment(ref invokeCount);
    }

    static void BackgroundActionDouble(double data) {
      Interlocked.Increment(ref invokeCount);
    }
  }
}

with an output,

Background invocation 10000000 times took 11580 ms,
means 863558 per second.
Press any key to close...

and no comments.

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.