PublisherConsumerCollection<T>

Hi, today I will show you a special concurrent collection created with Publisher->Consumer pattern. Below I will show you a C# 4.0 code with a small benchmark. Enjoy :).

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
 
namespace PublisherConsumerCollection
{
  public class PublisherConsumerCollection<T> : IList<T>, IDisposable
  {
    private enum MessageKind
    {
      Insert,
      InsertAt,
      RemoveAt,
      Add,
      Remove,
      Clear,
      CopyTo,
    }
 
    private class ChangeMessage<TItem>
    {
      public MessageKind Message { get; set; }
      public TItem Item { get;set; }
      public TItem[] ItemsArray { get; set; }
      public int Index { get; set; }
 
      private bool Invoked { get; set; }
      public object ReturnValue { get; set; }
 
      public void Invoke(Action method)
      {
        try
        {
          method();
        }
        finally
        {
          Invoked = true;
        }
      }
 
      public void Invoke<TReturnValue>(Func<TReturnValue> method)
      {
        try
        {
          ReturnValue = method();
        }
        finally
        {
          Invoked = true;
        }
      }
 
      public void Wait()
      {
        SpinWait.SpinUntil(() => Invoked);
      }
    }
 
    private readonly ConcurrentQueue<ChangeMessage<T>> ChangeMessages
      = new ConcurrentQueue<ChangeMessage<T>>();
 
    private readonly IList<T> data
      = new List<T>();
 
    private bool disposed = false;
 
    public void Consumer()
    {
      while (!disposed)
      {
        if (SpinWait.SpinUntil(() => !ChangeMessages.IsEmpty, 100))
        {
          while (!ChangeMessages.IsEmpty)
          {
            ChangeMessage<T> message;
            if (ChangeMessages.TryDequeue(out message))
            {
              switch (message.Message)
              {
                case MessageKind.Insert:
                  message.Invoke(() => data.Insert(message.Index, message.Item));
                  break;
                case MessageKind.InsertAt:
                  message.Invoke(() => data[message.Index] = message.Item);
                  break;
                case MessageKind.RemoveAt:
                  message.Invoke(() => data.RemoveAt(message.Index));
                  break;
                case MessageKind.Add:
                  message.Invoke(() => data.Add(message.Item));
                  break;
                case MessageKind.Remove:
                  message.Invoke(() => data.Remove(message.Item));
                  break;
                case MessageKind.CopyTo:
                  message.Invoke(() => data.CopyTo(message.ItemsArray, message.Index));
                  break;
                case MessageKind.Clear:
                  message.Invoke(() => data.Clear());
                  break;
              }
            }
          }
        }
      }
    }
 
    public PublisherConsumerCollection()
    {
      new Thread(Consumer){IsBackground = true}.Start();
    }
 
    public void Dispose()
    {
      disposed = true;
    }
 
    public int IndexOf(T item)
    {
      return data.IndexOf(item);
    }
 
    public void Insert(int index, T item)
    {
      var message = new ChangeMessage<T>
      {
        Message = MessageKind.Insert,
        Index = index,
        Item = item
      };
      ChangeMessages.Enqueue(message);
      message.Wait();
    }
 
    public void RemoveAt(int index)
    {
      var message = new ChangeMessage<T>
      {
        Message = MessageKind.RemoveAt,
        Index = index
      };
      ChangeMessages.Enqueue(message);
      message.Wait();
    }
 
    public T this[int index]
    {
      get
      {
        return data[index];
      }
      set
      {
        var message = new ChangeMessage<T>
        {
          Message = MessageKind.InsertAt,
          Index = index,
          Item = value
        };
        ChangeMessages.Enqueue(message);
        message.Wait();
      }
    }
 
    public void Add(T item)
    {
      var message = new ChangeMessage<T>
      {
        Message = MessageKind.Add,
        Item = item
      };
      ChangeMessages.Enqueue(message);
      message.Wait();
    }
 
