Data integration in the hybrid cloud with Apache Spark and Open Data Hub

by | Jun 16, 2020 | Hybrid Cloud

In this post we introduce the basics of reading and writing Apache Spark DataFrames to an SQL database, using Apache Spark’s JDBC API.

Apache Spark’s Structured Streaming data model is a framework for federating data from heterogeneous sources. Structured Streaming unifies columnar data from differing underlying formats and even completely different modalities – for example streaming data and data at rest – under Spark’s DataFrame API.

SQL databases are one of the most ubiquitous data platforms. They form the backbone of many data centers. Spark makes it easy to read, write, and compute with SQL data using its DataFrame API, allowing it to be federated with Structured Streaming. One of the best ways to work with SQL databases in Spark is via the Java Database Connectivity (JDBC) API.

Both Apache Spark and SQL databases are easy to install on Red Hat OpenShift via OpenShift’s OperatorHub Catalog. Apache Spark is available through the Open Data Hub operator, which you can install from the community section of the Catalog. OperatorHub also provides a variety of database operator choices that expose an SQL interface, including PostgreSQL, Percona and PlanetScale. In this blog post we’ll be using PostgreSQL as an example, but you’ll see how to adapt to other SQL vendors as well.

Working with JDBC Drivers

In order to work with Spark’s JDBC API, you’ll need to provide Spark with a JDBC driver. Many drivers are available as Maven-style packages, such as the driver for PostgreSQL in the example code below.

Consuming JDBC drivers via Maven coordinates and spark.jars.packages is convenient, since Spark will automatically download such packages and install them on Spark executors.

from pyspark import SparkConf

# Instantiate a spark configuration object to receive settings
spark_conf = SparkConf()

# Maven coordinates for package containing JDBC drivers
jdbc_driver_packages = 'org.postgresql:postgresql:42.2.9'

# Configure spark to see the postgresql driver package
spark_conf.set('spark.jars.packages', jdbc_driver_packages)

In some cases, database vendors may provide a “raw” jar file instead of a Maven package. The code below shows a Spark connection alternative that specifies the driver as a jar file using the spark.jars configuration parameter. Installing and configuring jar files directly is also well suited for building container images that can run on platforms like Kubernetes or OpenShift.

jdbc_driver_jars = '/path/to/postgresql-42.2.9.jar'
spark_conf.set('spark.jars', jdbc_driver_jars)

Obtaining a Spark Session

Before we can begin, we need to attach to a running Apache Spark cluster. Continuing the spark configuration above, we set some basic cluster size parameters, specify the hostname of the cluster, and obtain a Spark Session.

from pyspark.sql import SparkSession

# The name of your Spark cluster hostname or ip address
spark_cluster = 'spark-cluster-eje'

# Configure some basic spark cluster sizing parameters
spark_conf.set('spark.cores.max', 1)
spark_conf.set('spark.executor.cores', '1')

spark = SparkSession.builder 
.master('spark://{cluster}:7077'.format(cluster=spark_cluster)) 
.appName('Spark-JDBC-Demo') 
.config(conf = spark_conf) 
.getOrCreate()

Spark can receive its configuration parameters from a variety of channels. In general, configurations set via a SparkConf object will override all other configurations. However, Spark has some quirks in its normal configuration precedence orderings when it comes to spark.package.jars and spark.jars. You can use the getConf() method to sanity-check the final settings that Spark is using. This example checks Spark’s configuration to verify that the jar-files for our PostgreSQL JDBC driver are visible in Spark’s classpath.

'postgresql' in spark.sparkContext.getConf().get('spark.jars')

The code above will return True if our PostreSQL driver is visible to Spark as intended.

In this post, we’re working with a small example data table. The first column is some consecutive integers, and the second column is the squares of the first:

data_raw = [(x, x*x) for x in range(1000)]
data_df = spark.createDataFrame(data_raw, ['x', 'xsq'])
data_df.show(5)

The code above will output the following if it is executed in a Jupyter notebook or other pyspark REPL:

+---+---+
|   x|xsq|
+---+---+
|   0|   0|
|   1|   1|
|   2|   4|
|   3|   9|
|   4| 16|
+---+---+
only showing top 5 rows

Spark JDBC URL and Properties

Apache Spark JDBC calls take two data structures to specify a database connection. The first is a string containing a JDBC connection URL. Such a URL typically includes the following DB connect info:

  • vendor (for example, ‘postgresql’)

  • hostname

  • port

  • database name

