ParallelExecutorAspect in C#

imageHi, today I would like to share with you idea of right parallel execution of actions/methods. Imagine at the beginning that you have queue of messages and even some number of threads that in parallel get messages from queue and then there are doing background work. Now if that messages executes actions on relational database you will quickly find out that some of your work is done right, but because of some reason you get timeouts or deadlocks or any other errors dificult to repeat. That mostly it is because you parallel actions that want to use the same type of data resources. Now you can of course go to your data architect and application architect and ask them for start using some strong transactions isolation level, but only thing you get will be a lot of resources consuming on your database environment. For solving that kind of issues I want to present to you ParallelExecutorAspect that is able to automatically figure out what type of work can be parallelised. Of course different actions can be and cannot be executed in parallel, right? And to say to this aspect what is different and what is the same you can just passing to Execute method some number of keys that characterise your piece of work. For example in order, Queue Name, Customer Name and Action Name. After that all magic start to happen and you quickly find out that all of your work is done without database interaction errors.

namespace ParallelExecutorAspect
{
    using System;
    using System.Collections.Concurrent;
    using System.Globalization;
    using System.Linq;
    using System.Runtime.CompilerServices;
    using System.Threading;

    public class ParallelExecutorAspect<T, U>
    {
        class ParallelExecutorUnit<TU, TW>
        {
            private readonly ParallelExecutorUnit<TU, TW> parent;
            private readonly ConcurrentDictionary<string, ParallelExecutorUnit<TU, TW>> node;
            private readonly ConcurrentQueue<Tuple<Func<TU, TW>, TU>> queue;
            private readonly int maxThreadLifeTimeout;
            private readonly int maxThreadCount;
            private readonly Action<Exception> exceptionAction;

            private bool unitIsActive;
            private bool unitIsWorking;
            private int unitCount;

            internal ParallelExecutorUnit(
                int maxThreadLifeTimeout,
                int maxThreadCount,
                Action<Exception> exceptionAction,
                ParallelExecutorUnit<TU, TW> parent)
            {
                this.maxThreadLifeTimeout = maxThreadLifeTimeout;
                this.maxThreadCount = maxThreadCount;
                this.exceptionAction = exceptionAction;
                this.parent = parent ?? this;
                this.unitCount = 0;
                this.node = new ConcurrentDictionary<string, ParallelExecutorUnit<TU, TW>>();
                this.queue = new ConcurrentQueue<Tuple<Func<TU, TW>, TU>>();
            }

            TW ExecuteMethod()
            {
                TW returnValue = default(TW);
                Tuple<Func<TU, TW>, TU> method;
                if (this.queue.TryDequeue(out method))
                {
                    // if you want to try-catch exceptions and implement
                    // the retry solution that is right place to do that.
                    try
                    {
                        returnValue = method.Item1.Invoke(method.Item2);
                    }
                    catch (Exception exception)
                    {
                        try
                        {
                            var exceptionInvoker = this.exceptionAction;
                            if (exceptionInvoker != null)
                            {
                                exceptionInvoker.Invoke(exception);
                            }
                        }
                        catch { }
                    }
                }
                else
                {
                    this.unitIsWorking = false;
                }
                return returnValue;
            }

            void ExecuteQueue(object state)
            {
                try
                {
                    this.unitIsActive = true;
                    Interlocked.Increment(ref this.parent.unitCount);

                    while (this.unitIsWorking)
                    {
                        if (SpinWait.SpinUntil(
                                () => !this.queue.IsEmpty || this.parent.unitCount == this.maxThreadCount,
                                this.maxThreadLifeTimeout
                            )
                        )
                        {
                            if (!this.queue.IsEmpty)
                            {
                                ExecuteMethod();
                            }
                            else
                            {
                                this.unitIsWorking = false;
                            }
                        }
                        else
                        {
                            this.unitIsWorking = false;
                        }
                    }
                }
                finally
                {
                    Interlocked.Decrement(ref this.parent.unitCount);
                    this.unitIsActive = false;
                }
            }

            [MethodImpl(MethodImplOptions.Synchronized)]
            void ReCreateThread()
            {
                if (this.unitIsWorking)
                {
                    return;
                }

                SpinWait.SpinUntil(
                    () => !this.unitIsActive &&
                    this.parent.unitCount < this.maxThreadCount);

                this.unitIsActive = false;
                this.unitIsWorking = true;

                new Thread(ExecuteQueue) { IsBackground = true }.Start();

                SpinWait.SpinUntil(() => this.unitIsActive);
            }

            [MethodImpl(MethodImplOptions.Synchronized)]
            internal ParallelExecutorUnit<TU, TW> GetOrSet(string key)
            {
                ParallelExecutorUnit<TU, TW> value;
                if (this.node.TryGetValue(key, out value))
                {
                    return value;
                }
                value = new ParallelExecutorUnit<TU, TW>(
                    this.maxThreadLifeTimeout,
                    this.maxThreadCount,
                    this.exceptionAction,
                    this.parent);
                this.node.TryAdd(key, value);
                return value;
            }

            internal void ExecuteAsync(Func<TU, TW> executeMethod, TU args)
            {
                SpinWait.SpinUntil(() => this.queue.IsEmpty);
                this.queue.Enqueue(new Tuple<Func<TU, TW>, TU>(executeMethod, args));
                ReCreateThread();
            }