    public bool Remove(T item)
    {
      var message = new ChangeMessage<T>
      {
        Message = MessageKind.Remove,
        Item = item
      };
      ChangeMessages.Enqueue(message);
      message.Wait();
      return (bool)message.ReturnValue;
    }
 
    public void Clear()
    {
      var message = new ChangeMessage<T>
      {
        Message = MessageKind.Clear
      };
      ChangeMessages.Enqueue(message);
      message.Wait();
    }
 
    public bool Contains(T item)
    {
      return data.Contains(item);
    }
 
    public void CopyTo(T[] array, int arrayIndex)
    {
      var message = new ChangeMessage<T>
      {
        Message = MessageKind.CopyTo,
        ItemsArray = array,
        Index = arrayIndex
      };
      ChangeMessages.Enqueue(message);
      message.Wait();
    }
 
    public int Count
    {
      get { return data.Count; }
    }
 
    public bool IsReadOnly
    {
      get { return data.IsReadOnly; }
    }
 
    public IEnumerator<T> GetEnumerator()
    {
      for (int i = 0; i < data.Count; i++)
        yield return data[i];
    }
 
    System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
    {
      return this.GetEnumerator();
    }
  }
 
  class Program
  {
    static void Main(string[] args)
    {
      using (PublisherConsumerCollection<string> collection
        = new PublisherConsumerCollection<string>())
      {
        collection.Add("A");
        collection.Add("B");
        collection.Add("C");
 
        Console.Write("Collection with 3 elements: ");
        Console.WriteLine(string.Join<string>(",", collection));
 
        collection[1] = "X";
 
        Console.Write("Collection after change 'B' with 'X' with 3 elements: ");
        Console.WriteLine(string.Join<string>(",", collection));
 
        collection.Remove("C");
        Console.Write("Collection after remove 'C' with with 2 elements: ");
        Console.WriteLine(string.Join<string>(",", collection));
 
        collection.Clear();
 
        var stopwatchCollection = Stopwatch.StartNew();
        Parallel.For(0, 1000000, (i) =>
        {
          collection.Add(i.ToString());
        });
        stopwatchCollection.Stop();
        Console.WriteLine("Adding (PublisherConsumer)   1 000 000 elements took {0} ms.",
          stopwatchCollection.ElapsedMilliseconds);
 
        IDictionary<string, string> dictionary
          = new ConcurrentDictionary<string,string>();
 
        var stopwatchConcurrentDictionary = Stopwatch.StartNew();
        Parallel.For(0, 1000000, (i) =>
        {
          dictionary.Add(i.ToString(), i.ToString());
        });
        stopwatchConcurrentDictionary.Stop();
        Console.WriteLine("Adding (ConcurrentDictionary) 1 000 000 elements took {0} ms.",
          stopwatchConcurrentDictionary.ElapsedMilliseconds);
 
        Console.ReadKey();
      }
    }
  }
}

Benchmark output:

Collection with 3 elements: A,B,C
Collection after change 'B' with 'X' with 3 elements: A,X,C
Collection after remove 'C' with with 2 elements: A,X
Adding (PublisherConsumer)   1 000 000 elements took 1461 ms.
Adding (ConcurrentDictionary) 1 000 000 elements took 1427 ms.

And when you comment out 177 line with waiting in Add method:

Collection with 3 elements: A,B,C
Collection after change 'B' with 'X' with 3 elements: A,X,C
Collection after remove 'C' with with 2 elements: A,X
Adding (PublisherConsumer)   1 000 000 elements took 674 ms.
Adding (ConcurrentDictionary) 1 000 000 elements took 1712 ms.

It is up to you how to change this example, but you can see that using Publisher->Customer pattern is in some case twice faster than very good .NET 4.0 ConcurrentDictionary<TKey,TValue>, :).

Regards,

P ;).

P.S. You can try to benchmark another methods than Add :).

0 Replies to “PublisherConsumerCollection<T>”

  1. I think you have observed some very interesting details , appreciate it for the post.

    • Thanks Barnel,
      I know, it is kind of example that have many layers and my point was only show this example not technical aspects of every layers. I did not like show you answers but give only examples for think :).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.