
Hola Amigos, wandering around looking for a data processing engine that has excellent support for the data streaming engine, Kafka, so this blog will help you get quickly started with Apache Samza. We will have a little overview of Apache Samza here with an example of a hello Samza Application.
What is Apache Samza?
Apache Samza was born in LinkedIn to provide real-time analytics on streaming data. In 2014, It was a top-level and is powering thousands of applications in Production at LinkedIn. Apache Samza is a scalable data processing engine that enables us to process and analyze data in real-time.
Architecture
Apaches Samza provides multiple streaming sources and sync for e.g. Kafka being one of them. It provides support to both standalone and yarn deployment. To explore further the design motivation and use case, you can refer to docs.

When to use Samza?
When your application needs to be looked for resource utilization, management, being able to properly plan for the resource consumption by the Samza jobs as Samza provides great support to all these factors.
Use-cases:
- detecting anomalies
- combating fraud
- monitoring performance
- notifications
- real-time analytics.
Let’s get started with the Example.
We’ll see how we can set up things for our application to communicate with Kafka input stream and write the processed data to another Kafka output stream(particularly, topic).
Making Kafka and Zookeeper running
Prerequisites will be to have kafka installed.
In the terminal,
- start zookeeper(always provide the path to the script file and properties file according to your system).
bin/zookeeper-server-start.sh config/zookeeper.properties
- start kafka server
bin/kafka-server-start.sh config/server.properties
- create an input stream(topic) named “test”
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
- create an producer that produces data to input stream
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Writing Samza Application
Now in the below example, we will write the data processing logic as part of class HelloWorld which extends StreamApplication which is the primary abstraction of the streaming application in Apache Samza.
- Adding Dependencies(in build.sbt file): make sure to add the version compatible with your scalaVersion.
val samzaVersion = "1.4.0"
val samzaCore = "org.apache.samza" %% "samza-core" % samzaVersion
val samzaKafka = "org.apache.samza" %% "samza-kafka" % samzaVersion
libraryDependencies ++= Seq(
samzaCore
, samzaKafka
)
- Properties file: in the hierarchy src/main/config/ create a file named HelloWorld.properties. In Samza applications, we need to provide properties for a task.
job.name=HelloWorld
# Use a PassthroughJobCoordinator since there is no coordination needed
job.coordinator.factory=org.apache.samza.standalone.PassthroughJobCoordinatorFactory
job.coordination.utils.factory=org.apache.samza.standalone.PassthroughCoordinationUtilsFactory
job.changelog.system=kafka
# Use a single container to process all of the data
task.name.grouper.factory=org.apache.samza.container.grouper.task.SingleContainerGrouperFactory
processor.id=0
# Read from the beginning of the topic
systems.kafka.default.stream.samza.offset.default=oldest
- Adding system descriptor and processing logic for our task: Create a scala class named HelloWorld.scala under src/main/
class HelloWorld extends StreamApplication { //StreamApplication provides built-in methods to process streaming data.
val zookeeper = ImmutableList.of("localhost:2181")
val server = ImmutableList.of(localhost:9092)
val streamConfig = ImmutableMap.of("replication.factor", "1")
val inputTopicName = config.getString("test")
val outputTopicName = config.getString("testOut")
val serdeForKV = KVSerde.of(new StringSerde(), new StringSerde())
val ksd = new KafkaSystemDescriptor(systemName)
.withConsumerZkConnect(zookeeper)
.withProducerBootstrapServers(server)
.withDefaultStreamConfigs(streamConfig)
val kid = ksd.getInputDescriptor(inputTopicName, serdeForKV)
val kod = ksd.getOutputDescriptor(outputTopicName, serdeForKV)
appDescriptor.withDefaultSystem(ksd)
val inputStream = appDescriptor.getInputStream(kid)
val outputStream = appDescriptor.getOutputStream(kod)
// we are only filtering out those messages from input topic which contains "hi".
inputStream.filter(kv => kv.value.contains("hi")).sendTo(outputStream)
}
- Creating Runner class: Create a scala class “AppRunner.scala” in the same directory where you have created the “HelloWorld.scala”.
val cmdLine = new CommandLine
val options = cmdLine.parser.parse("--config-factory=org.apache.samza.config.factories.PropertiesConfigFactory", "--config-path=<<PATH TO THE .properties file>>")
val conf = cmdLine.loadConfig(options)
val runner = new LocalApplicationRunner(new HelloWorld, conf)
runner.run()
runner.waitForFinish()
Now, produce some messages to input stream some of which contains “hi” and some of them don’t. And when you start consuming data from the output topic “testOut”, you will see only those messages which contain “hi” are available there.
For complete code template you may check out here.
Thanks and Happy Coding!!
References: https://samza.apache.org/
