Batch Processing Aspect in C#

code_puzzleHi, today I want to share with you idea of batch processing aspect. It solves an issue with calling T-SQL Server procedures 1-by-1 request for example for inserts calls. And instead of calling 1-by-1 I prepared aspects that you call 1-by-1 but it do it in batches for example up to 100-by-100 like in my test example. Below you can find a code that includes aspect and a simple test of it. The most important quality factor of usage of this aspect is the performance of the execution. For example on the local machine MS SQL Server where network latency is minimal execution of 1-by-1 of 1000 inserts took more than 6 seconds and 100-by-100 took about 0.8 second. And it scales very well especially when the network latency that increase execution of all round trips time. The thing is that for 1 million requests in my test I needed only 193 seconds to put them inside database witch is 10 times faster than using NHibernate or Entity Framework ORMs. And what is the most important usage of aspect is just request entity changes 1-by-1 and aspects does the work and make them 100-by-100. Of course it depends of how constructor of the aspect is configured. In code below it is up to 100-by-100 with delay up to 1000 milliseconds.

namespace BatchProcessingAspectSandbox
{
    using System;
    using System.Collections.Concurrent;
    using System.Collections.Generic;
    using System.Data.SqlClient;
    using System.Diagnostics;
    using System.Threading;

    public interface IBatchProcessingAspectEntity { }

    public class BatchProcessingAspect<T> where T : IBatchProcessingAspectEntity
    {
        Action<T[]> dbAction;
        int maxTimeout;
        int maxCount;
        bool working;

        public BatchProcessingAspect (
            Action<T[]> dbAction,
            int maxTimeout = 1000,
            int maxCount = 100
        )
        {
            this.dbAction = dbAction;
            this.maxTimeout = maxTimeout;
            this.maxCount = maxCount;
            this.working = true;
            new Thread(Consume){IsBackground = true}.Start();
        }

        ConcurrentQueue<T> entities = new ConcurrentQueue<T>();

        void Consume()
        {
            var values = new List<T>();

            while (working)
            {
                SpinWait.SpinUntil(() => entities.Count >= maxCount, maxTimeout);

                if (entities.IsEmpty) continue;

                values.Clear();
                
                while (entities.Count > 0 && values.Count < maxCount)
                {
                    T value = default(T);
                    if (entities.TryDequeue(out value))
                    {
                        values.Add(value);
                    }
                }

                if (values.Count > 0)
                {
                    try
                    {
                        dbAction(values.ToArray());
                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine(exception.ToString());
                    }
                }
            }
        }

        public void BatchInvocation(T entity)
        {
            entities.Enqueue(entity);
        }

        public bool IsEmpty
        {
            get { return entities.IsEmpty; }
        }
    }

    class PostChatTest
    {
        class PostChatEntity : IBatchProcessingAspectEntity
        {
            public int ID { get; set; }
            public string Room { get; set; }
            public string Chat { get; set; }
            public string User { get; set; }
            public string Message { get; set; }
        }

        static void PostsChat(PostChatEntity[] entities)
        {
            var ids = new int[entities.Length];
            var rooms = new string[entities.Length];
            var chats = new string[entities.Length];
            var users = new string[entities.Length];
            var messages = new string[entities.Length];

            for (var i = 0; i < entities.Length; ++i)
            {
                rooms[i] = entities[i].Room;
                chats[i] = entities[i].Chat;
                users[i] = entities[i].User;
                messages[i] = entities[i].Message;
            }

            var resutls = DbPostsChat(rooms, chats, users, messages);

            for(var i = 0; i < entities.Length; ++i)
            {
                entities[i].ID = resutls[i];
            }
        }