The second structure is a property map. In Python this is a dict structure, containing:

  • db user name

  • password

  • Java class name of the JDBC driver

For some vendors, other properties are expected. A common additional property is sslConnection, as shown below in the comments.

The exact syntax of the JDBC URL varies from vendor to vendor. Refer to the vendor’s JDBC driver documentation for connection specifics. Vendors will often publish examples that illustrate how to use their JDBC drivers with Spark. Community resources such as GitHub and StackOverflow frequently contain examples as well.

To keep things simple, our example code includes a visible username and password. Remember, it is best practice to store sensitive values such as username and password in environment variables or other forms that can be set without exposing security information in your code!

spark_jdbc_url = 'jdbc:postgresql://{host}:{port}/{database}'.format( 
       host = 'postgresql', 
       port = '5432', 
       database = 'demodb')

spark_jdbc_prop = { 
       'user': 'eje', 
       'password': 'eje12345', 
       'driver': 'org.postgresql.Driver'
      # 'sslConnection': 'false'
      # Some DB vendors expect other connection properties.
      # Setting 'sslConnection' is one common vendor-specific parameter
}

Writing a DataFrame with JDBC

The following Spark call uses our database connect info above to write our example data to a database table. The two write modes are overwrite and append. Note that in overwrite mode you must have both write and table create privileges on your db.

data_df.write.jdbc( 
       table = 'demo', 
       mode = 'overwrite', 
       url = spark_jdbc_url, 
       properties = spark_jdbc_prop 
)

Reading a DataFrame with JDBC

The basics of reading a DataFrame from a JDBC query are (almost) the same as writing. The database connection information is the same. To read data, we must specify a database query, written in the vendor’s supported dialect of SQL. A query can be very simple, as in the example below, or hundreds of lines of complex SQL code. It is generally best practice to construct the query separately in its own variable, as our example code does.

Notice that in the read.jdbc call below, we have enclosed the raw query in parentheses and assigned it a temporary view tmp, which Spark requires. You may assign temporary views to a name of your choice.

query = 'select * from demo'

query_df = spark.read.jdbc( 
       table = '({q}) tmp'.format(q=query), 
       url = spark_jdbc_url, 
       properties = spark_jdbc_prop 
)

query_df.show(5)

The code above will output the following if it is executed in a pyspark REPL:

+---+---+
|   x|xsq|
+---+---+
|   0  | 0|
|   1  | 1|
|   2|   4|
|   3|   9|
|   4| 16|
+---+---+
only showing top 5 rows

The Perils of Partitioning

Apache Spark’s scalable compute model depends on being able to break data into multiple partitions so that it can parallelize work across each partition. Let’s look at how our dataframe got partitioned when we read it above. The code below prints out the number of partitions and the number of records in each partition.

As you can see from the output, Spark put all the records in our query into a single partition. For our small example data, this is fine. However, if we are working with large volumes of data, this can cause serious problems. With all our data in a single partition, Spark cannot process our data in parallel. Worse, if a sufficiently large query result is loaded into a single Spark executor, it may cause an out of memory error and crash that executor.

print("partitions: {np}nsizes: {sz}".format( 
       np = query_df.rdd.getNumPartitions(), 
       sz = query_df.rdd.mapPartitions(lambda itr: [len(list(itr))]).collect() 
))

The code above will output the following if it is executed in a pyspark REPL:

partitions: 1
sizes: [1000]

Proper DataFrame Partitioning with JDBC

Fortunately, Spark provides a way to perform JDBC reads and correctly partition the result.

When you read data into a Spark DataFrame using a JDBC query, Spark can split the query results into multiple partitions. It does this by internally generating one query for each partition, by modifying the logical query you provide it. To enable this, you must provide your JDBC read with a list of “partitioning predicates.” Spark will generate its DataFrame with one partition for each predicate you give it; the data that goes into each partition is the data that is true for the corresponding predicate.

For example, if we wanted to load our data into 3 partitions, we might want spark to use the following 3 queries:

select x, xsq from demo where mod(x, 3) = 0 /* query for 1st partition */
select x, xsq from demo where mod(x, 3) = 1 /* query for 2nd partition */
select x, xsq from demo where mod(x, 3) = 2 /* query for 3rd partition */

Spark expects us to provide a list that looks like this:

[ 'mod(x, 3) = 0', 'mod(x, 3) = 1', 'mod(x, 3) = 2' ]