            internal TW ExecuteSync(Func<TU, TW> executeMethod, TU args)
            {
                SpinWait.SpinUntil(() => this.queue.IsEmpty);
                TW returnValue = default(TW);
                bool finished = false;
                this.queue.Enqueue(new Tuple<Func<TU, TW>, TU>(
                    (eargs) =>
                    {
                        returnValue = executeMethod(eargs);
                        finished = true;
                        return returnValue;
                    },
                    args));
                ReCreateThread();
                SpinWait.SpinUntil(() => finished);
                return returnValue;
            }
        }

        private readonly Func<T, U> executeMethod;
        private readonly ParallelExecutorUnit<T, U> tree;

        public ParallelExecutorAspect(
            Func<T, U> executeMethod,
            int maxThreadLifeTimeout = 30000,
            int maxThreadCount = 100,
            Action<Exception> exceptionAction = null)
        {
            this.executeMethod = executeMethod;
            this.tree = new ParallelExecutorUnit<T, U>(
                maxThreadLifeTimeout,
                maxThreadCount,
                exceptionAction,
                null);
        }

        public void ExecuteAsync(T args, params string[] keys)
        {
            var node = keys.Aggregate(this.tree, (current, key) => current.GetOrSet(key));
            node.ExecuteAsync(this.executeMethod, args);
        }

        public U ExecuteSync(T args, params string[] keys)
        {
            var node = keys.Aggregate(this.tree, (current, key) => current.GetOrSet(key));
            return node.ExecuteSync(this.executeMethod, args);
        }
    }

    class TestProgram
    {
        internal class TestEntity
        {
            public string Queue { get; set; }
            public string Customer { get; set; }
            public string Action { get; set; }
            public string Params { get; set; }
        }

        static readonly ParallelExecutorAspect<TestEntity, bool>
            ExecutorTest = new ParallelExecutorAspect<TestEntity, bool>(
            executeMethod: ExecuteTestAction,
            maxThreadLifeTimeout: 5000,
            maxThreadCount: 10,
            exceptionAction: ExceptionTestAction);

        static readonly ParallelExecutorAspect<TestEntity, bool>
            ExecutorPerformanceTest = new ParallelExecutorAspect<TestEntity, bool>(
            executeMethod: ExecutePerformanceAction,
            maxThreadLifeTimeout: 5000,
            maxThreadCount: 10,
            exceptionAction: ExceptionTestAction);

        private static bool ExecuteTestAction(TestEntity entity)
        {
            // lets say that it takes a bit more than 1 second.
            Thread.Sleep(1000);
            // you can do do something with entity.Params
            // in this example or just write the action data.
            Console.WriteLine(
                "Q: {0}\tC: {1}\tA: {2}",
                entity.Queue,
                entity.Customer,
                entity.Action);

            return true;
        }

        private static int _counter;

        private static bool ExecutePerformanceAction(TestEntity entity)
        {
            Interlocked.Increment(ref _counter);
            Console.WriteLine(
                "Q: {0}\tC: {1}\tA: {2}",
                entity.Queue,
                entity.Customer,
                entity.Action);

            return true;
        }

        // can be sahred between invokers.
        private static void ExceptionTestAction(Exception exception)
        {
            // you can log exception here, or send alter, or aggregate it.
        }

        private static void Test()
        {
            var entity = new TestEntity { Queue = "A", Customer = "X", Action = "Add" };

            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);

            entity = new TestEntity { Queue = "A", Customer = "Z", Action = "Upd1" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            entity = new TestEntity { Queue = "A", Customer = "Z", Action = "Upd2" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            entity = new TestEntity { Queue = "A", Customer = "Z", Action = "Upd3" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);

            entity = new TestEntity { Queue = "A", Customer = "X", Action = "Del" };
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);

            // even if all threads was finished after 7 seconds.
            Thread.Sleep(7000);
            // you can still use new one for below action.
            ExecutorTest.ExecuteAsync(entity,
                entity.Queue, entity.Customer, entity.Action);

            // now lets switch between Threads 20 times.
            Thread.Sleep(2000);
            for (var i = 0; i < 20; ++i)
            {
                var tentity = new TestEntity
                {
                    Queue = "A",
                    Customer = "X",
                    Action =
                    "Action"
                    + i.ToString(CultureInfo.InvariantCulture).PadLeft(2, '0')
                };

                ExecutorTest.ExecuteAsync(tentity,
                    tentity.Queue, tentity.Customer, tentity.Action);
            }
        }

        private static void TestPerformance()
        {
            _counter = 0;

            const int count = 2000;

            for (var i = 0; i < count; ++i)
            {
                var entity = new TestEntity
                {
                    Queue = "A",
                    Customer = "X",
                    Action =
                    "PerfAction"
                    + i.ToString(CultureInfo.InvariantCulture).PadLeft(4, '0')
                };
                ExecutorPerformanceTest.ExecuteAsync(entity,
                    entity.Queue, entity.Customer);
            }

            SpinWait.SpinUntil(() => _counter == count, 30 * 1000);
            Console.WriteLine("Performance Test {0}.",
                              _counter == count ? "PASS" : "FAIL");
        }

        static void Main()
        {
            Console.WriteLine("Test 01 - Proof of Concept");
            Test();

            Thread.Sleep(2000);

            Console.WriteLine("Test 02 - Performance Test");
            TestPerformance();

            Console.ReadKey(true);
        }
    }
}

There is one more thing. ParallelExecutorAspect automatically releases not used Threads and reuse Treads if it is possible, so your overall performance of execution is also thanks to that solution better. And you can replace your classic throttling with parallel threads and use this automatic solution with only 1 thread per queue and maxThreadCount that decide in short how many threads you have for this aspect, for example in my test shown above you can control how many parallel customers you have. Last one thing is that this aspect is pure orthogonal solution and does not depends on any functional requirements. Enjoy!

p ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*