Coder Legacy

In-Proc Agents Communications
Hello, this is my first article on Aspect Coder Network and I decide to start with some very modern subject of software architecture I am working on by a very long time before on implementation on this idea on .NET/C# technology stack. It takes me about four years to find best practices of this subject. Which is modeling inter-process communications as a key of aspects designing for modeling multi-agent environment? It is useful technique, especially when you want to create your backend sub-system or a part of grid computing or even a multi-core desktop application modeled as a group of independent agents working together. Of course such kind of modeling can be very clean design because you can start with a group of agents, you can give to all of the names and then you can only focus on functional specialization of every one of them separately. In this article I will cover some techniques that answers for the following questions. How to create such kind of design with white clean paper and pen? How to prepare testing environment for agents you created with stubbing technique? How many operations you can have per second? How you can calculate all computing bandwidth of solution? All subjects that I am trying to cover in this article are results of my experiments and knowledge.
So, lets begin, I can show you now first example that came to my mind just now as a kind of easy to understand brief solution of financial risk calculation sub-system design. Main functionality of this sub-system will be calculation of the risk of financial operations. I am not trying to cover all aspects of this kind of solutions to you now. I am only wanting to show you how to design such kind of system inefficient way. And also how to calculate number of operations you will be able to create in it. Of course all in details of this kind of calculations, data storing and predicting algorithms are not subjects here. This can be next step to you when you make sure all things work together fast and efficient. You can code your agents in details and focus only on algorithms with all business code logic. So let me show you an example of such kind of high-level system design on a diagram.

In-ProcAgentsCommunication

Now, let me explain it. As you can see we have following agents in our design. ControllerAgent, AuditLogAgent, ValidationAgent, CalculationAgent, RepositoryAgent and CachingAgent. All agents coexist and work together and I draw all communication messages that need to be exchanged. And I hope that all messages name and all agents name will help you to understand functional role of each of them, but I will try to explain what is particular role of every of them. ControllerAgent can be used by any controller of your ASP.NET MVC application or by WebAPI application or even WCF application it is responsible for interpreting needs of all requests and please not combine it with any technology yet, because it can be also used separately in operating system service as well. So, ControllerAgent creates requests in synchronous manner. That mean it needs to wait on response, but as you already know requests can be produced in many threads in all mentioned technology. For example by Thread Pool or TPL abstractions based engines as well. And controller main simple job is making sure that every request was processed and every response was created and returning back to the client from our sub-system solution.
Above mentioned agent cooperate with AuditLogAgent and it has audit logging role, that is mean every single requests will be logged. If you are wondering about good logging technique is Trace and Debug objects from System.Diagnostics namespace, if you use them correctly you can have about 50 millions logs per second stored. You not need any other logging library, everything is build in .NET Framework and waiting for you, but that was only small digression. So, lets return to the subject which is that AuditLogAgent role that is responsibility for log every single requests, it should not slow down the process but this is non-functional requirement, so all data came to this agent asynchronically.
Next you see validation agent and it is responsible for validate every requests, because of any kind of business rules you want, created to check if request should or should not be forbidden because is invalid. That also means that response cannot be created, so agent can immediately after validation returns back response to ControllerAgent with the reason of validation and then ControllerAgent can create forbidden response to this particular client request. On the other hand if request is valid it can be transferred to CalculationAgent. And this agent is a main business logic agent in entire application. Let’s say that this agent is responsible for calculation data for risk of the financial operations, to do so it gets possible transactions predictions data that it needs to calculate and store risk. Before CalculationAgent give a response for example as a group of the risks of particular financial operation in different moments of time for future, it needs to get calculation data to resolve this issue. To get that data CalculationAgent asking RepositoryAgent about all needed information. And then RepositoryAgent can be smart and ask CachingAgent about already get data for the same subject. When CachingAgent has all portion of data or any portion of data that can helps RepositoryAgent to give data faster. But if there is no data in memory of CachingAgent then Repository agent needs to get data from storage, and it can by relational database server or in memory storage solution server or object database or anything else. When RepositoryAgent has data it send that to the CachingAgent and CalculationAgent as well. Moreover RepositoryAgent can notify CachingAgent that some of its data should be collected because information need to be calculated again, for example when RepositoryAgent gets notification from storage. Other possibility for collecting data from CachingAgent memory is own decision of CachingAgent that can be made from all statistic logic of agent. Of course when CalculationAgent receive all necessary data it creates response for ControllerAgent and then ControllerAgent will create response for the client.
Ok, so you may be wondering how to make this design possible. We all living as software engineers, coders and designers in layer world. Am I right? Everyone told us we need layers with separations. In my opinion, it depends, it is time for layers and it is time of agents. Solution of this entry will show you how to use agents and how to model its communication. And I like agents idea because of very easily modeling of functionality of each agent and also because of natural design of that. As you can see on my design draw it is easy to create it in naturally way. And I will also show you how easily you can create system that working in exactly the same way. All you need for that is five following aspects, RequesterAspect with defined request only message type, RequesterAspect with defined both request and response message types, ReceiverAspect with defined response only message type, ReceiverAspet with defined both request and response message types and to have all possibility you can also create ResponderAspect with both request and response message types. Main difference between ReceiverAspect and ResponderAspect with two message types is that, ReceiverAspect can response but do not have to and ResponderAspect needs to create response all the time no matter what. All mentioned aspects should be disposable. So that is bunch of contracts that describe all of mentioned aspects as a solution for every agent.

