A few ideas on how to write twitter feeds and statistics about the most frequent words entered in the last few minutes

A few weeks ago I was browsing the Internet and while looking over the twitter page I started thinking what does it involve to build a twitter event feed on my own. Naively it just looks like the tweets are imported in a few big database tables (tweets, users) and they get distributed to the users of the website/twitter mobile clients.

So I’ve started to work on building the website and I’ve envisioned a few independent modules (building bricks) for the system :

– a spring boot standalone application dedicated to import real tweets from Twitter via a twitter stream into the Postgres tables

– a spring boot REST service application used to expose the tweets that were imported on the database

– a spring MVC web application which can be used to display the tweets with an infinite scrolling ability (like Twitter page does also)

While working on building the website I thought that it would be interesting to build some kind of statistic on top of the imported tweets. So I’ve decided to add the functionality of displaying a statistic of the popular words gathered in the last 5 minutes. For implementing this feature I’ve thought of using redis sorted sets functionality for having structuring the word statistics. For displaying the statistics, I found the d3js pack layout to express easily in a visual fashion this statistic.

Below is the current outcome of the functionalities exposed for the website.

twitter-feed

words-statistics

The source code for the site is available on Github :

https://github.com/mariusneo/twitter-feed

In the following part of this post I’ll describe some of the things that I find relevant to think of while building a similar functionality :

Tweets importer application

This application depends heavily on twitter4j library and does simply open a Twitter stream on a set of given topics (cities from the german speaking countries in this case) and each of the tweets gets imported into the Postgres database :

        // ...
        StatusListener listener = new StatusListener() {
            public void onStatus(Status status) {
                tweetImporterService.importTweet(status);
            }
            // ....
        };

        TwitterStream twitterStream = new TwitterStreamFactory().getInstance();
        twitterStream.setOAuthConsumer(consumerKey, consumerSecret);
        twitterStream.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret));
        twitterStream.addListener(listener);

        FilterQuery capitalsQuery = new FilterQuery();
        capitalsQuery.track(new String[]{"Wien", "wien", "Berlin", "berlin", "Bern", "bern", "Vaduz", "vaduz"});
        twitterStream.filter(capitalsQuery);

One important thing to note here is that the import operation of the tweets happens into the databse should happen asynchronously in order to be able to process in parallel the incoming tweets:

    @Async
    @Transactional
    public void importTweet(Status status){
         // ... 
    }

 Service application

Spring boot and Jersey make it very easily nowadays to expose REST webservice functionality in java. One of the problems that arise when dealing with feeds of data is that their pagination.

The classic listing of items in a table multiple pages, specific for business web applications, can’t be used effectively for Twitter, because there are constantly new tweets added to the start of the event feed. When changing the page from the first to the second and then, after a few seconds, again back to the first page the user would be confused because the content of the first page changed (because new tweets arrived). For this reason, an infinite feed fits much better for displaying the flow of data.

The details on how the API is to be built are described with nice examples by the Twitter engineers on this page :

https://dev.twitter.com/rest/public/timelines

On the case of my service application, I’ve implemented service methods that use the following parameters:

– since_id : for retrieving the tweets that occured  since the tweet with this id

– max_id : for retrieving the previous tweets that were entered before the tweet with this id

See the implementation of the service here :

https://github.com/mariusneo/twitter-feed/blob/master/service/src/main/java/mg/twitter/feed/service/resource/TweetsResource.java

In order to provide efficient methods for retrieving content with very little duplication in the feed, the addition of a method that combines since_id and max_id parameters would make sense.

Web application

The web application is a classic MVC application with separation between the view, I’ve used Thymeleaf, and the controller.

Most of the logic of displaying new tweets or retrieving the previous tweets from the flow are done within Javascript with jQuery routines and AJAX JSONP calls made directly to the service application.

Relevant source code :

https://github.com/mariusneo/twitter-feed/blob/master/web/src/main/resources/static/js/feed.js

One TODO which I still have on my list is the consistent retrieval of the tweets that can occur when having to deal with more tweets than the defined bucket limit.

The solution for this issue is covered in the explanations for the twitter timeline API :

Word statistics functionality

Every time a new tweet gets imported into the database, a database trigger will take care of creating into a separate table an entry for having the tweet words processed.

CREATE OR REPLACE FUNCTION insert_tweet_into_count_words_tweets()
RETURNS trigger AS
$BODY$
BEGIN

INSERT INTO count_words_tweets(tweet_id,created_at, updated_at, status)
VALUES(NEW.id,now(),now(),'INITIAL');

RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;

