Example - Transferring Data Between Greenplum Database and Spark (Python)

This example utilizes the public Airline On-Time Statistics and Delay Cause data set. This data set records flights by date, airline, originating and destination airports, and many other flight details.

In this example, you:

  • Follow Greenplum Database tutorials to load the flight record data set into Greenplum Database.
  • Use the pyspark shell and the Greenplum-Spark Connector to read a fact table from Greenplum Database into Spark.
  • Perform transformations and actions on the data within Spark.
  • Write transformed Spark data into a new Greenplum Database table.

Prerequisites

Before starting this exercise, ensure that you are able to:

  • Access your Greenplum Database and Spark clusters
  • Identify your Greenplum Database master node hostname or IP address and port
  • Identify your Greenplum Database user/role name and password
  • Identify the absolute path to the Greenplum-Spark Connector JAR file on your system

Procedure 1: Read from Greenplum Database

Perform the following procedure to load flight record data into Greenplum Database, read this data into Spark, and use Spark to transform and view the table data.

Greenplum Database Operations

  1. Log in to your Greenplum Database master node and set up your environment. For example:

    $ ssh gpadmin@<gpmaster>
    gpadmin@gpmaster$ . /usr/local/greenplum-db/greenplum_path.sh
    
  2. Download and unpack the flight record example data set:

    gpadmin@gpmaster$ git clone https://github.com/greenplum-db/gpdb-sandbox-tutorials.git
    gpadmin@gpmaster$ cd gpdb-sandbox-tutorials
    gpadmin@gpmaster$ tar zxf faa.tar.gz
    
  3. Perform the following exercises in the Greenplum Database tutorial in the specified order:

    1. Create Users and Roles
    2. Create and Prepare Database
    3. Create Tables
    4. Data Loading
  4. When you complete Step 3, you will have created:

    • A user named user2.
    • A Greenplum Database database named tutorial.
    • A schema named faa.
    • Several Greenplum Database tables in the faa schema and loaded them with flight data.
  5. Identify your Greenplum Database user/role name and password:

    If you are performing this exercise in the Greenplum Sandbox VM, you will use the Greenplum-Spark Connector as user2 (password pivotal). If you are performing this exercise in your own Greenplum Database instance, you may have a different user name.

  6. Assign required privileges to the Greenplum Database user to access the tutorial database, faa schema, and relevant tables. For example, if your Greenplum Database user name is user2, the Greenplum Database administrator would execute the commands:

    gpadmin@gpmaster$ psql -d tutorial
    
    tutorial=# GRANT USAGE, CREATE ON SCHEMA faa TO user2;
    tutorial=# GRANT SELECT ON faa.otp_c TO user2;
    tutorial=# ALTER USER user2 CREATEEXTTABLE(type = 'writable', protocol = 'gpfdist');
    tutorial=# \q
    
  7. Verify that the flight data loaded correctly:

    1. Connect to the tutorial database as user user2. Recall that the password for user2 is pivotal:

      gpadmin@gpmaster$ psql -d tutorial -U user2
      Password for user user2:
      
    2. List all of the tables in the tutorial database schema named faa:

      tutorial=> \dt faa.*
                                 List of relations
       Schema |         Name         | Type  |  Owner  |       Storage        
      --------+----------------------+-------+---------+----------------------
       faa    | d_airlines           | table | user1   | heap
       faa    | d_airports           | table | user1   | heap
       faa    | d_cancellation_codes | table | user1   | heap
       faa    | d_delay_groups       | table | user1   | heap
       faa    | d_distance_groups    | table | user1   | heap
       faa    | d_wac                | table | user1   | heap
       faa    | faa_load_errors      | table | user1   | heap
       faa    | faa_otp_load         | table | gpadmin | heap
       faa    | otp_c                | table | gpadmin | append only columnar
       faa    | otp_c_1_prt_mth_1    | table | gpadmin | append only columnar
       faa    | otp_c_1_prt_mth_10   | table | gpadmin | append only columnar
       faa    | otp_c_1_prt_mth_11   | table | gpadmin | append only columnar
      ...
      (27 rows)
      

      The \dt output lists 27 tables when the flight data was correctly loaded.

    3. Examine the definition of the table named otp_c:

      tutorial=>  \d faa.otp_c
             Append-Only Columnar Table "faa.otp_c"
              Column        |       Type       | Modifiers 
      ----------------------+------------------+-----------
       flt_year             | smallint         | 
       flt_quarter          | smallint         | 
       flt_month            | smallint         | 
       flt_dayofmonth       | smallint         | 
       flt_dayofweek        | smallint         | 
       flightdate           | date             | 
       uniquecarrier        | text             | 
       airlineid            | integer          | 
       carrier              | text             | 
       flightnum            | text             | 
       origin               | text             | 
       origincityname       | text             | 
       originstate          | text             | 
       originstatename      | text             | 
       dest                 | text             | 
       destcityname         | text             | 
       deststate            | text             | 
       deststatename        | text             | 
       crsdeptime           | text             | 
       deptime              | integer          | 
       depdelay             | double precision | 
       depdelayminutes      | double precision | 
       departuredelaygroups | smallint         | 
       taxiout              | smallint         | 
       wheelsoff            | text             | 
       wheelson             | text             | 
       taxiin               | smallint         | 
       crsarrtime           | text             | 
       arrtime              | text             | 
       arrdelay             | double precision | 
       arrdelayminutes      | double precision | 
       arrivaldelaygroups   | smallint         | 
       cancelled            | smallint         | 
       cancellationcode     | text             | 
       diverted             | smallint         | 
       crselapsedtime       | integer          | 
       actualelapsedtime    | double precision | 
       airtime              | double precision | 
       flights              | smallint         | 
       distance             | double precision | 
       distancegroup        | smallint         | 
       carrierdelay         | smallint         | 
       weatherdelay         | smallint         | 
       nasdelay             | smallint         | 
       securitydelay        | smallint         | 
       lateaircraftdelay    | smallint         | 
      Checksum: t
      Number of child tables: 17 (Use \d+ to list them.)
      Distributed by: (uniquecarrier, flightnum)
      Partition by: (flightdate)
      

      The table named otp_c is a column-oriented, partitioned fact table.

