Reading from Greenplum Database into Spark
Reading a Greenplum Database table into Spark loads all of the table rows into a Spark DataFrame. You can read a Greenplum Database table that you created with the
CREATE TABLE SQL command using the Spark Scala API or within the
spark-shell interactive shell.
The Greenplum-Spark Connector provides a Spark data source optimized for reading Greenplum Database data into Spark. To read a Greenplum Database table into Spark, you must identify the Greenplum-Spark Connector data source name and provide read options for the import.
A Spark data source provides an access point to structured data. Spark provides several pre-defined data sources to support specific file types and databases. You specify a Spark data source using either its fully qualified name or its short name.
The Greenplum-Spark Connector exposes a Spark data source named
greenplum to access and read data from Greenplum Database into a Spark
DataFrame. The Greenplum-Spark Connector supports specifying the data source only with this short name.
.format(datasource: String) Scala method to identify the data source. You must provide the Greenplum-Spark Connector data source short name
greenplum to the
.format() method. For example:
You provide the Greenplum Database connection and read options required by the
greenplum data source via generic key-value
greenplum data source supports the read options identified in the table below. An option is required unless otherwise specified.
|Option Key||Value Description|
|url||The JDBC connection string URL; see Constructing the Greenplum Database JDBC URL.|
|dbschema||The name of the Greenplum Database schema in which the table resides. This option also identifies the name of the schema in which the Greenplum-Spark Connector creates temporary external tables. Optional, the default schema is the schema named
|dbtable||The name of the Greenplum Database table. This table must reside in the Greenplum Database schema identified in the
|driver||The fully qualified class path of the custom JDBC driver. Optional, specify only when using a custom JDBC driver.|
|user||The Greenplum Database user/role name.|
|password||(Optional.) The Greenplum Database password for the user. You can omit the password if Greenplum Database is configured to not require a password for the specified user, or if you use kerberos authentication and provide the required authentication properties in the JDBC connection string URL.|
|partitionColumn||The name of the Greenplum Database table column to use for Spark partitioning. This column must be one of the Greenplum Database data types
|partitionsPerSegment||The number of Spark partitions per Greenplum Database segment. Optional, the default value is 1 partition.|
You can specify options to the
greenplum data source individually or in an options
Map. The option-related
DataFrameReader class methods of interest for the Greenplum-Spark Connector are:
.option(key: String, value: String)
for specifying an individual option, and
.options(options: Map[String, String])
for specifying an options map.
To specify an option individually, provide <option_key> and <value> strings to the
DataFrameReader.option() method. For example, to provide the
To construct a
scala.collection.Map comprising more than one option, you provide the <option_key> and <value> strings for each option. For example:
val gscOptionMap = Map( "url" -> "jdbc:postgresql://gpdb-master:5432/testdb", "user" -> "bill", "password" -> "changeme", "dbschema" -> "myschema", "dbtable" -> "table1", "partitionColumn" -> "id" )
To provide an options map to the data source, specify it in the
DataFrameReader.options() method. For example, to provide the
gscOptionMap map created above to the data source:
A Spark DataFrame is a fault-tolerant collection of elements partitioned across the Spark cluster nodes. Spark operates on these elements in parallel.
Greenplum Database distributes its table data across segments running on segment hosts.
The Greenplum-Spark connector provides two options to configure the mapping between Spark partitions and Greenplum Database segment data,
partitionColumn option that you specify must have the
bigserial Greenplum Database data type. The
partitionColumn you identify need not be the column specified with the
DISTRIBUTED BY (<column>) clause when you created the Greenplum Database table.
By default, the Greenplum-Spark Connector creates one Spark partition per Greenplum Database segment. You can set the
partitionsPerSegment option to specify a larger number of Spark partitions.
Spark partitions have a 2 GB size limit. If you are using the Connector to move more than 2 GB of data per Greenplum Database segment, you must increase the
partitionsPerSegment option value appropriately.
connector.port option to specify the port number that the Greenplum-Spark Connector uses for data transfer between Greenplum Database and Spark worker nodes.
|Option Key||Value Description|
|connector.port||The port number or the environment variable name that identifies such, that the Greenplum-Spark Connector should use for data transfer to Spark worker nodes. Optional, by default the Connector uses a random port.|
You can specify a port number or an environment variable name for the
connector.port option. To set a port number, identify the specific port number. For example, to set the
connector.port as a single option:
Or, to set within a
Map of options:
"connector.port" -> "12900"
If you choose to specify the port number with an environment variable, prefix the environment variable name with
env.. For example, to identify the environment variable named
GSC_EXTERNAL_PORT as the
connector.port option value:
"connector.port" -> "env.GSC_EXTERNAL_PORT",
env.GPFDIST_PORT results in the same behavior as that of Greenplum-Spark Connector version 1.2.0.
Refer to Configuring Spark Worker Ports for additional information about the
The Greenplum-Spark Connector provides connection pool configuration options. These options are named with the
|Option Key||Value Description|
|pool.maxSize||The maximum number of connections in the connection pool. Optional, the default value is 64.|
|pool.timeoutMs||The amount of time, in milliseconds, after which an inactive connection is considered idle. Optional, the default value is 10,000 (10 seconds).|
|pool.minIdle||The minimum number of idle connections maintained in the connection pool. Optional, the default value is 0.|
To set each connection pool option as a single option:
.option("pool.maxSize", "50") .option("pool.minIdle", "5") .option("pool.timeoutMs", "7500")
DataFrame that you create with a specific connection string URL, username, and password combination defines the configuration of that connection pool. The Greenplum-Spark Connector ignores connection pool options specified on subsequent
DataFrames created with the same URL/username/password combination.
Refer to JDBC Connection Pooling for additional information about connection pool configuration options.
When you read a Greenplum Database table into Spark, you identify the Greenplum-Spark Connector data source, provide the read options, and invoke the
DataFrameReader.load() method. For example:
val gpdf = spark.read.format("greenplum") .options(gscOptionMap) .load()
.load() method returns a
DataFrame is a set of rows, i.e. a
Note that the
.load() operation does not initiate the movement of data from Greenplum Database to Spark. Spark employs lazy evaluation for transformations; it does not compute the results until the application performs an action on the
DataFrame, such as displaying or filtering the data or counting the number of rows.
Actions and transformations you can perform on the returned
- Viewing the contents of the table with
- Counting the number of rows with
- Filtering the data using
- Grouping and ordering the data using
Refer to the Spark DataSet Scala API docs for additional information about this class and the other actions and transformations you can perform.
Note: By default, Spark recomputes a transformed
DataFrame each time you run an action on it. If you have a large data set on which you want to perform multiple transformations, you may choose to keep the
DataFrame in memory for performance reasons. You can use the
DataSet.persist() method for this purpose. Keep in mind that there are memory implications to persisting large data sets.