Data Ingestion in Druid – Overview

Reading Time: 4 minutes

Hey folks, nowadays Big Data is the most trending topic and most of us are familiar with it as well as all the trending technologies. One of them is Druid which is a distributedcolumn-oriented, real-time analytical data store. A quick recap on Druid can be found on the blog posts In this blog post, we will be discussing how data ingestion can be done in Druid.

Before discussing how Druid does it, let’s understand what do you mean by data Ingestion. Data ingestion is the process of obtaining and importing data for immediate use or storage in a database. To ingest something is to “take something in or absorb something.” 

Now, let’s walk through on how Druid can do it. When we are talking about ingesting data, then it’s obvious that the data will have some format and we need to store it in some sort of schema so that it can be easily queried, the Druid used datasources for storing the data and providing the schema.

Datasources and Segments

In druid, data is stored in datasources, which is similar to tables in a traditional RDBMS. Each datasource is partitioned by time and, optionally, further partitioned by other attributes. Each time range is called a chunk. Within a chunk, data is partitioned into one or more segments. Each segment is a single file, comprising up to a few million rows of data.

A datasource can have varying number of segment files. A segment is created on a MiddleManager, and at that point, it is mutable and uncommitted. The process of creating a segment file includes following steps –

  • Converting to columnar format
  • Indexing with bitmap indexes
  • Compression

After this, segments are committed and written to deep storage, and move from MiddleManagers to the Historical processs. Also, the segment is recorded in the metadata store where we specify the schema of the segment, its size, and its location on deep storage which help the Coordinator node to know which data should be available on the cluster.

Segment Format

A segment corresponds to a file in Druid. Recommended size of a segment is 300-700 MB. Each column is stored separately within the segment file, which means that Druid can save time by accessing only those columns which are actually necessary to process a query. Timestamps, dimensions, and metrics are the three basic column types in Druid.

Timestamp and metric values are compressed integer or floating point values. Dimensions is the column on which filtering is applied. For each dimension column, druid contains the following three data structures:

  • A dictionary that maps values (which are always treated as strings) to integer IDs
  • A list of the column’s values encoded using the dictionary described above.
  • For each distinct value in the column, a bitmap that indicates which rows contain that value.

Druid also supports multi-value columns.

Segment identifiers

A segment identifier has four parts and the following are its components –

  • Datasource Name
  • Time Interval i.e the chunk of time containing that segment.
  • Version number i.e. a timestamp which specifies when the segment set was first started.
  • Partition number i.e. a unique integer within a ( datasource + interval + version) which may not be contiguous.

Below is a sample segment identifier –


Segment versioning

The versioning of segment comes into play when you want to overwrite data unlike in case of appending data where there is only one version for each time chunk. In case of overwriting, a new segment is created for the same time interval and datasource with higher number i.e. version. This tells the system that a new version needs to replace the older version thereby deleting the older version.

Segment states

A segment can have the following states –

  • Available or unavailable – refers to whether the segment is currently served by some Druid server process or not.
  • Published or unpublished – refers to whether the segment has been written to the deep storage or metadata store or not.
  • Used or unused – refers to whether Druid considers the segment active so that it should be served. It applies only to the published segments.

On combining above states, a segment can fall into one of the below states –

  • Published, available and used
  • Published, available and unused
  • Published, unavailable and used
  • Published, unavailable and unused
  • Unpublished and unavailable

Indexing and handoff

Indexing : It is the process through which new segments are created and following mechanism is performed –

  • Firstly, the indexing task determines the identifier of the segment before creating the segment. For appending tasks, an allocate API is called on the Overlord to add a new partition to an existing set of segments. For overwriting tasks, a new segment is created with a new version.
  • If the indexing task is realtime, it is immediately available for querying and after reading the data for the segment, it is published to deep storage first and then to metadata store.
  • A real-time task waits for the Historical process to load the segment whereas the non real-time task exits.

Handoff : It is the process through which they are published and start being served by Historical processes. The follwing mechanism is peformed on the Coordinator/Historical side –

  • Coordinator node polls metadata store for new segments periodically by default after every 1 minute.
  • When it finds a segment published and used but unavailable, it chooses the Historical node to load the segment.
  • After that, Historical node serves the segment.

Data Formats for Ingestion

Druid can ingest denormalized data in JSON, CSV, or a delimited form such as TSV, or any custom format. The CSV and TSV data do not contain column heads so we must be careful at the time of specifying data for ingestion. The format of the data to be ingested is specified using the theparseSpec entry in the data schema.

Types of Ingestion Methods

There are majorly two types of ingestion methods –

  • Batch File Ingestion
    • Native Batch Ingestion
    • Hadoop Batch Ingestion
  • Stream Ingestion
    • Kafka Indexing Service (Stream Pull)
    • Stream Push

In the coming blogs, we will be discussing the types of ingestion methods in detail that how ingestion can be performed using the above methods.

Stay tuned for more posts. Thank you for reading and hope the blog was helpful..!!


Written by 

Vidisha Gupta is a software consultant having more than 0.5 years of experience. She likes to keep up with the trending technologies. She is familiar with languages such as C, C++, Java, Scala and is currently working on reactive technologies like spark, Lagom, Kafka, Cassandra. Her hobbies includes exploring new things, watching web series and listening music.