Spark Operations

  1. Open a terminal window and log in to your Spark client node:

    $ ssh user@<spark-client>
    user@spark-client$ 
    
  2. Construct the JDBC connection string URL to access Greenplum Database. For example, this JDBC connection string accesses a database named tutorial using the Greenplum Database master host gpmaster.domain at the default connection port:

    jdbc:postgresql://gpmaster.domain/tutorial
    

    Save this URL string, you will use it in an upcoming step.

  3. Start pyspark. Replace <gsc-jar> with the full path to your Greenplum-Spark Connector JAR file:

    user@spark-client$ pyspark --jars <gsc-jar>
    < ... pyspark startup output messages ... >
    >>>
    

    You enter the pyspark interactive python shell.

  4. Prepare to read the otp_c table into Spark. In a text editor, construct a map of read options for the greenplum data source. You want to load the Greenplum Database table named otp_c in the schema named faa, specifying airlineid as the partition column. For example, if you are the user user2 with password pivotal:

    gscPythonOptions = {
        "url": "jdbc:postgresql://gpmaster.domain/tutorial",
        "user": "user2",
        "password": "pivotal",
        "dbschema": "faa",
        "dbtable": "otp_c",
        "partitionColumn": "airlineid"
    }
    
  5. Copy/paste the options map to your pyspark shell terminal window, and enter return to submit the command. For example:

    >>> gscPythonOptions = {
    ...         "url": "jdbc:postgresql://gpmaster.domain/tutorial",
    ...         "user": "user2",
    ...         "password": "pivotal",
    ...         "dbschema": "faa",
    ...         "dbtable": "otp_c",
    ...         "partitionColumn": "airlineid"
    ...     } <return>
    >>> 
    
  6. Load the data from the Greenplum Database table otp_c into a Spark DataFrame, providing the options that you constructed. For example:

    >>> gpdf = spark.read.format("greenplum").options(**gscPythonOptions).load()
    

    The Greenplum Database table is not actually loaded until you perform an action on the returned DataFrame.

  7. Print the schema of the Greenplum Database table:

    >>> gpdf.printSchema()
    
    root
     |-- flt_year: short (nullable = true)
     |-- flt_quarter: short (nullable = true)
     |-- flt_month: short (nullable = true)
     |-- flt_dayofmonth: short (nullable = true)
     |-- flt_dayofweek: short (nullable = true)
     |-- flightdate: date (nullable = true)
     |-- uniquecarrier: string (nullable = true)
     |-- airlineid: integer (nullable = true)
     |-- carrier: string (nullable = true)
     |-- flightnum: string (nullable = true)
     |-- origin: string (nullable = true)
     |-- origincityname: string (nullable = true)
     |-- originstate: string (nullable = true)
     |-- originstatename: string (nullable = true)
     |-- dest: string (nullable = true)
     |-- destcityname: string (nullable = true)
     |-- deststate: string (nullable = true)
     |-- deststatename: string (nullable = true)
     |-- crsdeptime: string (nullable = true)
     |-- deptime: integer (nullable = true)
     |-- depdelay: double (nullable = true)
     |-- depdelayminutes: double (nullable = true)
     |-- departuredelaygroups: short (nullable = true)
     |-- taxiout: short (nullable = true)
     |-- wheelsoff: string (nullable = true)
     |-- wheelson: string (nullable = true)
     |-- taxiin: short (nullable = true)
     |-- crsarrtime: string (nullable = true)
     |-- arrtime: string (nullable = true)
     |-- arrdelay: double (nullable = true)
     |-- arrdelayminutes: double (nullable = true)
     |-- arrivaldelaygroups: short (nullable = true)
     |-- cancelled: short (nullable = true)
     |-- cancellationcode: string (nullable = true)
     |-- diverted: short (nullable = true)
     |-- crselapsedtime: integer (nullable = true)
     |-- actualelapsedtime: double (nullable = true)
     |-- airtime: double (nullable = true)
     |-- flights: short (nullable = true)
     |-- distance: double (nullable = true)
     |-- distancegroup: short (nullable = true)
     |-- carrierdelay: short (nullable = true)
     |-- weatherdelay: short (nullable = true)
     |-- nasdelay: short (nullable = true)
     |-- securitydelay: short (nullable = true)
     |-- lateaircraftdelay: short (nullable = true)
    

    Compare this Spark output with that of the Greenplum Database \d faa.otp_c command that you invoked earlier. Note that the Greenplum Database data type names differ from those of Spark. For example, the distancegroup column is of Greenplum Database type smallint, while the Spark data type is short. For detailed information about how the Greenplum-Spark Connector maps data types between Greenplum Database and Spark, refer to the Greenplum Database -> Spark Data Type Mapping documentation.

  8. Use the .count() method to count the number of rows loaded:

    >>> gpdf.count()
    1024552
    
  9. Use the .select() and .filter() methods to show the origin city, month, and carrier of all flights cancelled in the month of December. Order the results by airline ID and origin city. Copy/paste the command:

    >>> gpdf.select("origincityname", "flt_month", "airlineid", "carrier") \
               .filter("cancelled = 1").filter("flt_month = 12") \
               .orderBy("airlineid", "origincityname") \
               .show()
    
    +---------------+---------+---------+-------+
    | origincityname|flt_month|airlineid|carrier|
    +---------------+---------+---------+-------+
    |    Detroit, MI|       12|    19386|     NW|
    |    Detroit, MI|       12|    19386|     NW|
    |  Milwaukee, WI|       12|    19386|     NW|
    |Minneapolis, MN|       12|    19386|     NW|
    |    Phoenix, AZ|       12|    19386|     NW|
    |    Houston, TX|       12|    19393|     WN|
    |    Houston, TX|       12|    19393|     WN|
    |  Las Vegas, NV|       12|    19393|     WN|
    |  Las Vegas, NV|       12|    19393|     WN|
    | Manchester, NH|       12|    19393|     WN|
    |      Omaha, NE|       12|    19393|     WN|
    |    Phoenix, AZ|       12|    19393|     WN|
    |   San Jose, CA|       12|    19393|     WN|
    |      Tampa, FL|       12|    19393|     WN|
    | Washington, DC|       12|    19393|     WN|
    |  Anchorage, AK|       12|    19704|     CO|
    |  Anchorage, AK|       12|    19704|     CO|
    |     Austin, TX|       12|    19704|     CO|
    |    Houston, TX|       12|    19704|     CO|
    |    Houston, TX|       12|    19704|     CO|
    +--------------------+---------+---------+-------+
    only showing top 20 rows
    
  10. Use the .groupBy(), .agg(), and avg() methods to identify the average departure delay for each day of the week, sorting by the day of the week with the .sort() method. The avg() method is located in the pyspark.sql.functions module; you must first import this module:

    >>> from pyspark.sql.functions import avg
    >>> gpdf.groupBy("flt_dayofweek").agg(avg("depdelayminutes")).sort("flt_dayofweek").show()
    
    +-------------+--------------------+
    |flt_dayofweek|avg(depdelayminutes)|
    +-------------+--------------------+
    |            1|  14.738491569779914|
    |            2|  11.237272024020244|
    |            3|  11.198198256252295|
    |            4|  12.056892575385985|
    |            5|  12.455024249521957|
    |            6|   12.69586361271813|
    |            7|  14.818271192603715|
    +-------------+--------------------+
    
  11. Use the like() method to display the cancelled flights for the month of December for all origin cities whose name starts with the letters Mi. Copy/paste the command:

    >>> gpdf.select("origincityname", "destcityname", "flightnum", "carrier", "airlineid", "flt_month") \
               .filter("cancelled = 1") \
               .filter("flt_month = 12") \
               .filter(gpdf.origincityname.like("Mi%")) \
               .orderBy("origincityname", "destcityname") \
               .show()
    
    +--------------------+--------------------+---------+-------+---------+---------+
    |      origincityname|        destcityname|flightnum|carrier|airlineid|flt_month|
    +--------------------+--------------------+---------+-------+---------+---------+
    |           Miami, FL|         Chicago, IL|      846|     AA|    19805|       12|
    |           Miami, FL|Greensboro/High P...|     4197|     MQ|    20398|       12|
    |           Miami, FL|      Washington, DC|     1068|     AA|    19805|       12|
    |       Milwaukee, WI|       Baltimore, MD|      817|     FL|    20437|       12|
    |       Milwaukee, WI|          Denver, CO|     5838|     OO|    20304|       12|
    |       Milwaukee, WI|         Memphis, TN|     3799|     9E|    20363|       12|
    |       Milwaukee, WI|     Minneapolis, MN|     7177|     NW|    19386|       12|
    |       Milwaukee, WI|          Newark, NJ|     2504|     OO|    20304|       12|
    |     Minneapolis, MN|         Atlanta, GA|     1073|     DL|    19790|       12|
    |     Minneapolis, MN|     Grand Forks, ND|     4003|     9E|    20363|       12|
    |     Minneapolis, MN|         Houston, TX|     2399|     XE|    20374|       12|
    |     Minneapolis, MN|         Lincoln, NE|     3798|     9E|    20363|       12|
    |     Minneapolis, MN|           Miami, FL|      925|     AA|    19805|       12|
    |     Minneapolis, MN|       Milwaukee, WI|     7165|     NW|    19386|       12|
    |     Minneapolis, MN|      Rapid City, SD|     3931|     9E|    20363|       12|
    |Mission/Mcallen/E...|         Memphis, TN|     4066|     9E|    20363|       12|
    |        Missoula, MT|  Salt Lake City, UT|     4461|     OO|    20304|       12|
    +--------------------+--------------------+---------+-------+---------+---------+
    
  12. Exit the pyspark shell:

    >>> quit()
    

