Apache Calcite : Evaluating REX Expressions

Reading Time: 2 minutes

Apache Calcite is a dynamic data management framework. It provides a SQL parser, validator and optimizer. Using the sub-project Avatica we also have the ability to execute our optimized queries against external databases.

Every row expression (rex) in a SQL query is defined as a ‘RexNode’ internally which can be an identifier,a literal or a function.In this blog we will illustrate evaluating functions involving literals only in ‘Values’ Operator in a SQL query so that we can have an improved logical plan.

Use Case

For simple queries like “SELECT 5*6,5+6” or any scalar function involving literals only ,the logical plan is:

LogicalProject(EXPR$0=[*(5, 6)], EXPR$1=[+(5, 6)])
  LogicalValues(tuples=[[{ 0 }]])

Optimized Plan

  1. We want to minimize the total number of instructions executed by our execution engine.
  2. We want to remove redundant operators.

    Our final plan should look like this:
LogicalValues(tuples=[[{ 30,11 }]])

Code Implementation

We’ll use volcano planner for this optimization. Lets wire it together.

Custom TraitSet
sealed trait MyRel extends RelNode

object MyRel {
  val CONVENTION = new Convention.Impl("MyRelTrait", classOf[MyRel])
}
ConverterRules
 val PROJECT: ConverterRule.Config = ConverterRule.Config.INSTANCE
    .withConversion(classOf[LogicalProject], Convention.NONE, MyRel.CONVENTION, "LogicalProjectToMyProject")
    .withRuleFactory(_ => LogicalProjectConverter)

  val VALUES: ConverterRule.Config = ConverterRule.Config.INSTANCE
    .withConversion(classOf[LogicalValues], Convention.NONE, MyRel.CONVENTION, "LogicalValuesToMyLogicalValues")
    .withRuleFactory(_ => LogicalValuesConverter)
RelRule
 val EVAL_VALUES: EvalValuesRule.Config = RelRule.Config.EMPTY
    .withDescription("Evaluate Literals")
    .withOperandSupplier(b0 =>
      b0.operand(classOf[MyLogicalProject])
        .oneInput(b1 =>
          b1.operand(classOf[MyLogicalValues])
            .noInputs()
        )
    ).as(classOf[EvalValuesRule.Config])

object EvalValuesRule {

  trait Config extends RelRule.Config {
    override def toRule: RelOptRule = new EvalValuesRule(this)
  }

}
JaninoRexCompiler

Calcite provides an inbuilt RexCompiler which can evaluate an expression. Its pretty straightforward to use

val compiler = new JaninoRexCompiler(rexBuilder)
val scalar = compiler.compile(expression, inputRowType)
scalar.execute(context)
Detailed EVAL_VALUES rule
onMatch
override def onMatch(call: RelOptRuleCall): Unit = {
    val project = call.rel(0).asInstanceOf[Project]
    val values = call.rel(1).asInstanceOf[Values]

    val rexBuilder = project.getCluster.getRexBuilder

    val newLiterals = evaluate(project.getProjects, project.getRowType, values, rexBuilder)
    call.transformTo(
      new MyLogicalValues(
        values.getCluster,
        project.getRowType,
        newLiterals,
        project.getTraitSet
      )
    )
  }
evaluate
def evaluate(
                exprs: java.util.List[RexNode],
                inputRowType: RelDataType,
                values: Values,
                rexBuilder: RexBuilder
              ): ImmutableList[ImmutableList[RexLiteral]] = {
    val compiler = new JaninoRexCompiler(rexBuilder)
    val execution = exprs.asScala
      .map({
        case ref: RexInputRef =>
          values.getTuples.get(0).get(ref.getIndex).getValue2
        case expr =>
          val scalar = compiler.compile(ImmutableList.of(expr), inputRowType)
          scalar.execute(null)
      })
    val literalValues = execution.zipWithIndex.map {
      case (ref, index) =>
        rexBuilder.makeLiteral(ref, inputRowType.getFieldList.get(index).getType, true)
          .asInstanceOf[RexLiteral]
    }
      .asJava
    ImmutableList.of(ImmutableList.copyOf(literalValues))
  }

Here our logic is simple,
1) If expression is a RexInputRef , map it to a RexLiteral
2) If expression is a RexLiteral or a RexCall , use the compiler.
3) Rebuild RexLiterals using RexBuilder .
4) Remove “Project” operator and add all RexLiterals in “Values” operator.

Putting it together

Lets define a method to optimize our Rel node

 def optimize(relNode: RelNode): RelNode = {
    val costPlanner = relNode.getCluster.getPlanner
    RelOptUtil.registerDefaultRules(costPlanner, false, false)
    costPlanner.addRule(Rules.VALUES.toRule)
    costPlanner.addRule(Rules.PROJECT.toRule)
    costPlanner.addRule(Rules.EVAL_VALUES.toRule)
    val myRel = costPlanner.changeTraits(relNode,relNode.getTraitSet.replace(MyRel.CONVENTION))
    costPlanner.setRoot(myRel)
    costPlanner.findBestExp()
  }

Result

BEFORE : 
LogicalProject(EXPR$0=[*(5, 6)], EXPR$1=[+(5, 6)])
  LogicalValues(tuples=[[{ 0 }]])

AFTER :
MyLogicalValues(tuples=[[{ 30, 11 }]])

Conclusion

The full code can be found here.

We can utilize cost based optimization using Volcano Planner by using hundreds of existing rules or by add our own rules for different use cases . A well optimized plan is that which :

  1. Reduces the number of rows to process as early as possible.
  2. Removes redundant operations ( doing same thing repeatedly) .
  3. Has minimum operations to perform (scalar functions , aggregate functions etc.)

Leave a Reply