namespace AspectCoder.Communication.Contracts
{
    using System;

    public interface IMessage
    {
    }
}
namespace AspectCoder.Communication.Contracts
{
    using System;

    interface ITransactId
    {
        long Value { get; }
    }
}
namespace AspectCoder.Communication.Contracts
{
    using System;

    public interface IRequesterAspect<TMessageReq> : IDisposable
    where TMessageReq : IMessage, new()
    {
        void RequestAsync(TMessageReq message);
    }

    public interface IRequesterAspect<TMessageReq, TMessageRes> : IDisposable
    where TMessageReq : IMessage, new()
    where TMessageRes : IMessage, new()
    {
        Action<TMessageRes> ResponseAction { get; set; }
        void RequestAsync(TMessageReq message);
        void RequestSync(TMessageReq message);
        void RequestAsync(TMessageReq message, Action<TMessageRes> responseAction);
        void RequestSync(TMessageReq message, Action<TMessageRes> responseAction);
    }
}
namespace AspectCoder.Communication.Contracts
{
    using System;

    public interface IReceiverAspect<TMessageReq> : IDisposable
    where TMessageReq : IMessage, new()
    {
        Action<TMessageReq> RecieveAction { get; set; }
    }

    public interface IReceiverAspect<TMessageReq, TMessageRes>
    where TMessageReq : IMessage, new()
    where TMessageRes : IMessage, new()
    {
        Action<Guid, TMessageReq> RecieveAction { get; set; }
        void ResponseAsync(Guid id, TMessageRes message);
    }
}
namespace AspectCoder.Communication.Contracts
{
    using System;

    public interface IResponderAspect<TMessageReq, TMessageRes>
    where TMessageReq : IMessage, new()
    where TMessageRes : IMessage, new()
    {
        Func<TMessageReq, TMessageRes> ResponseFunc { get; set; }
    }
}

Now, you may be wonder how to make it happened and how to make implementation of above interfaces? To be honest it can be done in good and in bad way. Hope you are not wondering how to make it bad. So, let me explain good implementation of this solution. First you need to protect all invocation with Publisher-Consumer pattern. It can be done by using ConcurrentQueue collection from .NET 4.0 and one Thread to consume messages. That allows you to have solution without any locking in all production code. Maybe you will needs some synchronization technique in your messaging aspects, but I am not sure. I was using read an write slim blockers form .NET 4.0 and there are works great. Second thing is to remember that all aspects should somehow to be able to find each other and also that you can have different number of requesters than receivers and so on. If you want to store transactions in progress you can use ConcurrentDictionary collection from .NET 4.0, but remember to have some timeouts and collecting old transactions from this dictionary. Ok, now it is time to challenge. This is my benchmark project for you, you can download it from here: New OpenSource NoLock InProc Communication Challenge (2170 downloads). And if you want you can try make something faster and better. All code here is also for showing you how to test aspects, how to stub agents communication and how to measure number of messages that you can transfer in your sub-system. My results you can find below, this is also a challenge, you can try to get better performance if you can. Or prepare better library then library inside this challenge. To be sure you won you need to extend benchmark tests and add both usage of AspectCoder.dll included assembly and your assembly, then you can compare results. Good luck!

Testing communication... 
Syncronously sending 10 events:
(1)Requester(msg)->(2)Receiver(msg)
and NO BACK...
PASS.

