Foreach in WCF Generic Implementation (New)

imageHi, Today I try to resolve in elegant and generic way problem of getting very big amount of data from WCF server. This is my resolution for that problem and it based on materialization of generic IEnumerable interface for both client and server site. On server site I try to prepare generic way for getting any kind of data from any kind of data source. Also in my solution I use small buffer for data, because I do not need to store all data in memory. I can imagine that amount of this data is simply to big to store it all so I need to fetch data in elegant way by simple injection of the enumerator and consume this elephant amount of data piece by piece.

So lets start from the ServiceContract and the DataContract implementation. Everything is generic because I need to serve any kind of data row by row and I need decide what kind of  data I serve on WCF host. Generics give me possibility to create any kind of enumerator that is necessary for real implementation.

using System;
using System.Runtime.Serialization;
using System.ServiceModel;

namespace ForeachWCFContracts.Service
{
  [ServiceContract]
  public interface IForeachWCF<TKey, TValue>
  {
    [OperationContract]
    TValue[] GetNext(TKey key);
  }

  [DataContract]
  [Serializable]
  public class Entity
  {
    [DataMember]
    public string Value { get; set; }
  }
}

Now I need IForeachWCF implementation and I will try to make it as simple as I can but with some extra benefits. I want to inject any IEnumerable implementation in this service and I need to have that enumerator generic for serving anything I want. Also, I need to collect unnecessary buffers from memory automatically on server site when clients stop getting data or die from connection perspective.

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ServiceModel;
using System.Threading;
using ForeachWCFContracts.Service;

namespace ForeachWCFServer.Service
{
  [ServiceBehavior(
    InstanceContextMode = InstanceContextMode.Single,
    ConcurrencyMode = ConcurrencyMode.Multiple
  )]
  public class ForeachWCF<TKey, TValue> : IForeachWCF<TKey, TValue>
    where TValue : class, new()
  {
    private readonly Func<IEnumerable<TValue>> Enumerator;
    private readonly int BufferSize;
    private readonly int MaxTimeout;

    public ForeachWCF(Func<IEnumerable<TValue>> enumerator,
      int bufferSize,
      int maxTimeout) {
      if (enumerator == null)
        throw new InvalidOperationException(
        "Enumerator cannot be null.");
      if (bufferSize <= 0)
        throw new InvalidOperationException(
        "BufferSize cannot be less or equal than 0.");
      if (maxTimeout <= 0)
        throw new InvalidOperationException(
        "MaxTimeout cannot be less or equal than 0.");

      Enumerator = enumerator;
      BufferSize = bufferSize;
      MaxTimeout = maxTimeout;
    }

    private readonly ConcurrentDictionary<TKey, ConcurrentQueue<TValue>>
      enumerators = new ConcurrentDictionary<TKey, ConcurrentQueue<TValue>>();

    public TValue[] GetNext(TKey key) {
      var count = BufferSize;
      var data = new TValue[count];

      ConcurrentQueue<TValue> queue = null;

      if (!enumerators.ContainsKey(key)) {
        queue = new ConcurrentQueue<TValue>();
        enumerators.TryAdd(key, queue);
        new Thread(
          () => FetchDataFromDataStore(key, queue)
        ) { IsBackground = true }.Start();
      } else {
        queue = enumerators[key];
      }

      SpinWait.SpinUntil(() => !queue.IsEmpty);

      TValue value;
      var fetched = 0;
      RETRY:      
      if (queue.TryDequeue(out value)) {
        if (value == null) {
          enumerators.TryRemove(key, out queue);
          for(var pos = fetched; pos < count; ++pos)
            data[pos] = value;
          return data;
        }

        data[fetched++] = value;

        if (fetched < count) {
          SpinWait.SpinUntil(() => !queue.IsEmpty);
          goto RETRY;
        }
      } else {
        SpinWait.SpinUntil(() => !queue.IsEmpty);
        goto RETRY;
      }

      return data;
    }

    private void FetchDataFromDataStore(
      TKey key,
      ConcurrentQueue<TValue> queue) {
      foreach (var next in Enumerator()) {
        if (SpinWait.SpinUntil(
          () => queue.Count < BufferSize, MaxTimeout)) {
          queue.Enqueue(next);
        } else {
          enumerators.TryRemove(key, out queue);
          break;
        }
      }
    }
  }
}

For every fetching of the data I starting new background thread, this implementation can be extended for using ThreadPool, but in my example, I try to make everything as simple as possible. As you can probably see I am using SpinWait class for regulation of server buffer size and I also stopping fetch data when a client does not ask about next element of data by a time longer than timeout. everything is parameterized by generics types and constructor parameters. As you can see I inject enumerator implementation as a parameter for constructor by using Func delegate.

So, next we can serve this service implementation on WCF endpoint. I choose NetTcpBinding binding with security mode set to the None value but feel free to use another binding if you need it. In this code example, you can see an injection of the enumerator. You can imagine that this enumerator is getting data from RDBS data store by 1k of elements or something like that and then serve it one by one with yield return statement in our ForeachWCF service implementation.

using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Description;
using ForeachWCFContracts.Service;