        static int[] DbPostsChat (
            string[] rooms, string[] chats, string[] users, string[] messages
        )
        {
            var sqlProcedure = "PostsChat";
            var sqlSeparator = "·";

            var connectionStringBuilder = new SqlConnectionStringBuilder();
            connectionStringBuilder.DataSource = "MACBOOKPRO";
            connectionStringBuilder.ApplicationName = "BatchProcessingAspect";
            connectionStringBuilder.InitialCatalog = "CrisisManagement";
            connectionStringBuilder.IntegratedSecurity = true;
            connectionStringBuilder.ConnectTimeout = 0;

            using (var connection
                   = new SqlConnection(connectionStringBuilder.ToString()))
            {
                connection.Open();

                using (var command = new SqlCommand(sqlProcedure, connection))
                {
                    command.CommandType = System.Data.CommandType.StoredProcedure;

                    if (rooms != null)
                    {
                        command.Parameters.AddWithValue("Rooms",
                        string.Join(sqlSeparator, rooms));
                    }
                    if (chats != null)
                    {
                        command.Parameters.AddWithValue("Chats",
                        string.Join(sqlSeparator, chats));
                    }
                    if (users != null)
                    {
                        command.Parameters.AddWithValue("Users",
                        string.Join(sqlSeparator, users));
                    }
                    if (messages != null)
                    {
                        command.Parameters.AddWithValue("Messages",
                        string.Join(sqlSeparator, messages));
                    }

                    var errors = new List<string>();
                    var ids = new List<string>();
                    var names = new List<string>();
                    var values = new List<string>();

                    var flow = new List<string>[] { errors, ids, names, values };
                    var flowStep = -1;

                    SqlDataReader reader = null;
                    try
                    {
                        reader = command.ExecuteReader();

                        do
                        {
                            ++flowStep;
                            while (reader.Read())
                            {
                                flow[flowStep].Add(reader.GetString(0));
                            }
                        }
                        while (reader.NextResult());

                    }
                    catch (Exception exception)
                    {
                        Console.WriteLine(exception.ToString());
                    }
                    finally
                    {
                        if (reader != null)
                        {
                            reader.Dispose();
                            reader = null;
                        }
                    }
                    var response = new int[flow[1].Count];

                    for(var i = 0; i < flow[1].Count; ++i)
                    {
                        response[i] = int.Parse(flow[1][i]);
                    }

                    return response;
                }
            }
        }

        static BatchProcessingAspect<PostChatEntity> batchProcessingAspect
        = new BatchProcessingAspect<PostChatEntity>
        (dbAction: PostsChat, maxTimeout: 1000, maxCount: 100);

        static void Main(string[] args)
        {
            var count = 1000;

            Console.WriteLine("Test Start");

            var entities = new PostChatEntity[count];

            for (var i = 0; i < entities.Length; ++i)
            {
                entities[i] =
                new PostChatEntity
                {
                    Room = "Room1",
                    Chat = "Chat_1_1",
                    User = "user1",
                    Message = "Message+" + i.ToString().PadLeft(6, '0')
                };
            }

            Stopwatch measure = Stopwatch.StartNew();

            for (var i = 0; i < entities.Length; ++i)
            {
                batchProcessingAspect.BatchInvocation (
                    entities[i]
                );
            }

            SpinWait.SpinUntil(() => batchProcessingAspect.IsEmpty);
            measure.Stop();

            Console.Write("Test Stop: took {0} ms... ", measure.ElapsedMilliseconds);

            Thread.Sleep(2500);

            var pass = true;

            for (var i = 0; i < entities.Length; ++i)
            {
                if (entities[i].ID == 0)
                {
                    pass = false;
                    break;
                }
            }

            Console.WriteLine(pass ? "PASS" : "FAIL");
            Console.ReadKey(true);
        }
    }
}

And here it is a extension for database code from Simple Service Bus Training database I posted some time ago. Below procedure is ready for bulk executions of many arguments. Only limitation is a request size for MS SQL Server.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

