BlockingCollectionConsumer<T>

imagetry

namespace BlockingCollectionConsumerSandbox {
using System;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;  

  class Program {

    static int count;

    static void Main(string[] args) {
      var consumer = new BlockingCollectionConsumer<int>(Consume);

      var c = 1000000;
      Stopwatch meter = new Stopwatch();

      for (int i = 0; i < 10; ++i) {
        meter.Start();
        Parallel.For(0, c, item => consumer.Publish(item));
        SpinWait.SpinUntil(() => c == count);
        meter.Stop();
        Console.WriteLine("1 mln trys took about {0} ms.",
          meter.ElapsedMilliseconds);
        meter.Reset();
        count = 0;
      }

      consumer.StopAndWait();

      Console.WriteLine("Press any key to continue...");

      Console.ReadKey();
    }    

    static void Consume(int item) {
      ++count;
    }
  }
}

an output

1 mln trys took about 400 ms.
1 mln trys took about 411 ms.
1 mln trys took about 397 ms.
1 mln trys took about 402 ms.
1 mln trys took about 411 ms.
1 mln trys took about 408 ms.
1 mln trys took about 420 ms.
1 mln trys took about 411 ms.
1 mln trys took about 409 ms.
1 mln trys took about 399 ms.
Press any key to continue...

implementation

namespace BlockingCollectionConsumerSandbox {
using System.Collections.Concurrent;
using System.Threading.Tasks;

  public class BlockingCollectionConsumer<T> {
    private readonly BlockingCollection<T> collection = new BlockingCollection<T>();
    private readonly Task consumerTask;

    public BlockingCollectionConsumer(System.Action<T> consumeAction) {
      consumerTask = Task.Factory.StartNew(() => Consumer(consumeAction));
    }

    private void Consumer(System.Action<T> consumeAction) {
      foreach (T item in collection.GetConsumingEnumerable())
        consumeAction.Invoke(item);
    }

    public void Publish(T item) {
      collection.Add(item);
    }

    public void StopAndWait() {
      collection.CompleteAdding();
      consumerTask.Wait();
    }
  }
}

no comments, enjoy,

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.