In this System, we are going to process Real-time data or server logs and perform analysis on them using Apache Flink. Instead of using the batch processing system we are using event processing system on a new event trigger.
Whenever a new event occurs, the Flink Streaming Application performs search analysis on the consumed event. Source of data here can be Hadoop, MySql, HTTP logs, proxy logs etc.
The benefits of using Real-time Analysis on the records are:
- Get insights right away
- Make decisions based on timely data
- React without delay
As you can see in the below diagram, whenever a new event is triggered in fink data pipeline, This data is passed to elastic for data processing. Data analyzing is processed in Elastic and result visualised in Kibana.
This is what the application does,

Step 1) Getting logs in the Flink Streaming Application
if (reader.ready) {
val line = reader.readLine
if (line != null) {
print(line)
val ride = KnolxSession.fromString(line)
// set time of first event
dataStartTime = ride.sessionDate.getMillis
// initialize watermarks
nextWatermark = dataStartTime + watermarkDelayMSecs
nextWatermarkServingTime = toServingTime(servingStartTime, dataStartTime, nextWatermark)
// emit first event
sourceContext.collectWithTimestamp(ride, ride.sessionDate.getMillis)
}
}
Step 2) Setting up Flink and Elastic operations
Make sure Elastic and Kibana is up and running (version 7.4.0 )
class KnolxSessionUpsert(host: String, port: Int)
extends ElasticsearchUpsertSink[KnolxSession](
host,
port,
"elasticsearch",
"knolx-sessions-most-popular",
"knolx-sessions") {
override def insertJson(r: (KnolxSession)): Map[String, AnyRef]
= {
Map(
"session-name" -> r.sessionName.asInstanceOf[AnyRef],
"audience-count" -> r.audienceCount.asInstanceOf[AnyRef]
)
}
override def updateJson(r: KnolxSession): Map[String, AnyRef] =
{
Map[String, AnyRef](
"knolx-sessions" -> r.asInstanceOf[AnyRef]
)
}
override def indexKey(r: KnolxSession): String = {
// index by location
r.sessionDate.toString
}
}
Step 3) Processing records to get the total count of Knolx sessions
val env: StreamExecutionEnvironment = DemoStreamEnvironment.env
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// Define the data source
val sessions: DataStream[KnolxSession] = env.addSource(new KnolxPortalSource(
data, maxServingDelay, servingSpeedFactor))
val totalKnolxThatAreNotMeetup: DataStream[KnolxSession] = sessions
.filter(!_.isMeetup)
totalKnolxThatAreNotMeetup.print()
Step 5) Data is visualization in Kibana
Here, we are visualizing the popularity count of Knolx Sessions with the help of Kibana notebook.
References:
- https://ci.apache.org/projects/flink/flink-docs-stable/dev/datastream_api.html
- https://www.elastic.co/guide/en/kibana/current/index.html