Syncronously sending 10 events:
(1)Requester(msg1)->(1)Responder(msg1,msg2)->(1)Receiver(msg1,msg2)
and NO BACK...
PASS.

Syncronously sending 10 events:
(1)Requester(msg1,msg2)->Responder(msg1,msg2)
and BACK...
PASS.

Asyncronously sending 10 events:
(1)Receiver(msg)->(1)Requester(msg)
and NO BACK...
PASS.

Asyncronously sending 10 * 50 000 events:
(1)Requester(msg1,msg2)->(1)Responder(msg1,msg2)
and BACK...
Benchmark start 50 000 events: 53 milliseconds, 943396.2264 per second.
Benchmark start 50 000 events: 64 milliseconds, 781250 per second.
Benchmark start 50 000 events: 83 milliseconds, 602409.6386 per second.
Benchmark start 50 000 events: 53 milliseconds, 943396.2264 per second.
Benchmark start 50 000 events: 66 milliseconds, 757575.7576 per second.
Benchmark start 50 000 events: 80 milliseconds, 625000 per second.
Benchmark start 50 000 events: 54 milliseconds, 925925.9259 per second.
Benchmark start 50 000 events: 43 milliseconds, 1162790.6977 per second.
Benchmark start 50 000 events: 78 milliseconds, 641025.641 per second.
Benchmark start 50 000 events: 78 milliseconds, 641025.641 per second.
Disposing... done after 0 milliseconds.
PASS.

Syncronously sending 10 * 100 000 events:
(1)Requester(msg1,msg2)->(1)Receiver(msg1,msg2)->
(1)Requester(msg3,msg4)->(1)Receiver(msg3,msg4)->
(1)Requester(msg5)->(1)Receiver(msg5)
and (2)BACK(msg2,msg4)...
Benchmark start 100 000 events: 934 milliseconds, 107066.3812 per second.
Benchmark start 100 000 events: 801 milliseconds, 124843.9451 per second.
Benchmark start 100 000 events: 901 milliseconds, 110987.7913 per second.
Benchmark start 100 000 events: 902 milliseconds, 110864.745 per second.
Benchmark start 100 000 events: 896 milliseconds, 111607.1429 per second.
Benchmark start 100 000 events: 910 milliseconds, 109890.1099 per second.
Benchmark start 100 000 events: 953 milliseconds, 104931.7943 per second.
Benchmark start 100 000 events: 979 milliseconds, 102145.046 per second.
Benchmark start 100 000 events: 738 milliseconds, 135501.355 per second.
Benchmark start 100 000 events: 968 milliseconds, 103305.7851 per second.
Disposing... done after 0 milliseconds.
PASS.

Press any key to continue...

In this article I focused mostly on inside process communication between agents in solution. In next one article I will show you solution for create agents or agents group that communicate between many processes and between many hosts as well for example on computing grid farm or on multi-core computing system. Good luck with a challenge and see you next time on Aspect Coder Network.

Coder

Out-Proc and In-Net Agents Communications
Hello, this will be my next article on Aspect Coder Network and I am very exciting to show you this subject. There is a many ways to communicate across processes and inside the network. True be told I spend more than last 2 years to find best way to do that in most efficient and secure way the same time. Implementation that I will be presenting here is based on the best truly interoperability I know implemented in Apache Thrift in version 0.9.0. Best information of is that you can communicate with this solution across process and inside the network and across all modern languages and technology the same time. For example between: Objective C, C++,  C# , Cocoa, D,  Delphi, Erlang, Haskell, Java, JavaScript, OCaml, Perl, PHP, Python, Ruby, Smalltalk and probably more. And this is a really fun make generic design for common usage with this. Today I want to start with following Cloud usage example. This will be notification monitoring system for few agents inside Cloud implementation on hosting web farm. Very often we need to build agents solution inside very fast secured network for example by Layer-3 VLAN based separation. And for build best backend we very often need good and very fast notifications from many resources from our cloud application. So we can imagine following design.

