Recently, I’ve been working on a stand-alone Spark SQL related project where I needed to support Spatial queries. Luckily, Spark 2.2 added extension points that allow injecting a customized parser, analyzer or optimizer into Spark session. In this blog post, I will walk through adding the following KNN join query (you can access the full code HERE)

select * from table1 knn join table2 using POINT(x2, y2) knnPred(POINT(x1, y1), 5)

First, let’s take a look at how the extension looks like (you may also check the examples in SparkSessionExtensionSuite):

lazy val spark = {
val conf = new SparkConf().setAppName("example").setMaster("local[1]")
type ExtensionsBuilder = SparkSessionExtensions => Unit
def create(builder: ExtensionsBuilder): ExtensionsBuilder = builder
val extension = create { extensions =>
extensions.injectParser((_, _) => MyCatalystSqlParser)
}
val session = SparkSession.builder().config(conf).withExtensions(extension).getOrCreate()
session.sparkContext.setLogLevel("ERROR")
session
}

The key point is to create an object similar to CatalystSqlParser by isolating then modifying the code in ParseDriver and AstBuilder. Also, we need to update then recompile SqlBase.g4 grammar.

The Grammar

Spark SQL uses ANTLR4 to generate its SQL parser. I recommend skimming through the first four chapters of The Definitive ANTLR 4 Reference or any equivalent material to understand ANTLR grammar file and the generated parser files.

Starting with a blank Scala project, I copied Spark SQL’s grammar file SqlBase.g4. After that, I added the new keywords (KNN, POINT, and PREDKNN), updated joinType and joinCriteria I also defined a new rule named spatialpredicated to get my KNN query through the parser. After compiling SqlBase.g4 I got the updated Parser files. Refer to this script to compile and post process the grammar file.

The AstBuilder

The AstBuilder in Spark SQL, processes the ANTLR ParseTree to obtain a Logical Plan. I copied this file into my project and renamed it as MyAstBuilder. I only needed to update withJoinRelations method to handle the KNN join type and implement visitSpatialpredicated for the spatialpredicated rule I added to the grammar file. Furthermore, I defined an operator named SpatialJoin and a dummy prdicate to model PredKnn.

A Small Test

Finally, inside the main, I print the optimized plan for the KNN join query.

System.out.println(spark.sql("select * from table1 knn join table2 using POINT(x2, y2) knnPred(POINT(x1, y1), 5)").queryExecution.optimizedPlan)

SpatialJoin KNNJoin, PredKnn (POINT(x2#8,y2#9), POINT(x1#1,y1#2), 5)
:- Relation[id1#0,x1#1,y1#2] csv
+- Relation[id2#7,x2#8,y2#9] csv

Lessons

Here’s a few lessons I learned while working on customizing Spark SQL parser:

  • The comment # in the grammar file means parser generates a visitor method for this rule.
  • It’s very important to understand the Expression class in Spark SQL as it used to model various types of nodes in the query plan.
  • Pay attention to Spark SQL package hierarchy. For instance, MyAstBuilder should be defined under org.apache.spark.sql.catalyst.parser in order to avoid dependency errors.

Last Word

This blog post is only about customizing Spark SQL’s parser. Readers who are interested in implementing various spatial queries in Spark SQL may check Simba. For customizing Spark SQL optimizer you may check Sunitha’s blog post.