These clauses have been designed so that every record ends up in exactly one of our partitions, and also that our partition sizes should be roughly equal, with 1 out of 3 records satisfying each.

In practice, we may very well want to create a large number of these partitioning predicates, and so it is a good idea to generate them with a function, such as the qpreds function below:

def qpreds(n, rowcol):
       return ["mod({rc}, {np}) = {mk}".format(mk=k, np=n, rc=rowcol) for k in range(n)]

qpreds(3, 'x')

The code above will output the following if it is executed in a pyspark REPL:

['mod(x, 3) = 0', 'mod(x, 3) = 1', 'mod(x, 3) = 2']

With our qpreds function, we can easily add the additional predicates parameter to our JDBC read so that Spark can create a well-partitioned DataFrame from our query. In our example below, we configure our predicates for 5 partitions:

# Perform a JDBC read with proper partitioning
query = 'select * from demo'


query_df_pp = spark.read.jdbc( 
       table = '({q}) tmp'.format(q=query), 
       url = spark_jdbc_url, 
       properties = spark_jdbc_prop, 
       predicates = qpreds(5, 'x') 
)
query_df_pp.show(5)

The code above will output the following if it is executed in a pyspark REPL:

+---+---+
|   x|xsq|
+---+---+
|   0|   0|
|   5|  25|
| 10|100|
| 15|225|
| 20|400|
+---+---+
only showing top 5 rows

Now, when we check our partitions, we see a properly partitioned DataFrame that has the 5 partitions we desired, with the query results evenly distributed among the partitions.

print("partitions: {np}nsizes: {sz}".format( 
       np = query_df_pp.rdd.getNumPartitions(), 
       sz = query_df_pp.rdd.mapPartitions(lambda itr: [len(list(itr))]).collect() 
))

The code above will output the following if it is executed in a pyspark REPL:

partitions: 5
sizes: [200, 200, 200, 200, 200]

Other Partitioning Techniques

In our previous examples we took advantage of having a column “x” in our data that was both an integer and had a uniform distribution of values that was easy to generate equal-sized partitions from. In real data we may not have this kind of convenient data to partition with, but there are a couple techniques that we can use with any data.

The first technique is hashing. For SQL dialects that support a hashing function, you can pick a column (or columns) to apply a hash to, and then take the modulus of the resulting hash value. A hypothetical example of such predicates might look like this:

mod(vendor_hash(my_column, 3)) = 0
mod(vendor_hash(my_column, 3)) = 1
mod(vendor_hash(my_column, 3)) = 2

If you use this technique, you may need to tweak your qpreds function to generate predicates of this form.

Not all SQL dialects have this kind of hash function, but there is almost always some variation on assigning a unique integer to each query output row. In postgresql, this function is row_number(), and we can add it to our query select. In the following example, we have added a row number clause to our query. In postgresql, this must include an over clause to tell it what ordering you wish the numbering to use. When you refer to this new column, you just use row_number as you can see in the qpred call:

# The additional row_number clause is not necessary if you are partitioning via 
# an existing integer field, or hashing, etc.
query = 'select *, row_number() over (order by x) from demo'

query_df_pp2 = spark.read.jdbc( 
       table = '({q}) tmp'.format(q=query), 
       url = spark_jdbc_url, 
       properties = spark_jdbc_prop, 
       predicates = qpreds(5, 'row_number') 
)
query_df_pp2.show(5)

The code above will output the following if it is executed in a pyspark REPL:

+---+---+---------------+
| x|xsq|row_number|
+---+---+---------------+
|   4| 16|                5|
|   9| 81|              10|
| 14|196|             15|
| 19|361|             20|
| 24|576|             25|
+---+---+---------------+
only showing top 5 rows

You can see from the output above that using the row-numbering technique causes that column to be included in your query results. You may want to later drop this column if you are generating output to some other channel.

Lastly, we check our partitioning to see that it worked correctly:

print("partitions: {np}nsizes: {sz}".format( 
       np = query_df_pp2.rdd.getNumPartitions(), 
       sz = query_df_pp2.rdd.mapPartitions(lambda itr: [len(list(itr))]).collect() 
))

The code above will output the following if it is executed in a pyspark REPL:

partitions: 5
sizes: [200, 200, 200, 200, 200]

Conclusion

With these basic techniques, you can apply the power of Spark’s Structured Streaming to almost any SQL database and federate your different data sources easily./.