Christmas Tweets Sentiment Analysis with Spark on ElasticSearch and Kibana

christmas tweets

Hi everyone, in my first post in this blog I’ll show you how I’m trying to get more experience with Scala and Apache Spark in the last few months.

Apache Spark, as many of you will know, is a fast engine for big data processing written in Scala that in the last couple of years has been used by almost every company that had to process large-scale data. More info here

You can use Spark with Scala, Java or Python. I preferred Scala because I’m still studying it and it was a good excuse to learn (Scala is also the recommended language by Databricks). Anyway if you’re Java programmers, as I am, and don’t want to learn Scala, I recommend using Java 8, the functional programming approach will help you a lot. Otherwise you’ll be swamped in boilerplate code and you’re going to throw your PC out of the window.

Ok, let’s get to the point of the post: gathering tweets about Christmas with Spark Streaming library and use Spark SQL and Spark Core NLP to analyze the sentiment.

To gather tweets, you’ll need to create a Twitter Application on https://dev.twitter.com/, get the credentials and set them in your program as environment variables, so something like this:

  def setupTwitter() = {

    for (line <- Source.fromFile("../twitter.txt").getLines) {
      val fields = line.split(" ")
      if (fields.length == 2) {
        System.setProperty("twitter4j.oauth." + fields(0), fields(1))
      }
    }
  }

Now we can access the Twitter stream and filter it to get only Christmas related tweets:

val tweetStream = TwitterUtils.createStream(ssc, None)
		.filter { filterTweets }
		.map(gson.toJson(_))
		.foreachRDD((rdd, time) => {
			val count = rdd.count()
			if (count > 0) {
				val outputRDD = rdd.repartition(partitionsEachInterval)
				outputRDD.saveAsTextFile(outputDirectory + "/tweets_" + time.milliseconds.toString)
				numTweetsCollected += count
				if (numTweetsCollected > numTweetsToCollect) {
					System.exit(0)
				}
			}
    })

As you can see in the snippet above what we do is:

  • Access Twitter stream using the Spark Stream Context (ssc)
  • Filter the stream to find only Christmas related tweets. The filterTweets function just returns true if the tweet text contains words like “christmas”, “xmas” and so on.
  • Convert every Tweet object to JSON using Google Gson library
  • Save a maximum of numTweetsToCollect to text file

Now we can use Spark SQL to read these JSON files and analyze them with Spark Core NLP.

Spark Core NLP  is a wrapper for Spark SQL around the famous Stanford CoreNLP annotators:

val tweets=spark.read.json(fromDir+"/*")
      .select("text","user","geolocation","place","lang").where("lang == 'en'").where("geolocation != null")
      .select(col("text"),col("user"),getGeoPointUDF(col("geolocation")) as "location",col("place"),explode(ssplit(removeUrlUDF(col("text")))) as "sentences",col("lang"))
      .select(col("text"),col("user"),col("location"),col("place"),col("sentences"),col("lang"),sentiment(col("sentences")) as "sentiment",tokenize(col("sentences")) as "words",pos(col("sentences")) as "pos",lemma(col("sentences")) as "lemmas",ner(col("sentences")) as "nerTags")

This snippet reads JSONs from all files in “fromDir” creating a Dataset, then takes only text,user and location of all English geolocated tweets. The last select extracts features using Core NLP for later use and analyzes the sentiment of the sentences.

The removeUrlUDF function is a User-Defined Function to clean the tweet text, while the getGeoPointUDF function parses latitude and longitude as passed from Twitter and rearranges them in a String to create a GeoPoint in ElasticSearch.

 def removeUrl(commentstr:String) : String ={
    val urlPattern = "((https?|ftp|gopher|telnet|file|Unsure|http):((//)|(\\\\))+[\\w\\d:#@%/;$()~_?\\+-=\\\\\\.&]*)".r
    urlPattern.replaceAllIn(commentstr.toLowerCase(),"").trim()     
  }
  
  def getGeoPoint(geo:Row): String ={
    val lon=geo.get(1).toString()
    val lat=geo.get(0).toString()
    lat+","+lon
}

val removeUrlUDF= udf((s:String) => removeUrl(s))
val getGeoPointUDF= udf((geo:Row) => getGeoPoint(geo))

After this we just store the Dataset in a new ElasticSearch index:

tweets.saveToEs("twitter-sentiment-xmas/tweet")

 

Of course you don’t have to save to text file first and then read it again to analyze the JSON, this is just the way I did it because I was studying along the way. You can easily store directly to ElasticSearch after the analysis is done as you can see in the ElasticSearch documentation

On my Github you’ll find methods to store also on Amazon S3 and ElasticSearch itself directly!

 

Now that we created a new ElasticSearch index we can log into Kibana to use this data for further analysis.

I created a few visualizations to get for example:

  • The country that loves Christmas the most
  • The country with the highest number of tweets
  • A Map visualization of the tweets

and so on..