LATEST VERSION: 1.4 - CHANGELOG
Pivotal Greenplum®-Spark® Connector v1.1

Example - Reading a Greenplum Database Table into Spark (Scala)

This example utilizes the public Airline On-Time Statistics and Delay Cause data set. This data set records flights by date, airline, originating and destination airports, and many other flight details.

In this example, you:

  • Follow Greenplum Database tutorials to load the flight record data set into Greenplum Database.
  • Use spark-shell and the Greenplum-Spark Connector to read a fact table from Greenplum Database into Spark.
  • Perform transformations and actions on the data within Spark.

Prerequisites

Before starting this exercise, ensure that you are able to:

  • Access your Greenplum Database and Spark clusters
  • Identify your Greenplum Database master node hostname or IP address and port
  • Identify your Greenplum Database user/role name and password (must be a SUPERUSER role)
  • Locate the full file system path to the Greenplum-Spark Connector JAR file on your system

Procedure

Perform the following procedure to load flight record data into Greenplum Database, read this data into Spark, and use Spark to transform and view the table data.

  1. Log in to your Greenplum Database master node and set up your environment. For example:

    $ ssh gpadmin@<gpmaster>
    gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
    
  2. Download and unpack the flight record example data set:

    gpadmin@gpmaster$ git clone https://github.com/greenplum-db/gpdb-sandbox-tutorials.git
    gpadmin@gpmaster$ cd gpdb-sandbox-tutorials
    gpadmin@gpmaster$ tar zxf faa.tar.gz
    
  3. Perform the Get Started and Data Loading and Unloading exercises in the Greenplum Database tutorial. When you complete these exercises, you will have created:

    • A Greenplum Database database named tutorial.
    • A schema named faa.
    • Several Greenplum Database tables in the faa schema and loaded them with flight data.
  4. Verify that the flight data loaded correctly:

    1. Connect to the tutorial database:

      gpadmin@gpmaster$ psql -d tutorial
      
    2. The Greenplum-Spark Connector requires that the schema named public be the first schema listed in your search_path. Additionally, the table you specify on a Greenplum Database read operation be accessible from the schema search_path. Add the public and faa schemas to your search_path. For example:

      tutorial=# SHOW search_path;
      -------------------------------------
       path1, path2, path3
      (1 row)
      tutorial=# ALTER ROLE gpadmin SET search_path TO public, path1, path2, path3, faa;
      
    3. List all databases in the tutorial database schema named faa:

      tutorial=# \dt faa.*
                                 List of relations
       Schema |         Name         | Type  |  Owner  |       Storage        
      --------+----------------------+-------+---------+----------------------
       faa    | d_airlines           | table | user1   | heap
       faa    | d_airports           | table | user1   | heap
       faa    | d_cancellation_codes | table | user1   | heap
       faa    | d_delay_groups       | table | user1   | heap
       faa    | d_distance_groups    | table | user1   | heap
       faa    | d_wac                | table | user1   | heap
       faa    | faa_load_errors      | table | user1   | heap
       faa    | faa_otp_load         | table | gpadmin | heap
       faa    | otp_c                | table | gpadmin | append only columnar
       faa    | otp_c_1_prt_mth_1    | table | gpadmin | append only columnar
       faa    | otp_c_1_prt_mth_10   | table | gpadmin | append only columnar
       faa    | otp_c_1_prt_mth_11   | table | gpadmin | append only columnar
      ...
      (27 rows)
      

      The \dt output lists 27 tables when the flight data was correctly loaded.

    4. Examine the definition of the table named otp_c:

      tutorial=#  \d faa.otp_c
             Append-Only Columnar Table "faa.otp_c"
              Column        |       Type       | Modifiers 
      ----------------------+------------------+-----------
       flt_year             | smallint         | 
       flt_quarter          | smallint         | 
       flt_month            | smallint         | 
       flt_dayofmonth       | smallint         | 
       flt_dayofweek        | smallint         | 
       flightdate           | date             | 
       uniquecarrier        | text             | 
       airlineid            | integer          | 
       carrier              | text             | 
       flightnum            | text             | 
       origin               | text             | 
       origincityname       | text             | 
       originstate          | text             | 
       originstatename      | text             | 
       dest                 | text             | 
       destcityname         | text             | 
       deststate            | text             | 
       deststatename        | text             | 
       crsdeptime           | text             | 
       deptime              | integer          | 
       depdelay             | double precision | 
       depdelayminutes      | double precision | 
       departuredelaygroups | smallint         | 
       taxiout              | smallint         | 
       wheelsoff            | text             | 
       wheelson             | text             | 
       taxiin               | smallint         | 
       crsarrtime           | text             | 
       arrtime              | text             | 
       arrdelay             | double precision | 
       arrdelayminutes      | double precision | 
       arrivaldelaygroups   | smallint         | 
       cancelled            | smallint         | 
       cancellationcode     | text             | 
       diverted             | smallint         | 
       crselapsedtime       | integer          | 
       actualelapsedtime    | double precision | 
       airtime              | double precision | 
       flights              | smallint         | 
       distance             | double precision | 
       distancegroup        | smallint         | 
       carrierdelay         | smallint         | 
       weatherdelay         | smallint         | 
       nasdelay             | smallint         | 
       securitydelay        | smallint         | 
       lateaircraftdelay    | smallint         | 
      Checksum: t
      Number of child tables: 17 (Use \d+ to list them.)
      Distributed by: (uniquecarrier, flightnum)
      Partition by: (flightdate)
      

      The table named otp_c is a column-oriented, partitioned fact table.

  5. Log in to your Spark client node:

    $ ssh user@<spark-client>
    user@spark-client$ 
    
  6. Construct the JDBC connection string URL to access Greenplum Database. For example, if your Greenplum Database master host name is gpmaster.domain, the Greenplum Database master process is running on the default port, and you want to access the Greenplum database named tutorial:

    jdbc:postgresql://gpmaster.domain/tutorial
    

    Save this URL string, you will use it in an upcoming step.

  7. Start spark-shell. Replace <gsc-jar> with the full path to your Greenplum-Spark Connector JAR file:

    user@spark-client$ spark-shell --jars <gsc-jar>
    < ... spark-shell startup output messages ... >
    scala>
    

    You enter the spark-shell interactive scala shell.

  8. Prepare to read the otp_c table into Spark. In a text editor, construct a Map of read options for the GreenplumRelationProvider data source. You want to load the Greenplum Database table named otp_c, specifying airlineid as the partition column. For example, if you are the user gpadmin with password changeme:

    val gscOptionMap = Map(
          "url" -> "jdbc:postgresql://gpmaster.domain/tutorial",
          "user" -> "gpadmin",
          "password" -> "changeme",
          "dbtable" -> "otp_c",
          "partitionColumn" -> "airlineid"
    )
    
  9. Copy/paste the options map setting to your scala> shell terminal window. You must first enter :paste to switch to scala> paste mode:

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val gscOptionMap = Map(
          "url" -> "jdbc:postgresql://gpmaster.domain/tutorial",
          "user" -> "gpadmin",
          "password" -> "changeme",
          "dbtable" -> "otp_c",
          "partitionColumn" -> "airlineid"
    )
    
  10. Exit scala> paste mode by entering control-D:

    control-D
    // Exiting paste mode, now interpreting.
    
    gscOptionMap: scala.collection.immutable.Map[String,String] = Map(url -> jdbc:postgresql://gpmaster.domain/tutorial, partitionColumn -> airlineid, dbtable -> otp_c, user -> gpadmin, password -> changeme) 
    

    scala displays the option map you just constructed.

  11. Load the data from Greenplum Database table otp_c into a Spark DataFrame. Enter paste mode and copy/paste the command:

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    val gpdf = spark.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider")
            .options(gscOptionMap)
            .load()
    
    control-D
    // Exiting paste mode, now interpreting.
    
    gpdf: org.apache.spark.sql.DataFrame = [flt_year: smallint, flt_quarter: smallint ... 44 more fields]
    

    The Greenplum Database table is not actually loaded until you perform an action on the returned DataFrame.

  12. Print the schema of the Greenplum Database table:

    scala> gpdf.printSchema()
    root
     |-- flt_year: short (nullable = true)
     |-- flt_quarter: short (nullable = true)
     |-- flt_month: short (nullable = true)
     |-- flt_dayofmonth: short (nullable = true)
     |-- flt_dayofweek: short (nullable = true)
     |-- flightdate: date (nullable = true)
     |-- uniquecarrier: string (nullable = true)
     |-- airlineid: integer (nullable = true)
     |-- carrier: string (nullable = true)
     |-- flightnum: string (nullable = true)
     |-- origin: string (nullable = true)
     |-- origincityname: string (nullable = true)
     |-- originstate: string (nullable = true)
     |-- originstatename: string (nullable = true)
     |-- dest: string (nullable = true)
     |-- destcityname: string (nullable = true)
     |-- deststate: string (nullable = true)
     |-- deststatename: string (nullable = true)
     |-- crsdeptime: string (nullable = true)
     |-- deptime: integer (nullable = true)
     |-- depdelay: double (nullable = true)
     |-- depdelayminutes: double (nullable = true)
     |-- departuredelaygroups: short (nullable = true)
     |-- taxiout: short (nullable = true)
     |-- wheelsoff: string (nullable = true)
     |-- wheelson: string (nullable = true)
     |-- taxiin: short (nullable = true)
     |-- crsarrtime: string (nullable = true)
     |-- arrtime: string (nullable = true)
     |-- arrdelay: double (nullable = true)
     |-- arrdelayminutes: double (nullable = true)
     |-- arrivaldelaygroups: short (nullable = true)
     |-- cancelled: short (nullable = true)
     |-- cancellationcode: string (nullable = true)
     |-- diverted: short (nullable = true)
     |-- crselapsedtime: integer (nullable = true)
     |-- actualelapsedtime: double (nullable = true)
     |-- airtime: double (nullable = true)
     |-- flights: short (nullable = true)
     |-- distance: double (nullable = true)
     |-- distancegroup: short (nullable = true)
     |-- carrierdelay: short (nullable = true)
     |-- weatherdelay: short (nullable = true)
     |-- nasdelay: short (nullable = true)
     |-- securitydelay: short (nullable = true)
     |-- lateaircraftdelay: short (nullable = true)
    

    Compare this Spark output with that of the Greenplum Database \d faa.otp_c command that you invoked earlier. Note that the Greenplum Database data type names differ from those of Spark. For example, the distancegroup column is of Greenplum Database type smallint, while the Spark data type is short. For detailed information about how the Greenplum-Spark Connector maps data types between Greenplum Database and Spark, refer to the Greenplum Database -> Spark Data Type Mapping documentation.

  13. Use the .count() method to count the number of rows loaded:

    scala> gpdf.count()
    res2: Long = 1024552
    
  14. Use the .select() and .filter() methods to show the origin city, month, and carrier of all flights cancelled in the month of December. Order the results by airline ID and origin city. Enter paste mode and copy/paste the command:

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    gpdf.select("origincityname", "flt_month", "airlineid", "carrier")
               .filter("cancelled = 1").filter("flt_month = 12")
               .orderBy("airlineid", "origincityname")
               .show()
    
    control-D
    // Exiting paste mode, now interpreting.
    
    +---------------+---------+---------+-------+
    | origincityname|flt_month|airlineid|carrier|
    +---------------+---------+---------+-------+
    |    Detroit, MI|       12|    19386|     NW|
    |    Detroit, MI|       12|    19386|     NW|
    |  Milwaukee, WI|       12|    19386|     NW|
    |Minneapolis, MN|       12|    19386|     NW|
    |    Phoenix, AZ|       12|    19386|     NW|
    |    Houston, TX|       12|    19393|     WN|
    |    Houston, TX|       12|    19393|     WN|
    |  Las Vegas, NV|       12|    19393|     WN|
    |  Las Vegas, NV|       12|    19393|     WN|
    | Manchester, NH|       12|    19393|     WN|
    |      Omaha, NE|       12|    19393|     WN|
    |    Phoenix, AZ|       12|    19393|     WN|
    |   San Jose, CA|       12|    19393|     WN|
    |      Tampa, FL|       12|    19393|     WN|
    | Washington, DC|       12|    19393|     WN|
    |  Anchorage, AK|       12|    19704|     CO|
    |  Anchorage, AK|       12|    19704|     CO|
    |     Austin, TX|       12|    19704|     CO|
    |    Houston, TX|       12|    19704|     CO|
    |    Houston, TX|       12|    19704|     CO|
    +--------------------+---------+---------+-------+
    only showing top 20 rows
    
  15. Use the .groupBy(), .agg(), and avg() methods to identify the average departure delay for each day of the week, sorting by the day of the week with the .sort() method:

    scala> gpdf.groupBy("flt_dayofweek").agg(avg("depdelayminutes")).sort("flt_dayofweek").show()
    +-------------+--------------------+
    |flt_dayofweek|avg(depdelayminutes)|
    +-------------+--------------------+
    |            1|  14.738491569779914|
    |            2|  11.237272024020244|
    |            3|  11.198198256252295|
    |            4|  12.056892575385985|
    |            5|  12.455024249521957|
    |            6|   12.69586361271813|
    |            7|  14.818271192603715|
    +-------------+--------------------+