Out-ProcAgentsCommunication
We have in center of our design LongRunProcessing agent/service and we have 3 sources of long run processing tasks, WebApp, DbTriggers and BackendAgents. We want to use NotificationAgent for all processing notification. When any of 3 task sources send task to LongRunProcessing agent we need to send notification that we try to contact with this agent.  Also LongRunProcessing agent needs to notify about begin of work and end of work. All information need to go to the NotoficationAgent. Our NotificationAgent store all notification that came into Relational DataBase Server for WebApp for future analyze. And there is two kind of principles here. First is that all nodes need to know what is going on, so notification not only go to NotificationAgent but also there are broadcasted to the all nodes that send them or in this solution publish notifications. All elements of this design are independent processes and all of them coexist in the same TCP/IP network. I think I can tell you a lot about implementation I was doing inside this solution for NotificationAgent, but I think that will be most efficient to share with you only a few information for basic understanding the solution and present you working implementation challenge for your self experimenting. So, implementation using LongPooling pattern forget back all published notifications and broadcast them all to nodes. And also this implementation allow Publish operation for all nodes that connect to the service. Responsibility for getting all notifications from NotoficationAgent is on node site. Because notifications are prepared but not sanded, you need to get them. And if NotificationAgent has not any notification it use long wait to have client connected and if even after long wait there is no notifications the empty array of notifications is getting. This is for holding connection alive. What is more important getting notification not need to open connection for SYN packets on node site only on NotificationAgent site. Ok, I hope you are interesting a bit how it can be implemented and try to challenge with my implementation and benchmark. So here it is New OpenSource NoLock OutProc Communication Challenge (1901 downloads). That is whole library refactored a bit since first article to hold all inside process and outside process communication in the same place for reuse. And I will be happy if you find best solution for this and let me know about that. Here is my results of benchmark.

COMMUNICATION OUT-PROC CHALLENGES
Syncronously sending 10 * 100 000 messages each with 10 characters:
(1)LongPoolServer->(1)LongPoolClient...
100k messagess, each with 10 characters received in 490196.0784 per second.
100k messagess, each with 10 characters received in 495049.505 per second.
100k messagess, each with 10 characters received in 537634.4086 per second.
100k messagess, each with 10 characters received in 531914.8936 per second.
100k messagess, each with 10 characters received in 495049.505 per second.
100k messagess, each with 10 characters received in 492610.8374 per second.
100k messagess, each with 10 characters received in 537634.4086 per second.
100k messagess, each with 10 characters received in 492610.8374 per second.
100k messagess, each with 10 characters received in 534759.3583 per second.
100k messagess, each with 10 characters received in 534759.3583 per second.

Syncronously sending 10 * 100 000 messages each with 100 characters:
(1)LongPoolServer->(1)LongPoolClient...
100k messagess, each with 100 characters received in 401606.4257 per second.
100k messagess, each with 100 characters received in 495049.505 per second.
100k messagess, each with 100 characters received in 492610.8374 per second.
100k messagess, each with 100 characters received in 427350.4274 per second.
100k messagess, each with 100 characters received in 458715.5963 per second.
100k messagess, each with 100 characters received in 456621.0046 per second.
100k messagess, each with 100 characters received in 429184.5494 per second.
100k messagess, each with 100 characters received in 458715.5963 per second.
100k messagess, each with 100 characters received in 400000 per second.
100k messagess, each with 100 characters received in 458715.5963 per second.

Syncronously sending 10 * 100 000 messages each with 1000 characters:
(1)LongPoolServer->(1)LongPoolClient...
100k messagess, each with 1000 characters received in 114155.2511 per second.
100k messagess, each with 1000 characters received in 112485.9393 per second.
100k messagess, each with 1000 characters received in 106723.5859 per second.
100k messagess, each with 1000 characters received in 118483.4123 per second.
100k messagess, each with 1000 characters received in 116550.1166 per second.
100k messagess, each with 1000 characters received in 114285.7143 per second.
100k messagess, each with 1000 characters received in 112359.5506 per second.
100k messagess, each with 1000 characters received in 112359.5506 per second.
100k messagess, each with 1000 characters received in 112485.9393 per second.
100k messagess, each with 1000 characters received in 116414.4354 per second.

Press any key to continue...

This is not the end, I want to show you how simple NotificationAgent can be build. There are two simple code examples. I think It not need to be explained, it is almost trivial. Remember only to split code samples to two Console Application projects based on namespace names.