-- =============================================
-- Author: 	 	  Piotr Sowa
-- Create Date: July 10th, 2015
-- Description:	Split Arguments Util Function
-- =============================================
CREATE FUNCTION [dbo].[SplitArguments] (
@Arguments [NVARCHAR](MAX)
)
RETURNS
@ReturnArguments TABLE (
[Index] [INT] IDENTITY(0,1) PRIMARY KEY,
[Argument] [NVARCHAR](4000))
AS
BEGIN
	DECLARE @delimiter NVARCHAR(1)
	SET @delimiter = N'·'

    IF @Arguments is null return
    DECLARE	@iStart INT,
    		@iPos INT

    IF SUBSTRING(@Arguments, 1, 1) = @delimiter 
    BEGIN
    	SET	@iStart = 2
    	INSERT INTO @ReturnArguments
    	VALUES (NULL)
    END
    ELSE 
    	SET	@iStart = 1
    WHILE 1=1
    BEGIN
    	SET @iPos = CHARINDEX(@delimiter, @Arguments, @iStart)
    	IF @iPos = 0
    		SET	@iPos = len( @Arguments )+1
    	IF @iPos - @iStart > 0			
    		INSERT INTO @ReturnArguments
    		VALUES (SUBSTRING (@Arguments, @iStart, @iPos-@iStart))
    	ELSE
    		INSERT INTO @ReturnArguments
    		VALUES (NULL)
    	SET	@iStart = @iPos+1
    	IF @iStart > len( @Arguments ) 
    		BREAK
    END

	RETURN
END
GO

SET ANSI_NULLS ON
GO

SET QUOTED_IDENTIFIER ON
GO

-- =============================================
-- Author: 	 	  Piotr Sowa
-- Create Date: July 21th, 2016
-- Description:	PostChat in CrisisManagement
-- =============================================
CREATE PROCEDURE [dbo].[PostsChat]
@Rooms NVARCHAR(MAX),
@Chats NVARCHAR(MAX),
@Users NVARCHAR(MAX),
@Messages NVARCHAR(MAX)
AS
BEGIN
	DECLARE @ErrorsTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[Error] [NVARCHAR](4000))
	DECLARE @IDsTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[ID] [NVARCHAR](4000))
	DECLARE @NamesTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[Name] [NVARCHAR](4000))
	DECLARE @ValuesTable TABLE(
[Index] [INT] IDENTITY(1,1) PRIMARY KEY,
[Value] [NVARCHAR](4000))

	DECLARE @ArgumentsInTable TABLE(
[Index] [INT] PRIMARY KEY,
[C_ID] INT,
[User] [NVARCHAR](4000),
[Message] [NVARCHAR](4000))

	INSERT INTO @ArgumentsInTable([R].[Index], [C_ID], [User], [Message])
	SELECT [R].[Index], [CT].[ID], [U].[Argument], [M].[Argument]
	FROM SplitArguments(@Rooms) AS [R]
	INNER JOIN [dbo].[Rooms] AS [RT] ON [RT].[Name] = [R].Argument
	INNER JOIN SplitArguments(@Chats) AS [C] ON [R].[Index] = [C].[Index]
	INNER JOIN [dbo].[Chats] AS [CT] ON [CT].[Name] = [C].[Argument]
	INNER JOIN SplitArguments(@Users) AS [U] ON [R].[Index] = [U].[Index]
	INNER JOIN SplitArguments(@Messages) AS [M] ON [R].[Index] = [M].[Index]
	ORDER BY [Index]

	INSERT INTO [ChatPosts]([ChatID], [UserName], [Message], [Status])
	OUTPUT CAST(INSERTED.ID AS NVARCHAR(4000))
        INTO @IDsTable([ID])
	SELECT
	[A].[C_ID], [A].[User], [A].[Message], 0
	FROM @ArgumentsInTable [A]
	ORDER BY [A].[Index]

	SELECT [Error] FROM @ErrorsTable ORDER BY [Index]
	SELECT [ID] FROM @IDsTable ORDER BY [Index]
	SELECT [Name] FROM @NamesTable ORDER BY [Index]
	SELECT [Value] FROM @ValuesTable ORDER BY [Index]
END
GO

p ;).

One Reply to “Batch Processing Aspect in C#”

  1. Pingback: Batch Processing Aspect in Java - coding by to design

Leave a Reply

Your email address will not be published. Required fields are marked *

*