Each tweet will be processed by a quartz job running every second which splits the tweet into words and sends the words towards the corresponding redis set and into the redis sorted set containing the total word statistic.

The site displays currently the words from the last 5 minutes, so I’ve decided to keep 6 sorted set buckets and to fill them in a circular fashion.

Another quartz job will kick in at the beginning of each minute and will subtract from the totals the content of the obsolete sorted set bucket and subsequently it will clear the content of this obsolete bucket. At the beginning of the next minute the bucket which was in the previous minute cleared will be filled with the words from the tweets corresponding to the current minute. These operations are executed in a circular fashion ensuring in this way that the content of words.totals sorted set contains always only the sorted statistic of the words introduced in the last five minutes.

count_words_workflow

Note in the image above that the words in the sorted sets are sorted by their values (in this case the number of times they were used within the tweets created within a specified minute).

The implementation details for this functionality can be found here :

https://github.com/mariusneo/twitter-feed/tree/master/jobs/count-words-job

Interraction with Redis is done by using JedisPool resources offered by the library jedis

@Configuration
public class JedisConfig {
    @Value("${redis.datasource.url}")
    private String redisUrl;

    @Bean
    public JedisPool jedisPool() {
        return new JedisPool(redisUrl);
    }
}


// usage of the Jedis resources
Jedis jedis = jedisPool.getResource();
try {
    // ... jedis stuff
} finally {
    /// ... it's important to return the Jedis instance to the pool once you've finished using it
    jedisPool.returnResource(jedis);
}

Getting at any time the statistic of the most popular 50 words so that they get displayed is very easily achieved in the following fashion :

        Jedis jedis = jedisPool.getResource();
        try {
            Set<Tuple> range = jedis.zrevrangeWithScores("words.totals", 0, 50);

            Map<String, Integer> result = new LinkedHashMap<>();
            range.stream().forEach(t -> result.put(t.getElement(), (int) t.getScore()));

            return result;
        } finally {
            /// ... it's important to return the Jedis instance to the pool once you've finished using it
            jedisPool.returnResource(jedis);
            stopWatch.stop();
        }

Although far from fully functional, this application can be used as a tutorial application on building a website with a greater complexity than the one we come across when reading a general technology tutorial.

MongoDb 3.0 Replica Set for development and testing

I’ve been yesterday at a MongoDb meetup where Norberto Leite made a pretty interesting introduction concerning the new features introduced in the version 3.0 of MongoDb.

The presentation that was used in the meetup can be seen here.

One of the major changes in the new release is represented by the introduction of modular storage engines. If until the version 2.6 the storage engine of choice was MMapV1 (comes from Memory Map), with this new release, the storage engine Wired Tiger was introduced.

The new storage engine provides document level locking, compared to the MMapV1 storage engine which provides collection level locking. To make things clear, when there is a write operation on the collection, and collection level locking is in place, no other process can read or write during this time.

Until now, MongoDb was offering a warrantly that a document is either written successfully or it is not written. With the introduction of Wired Tiger storage engine this aspect gets relaxed a bit because the documents are stored in memory and are flushed at a specified interval (1 min by default) or when they depass a certain tresshold (2 GB). The writing operation to a journal file is not done for this storage engine, but if database finds itself in a replica set there is a high chance that the data gets copied right away to other neighbours from the replica set.

Since I haven’t used before a replica set in Mongo or Postgres in my previous activity I found it very interesting to know that a replica set can be installed without too much effort on a development machine.

Below I’ll reproduce a diagram of the replica set presented during the meetup

mongo-replica-set

And bellow are the scripts required to create on a development machine the replica set exposed in the picture above :

mongodb-linux-x86_64-3.0.0-rc8/bin/mongod --port 30001 --fork --logpath mongo1-mmapv1/mongodb.log --dbpath mongo1-mmapv1/ --replSet "RSET"

mongodb-linux-x86_64-3.0.0-rc8/bin/mongod –port 30002 –fork –logpath mongo2-wt/mongodb.log –dbpath mongo2-wt/ –storageEngine=wiredTiger –replSet “RSET”

mongodb-linux-x86_64-3.0.0-rc8/bin/mongod –port 30003 –fork –logpath mongo3-wt/mongodb.log –dbpath mongo3-wt/ –storageEngine=wiredTiger –wiredTigerJournalCompressor=zlib –replSet “RSET”

I’ve created the directories :

– mongo1-mmapv1

– mongo2-wt

– mongo3-wt

in which the data stored by the mongo instances will be stored.

Notice that for mongo1-mmapv1 instance there is no storage engine specified. This is because the storage engine is still by default mmapv1.

