Real-time Data Analytics Engine

Reading Time: 2 minutes

In this System, we are going to process Real-time data or server logs and perform analysis on them using Apache Flink. Instead of using the batch processing system we are using event processing system on a new event trigger.

Whenever a new event occurs, the Flink Streaming Application performs search analysis on the consumed event. Source of data here can be Hadoop, MySql, HTTP logs, proxy logs etc.

The benefits of using Real-time Analysis on the records are:

  • Get insights right away
  • Make decisions based on timely data
  • React without delay

As you can see in the below diagram, whenever a new event is triggered in fink data pipeline, This data is passed to elastic for data processing. Data analyzing is processed in Elastic and result visualised in Kibana.

This is what the application does,

if (reader.ready) {
      val line = reader.readLine
      if (line != null) {
        print(line)
        val ride = KnolxSession.fromString(line)
        // set time of first event
        dataStartTime = ride.sessionDate.getMillis
        // initialize watermarks
        nextWatermark = dataStartTime + watermarkDelayMSecs
        nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark)
        // emit first event
        sourceContext.collectWithTimestamp(ride, ride.sessionDate.getMillis)
      }
    }

Make sure Elastic and Kibana is up and running (version 7.4.0 )

class KnolxSessionUpsert(host: String, port: Int)
      extends ElasticsearchUpsertSink[KnolxSession](
        host,
        port,
        "elasticsearch",
        "knolx-sessions-most-popular",
        "knolx-sessions") {

      override def insertJson(r: (KnolxSession)): Map[String, AnyRef] 
     = {
        Map(
          "session-name" -> r.sessionName.asInstanceOf[AnyRef],
          "audience-count" -> r.audienceCount.asInstanceOf[AnyRef]
        )
      }

      override def updateJson(r: KnolxSession): Map[String, AnyRef] = 
     {
        Map[String, AnyRef](
          "knolx-sessions" -> r.asInstanceOf[AnyRef]
        )
      }
      override def indexKey(r: KnolxSession): String = {
        // index by location
        r.sessionDate.toString
      }
    }


Step 3) Processing records to get the total count of Knolx sessions

val env: StreamExecutionEnvironment = DemoStreamEnvironment.env
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    // Define the data source
    val sessions: DataStream[KnolxSession] = env.addSource(new KnolxPortalSource(
      data, maxServingDelay, servingSpeedFactor))

    val totalKnolxThatAreNotMeetup: DataStream[KnolxSession] = sessions
      .filter(!_.isMeetup)

    totalKnolxThatAreNotMeetup.print()

Step 5) Data is visualization in Kibana 

Here, we are visualizing the popularity count of Knolx Sessions with the help of Kibana notebook.

References:

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading