SimpleDatabaseBroker

Hi, today I would like to share with you implementation of new SimpleServiceBus that now uses Apache.Thrift and Protobuf.NET and is faster because of that. First reason of use those libraries was of course performance, but also I want to build something completely new which is SimpleDatabaseBroker. For use this code in SQL Server 2012 I needed something different than dependency for System.ServiceModel.dll because that cannot be used as SQL CLR. In other words you cannot use WCF for SQL CLR. So, I replaced communication model from WCF to Apache.Thrift. I used that solution for sending simple messages with generic List of byte arrays. And that byte arrays contains serialized messages with Protobuf.NET. Unfortunately, sometimes with complicated communications Protobuf deserialized some messages with null properties, but not for communication model I used for SimpleDatabaseBroker. Let me start with simple example to show you how it works. For example you want to invoke following code in T-SQL on SQL Server 2012. Where you invoke RequestAsync stored procedure with 2 arguments, preconfigured queue name and message content.

USE SimpleDatabaseBroker;
EXEC [dbo].[RequestAsync] '*', 'Hello SimpleDatabaseBroker!';
EXEC [dbo].[RequestAsync] 'emails', 'some.email@domain.somwhere.com';
EXEC [dbo].[RequestAsync] 'notifs', 'Hi!';
EXEC [dbo].[RequestAsync] 'emails', 'This is queue for emails notifis';
EXEC [dbo].[RequestAsync] 'notifs', 'This is queue for notification';
EXEC [dbo].[RequestAsync] '*', 'This is queue for rest messages';
EXEC [dbo].[RequestAsync] '*', 'key:A val:1';
EXEC [dbo].[RequestAsync] '*', 'key:B val:2';
EXEC [dbo].[RequestAsync] '*', 'key:C val:3';
EXEC [dbo].[RequestAsync] '*', 'key:D val:4';
EXEC [dbo].[RequestAsync] '*', 'key:E val:5';

And when you do that you can the same time consume your queue, messages information in any services for long running query operations or background processing in your backend Cloud environment. It can looks like below in tester consumers for above code in T-SQL. Very important thing to remember is that SimpleDatabaseBroker is truly async and sending or requesting messages is in fact putting that messages into data model for consumption. So when you use that in for example trigger that send messages to customers you make request truly async and your trigger is not blocked. So for above code, I started 4 consumer testers and all of that consumers received all mesages. Order per queue is correct but all messages are consumed by separated threads so, writing lines with message content can have different order.

image

To use this solution all you have to do is using SQL CLR database and new version of SimpleServiceBus that is integrated part of SimpleDatabaseBroker. Below you can find T-SQL code that creates this database, I am not sure if UNSAFE permission is required for this case, you can experiment with remove that if you like.

/* -- drop database under development of solution
USE master
GO
DROP DATABASE SimpleDatabaseBroker
GO
*/
CREATE DATABASE SimpleDatabaseBroker
GO
USE SimpleDatabaseBroker
GO
sp_configure 'clr enabled', 1
GO
RECONFIGURE
GO
ALTER DATABASE SimpleDatabaseBroker
SET TRUSTWORTHY ON
GO
RECONFIGURE
GO
DECLARE @FPATH AS VARCHAR(100)
-- check this path on your system
SET @FPATH = 'C:\Windows\Microsoft.NET\Framework64\v4.0.30319';
CREATE ASSEMBLY System_Web
AUTHORIZATION [dbo]
FROM @FPATH + '\System.Web.dll'
WITH PERMISSION_SET = UNSAFE;
DECLARE @PATH AS VARCHAR(100)
-- check this path on your system
SET @PATH = 'C:\Projects\SimpleDatabaseBroker\SimpleDatabaseBroker\bin\Release';
CREATE ASSEMBLY CodingByToDesign_SimpleServiceBusThrift
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleServiceBusThrift.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleServiceBusProtobuf
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleServiceBusProtobuf.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleServiceBus
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleServiceBus.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleDatabaseBrokerProtocol
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleDatabaseBrokerProtocol.dll'
WITH PERMISSION_SET = UNSAFE;
CREATE ASSEMBLY CodingByToDesign_SimpleDatabaseBroker
AUTHORIZATION [dbo]
FROM @PATH + '\CodingByToDesign.SimpleDatabaseBroker.dll'
WITH PERMISSION_SET = UNSAFE;
GO
CREATE PROCEDURE RequestAsync (
 @queue nvarchar(max),
 @message nvarchar(max)
)
AS
EXTERNAL
NAME
CodingByToDesign_SimpleDatabaseBroker.[CodingByToDesign.SimpleDatabaseBroker.Broker].RequestAsync
GO
/* -- trivial test
EXEC dbo.RequestAsync '*', 'Hello SimpleDatabaseBroker!';
*/

I want to also show you implementations of almost trivial key elements of SimpleDatabaseBroker here. First I will show you Broker class in CodingByToDesign.SimpleDatabaseBroker.dll assembly. Behind the scenes it opens listeners and accept connections for in this case 3 configured Requesters for separated queues. Important thing is that it works truly async and puts into data model to queue informations that can be but not have to be consumed be consumers. And I decided to limit that queue to 1 million messages buffer. If you like use this solution on production servers you may consider to add Start, Stop and GetState methods, last one have to use output parameter not return value to become stored procedure.

namespace CodingByToDesign.SimpleDatabaseBroker
{
    using System;
    using System.Collections.Generic;
    using Microsoft.SqlServer.Server;
    using CodingByToDesign.SimpleDatabaseBrokerProtocol;
    using CodingByToDesign.SimpleServiceBus.Communication.Contracts;
    using CodingByToDesign.SimpleServiceBus.Communication;

    public static class Broker
    {
        static readonly IRequester<SimpleDatabaseBrokerMessage>[] requesters
            = new IRequester<SimpleDatabaseBrokerMessage>
              [SimpleDatabaseBrokerConfigs.Configs.Length];
        static readonly IDictionary<string, int> indexes
            = new Dictionary<string, int>
              (SimpleDatabaseBrokerConfigs.Configs.Length);

        static Broker()
        {
            for (var index = 0; 
                 index < SimpleDatabaseBrokerConfigs.Configs.Length;
                 ++index)
            {
                var config = SimpleDatabaseBrokerConfigs.Configs[index];
                indexes.Add(config.QueueName, index);
                requesters[index] = new Requester<SimpleDatabaseBrokerMessage>
                (config.QueueNetTcpAddress);
            }
        }
        
        [SqlProcedure]
        public static void RequestAsync(string queue, string message)
        {
            int index;
            if (indexes.TryGetValue(queue, out index))
            {
                requesters[index].RequestAsync(new SimpleDatabaseBrokerMessage
                { Message = message });
            }
        }
    }
}

Also interesting thing is Consumer code that is able to consume messages from Broker. But if you want to use that on production keep in mind that you should get messages as fast as possible on Consumer machine in backend Cloud system. So you can use Publisher-Consumer pattern and enqueue all messages to concurrent queues and then consume in separated threads that messages. If you do that you will be able to control your queues and for example purge messages from processing very fast. Or you can use IRequester and IReceiver from SimpleServiceBus in few lines of code, because both those components use Publisher-Consumer pattern as well.

namespace CodingByToDesign.SimpleDatabaseBrokerConsumer
{
    using System;
    using System.Collections.Generic;
    using CodingByToDesign.SimpleDatabaseBrokerProtocol;
    using CodingByToDesign.SimpleServiceBus.Communication.Contracts;
    using CodingByToDesign.SimpleServiceBus.Communication;

    public static class Consumer
    {
        static readonly IReceiver<SimpleDatabaseBrokerMessage>[] receivers
            = new IReceiver<SimpleDatabaseBrokerMessage>
              [SimpleDatabaseBrokerConfigs.Configs.Length];
        static readonly IDictionary<string, int> indexes
            = new Dictionary<string, int>
              (SimpleDatabaseBrokerConfigs.Configs.Length);

        static Consumer()
        {
            for (var index = 0;
                 index < SimpleDatabaseBrokerConfigs.Configs.Length;
                 ++index)
            {
                var config = SimpleDatabaseBrokerConfigs.Configs[index];
                indexes.Add(config.QueueName, index);
                receivers[index] = new Receiver<SimpleDatabaseBrokerMessage>
                                   (config.QueueNetTcpAddress)
                {
                    RecieveAction = new Action<SimpleDatabaseBrokerMessage>(
                        message =>
                        {
                            var messageReceived = MessageReceived;
                            var queueName = config.QueueName;
                            if (messageReceived != null)
                            {
                                messageReceived.Invoke(queueName, message.Message);
                            }
                        }
                    )
                };
            }
        }

        public static Action<string, string> MessageReceived;
    }
}

For your own experiments I am sharing source code. So here there are. Source Code of SimpleServiceBus with Thrift and Protobuf (2136 downloads) and Source Code of SimpleDatabaseBroker (2130 downloads). There is one more thing, I decided to share those source codes with the Apache License 2.0 for reuse in commercial software. Enjoy!

p ;).

2 Replies to “SimpleDatabaseBroker”

  1. I asked for help author of Protobuf.NET, Marc Gravell. Maybe he will help me to pass last performance test on SimpleServiceBus that fail because of deserializaed string property that is null but should have content. I will update content if He help me or when I find out where is the issue.

  2. Pingback: Happy Holidays at The End of 2014 @ coding by to design

Leave a Reply

Your email address will not be published. Required fields are marked *

*