Through the parameter replSet we can specify the name of the replica set which is the same of all instances that are part of it.

Once this is done, the replica needs to be initiated and configured on the master (in our case mongo-mmapv1 instance). More details about these operations can be found on the Mongo documentation page


# be sure to replace myhostname with the name of your computer
$ mongo myhostname:30001

> rs.initiate()
> rs.add(“myhostname:30002”)
> rs.add(“myhostname:30003”)
> rs.conf()

> rs.conf()
RSET:PRIMARY> rs.status()
{
“set” : “RSET”,
“date” : ISODate(“2015-02-06T16:39:10.203Z”),
“myState” : 1,
“members” : [
{
“_id” : 0,
“name” : “myhostname:30001”,
“health” : 1,
“state” : 1,
“stateStr” : “PRIMARY”,
“uptime” : 4135,
“optime” : Timestamp(1423237503, 1134),
“optimeDate” : ISODate(“2015-02-06T15:45:03Z”),
“electionTime” : Timestamp(1423236911, 2),
“electionDate” : ISODate(“2015-02-06T15:35:11Z”),
“configVersion” : 3,
“self” : true
},
{
“_id” : 1,
“name” : “myhostname:30002”,
“health” : 1,
“state” : 2,
“stateStr” : “SECONDARY”,
“uptime” : 2467,
“optime” : Timestamp(1423237503, 1134),
“optimeDate” : ISODate(“2015-02-06T15:45:03Z”),
“lastHeartbeat” : ISODate(“2015-02-06T16:39:09.236Z”),
“lastHeartbeatRecv” : ISODate(“2015-02-06T16:39:09.285Z”),
“pingMs” : 0,
“configVersion” : 3
},
{
“_id” : 2,
“name” : “myhostname:30003”,
“health” : 1,
“state” : 2,
“stateStr” : “SECONDARY”,
“uptime” : 3399,
“optime” : Timestamp(1423237503, 1134),
“optimeDate” : ISODate(“2015-02-06T15:45:03Z”),
“lastHeartbeat” : ISODate(“2015-02-06T16:39:08.818Z”),
“lastHeartbeatRecv” : ISODate(“2015-02-06T16:39:09.007Z”),
“pingMs” : 0,
“syncingTo” : “neox:30001”,
“configVersion” : 3
}
],
“ok” : 1
}

 

That’s about it for setting up the replica set on your development machine.
To make some experiments with it you can import some data into the master database :

$ mongoimport --port 30001 --db tweets --collection tweets < tweets.js

or in a slave database, but make sure to specify the replica set in this case :

$ mongoimport --host RSET/myhostname --port 30003 --db tweets --collection tweets < tweets.js

After the import operation you can check whether the instances from the replica set are synchronized in the following manner :

$ mongo myhostname:30001

> use tweets
> db.stats()

RSET:PRIMARY> db.stats()
{
“db” : “tweets”,
“collections” : 3,
“objects” : 3790,
“avgObjSize” : 252.8,
“dataSize” : 958112,
“storageSize” : 2805760,
“numExtents” : 7,
“indexes” : 1,
“indexSize” : 130816,
“fileSize” : 67108864,
“nsSizeMB” : 16,
“extentFreeList” : {
“num” : 0,
“totalSize” : 0
},
“dataFileVersion” : {
“major” : 4,
“minor” : 22
},
“ok” : 1
}

$ mongo “myhostname:30002”
> db.stats()

RSET:SECONDARY> db.stats()
{
“db” : “tweets”,
“collections” : 1,
“objects” : 3786,
“avgObjSize” : 204.07527733755944,
“dataSize” : 772629,
“storageSize” : 491520,
“numExtents” : 0,
“indexes” : 1,
“indexSize” : 53248,
“ok” : 1
}

 

Notice that the dataSize is visibly smaller between Wired Tiger and MMapV1 storage engines. It is even smaller on the instance doing the zlib compression.

When trying to read from a slave, you will have to enable this operation


$ mongo "myhostname:30002"
RSET:SECONDARY> use tweets
switched to db tweets
RSET:SECONDARY> db.tweets.count()
2015-02-06T18:01:25.532+0100 count failed: { "note" : "from execCommand", "ok" : 0, "errmsg" : "not master" } at src/mongo/shell/query.js:191
RSET:SECONDARY> rs.slaveOk()
RSET:SECONDARY> db.tweets.count()
3786

Combinations of how to arrange blocks on a row

Recently I came over an algorithmic problem dealing with arranging blocks of different sizes on a row composed of multiple cells.

One constraint that was specified on the problem was to separate two blocks having the same colour with at least one blank space between them.