namespace ForeachWCFServer
{
  class Server
  {
    static void Main(string[] args) {
      ServiceHost seriveHost = null;

      try {
        string addressStr = "net.tcp://localhost:27272/ForeachWCF";
        string addressMexStr = "net.tcp://localhost:27272/ForeachWCF/MEX";

        var netTcpBinding = new NetTcpBinding(SecurityMode.None);

        var addressUri = new Uri(addressStr);

        var service = new Service.ForeachWCF<Guid, Entity>(
          // Enumerator Injection.
          GetEnumeratorFromDataStore,
          // 1000 entities is max buffer size for one enumeration.
          1000,
          // 10 seconds is max timeout for getting next value from server.
          10000
        );

        seriveHost = new ServiceHost(service, addressUri);

        var metadataBehavior =
          seriveHost.Description.Behaviors.Find<ServiceMetadataBehavior>();

        if (metadataBehavior == null) {
          metadataBehavior = new ServiceMetadataBehavior();
          seriveHost.Description.Behaviors.Add(metadataBehavior);
        }

        seriveHost.AddServiceEndpoint(
          typeof(IForeachWCF<Guid, Entity>),
          netTcpBinding,
          addressStr);

        seriveHost.AddServiceEndpoint(
          typeof(IMetadataExchange),
          netTcpBinding,
          addressMexStr);

        seriveHost.Open();

        Console.WriteLine(
          "ForeachWCF service is running, press any key to stop.");
        Console.ReadKey();

      } catch (Exception ex) {
        Console.WriteLine(ex.ToString());
      }
      finally {
        seriveHost.Close();
      }
    }

    #region Enumerator for Injection

    // Last yield return shall be null;
    private static IEnumerable<Entity> GetEnumeratorFromDataStore() {
      for (int i = 0; i < 1000000; ++i) {
        yield return new Entity { Value = i.ToString() };
      }
      yield return null;
    }
  }

  #endregion
}

As you can see GetEnumerator is injected to the service host and I try to generate 1 million of elements for fetch. This fetching will be autoregulated by service host because of buffer size and in GetEnumeratorFromDataStore method we can fetch data by 1k rows and then enumerate by that data one by one using yield return construction.

And now we need only simple client implementation. Our service host has regular binding and MEX binding too for clients which want to consume service and create its own proxy classes.  So when we consume this service in client application we can add a service reference for our project and prepare code similar to this shown below.

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
using System.Text;
using ForeachWCFContracts.Service;

namespace ForeachWCFClient
{
  class Client
  {
    static void Main(string[] args) {
      var count = 0;
      var meter = Stopwatch.StartNew();

      foreach (var element in ForeachWCFConsumer.GetEnumerator()) {
        var value = element.Value;
        ++count;
        //Console.WriteLine(value);
      }

      meter.Stop();
      Console.WriteLine("{0} elements was feched in {1} ms.",
        count, meter.ElapsedMilliseconds);
      Console.ReadKey();
    }
  }

  public static class ForeachWCFConsumer
  {
    static IForeachWCF<Guid, Entity> client;

    static ForeachWCFConsumer() {
      var binding = new NetTcpBinding(SecurityMode.None);
      var address = new EndpointAddress("net.tcp://localhost:27272/ForeachWCF");
      var channelFactory = new ChannelFactory<IForeachWCF<Guid, Entity>>(binding, address);

      client = channelFactory.CreateChannel();
    }

    public static IEnumerable<Entity> GetEnumerator() {
      var key = Guid.NewGuid();

      Entity[] nexts = null;

      while ((nexts = client.GetNext(key)) != null) {
        for (int i = 0; i < nexts.Length; ++i) {
          var value = nexts[i];
          if (value == null)
            yield break;
          yield return value;
        }
      }
    }
  }
}

An output on client site on the same machine was something like that:

1 000 000 elements was fetched in 9 400 ms.

I measure performance of that WCF server and 4 clients and 4 Clients get about 10% of CPU and Server get about 5% of CPU time, so it is pretty good I think.

image

Memory usage is constant all the time the same with the number of threads.

image

Ok, so let me know if you enjoy this example and feel free to leave me any comments. I wonder about your experience with consuming a big amount of that in a simple manner. Can you share that information in comments?

P ;).

4 Replies to “Foreach in WCF Generic Implementation (New)”

  1. All good, but isn’t sending petabytes of data SOAP-encoded an anti-pattern? Web services (and WCF is a web service technology, no matter which binding is used) is not meant to send big data. Period. This is a domain of ETL solutions.

  2. Hi Szymon, I have focused mainly on the design pattern which supplies watching the memory usage. I agree that is the domain of ETL solutions, but there will be only the difference in the transport layer. WCF is something fundamental to me, gives a view of how much better is the use of specialized transport layer. very interesting work is also a version 4.0 of the WCF REST (WebHttpBinding and WebHttpBehaviour plus ChannelFactory on the client side).

  3. The goto an in line implementation obscure the server code alot :P, but I guess that was made to compact the code for the sake of example. Usually fetching big data is done through TCP binding and on the server side I tend to do it by splitting the data into chunks where each contains it’s own id that also gets send, if the id is in form of -1 then this means that there is no data left to fetch, this is serviced by a thread pool and glued together, now if the data chunks need to arrive in the correct order as a business requirement, I set up wait handles on chunks to improve performance so that way each chunk (n) in order to be send needs to be signaled by chunk (n – 1) need to only wait on, this solution generally is acceptable if we can spare memory to improve performance, if we can’t then I leverage the threads and they only fetch next 1-2 chunks of data to sent.

    On the front end I simply do: do { //call web service }while( response.hasnextchunk )..

    Another more complex thing to do in such a case is to implement a chunk scheduler that’s based arround the implementation of a heap to keep the most “hot” data at the root and send it right away, this way statistically we will process data right away, but that’s all dependent on the use case in the situation you provided it’s simply not applicable, as there is not enough data.

  4. Pingback: .NET Rulez! Blog » ForeachREST in WCF Generic Implementation

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.