namespace NotificationAgentSimpleServer
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using AspectCoder.CommunicationOutProc;
    using AspectCoder.CommunicationOutProc.Generated;

    class SimpleServer
    {
        static void Main(string[] args)
        {
            var address = args.Length == 1 ? args[0] : "net.tcp://127.27.27.27:2727/NotificationAgentServer";

            Uri uri;
            if (!Uri.TryCreate(address, UriKind.RelativeOrAbsolute, out uri)) return;

            var serviceServer = new LongPoolingNotificationEntityImplementationThrift(5000, 1000000);

            var communicatorServer = new LongPoolingNotificationEntityPublishServiceImplementationThrift();

            var server = communicatorServer.CreateServer(uri.Port);
            try
            {
                Console.WriteLine("Hello, what is server name?");
                var name = Console.ReadLine();

                Console.WriteLine("Welcome '{0}' as notification server.", name);
                Console.WriteLine("write 'quit' command to quit.");

                new Thread(server.Serve) { IsBackground = true }.Start();
                Console.WriteLine("Server is running...");

                string text = string.Empty;
                while ((text = Console.ReadLine()) != "quit")
                {
                    serviceServer.Publish(
                        new List<LongPoolingNotificationEntityThrift> {
                            new LongPoolingNotificationEntityThrift {
                                DataString = string.Format("{0}> {1}", name, text)
                            }
                        });
                }
            }
            finally
            {
                server.Stop();
            }
        }
    }
}
namespace NotificationAgentSimpleClient
{
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading;
    using AspectCoder.CommunicationOutProc;
    using AspectCoder.CommunicationOutProc.Generated;
    using AspectCoder.CommunicationOutProc.Consumers;
    using Thrift.Transport;

    class SimpleClient
    {
        static void Main(string[] args)
        {
            var address = args.Length == 1 ? args[0] : "net.tcp://127.27.27.27:2727/NotificationAgentServer";

            Uri uri;
            if (!Uri.TryCreate(address, UriKind.RelativeOrAbsolute, out uri)) return;

            var communicatorClient = new LongPoolingNotificationEntityPublishServiceImplementationThrift();
            var consumer = new LongPoolingNotificationPublishEntityConsumerThrift();

            TTransport transport = null;
            Func<LongPoolingNotificationEntityPublishServiceThrift.Client> clientCreator =
              () =>
              {
                  if (transport != null && transport.IsOpen)
                      transport.Close();
                  transport = null;
                  var clientCreated = communicatorClient.CreateClient(uri.Host, uri.Port, out transport);
                  SpinWait.SpinUntil(() => transport.IsOpen, 1000);
                  return clientCreated;
              };

            TTransport transportPublish = null;
            Func<LongPoolingNotificationEntityPublishServiceThrift.Client> clientCreatorPublish =
              () =>
              {
                  if (transportPublish != null && transportPublish.IsOpen)
                      transportPublish.Close();
                  transportPublish = null;
                  var clientCreated = communicatorClient.CreateClient(uri.Host, uri.Port, out transportPublish);
                  SpinWait.SpinUntil(() => transportPublish.IsOpen, 1000);
                  return clientCreated;
              };

            var clientPublish = clientCreatorPublish();

            consumer.Notified += notification => Console.WriteLine(notification.DataString);

            try
            {
                consumer.StartWorking(clientCreator);

                Console.WriteLine("Hello, what is your client name?");
                var name = Console.ReadLine();

                Console.WriteLine("Welcome '{0}' on notification server.", name);
                Console.WriteLine("write 'quit' command to quit.");

                string text = string.Empty;
                while ((text = Console.ReadLine()) != "quit")
                {
                    Publish(
                      new List<LongPoolingNotificationEntityThrift> {
                          new LongPoolingNotificationEntityThrift { DataString = string.Format("{0}> {1}", name, text) }
                      },
                      ref clientPublish,
                      clientCreatorPublish,
                      transportPublish);
                }
            }
            finally
            {
                consumer.StopWorking();
            }
        }

        static void Publish(List<LongPoolingNotificationEntityThrift> data,
          ref LongPoolingNotificationEntityPublishServiceThrift.Client clientP,
          Func<LongPoolingNotificationEntityPublishServiceThrift.Client> clientCreatorP,
          TTransport transportP)
        {
        RETRY:
            try
            {
                clientP.Publish(data);
            }
            catch (Exception ex)
            {
                Thread.Sleep(1000);
                clientP = clientCreatorP();
                goto RETRY;
            }
        }
    }
}

In this article I focused mostly on outside process and inside network communication between agents in internal cloud environment. Good luck with a challenge and see you next time on Aspect Coder Network!

Coder

Leave a Reply

Your email address will not be published. Required fields are marked *

*

This site uses Akismet to reduce spam. Learn how your comment data is processed.