To make the problem statement easier to grasp I’ll add some visual examples :

– one row with 5 cells capacity containing one block of 3 cells

1b-0-2   0 spaces | block 3 cells | 2 spaces

1b-1-1    1 space | block 3 cells | 1 space

1b-2-0    2 spaces | block 3 cells | 0 spaces

– one row with 5 cells capacity containing two blocks – first having 2 cells and the second having 1 cell

2b-0-1-1

2b-0-2-0

2b-1-1-0

As it can be seen from the visual representations above, the problem is about finding the combinations of blank cell lengths under which the blocks can be arranged on the row.

in the first example we have to find the combinations under which for the two possible locations (before and after the block) white spaces can be filled :

– 0 2

– 1 1

– 2 0

The count of white spaces to be filled is given by count(row cells) – sum(block cells).

The Java code for solving this problem is attached below :

public class Puzzles {
    public void sumSubsets(int target, int elementIndex, int[] partial, List solutions) {
        int partialSum = 0;
        for (int i = 0;i&lt;elementIndex;i++){
            partialSum+=partial[i];
        }

        if (partialSum &lt;= target) {
            if (partial.length == elementIndex + 1) {
                partial[partial.length - 1] = (target - partialSum);
                solutions.add(Arrays.copyOf(partial, partial.length));
            } else {
                for (int i = 0; i  0) continue;
                    partial[elementIndex] = i;

                    sumSubsets(target, elementIndex+1, partial, solutions);
                }
            }
        }
    }


    @Test
    public void testSumSubsets() {
        List solutions = new ArrayList();

        Puzzles puzzles = new Puzzles();
        puzzles.sumSubsets(2, 0, new int[2], solutions);

        for (int[] solution : solutions) {
            for (int e : solution) {
                System.out.print(e + " ");
            }
            System.out.println();
        }
    }
}

This is just a tiny piece of algorithmics that can be used for solving the Puzzles problem described here :

https://github.com/mariusneo/algorithmics/tree/master/catcoder/src/main/resources/org/mg/catcoder/puzzles

After successfully solving the problem, your algorithm will be able to solve puzzles like the following :

puzzles

The implementation for the problem can be found here :

https://github.com/mariusneo/algorithmics/blob/master/catcoder/src/main/java/org/mg/catcoder/puzzles/Puzzles.java

Notes about concurrency in Java

From “Thinking in Java” – Bruce Eckel

Thread states

A thread can be in any one of four states:

  • New: The thread object has been created, but it hasn’t been started yet, so it cannot run.
  • Runnable: This means that a thread can be run when the time-slicing mechanism has CPU cycles available for the thread. 
  • Thus, the thread might or might not be running at any moment, but there’s nothing to prevent it from being run if the scheduler can arrange it; it’s not dead or blocked.
  • Dead: The normal way for a thread to die is by returning from its run( ) method. Before it was deprecated in Java 2, you could also call stop( ), but this could easily put your program into an unstable state. There’s also a destroy( ) method (which has never been implemented, and probably never will be, so it’s effectively deprecated). You’ll learn about an alternative way to code a stop( ) equivalent later in the chapter.
  • Blocked: The thread could be run, but there’s something that prevents it. 
  • While a thread is in the blocked state, the scheduler will simply skip over it and not give it any CPU time. Until a thread reenters the runnable state, it won’t perform any operations.

 

 

Pipes

 

It’s often useful for threads to communicate with each other by using I/O. Threading libraries may provide support for inter-thread I/O in the form of pipes.
These exist in the Java I/O library as the classes PipedWriter (which allows a thread to write into a pipe) and PipedReader (which allows a different thread to read from the same pipe).
This can be thought of as a variation of the producer-consumer problem, where the pipe is the canned solution.

 

Becoming blocked

When a thread is blocked, there’s some reason that it cannot continue running. A thread can become blocked for the following reasons: 

  • You’ve put the thread to sleep by calling sleep(milliseconds), in which case it will not be run for the specified time. 
  • You’ve suspended the execution of the thread with wait( ). It will not become runnable again until the thread gets the notify( ) or notifyAll( ) message. We’ll examine these in the next section. 
  • The thread is waiting for some I/O to complete. 
  • The thread is trying to call a synchronized method on another object, and that object’s lock is not available. 

 

In fact, the only place you can call wait( ), notify( ), or notifyAll( ) is within a synchronized method or block (sleep( ) can be called within non-synchronized methods since it doesn’t manipulate the lock). 

 

Deadlocks

