Batch Processing Aspect in Java

code_puzzleHi, today I made port of my Batch Processing Aspect in Java. It is quite a bit different aspect than I made previously in C#. In C# I made something that is fully asynchronous and in Java I made aspect that does sync invocations from many threads. I have in my mind uses of this aspect to web application or web api or web service implementation that has static controller field for example named “batchProcessingAspect” and invokes changes in database in sync way. However the aspect make processing of the database request and transform them into batches. Most of the solution is a simulation of the multithreading environment that is by nature in mentioned kinds of application where clients requests are “attacking” the aspect. In multi-threading solution designs I like attacking metaphor where all you have to do is making your solution threads-(attacking)-safe.

package net.codingbytodesign;

import java.util.*;
import java.util.concurrent.*;

interface Action<T> {
    public void invoke(T arg);
}

class ActionImpl<T> implements Action<T> {
    Action<T> action;
    public ActionImpl(Action<T> action) {
        this.action = action;
    }
    @Override
    public void invoke(T arg) {
        this.action.invoke(arg);
    }
}

class WaitNotify {
    private Boolean bool = true;
    private final Object lock = new Object();

    public Boolean waitDone(){
        if (bool) {
            synchronized (lock) {
                while (bool) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        return bool;
    }
    public void setDone(){
        synchronized(lock){
            bool = false;
            lock.notify();
        }
    }
}

class BatchProcessingAspectEntity {
    public WaitNotify waitNotify = new WaitNotify();
}

class BatchProcessingAspect<T extends BatchProcessingAspectEntity> {
    Action<List<T>> actionBatch;
    Consumer<T> consumer;

    public BatchProcessingAspect(
            Action<List<T>> actionBatch,
            int maxCount,
            int maxTimeout) {
        this.actionBatch = actionBatch;
        this.consumer = new Consumer<T>(
                this.actionBatch, maxCount, maxTimeout);
        Thread thread = new Thread(this.consumer);
        thread.start();
    }

    class Consumer<T extends BatchProcessingAspectEntity> extends Thread {
        Action<List<T>> actionBatch;
        int maxCount;
        int maxTimeout;
        Date current = new Date(System.currentTimeMillis() + maxTimeout);

        ConcurrentLinkedQueue<T> entities = new ConcurrentLinkedQueue<T>();

        public Consumer(
                Action<List<T>> actionBatch,
                int maxCount,
                int maxTimeout) {
            super("Consumer");
            this.actionBatch = actionBatch;
            this.maxCount = maxCount;
            this.maxTimeout = maxTimeout;
        }

        @Override
        public void run() {
            while (true) {
                while(entities.isEmpty()) {
                    try {
                        Thread.sleep(10);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }

                List<T> list = new ArrayList<T>();

                while (!entities.isEmpty() &&
                        list.size() < maxCount &&
                        new Date(System.currentTimeMillis()).before(current)) {
                    T data = entities.poll();
                    list.add(data);
                }

                if (list.size() > 0) {
                    actionBatch.invoke(list);

                    for (T entity : list) {
                        entity.waitNotify.setDone();
                    }
                }
                current = new Date(System.currentTimeMillis() + maxTimeout);
            }
        }

        void batchInvocation(T entity)
        {
            entities.add(entity);
            entity.waitNotify.waitDone();
        }

        Boolean isEmpty()
        {
            return entities.isEmpty();
        }
    }

    public void batchInvocation(T entity)
    {
        this.consumer.batchInvocation(entity);
    }

    public Boolean isEmpty()
    {
        return this.consumer.isEmpty();
    }
}

class Task extends Thread {
    public boolean running;
    public void run(){
        return;
    }
}

class MultiThreaded {
    public HashSet<Task> tasks = new HashSet<Task>();

    public void startTask(Task task) {
        task.start();
        tasks.add(task);
    }

    public Task getTask(int id) {
        for(Task task : tasks){
            if(task.getId() == id)
                return task;
        }
        return null;
    }

    public boolean processing() {

        boolean processing = false;

        for(Task task: tasks){
            if(task.isAlive())
                processing = true;
        }
        return processing;
    }
}

class Entity extends BatchProcessingAspectEntity {
    public int value = 0;
    public Entity(int value) {
        this.value = value;
    }
    public void print() {
        System.out.println(value);
    }
}

public class MainTester {
    static void processEntity(List<Entity> entiies) {
        System.out.println("Entities Count: " + entiies.size());
    }

    static BatchProcessingAspect<Entity> batchProcessingAspect
            = new BatchProcessingAspect<Entity>(
            new ActionImpl<List<Entity>>(
                    entities -> processEntity(entities)),
            20,
            200
    );

    public static void main(String[] args) {
        List<Entity> values = new ArrayList<Entity>();

        for(int i = 0; i < 1000; ++i)
            values.add(new Entity(i));

        int block = 2;
        int cursor = 0;

        List<Entity> objectList = new ArrayList<Entity>();

        MultiThreaded multiThreaded = new MultiThreaded();

        for(Entity v : values) {
            cursor++;
            objectList.add(v);

            if(cursor < block)
                continue;

            final List<Entity> objectListCopy = new ArrayList<Entity>();
            objectListCopy.addAll(objectList);

            Task task = new Task() {
                @Override
                public void run() {
                    for(Entity entity : objectListCopy)
                        batchProcessingAspect.batchInvocation(entity);
                }
            };

            multiThreaded.startTask(task);
            objectList.clear();

            cursor = 0;
        }

        while (multiThreaded.processing() || !batchProcessingAspect.isEmpty()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }

        System.out.println("move on");
    }
}

Thanks for reading!

p ;).

Leave a Reply

Your email address will not be published. Required fields are marked *

*