Procedure 2: Write from Spark to Greenplum Database

Perform the following procedure to write Spark data that you transformed in the previous procedure into a new Greenplum Database table.

Note: This procedure assumes that you have completed Procedure 1 of this example and have retained the example runtime environment.

Greenplum Database Operations

  1. Locate your Greenplum Database terminal window.

  2. Assign the Greenplum privileges required to write to a Greenplum Database table. For example, if your Greenplum Database user name is user2, the Greenplum Database administrator would execute the commands:

    gpadmin@gpmaster$ psql -d tutorial
    
    tutorial=# ALTER USER user2 CREATEEXTTABLE(type = 'readable', protocol = 'gpfdist');
    
  3. You will return to the Greenplum terminal window at the end of this procedure.

Spark Operations

  1. Identify the average departure delay for each day of the week with the statement you specified in Procedure 1, this time saving, rather than displaying, the DataFrame. Assign the data to a variable named delaydf:

    >>> delaydf = gpdf.groupBy("flt_dayofweek").agg(avg("depdelayminutes")).sort("flt_dayofweek")
    
  2. Copy/paste the following options map setting to your pyspark shell terminal window, and enter return to submit the command. For example:

    gscPythonWriteOptions = {
        "url": "jdbc:postgresql://gpmaster.domain/tutorial",
        "user": "user2",
        "password": "pivotal",
        "dbschema": "faa",
        "dbtable": "avgdelay"
    }
    
  3. Write delaydf to the Greenplum Database table named avgdelay. If the table does not exist, the Greenplum-Spark Connector will create the table for you before loading the data. Specify the Append SaveMode to instruct the Connector to append the data to the table if it already exists.

    >>> delaydf.write.format("greenplum").options(**gscPythonWriteOptions).mode("Append").save()
    
  4. Exit the pyspark shell:

    >>> quit()
    

Greenplum Database Operations

  1. Run psql and connect to Greenplum Database as user user2:

    gpadmin@gpmaster$ psql -d tutorial -U user2
    
  2. Examine the schema of Greenplum Database table avgdelay:

    tutorial=>  \d+ faa.avgdelay
                                Table "faa.avgdelay"
            Column        |       Type       | Modifiers | Storage | Description
    ----------------------+------------------+-----------+---------+-------------
     flt_dayofweek        | smallint         |           | plain   |
     avg(depdelayminutes) | double precision |           | plain   |
    Has OIDs: no
    Distributed by: (flt_dayofweek)
    
  3. Examine the table contents:

    tutorial=> SELECT * FROM faa.avgdelay ORDER BY flt_dayofweek;
     flt_dayofweek | avg(depdelayminutes)
    ---------------+----------------------
                 1 |     14.7384915697799
                 2 |     11.2372720240202
                 3 |     11.1981982562523
                 4 |      12.056892575386
                 5 |      12.455024249522
                 6 |     12.6958636127181
                 7 |     14.8182711926037
    (7 rows)
    

    The table contents are slightly different than that displayed for the DataFrame in the pyspark shell. The Greenplum Database double precision data type holds 15 digits, while Spark utilizes 17 digits.