ForeachREST in WCF Generic Implementation

imageHi, do you remember my previous post about Foreach in WCF? Today I would like to show you only code samples of the same problem but with REST technology because I search the Internet and there is no example of both Server and Client sites setup property in .NET 4.0. Very often you can find implementation with the HttpClient class that is waste of time in my opinion. I will present you project stored in tree assemblies ForeachREST.Contracts, ForeachREST.Server and ForeachREST.Client. If you have trouble with that code examples let me know in a comments. It is a quite advanced topic and I do not want to write a saga about every mechanisms. I try only focus on correct usage of parts on .NET 4.0 Framework by clear example.

ForeachREST.Contracts

using System;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Web;

namespace ForeachREST.Contracts
{
  [ServiceContract(SessionMode = SessionMode.NotAllowed)]
  public interface IForeachREST<TKey, TValue>
  {
    [OperationContract]
    [WebGet(ResponseFormat = WebMessageFormat.Xml)]
    TValue[] GetNext(TKey key);
  }

  [DataContract]
  [Serializable]
  public class Entity
  {
    [DataMember]
    public string Value { get; set; }
  }
}

ForeachREST.Server.Service

using System;
using System.Collections.Generic;
using System.Collections.Concurrent;
using System.ServiceModel;
using System.Threading;
using ForeachREST.Contracts;

namespace ForeachREST.Server.Service
{
  [ServiceBehavior(
    InstanceContextMode = InstanceContextMode.Single,
    ConcurrencyMode = ConcurrencyMode.Multiple
  )]
  public class ForeachREST<TKey, TValue> : IForeachREST<TKey, TValue>
    where TValue : class, new()
  {
    private readonly Func<IEnumerable<TValue>> Enumerator;
    private readonly int BufferSize;
    private readonly int MaxTimeout;

    public ForeachREST(Func<IEnumerable<TValue>> enumerator,
      int bufferSize,
      int maxTimeout) {
      if (enumerator == null)
        throw new InvalidOperationException(
        "Enumerator cannot be null.");
      if (bufferSize <= 0)
        throw new InvalidOperationException(
        "BufferSize cannot be less or equal than 0.");
      if (maxTimeout <= 0)
        throw new InvalidOperationException(
        "MaxTimeout cannot be less or equal than 0.");

      Enumerator = enumerator;
      BufferSize = bufferSize;
      MaxTimeout = maxTimeout;
    }

    private readonly ConcurrentDictionary<TKey, ConcurrentQueue<TValue>>
      enumerators = new ConcurrentDictionary<TKey, ConcurrentQueue<TValue>>();

    public TValue[] GetNext(TKey key) {
      var count = BufferSize;
      var data = new TValue[count];

      ConcurrentQueue<TValue> queue = null;

      if (!enumerators.ContainsKey(key)) {
        queue = new ConcurrentQueue<TValue>();
        enumerators.TryAdd(key, queue);
        new Thread(
          () => FetchDataFromDataStore(key, queue)
        ) { IsBackground = true }.Start();
      } else {
        queue = enumerators[key];
      }

      SpinWait.SpinUntil(() => !queue.IsEmpty);

      TValue value;
      var fetched = 0;
      RETRY:      
      if (queue.TryDequeue(out value)) {
        if (value == null) {
          enumerators.TryRemove(key, out queue);
          for(var pos = fetched; pos < count; ++pos)
            data[pos] = value;
          return data;
        }

        data[fetched++] = value;

        if (fetched < count) {
          SpinWait.SpinUntil(() => !queue.IsEmpty);
          goto RETRY;
        }
      } else {
        SpinWait.SpinUntil(() => !queue.IsEmpty);
        goto RETRY;
      }

      return data;
    }

    private void FetchDataFromDataStore(
      TKey key,
      ConcurrentQueue<TValue> queue) {
      foreach (var next in Enumerator()) {
        if (SpinWait.SpinUntil(
          () => queue.Count < BufferSize, MaxTimeout)) {
          queue.Enqueue(next);
        } else {
          enumerators.TryRemove(key, out queue);
          break;
        }
      }
    }
  }
}

ForeachREST.Server

using System;
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.ServiceModel.Description;
using ForeachREST.Contracts;

namespace ForeachREST.Server
{
  class Server
  {
    static void Main(string[] args) {
      ServiceHost serviceHost = null;

      try {
        string addressStr = "http://localhost:27272/ForeachREST";

        var binding = new WebHttpBinding(WebHttpSecurityMode.None);

        var addressUri = new Uri(addressStr);

        var service = new Service.ForeachREST<Guid, Entity>(
          // Enumerator Injection.
          GetEnumeratorFromDataStore,
          // 1000 entities is max buffer size for one enumeration.
          1000,
          // 10 seconds is max timeout for getting next value from server.
          10000
        );

        serviceHost = new ServiceHost(service, addressUri);

        var endpoint = serviceHost.AddServiceEndpoint(
            typeof(IForeachREST<Guid, Entity>),
            binding,
            addressUri);

        endpoint.Behaviors.Add(new WebHttpBehavior());

        serviceHost.Open();

        Console.WriteLine(
          "ForeachREST service is running, press any key to stop.");
        Console.ReadKey();

      } catch (Exception ex) {
        Console.WriteLine(ex.ToString());
      } finally {
        serviceHost.Close();
      }
    }

    #region Enumerator for Injection

    // Last yield return shall be null;
    private static IEnumerable<Entity> GetEnumeratorFromDataStore() {
      for (int i = 0; i < 1000000; ++i) {
        yield return new Entity { Value = i.ToString() };
      }
      yield return null;
    }

    #endregion
  }
}

ForeachREST.Client

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.ServiceModel;
using System.ServiceModel.Description;
using System.Text;
using ForeachREST.Contracts;

namespace ForeachREST.Client
{
  class Client
  {
    static void Main(string[] args) {
      var count = 0;
      var meter = Stopwatch.StartNew();

      foreach (var element in ForeachRESTConsumer.GetEnumerator()) {
        var value = element.Value;
        ++count;
        //Console.WriteLine(value);
      }

      meter.Stop();
      Console.WriteLine("{0} elements was feched in {1} ms.",
        count, meter.ElapsedMilliseconds);
      Console.ReadKey();
    }
  }

  public static class ForeachRESTConsumer
  {
    static IForeachREST<Guid, Entity> client;

    static ForeachRESTConsumer() {
      var binding = new WebHttpBinding(WebHttpSecurityMode.None);
      var address = new EndpointAddress("http://localhost:27272/ForeachREST");
      var channelFactory = new ChannelFactory<IForeachREST<Guid, Entity>>(binding, address);
      channelFactory.Endpoint.Behaviors.Add(new WebHttpBehavior());

      client = channelFactory.CreateChannel(address);
    }

    public static IEnumerable<Entity> GetEnumerator() {
      var key = Guid.NewGuid();

      Entity[] nexts = null;

      while ((nexts = client.GetNext(key)) != null) {
        for (int i = 0; i < nexts.Length; ++i) {
          var value = nexts[i];
          if (value == null)
            yield break;
          yield return value;
        }
      }
    }
  }
}

Again, if you need more information let me know in comments.

P ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.