/ Programming

Moving data in Java. 3 ways of solving my data problem. Part 2

So the code is complete! (For Now). I finally settled on one way of moving the data into Mongo DB.

There are 4 main parts of the code:

Should I move the data to MongoDB?

Ok I want to move it, how should I?

Holding the data until it's ready to move to MongoDB

Actually moving the data to MongoDB


Should I move the data to MongoDB?

This section is going to deal with deciding which data we should move to MongoDB.

We are mainly storing 3 different sets of data (for now).

  • Sakai Events
  • Sakai Sessions
  • Sakai Sites

In order to decide whether we should move that data into Mongo, I have a field in our configuration files. That field is just simply a boolean that is true if we want to store it or false if we don't. I then use that boolean in the code.

private boolean logEvents = ServerConfigurationService.getBoolean("logEvents", false);
private boolean logSessions = ServerConfigurationService.getBoolean("logSessions", false);

Ok I want to move it, how should I?

This one is just a simple logic check then adding the object to a queue.

This example is for inserting a SakaiEvent into Mongo. I 'catch' the original method and then create a new one which calls the original first and then adds the Event to the queue.

    private void catchEvent(Event ee) {
            postEvent(ee);        
            if (canLogEvents) {               
                    man.addToQueue(new SakaiEvent(ee));
            }
    }

Holding the data until it's ready to move to MongoDB

Now here is where we get to the fun part. Once we get to this part the process is this:

  • Attempt to add it to a queue
  • If the queue is greater than half its threshold && our AtomicBoolean is false then dump it to mongo.
  • If not, then actually add it.

My boss and I worked on this bit to make sure that there were no dead locks.

The Code

    public void addToQueue(Object item) {

        if (item instanceof SakaiEvent) {
            if ((eventQueue.remainingCapacity() < eventsThreshold / 2) && (atomicEvent.compareAndSet(false, true))) {
                dumpQueue(eventQueue);
            }
            if (!(((SakaiEvent) item).getEventResource().contains("lbtest"))) {
                eventQueue.offer((SakaiEvent) item);
            }

        } else if (item instanceof SakaiSession) {
            if ((sessionQueue.remainingCapacity() < sessionsThreshold / 2) && (atomicSession.compareAndSet(false, true))) {
                dumpQueue(sessionQueue);
            }
            sessionQueue.offer((SakaiSession) item);

        }
    }

Actually moving the data to MongoDB

This is the part of the code that actually sends it to the Mongo database. It's threaded which means that it is ran inside another thread so that the program does not have to wait for it to finish.

What this does is to loop through each queue using an iterator and then adds the items to an instance of MongoDB's BulkWriteOperation. It then inserts all of the documents in it in parrallel to the Mongo database. Since we don't care about their order in the database, we can insert all of them concurrently.

One part of the code that is probably going to change is the following snippet.

bulkSession.insert(((SakaiSession) ittyTheIterator.next()).convertToDBObject());

I just found out that Mongo's API allows you to extend the DBObject class. I'm going to be looking into this to cut down on the casting and to make the code a little cleaner. This will essentially make each instance of a SakaiEvent, SakaiSession or SakaiSite a DBObject which can be directly written to a MongoDB collection.

    public void run() {
            Object item = itemQueue.peek();
            Iterator ittyTheIterator = itemQueue.iterator();

            if (item instanceof SakaiEvent) {
                BulkWriteOperation bulkEvent = eventsCollection.initializeUnorderedBulkOperation();
                while (ittyTheIterator.hasNext()) {
                    bulkEvent.insert(((SakaiEvent) ittyTheIterator.next()).convertToDBObject());
                }
                eventQueue.clear();
                atomicEvent.set(false);
                bulkEvent.execute();
            }
            if (item instanceof SakaiSession) {
                BulkWriteOperation bulkSession = sessionsCollection.initializeUnorderedBulkOperation();
                while (ittyTheIterator.hasNext()) {
                    bulkSession.insert(((SakaiSession) ittyTheIterator.next()).convertToDBObject());
                }
                sessionQueue.clear();
                atomicSession.set(false);
                bulkSession.execute();
            }
    }

Conclusion

We've been running this code for a few weeks now and haven't seen any problems. The CPU/RAM/Network load on each of the production machines & the database machine have been perfectly fine and have had expected changes. Just like every other programming project, this code will be improved or completely forgotten about over time.

Moving data in Java. 3 ways of solving my data problem. Part 2
Share this

Subscribe to Wills Thoughts