Deadlock can occur if four conditions are simultaneously met: 

  1. Mutual exclusion: At least one resource used by the threads must not be shareable. In this case, a chopstick can be used by only one philosopher at a time. 
  2. At least one process must be holding a resource and waiting to acquire a resource currently held by another process. That is, for deadlock to occur, a philosopher must be holding one chopstick and waiting for the other one. 
  3. A resource cannot be preemptively taken away from a process. All processes must only release resources as a normal event. Our philosophers are polite and they don’t grab chopsticks from other philosophers.
  4. A circular wait must happen, whereby a process waits on a resource held by another process, which in turn is waiting on a resource held by another process, and so on, until one of the processes is waiting on a resource held by the first process, thus gridlocking everything. In this example, the circular wait happens because each philosopher tries to get the left chopstick first and then the right. In the preceding example, the deadlock is broken by swapping the initialization order in the constructor for the last philosopher, causing that last philosopher to actually get the right chopstick first, then the left. 

 

From Tutorial Concurrrency – Vogella 

http://www.vogella.com/articles/JavaConcurrency/article.html

– Callable/Future

     – http://www.vogella.com/articles/JavaConcurrency/article.html#futures

     – special construct which gives the possibility to the caller of the thread to get synchronously a result (when Future.get() is called, the calling thread will wait, if necessary, until the execution is complete, and the will retrieve the result).

– ForkJoin (divide et impera via threads)

     – http://www.vogella.com/articles/JavaConcurrency/article.html#forkjoin

     – a kind of Divide et impera for threads, introduced in Java 7, which allows the programmer to distribute a complex task to several workers and then wait for the result (see JSR 166 for details).

 

From Concurrency tutorial Oracle

http://docs.oracle.com/javase/tutorial/essential/concurrency

Note that constructors cannot be synchronized — using the synchronized keyword with a constructor is a syntax error.
Synchronizing constructors doesn’t make sense, because only the thread that creates an object should have access to it while it is being constructed.

Warning: When constructing an object that will be shared between threads, be very careful that a reference to the object does not “leak” prematurely.
For example, suppose you want to maintain a List called instances containing every instance of class.
You might be tempted to add the following line to your constructor:

instances.add(this);

But then other threads can use instances to access the object before construction of the object is complete.

 

Intrinsic Locks and Synchronization

Synchronization is built around an internal entity known as the intrinsic lock or monitor lock.(The API specification often refers to this entity simply as a “monitor.”)
A thread is said to own the intrinsic lock between the time it has acquired the lock and released the lock.

Synchronized statements are also useful for improving concurrency with fine-grained synchronization.

 

Reentrant synchronization

Recall that a thread cannot acquire a lock owned by another thread. But a thread can acquire a lock that it already owns.
Allowing a thread to acquire the same lock more than once enables reentrant synchronization.
This describes a situation where synchronized code, directly or indirectly, invokes a method that also contains synchronized code,
and both sets of code use the same lock. Without reentrant synchronization, synchronized code would have to take many additional precautions  to avoid having a thread cause itself to block.

 

Starvation

Starvation describes a situation where a thread is unable to gain regular access to shared resources and is unable to make progress.

Livelock

A thread often acts in response to the action of another thread. If the other thread’s action is also a response to the action of another thread, then livelock may result. 

 

Immutable objects

An object is considered immutable if its state cannot change after it is constructed.
Maximum reliance on immutable objects is widely accepted as a sound strategy for creating simple, reliable code.

A Strategy for defining immutable objects

  1. Don’t provide “setter” methods — methods that modify fields or objects referred to by fields.
  2. Make all fields final and private.
  3. Don’t allow subclasses to override methods. The simplest way to do this is to declare the class as final. A more sophisticated approach is to make the constructor private and construct instances in factory methods.
  4. If the instance fields include references to mutable objects, don’t allow those objects to be changed:Don’t provide methods that modify the mutable objects.Don’t share references to the mutable objects. Never store references to external, mutable objects passed to the constructor; if necessary, create copies, and store references to the copies. Similarly, create copies of your internal mutable objects when necessary to avoid  returning the originals in your methods.

High level concurrency objects

ReentrantLock

  • tryLock : method backs out if the lock is not available immediately 
  • lockInterruptibly : acquires the lock if it is available and returns immediately.

Executor interfaces

  • Executor, a simple interface that supports launching new tasks.
  • ExecutorService, a subinterface of Executor, which adds features that help manage the lifecycle, both of the individual tasks and of the executor itself.
  • ScheduledExecutorService, a subinterface of ExecutorService, supports future and/or periodic execution of tasks.

First blog

The purpose of this blog is to have a better overview on what I will be doing in the future in programming, algorithmics and web design.

 

I hope there will be a steady flow of information coming.