LATEST VERSION: 1.3 - CHANGELOG
Pivotal Greenplum®-Spark® Connector v1.1

Reading from Greenplum Database into Spark

Reading a Greenplum Database table into Spark loads all of the table rows into a Spark DataFrame. You can read a Greenplum Database table that you created with the CREATE TABLE SQL command using the Spark Scala API or within the spark-shell interactive shell.

The Greenplum-Spark Connector provides a Spark data source optimized for reading Greenplum Database data into Spark. To read a Greenplum Database table into Spark, you must identify the Greenplum-Spark Connector data source name and provide read options for the import.

Greenplum-Spark Connector Data Source

A Spark data source provides an access point to structured data. Spark provides several pre-defined data sources to support specific file types and databases. You specify a Spark data source using its fully qualified name.

The Greenplum-Spark Connector exposes a Spark data source named io.pivotal.greenplum.spark.GreenplumRelationProvider to access and read data from Greenplum Database into a Spark DataFrame.

Use the .format(datasource: String) Scala method to identify the data source. You must provide the fully qualified Greenplum-Spark Connector data source name to the .format() method. For example:

spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")

Connector Read Options

You provide the Greenplum Database connection and read options required by the GreenplumRelationProvider data source via generic key-value String pairs.

GreenplumRelationProvider supports the read options identified in the table below. Each option is required unless otherwise specified.

Option Key Value Description
url The JDBC connection string URL; see Constructing the Greenplum Database JDBC URL.
dbtable The name of the Greenplum Database table. This table must be accessible from the user’s default Greenplum Database schema search_path.
user The Greenplum Database user/role name.
password The Greenplum Database password for the user.
partitionColumn The name of the Greenplum Database table column to use for Spark partitioning. This column must be one of the Greenplum Database data types integer, bigint, serial, or bigserial. The column name must be lowercase.
partitionsPerSegment The number of Spark partitions per Greenplum Database segment. Optional, the default value is 1 partition.

You can specify GreenplumRelationProvider options individually or in an options Map. The option-related DataFrameReader class methods of interest for the Greenplum-Spark Connector are:

.option(key: String, value: String)

for specifying an individual option, and

.options(options: Map[String, String])

for specifying an options map.

To specify an option individually, provide <option_key> and <value> strings to the DataFrameReader.option() method. For example, to provide the user option:

.option("user", "gpdb_role_name")

To construct a scala.collection.Map comprising more than one option, you provide the <option_key> and <value> strings for each option. For example:

val gscOptionMap = Map(
      "url" -> "jdbc:postgresql://gpdb-master:5432/testdb",
      "user" -> "gpadmin",
      "password" -> "changeme",
      "dbtable" -> "table1",
      "partitionColumn" -> "id"
)

To provide an options map to the data source, specify it in the DataFrameReader.options() method. For example, to provide the gscOptionMap map created above to the data source:

.options(gscOptionMap)

Specifying Partition Options

A Spark DataFrame is a fault-tolerant collection of elements partitioned across the Spark cluster nodes. Spark operates on these elements in parallel.

Greenplum Database distributes its table data across segments running on segment hosts.

The Greenplum-Spark connector provides two options to configure the mapping between Spark partitions and Greenplum Database segment data, partitionColumn and partitionsPerSegment.

partitionColumn

The partitionColumn option that you specify must have the integer, bigint, serial, or bigserial Greenplum Database data type. The partitionColumn you identify need not be the column specified with the DISTRIBUTED BY (<column>) clause when you created the Greenplum Database table.

partitionsPerSegment

By default, the Greenplum-Spark Connector creates one Spark partition per Greenplum Database segment. You can set the partitionsPerSegment option to specify a larger number of Spark partitions.

Spark partitions have a 2 GB size limit. If you are using the Connector to move more than 2 GB of data per Greenplum Database segment, you must increase the partitionsPerSegment option value appropriately.

Reading Greenplum Data

When you read a Greenplum Database table into Spark, you identify the Greenplum-Spark Connector data source, provide the read options, and invoke the DataFrameReader.load() method. For example:

val gpdf = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
      .options(gscOptionMap)
      .load()

The .load() method returns a DataFrame. A DataFrame is a set of rows, i.e. a DataSet[Row].

Note that the .load() operation does not initiate the movement of data from Greenplum Database to Spark. Spark employs lazy evaluation for transformations; it does not compute the results until the application performs an action on the DataFrame, such as displaying or filtering the data or counting the number of rows.

Actions and transformations you can perform on the returned DataFrame include:

  • Viewing the contents of the table with .show()
  • Counting the number of rows with .count()
  • Filtering the data using .filter()
  • Grouping the data using .groupBy()

Refer to the Spark DataSet Scala API docs for additional information about this class and the other actions and transformations you can perform.

Note: By default, Spark recomputes a transformed DataFrame each time you run an action on it. If you have a large data set on which you want to perform multiple transformations, you may choose to keep the DataFrame in memory for performance reasons. You can use the DataSet.persist() method for this purpose. Keep in mind that there are memory implications to persisting large data sets.