Reading Avro files using Apache Flink

Table of contents
Reading Time: 2 minutes

In this blog, we will see how to read the Avro files using Flink.

Before reading the files, let’s get an overview of Flink.

There are two types of processing – batch and real-time.

  • Batch Processing: Processing based on the data collected over time.
  • Real-time Processing: Processing based on immediate data for an instant result.

Real-time processing is in demand and Apache Flink is the real-time processing tool.

Some of the flink features include:

  • Fast speed
  • Support for scala and java
  • Low-latency
  • Fault-tolerance
  • Scalability

Let’s get started.

Step 1:

Add the required dependencies in build.sbt:

name := "flink-demo"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(

"org.apache.flink" %% "flink-scala" % "1.10.0",

"org.apache.flink" % "flink-avro" % "1.10.0",

"org.apache.flink" %% "flink-streaming-scala" % "1.10.0"

)

Step 2:

The next step is to create a pointer to the environment on which this program runs. In spark, it is similar to spark context.

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

Step 3:

Setting parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instance.

I am using 1 as it is a demo application.

env.setParallelism(1)

Step 4:

Defining the Input Format.

public AvroInputFormat(Path filePath, Class type)

It takes two parameters. The first one is the path to Avro file and the second one is the Class type. We will be reading the file as Generic Record.

Later if we want we can cast it to specific type using case classes.

val avroInputFormat = new AvroInputFormat[GenericRecord](new org.apache.flink.core.fs.Path("path to avro file"), classOf[GenericRecord])

Step 5:

Creating input data stream with a specific input format.

def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): DataStream[T]

The createInput function takes the input format as a paramter, we will send avroInputFormat to it. It also requires the TypeInformation.

implicit val typeInfo: TypeInformation[GenericRecord] = TypeInformation.of(classOf[GenericRecord])

val recordStream: scala.DataStream[GenericRecord] = env.createInput(avroInputFormat)

Step 6:

Let’s print the data that we will read from Avro files. The print function will act as a sink.

recordStream.print()

Step 7:

Streams are lazy. Let’s now trigger the program execution using execute.

env.execute(jobName = "flink-avro-demo")

To download the complete code, visit flink-avro-demo

Thanks for reading.

Conclusion:
I hope after reading this blog, you will be able to understand how we read the Avro files using Flink.

Written by 

Jyoti Sachdeva is a software consultant with more than 6 months of experience. She likes to keep up with the trending technologies. She is familiar with languages such as C,C++,Java,Scala and is currentky working on akka,akka http and scala. Her hobbies include watching tv series and movies, reading novels and dancing.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading