Data Model Async

Hi, I saw million times solution with ThreadPool or sometimes event with new async keyword in C# 5.0. And so many times I want to tell people that their doing it in wrong way. about a half of year ago someone told me that I have tendency to keep too much only for me. I am trying to change this. So, what is best way do do async? it is by using data model and publisher consumer dispatching pattern with ConcurrentQueue collection and SpinWait.SpinUntil method. And how to wait for async invocation? Best way is using ConcurrentDictionary with Guid key as a transaction id. So let me show you simple example and analyze all benefits for you.

namespace DataModelAsyncSandbox
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Linq;
    using System.Threading;
    using System.Threading.Tasks;

    public class AsyncInvoker<t , TResult> : IDisposable
    {
        readonly ConcurrentQueue<tuple <Guid, T>> consumeProcessingQueue;
        readonly Func<guid , T, TResult> consumeBegin;
        readonly Action</guid><guid , TResult> consumeEnd;
        readonly Action<exception> consumeThrow;

        public AsyncInvoker(
            Func<guid , T, TResult> consumeBegin,
            Action</guid><guid , TResult> consumeEnd,
            Action<exception> consumeThrow)
        {
            if (consumeBegin == null)
            {
                throw new ArgumentNullException("consumeBegin");
            }
            if (consumeEnd == null)
            {
                throw new ArgumentNullException("consumeEnd");
            }
            if (consumeThrow == null)
            {
                throw new ArgumentNullException("consumeThrow");
            }
            this.consumeProcessingQueue = new ConcurrentQueue<tuple <Guid, T>>();
            this.consumeBegin = consumeBegin;
            this.consumeEnd = consumeEnd;
            this.consumeThrow = consumeThrow;
            new Thread(Consume){IsBackground=true}.Start();
        }

        bool disposed;

        void Consume()
        {
            while (!disposed)
            {
                if (SpinWait.SpinUntil(() => !consumeProcessingQueue.IsEmpty, 1000))
                {
                    while (!consumeProcessingQueue.IsEmpty)
                    {
                        Tuple<guid , T> data;
                        if (consumeProcessingQueue.TryDequeue(out data))
                        {
                            try
                            {
                                TResult result = consumeBegin.Invoke(
                                    data.Item1,
                                    data.Item2);
                                consumeEnd.Invoke(
                                    data.Item1,
                                    result);
                            }
                            catch (Exception exception)
                            {
                                try
                                {
                                    consumeThrow.Invoke(exception);
                                }
                                catch { }
                            }
                        }
                    }
                }
            }
        }      

        public void InvokeAsync(Guid transactionId, T data)
        {
            consumeProcessingQueue.Enqueue(new Tuple</guid><guid , T>(transactionId, data));
        }

        ~AsyncInvoker()
        {
            disposed = true;
        }

        public void Dispose()
        {
            GC.SuppressFinalize(this);
            disposed = true;
        }
    }

    class TestProgram
    {
        static int ProcessData(Guid transactionId, int input)
        {
            var output = input + 1;
            return input;
        }
        static int count;
        static void ProcessDataEnd(Guid transactionId, int output)
        {
            ++count;
        }
        static void CatchException(Exception exception)
        {
        }
        static void Main(string[] args)
        {
            var invoker = new AsyncInvoker<int , int>(
                ProcessData,ProcessDataEnd,CatchException);
            for (var t = 0; t < 10; ++t)
            {
                count = 0;
                var meter = Stopwatch.StartNew();
                var meterAll = Stopwatch.StartNew();
                Parallel.For(0, 1000000, i =>
                {
                    invoker.InvokeAsync(Guid.NewGuid(), i);
                });
                meter.Stop();
                Console.WriteLine(
                    "1 million async invocations took about {0} ms.",
                    meter.ElapsedMilliseconds);
                SpinWait.SpinUntil(() => count == 1000000);
                meterAll.Stop();
                Console.WriteLine(
                    "1 million async invocations processing took about {0} ms.",
                    meter.ElapsedMilliseconds);
            }
        }
    }
}

As you can see I am using only ConcurrentQueue as a data model for async operations. And it scale very good. You may say. Ok, but you invoke everything synchronically. That’s right, moreover it guarantee not only invocation with order, but also invocation in thread safe mode. So, I do not need to put any lock in my code and it is truly async. So, I hope you have idea how to build things that correspond with previous post? I almost forget… is that truly multi-core solution? It is when you have for example 64 such kind of invokers in entire application it scale for ever. And the best part is that you have constant number of threads during entire run time of your apps. So it is very cool, but if you want to have many thread but still constant number just create them all in constructor of AsyncInvoker as a loop of last like that creates new Thread… At the end you can see the result of invocation above code.

1 million async invocations took about 188 ms.
1 million async invocations processing took about 188 ms.
1 million async invocations took about 186 ms.
1 million async invocations processing took about 186 ms.
1 million async invocations took about 176 ms.
1 million async invocations processing took about 176 ms.
1 million async invocations took about 175 ms.
1 million async invocations processing took about 175 ms.
1 million async invocations took about 178 ms.
1 million async invocations processing took about 178 ms.
1 million async invocations took about 181 ms.
1 million async invocations processing took about 181 ms.
1 million async invocations took about 196 ms.
1 million async invocations processing took about 196 ms.
1 million async invocations took about 186 ms.
1 million async invocations processing took about 186 ms.
1 million async invocations took about 185 ms.
1 million async invocations processing took about 185 ms.
1 million async invocations took about 180 ms.
1 million async invocations processing took about 180 ms.

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*