ds4all_seminar_Fall_2021_Results(Python)

Loading...

This Notebook

This notebook was created to help introduce students to the wonderful world of data science using Spark and Jupyter notebooks. The target audience are students who have no background in data science, programming, distributed computing, Python, or Apache Spark.

This notebook is designed to run on a Databricks Community account (our thanks to Databricks for making community accounts available!).

This notebook was created as part of the Data Science for All Seminar Series funded by NSF Grant #1829622
PI: Leslie Albert, Ph.D., Co-PIs: Scott Jensen, Ph.D. and Esperanza Huerta, Ph.D.

Copyright Scott Jensen, San Jose State University

Creative Commons License
This ds4all notebook by Scott Jensen,Ph.D. is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.

Part 1: The Data for This Workshop

In this workshop we will be using data from the USASpending.gov website. We will be looking at U.S. government contracts from 2014 through 2018. This period spans two different U.S. administrations, under two different presidents, from different political parties. The active members of these two parties have very different perspectives on governing and spending priorities. Are these differences reflected in the contracting being done? Do different agencies spend more? Are contract lengths different? Are the vendors for contracts different? Are the states where contracts are awarded different? Possibly, or it could be that the administration of contracts by career government employees does not change that much from administration to administration.

The USASpending site grew out of a government initiative on transparency as to how the government is spending money. Details regarding the application programming interface (API) for this data can be found here. That page includes a brief tutorial, and the Endpoints link on that page shows a list of the methods available to download data.

Some summary data can be downloaded directly in a JSON format and we will briefly cover an example at the end of this notebook. JSON has become the language of the web, so if you are downloading data from social media sites (e.g., Twitter, Yelp, etc.) or from other sources, JSON is a common format. Government and financial data often use an XML format, and scientific data usually has different formats.

For other larger data sets on the USASpending site, you can send an asynchronous request and your data is later available for download in a comma-separated value (CSV) format.

The APIs at the USASpending website will allow you to download data about contracts awarded, assistance (grants and loans) awarded, spending on those contracts and awards, federal accounts, and account balances.

Downloading the contracts data

For purposes of this seminar, we are going to start with data on U.S. government spending (federal) on contracts and who was being paid in each of the quarters mentioned above. Due to some quirks in the API (there is a 1GB limit that is silently imposed), we downloaded these by month, filtered the columns we wanted, combined the months by year, compressed the data, and staged the data files on the web so you can access them directly from your notebook (so we don't need to move large files over the Wi-Fi). Storing your data on S3 is relatively cheap - it costs pennies per GB per month.

Keep in mind that Spark can read compressed data files. As an example, the government spending data we will be using today compresses by a factor of 10 as a bzip2 file compared to the raw data, so the storage cost drops by a factor of 10.

For this workshop we filtered out some of the columns. The original contract data contained 250 columns and this was reduced to the 74 columns relevant to the questions we will ask of the data today (or other questions you may want to ask later). The actual downloading, filtering, compressing, etc. was done using a separate Jupyter notebook that does not require Spark (you can run that from Jupyter as part of Anaconda's free desktop distro). In that notebook we specify which columns of data we want, so you can specify different columns. If you want to download the data yourself or explore different columns in the USASpending data, see the "Tell Me More!" module in Canvas for this seminar.

Spinning Up a Cluster

To be able to calculate any cells in your notebook, you will need to be attached to a cluster. From the toolbar on the left-hand side, click on the Compute icon. You won't have any clusters to start with, so click on the +Create Cluster button.

There are two fields you need to specify:

  • You need to enter a name for your cluster
  • There is a DataBricks Runtime (DBR) that is the default. From the drop-down list for the DBR, select version 9.0 or higher, but do not select the "ML" version. If you select a version lower than 9.0, the data entry widget you will be creating in the next cell will not appear (and that's not fun). The drop-down list is shown in the image below.

    After entering a name for your cluster and specifying the DBR version, click the "Create Cluster" button. Initially you will see a spinning green circle next to the name of your cluster. That indicates that the cluster is starting (what's also referred to as "spinning up"). Once your cluster is up and running (it will have a solid green circle next to it), come back here to your notebook and from the drop-down list in the top-left corner that says "Detached", select your running cluster (listed with the solid green circle next to it). Your notebook will now be attached to your running cluster and can execute code cells, so let's get started!

Creating The Input Widget

When you run the following cell, Databricks will display a widget at the top of the notebook that prompts you for the URL for a data manifest. The URL is the location on the web where you can find the file, and the URL for the seminar is provided in Canvas. The data manifest lists the files you will need and tells the notebook where the files are located and some information that the notebook will use to verify that the files were retrieved without any problems.

To run the following cell. hover your mouse over the cell and some controls will appear in the upper right-hand corner of the cell as shown in the image below.

. Select the Run cell option.

The following cell is Step 1a

dbutils.widgets.text(name='manifest_url',defaultValue='xxxxxxxxxxxxxxx',label='Enter the manifest URL:')

Got Data? Downloading the data for 2014 - 2018

To analyze trends across the 5-year span from 2014 - 2018 (inclusive), we will load files in a format known as Parquet which is a popular data file format across Big Data tools and platforms. We can then create a table in Spark - it does not change our Parquet files, but stores metadata about the files in the Hive Metastore. If the table already exists, we will load a DataFrame (in memory) from the table. If it does not exist, we will first load the Parquet files in your account from the web (about 3-4 minutes).

We will be working with almost 20 million rows of data, so to download it quickly, we have have zipped up and staged the files out on Amazon Web Service (AWS), so the download will be transfering data from our account on AWS to your Databricks account, which is also hosted on AWS, without ever going through your laptop or over the local network. Since we may update the data in the future, we first download a file from our website that provides a manifest of the files to download and the hashed values of those files so we can check that they were accurately downloaded.

If the code in the following cell seems daunting, don't worry. If you wanted to use your Databricks account and Spark to do some analysis for a class project, you can load data easily through the GUI interface available from the Data option on the toolbar. The GUI interface will allow you to upload files up to 2GB in size (subject to the speed limitations of your Internet provider).

The following cell is Step 1b

import pyspark.sql.functions as f
import requests
import re
import tarfile
import zipfile
import json
import hashlib
 
FORCE_DOWNLOAD = False
 
MANIFEST_URL = 'https://www.sjsu.edu/datascienceforall/seminars/spark-jupyter-manifest.json'
URL_HOST = ".s3-us-west-2.amazonaws.com/"
TEMP_DIR = "/ds4all/"
ZIPPED_DIR = "/ds4all/zipped/"
UNZIPPED_DIR = "/ds4all/unzipped/"
DATA_DIR = "/user/hive/warehouse/usa_transactions_table"
DB_NAME = "usa_transactions_table"
 
 
 
def cleanAll():
  ' Removes the database directory if it exists on DBFS'
  spark.sql(f"DROP TABLE IF EXISTS {DB_NAME}")
  try:
    dbutils.fs.rm(DATA_DIR,recurse=True)
  except Exception:
    pass
 
  
def get_manifest():
  ''' Returns the dictionary that's the download manifest based on the URL
      entered in the URL widget.
      If it's not a valid URL or returns a status code other than 200, an exception is raised.
      If the manifest is not valid JSON, or does not contain name:value pairs named
      id, data_list, and download_list, an exception is raised.  
  ''' 
  manifest_url = dbutils.widgets.get("manifest_url")
  response = requests.get(manifest_url)
  if response.status_code != 200:
    raise Exception(f"The manifest URL {manifest_url} returned a status of " + str(response.status))
  manifest = response.text
  try:
    manifest_dict = json.loads(manifest)
    # The manifest should have a data_list element and a down_Load list element
    if "data_list" in manifest_dict == False:
      raise Exception("The manifest does not contain a data_list.")
    if "download_list" in manifest_dict == False:
      raise Exception("The manifest does not contain a download_list.")
    return(manifest_dict)
  except json.JSONDecodeError as err:
    raise Exception("The manifest is not a valid JSON document.", err)
  
  
def check_data(manifest):
  ''' Function used to check if the data directory contains valid
      copies of all of the files or directories in the download.
      The manifest dictionary is passed as a parameter and is expected 
      to comtain a data_list containing the names of each file or 
      directory expected in the data directory. If a file or directory 
      is missing, this method returns False. If all of the files or 
      directories exist, it returns True.
  '''
  try:
    data_dir_list = dbutils.fs.ls(DATA_DIR)
    if len(db_dir_list) == 0:
      return(False)
      file_list = manifest["file_list"]
      existing_list = dbutils.fs.ls(DATA_DIR)
      for file_name in file_list:
        found == False
        for info in existing_list:
          if info.name == file_name:
            found == True
            break
        if found == False:
          return(False)
      # looped through all of the required files and they are there
      return(True)
  except Exception:
    # The directory does not exist or does not match the manifest
    return(False)
  
  
def get_bucket_id(manifest):
  ''' The manifest is expected to contain a name:value pair named id
      where the value is the bucket name on S3 where the files are
      staged.  If the id is missing or is a blanks string, then an
      exception is raised, otherwise the bucket id is returned.
  '''
  try:
    bucket = manifest['id'].strip()
    if len(bucket) == 0:
      raise Exception("The id provided in the manifest was an empty string, but should be the name of the bucket being downloaded from.")
    else:
      return(bucket)
  except Exception as e:
    raise Exception("An error occurred in retrieving the bucket id from the manifest", e)
 
    
def download_file(manifest_item, bucket_id):
  ''' Given a dictionary from the download list, download the file to the
      temporary directory for downloading the file and check the
      MD5 sum to make sure it matches.
      If the MD5 sum does not match, an excetion is raised, otherwise it prints
      that the file was successfully downloaded.
  '''
  file_name = manifest_item["name"]
  item_md5sum = manifest_item["md5"]
  request_url = "https://" + bucket_id + URL_HOST + file_name
  local_name = ZIPPED_DIR + file_name 
  print("requesting file from:", request_url)
  r = requests.get(request_url, stream=True)
  status_code = r.status_code
  # If the status code is 200, then we successfully retrieved the file
  if status_code != 200:
    raise Exception(f"The {file_name} download failed. A status code of {str(status_code)} was returned from the URL:{request_url}.")
  else: # write the file 
    with open(local_name, 'wb') as file:
      for chunk in r.iter_content(chunk_size=4096):
        file.write(chunk)
        file.flush()
    file.close()
  #check if the hash of the file downloaded matches the md5 sum in the manifest
  with open(local_name, 'rb') as data_file:
    md5sum = hashlib.md5( data_file.read() ).hexdigest()
    if md5sum.lower() != item_md5sum.lower():
      raise Exception(f"The file {file_name} downloaded from Google Drive generated a MD5 sum of {md5sum} instead of the MD5 sum in the manifest ({item_md5sum}) so it may be corrupted and the processing was terminated.")
    else:
      print ("successfully downloaded:", file_name)
    
    
def process_file(manifest_item):
    ''' The file is now downloaded.  If the file is zipped,
        it first needs to be unziiped, and either way, moved
        to the DBFS data directory.
    '''
    local_name = ZIPPED_DIR + manifest_item["name"]
    local_path = "file:" + local_name
    is_zipped = manifest_item["zipped"] == "true" # This is either Ture or False
    if is_zipped:
      with tarfile.open(name=local_name,mode='r:bz2') as tar_ref:
        tar_ref.extractall(path=UNZIPPED_DIR)
      untar_info = dbutils.fs.ls("file:" + UNZIPPED_DIR)
      # The zip file could contain a directory, a file, or more than 1 file,
      # so we loop through the file list, moving all of them to DBFS
      for info in untar_info:
        destination = DATA_DIR + "/" + info.name
        dbutils.fs.mv(info.path, destination, recurse=True)  
      dbutils.fs.rm(local_path)
    else: # file was not zipped (or should remain zipped), so just move it
        destination = DATA_DIR + "/" + manifest_item["name"]
        dbutils.fs.mv(local_path, destination)  
    print ("processed:", local_name)
    
 
def load_data(manifest_list, bucket_id):
  ''' Loops through the files in the download list from the manifest and 
      downloads the file, verifies the MD5 sum is correct, unzips it if needed,  
      and moves the file or folder that was in it to the data directory.'''
  # Create the empty temporary directories
  try:
    dbutils.fs.rm("file:" + TEMP_DIR,recurse=True)
  except Exception:
    pass
  # Create the temporary local directory and sub-directories
  dbutils.fs.mkdirs("file:" + TEMP_DIR)
  dbutils.fs.mkdirs("file:" + ZIPPED_DIR)
  dbutils.fs.mkdirs("file:" + UNZIPPED_DIR)
  # Loop through the files to download
  for item in manifest_list:
    download_file(item, bucket_id)
    process_file(item)
  # Remove the temp directory used to unzip the files
  dbutils.fs.rm("file:" + TEMP_DIR, recurse=True) 
  
 
def createTable():
  spark.sql(f"DROP TABLE IF EXISTS {DB_NAME}")
  spark.sql(f"""
    CREATE TABLE {DB_NAME} 
    USING PARQUET 
    LOCATION '{DATA_DIR}' 
  """)
  spark.sql(f"MSCK REPAIR TABLE {DB_NAME}")
  
  
# *******************************************  
# Run the Actual Routine to Load the Data
# This code uses the above defined functions
# *******************************************
if FORCE_DOWNLOAD == True:
  cleanAll()
if ( spark.catalog._jcatalog.tableExists(DB_NAME) != True):
  manifest_dict = get_manifest()
  if check_data(manifest_dict) == False:
    cleanAll()
    bucket_id = get_bucket_id(manifest_dict)
    download_list = manifest_dict["download_list"]
    load_data(download_list, bucket_id)
  else:
    print("All of the required files exist in the data directory already, so the download was not processed.")
  # The table definition is deleted when the cluster shuts down, but the files remain.
  # The call to createTable will create the able if we just loaded the files or if they were already there
  createTable()
else:
  print("The table already exists.  If you want to recreate it, set the FORCE_DOWNLOAD constant to true and re-run this cell.")
  
requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.1.bz2 successfully downloaded: file.1.bz2 processed: /ds4all/zipped/file.1.bz2 requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.2.bz2 successfully downloaded: file.2.bz2 processed: /ds4all/zipped/file.2.bz2 requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.3.bz2 successfully downloaded: file.3.bz2 processed: /ds4all/zipped/file.3.bz2 requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.4.bz2 successfully downloaded: file.4.bz2 processed: /ds4all/zipped/file.4.bz2 requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.5.bz2 successfully downloaded: file.5.bz2 processed: /ds4all/zipped/file.5.bz2 requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.6.bz2 successfully downloaded: file.6.bz2 processed: /ds4all/zipped/file.6.bz2

Some initial profiling of the data: creating a DataFrame for 2014 - 2018

We need to know what we loaded, so we are going to:

  • Create a DataFrame
  • show the schema

First we will use Spark SQL to write a query of our table. Every SQL query requires a SELECT and a FROM clause:

  • SELECT just says what columns we want from the data. Think of a spreadsheet - you would just be giving a comma-seperated list of the column names you want.
  • FROM provides the name of the temporary view, or in our case we created a table so we provide the name of the table. Using the example of a spreadheet, the FROM clause would be like saying which tab your data is on.

Since every Spark SQL query generates a new DataFrame, if we want to create a DataFrame for our data, we can just select all of the columns in our data. Since there are 29 columns in our data, that may seem sort of tedious, but SQL has a shorthand way of saying we want all of the columns - we can just use an asterisk * to indicate we want all of the columns. The asterisk is often referred to as a "wildcard".

We will run Spark's printSchema() method on the resulting DataFrame to show us the schema for the data. This will show us the names of the fields in the DataFrame and the data type inferred by Spark when loading the data. It made a guess as to the data type when we first loaded the data from CSV files (before creating the table).

Since we originally loaded the data from a CSV file with headers, determining the column names may seem trivial, but Spark can also do this if we import semi-structured data such as the JSON format that's the language of data on the web. Unlike our CSV file where each column/row intersection is a single value, JSON can contain nested structures and Spark will make a pass through the entire file to determine the schema.

The following cell is Step 1c

df_transactions = spark.sql("""
 
 
""")
 
df_transactions.show(2,truncate=False, vertical=True)
-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------- award_id_piid | N6883615F0074 federal_action_obligation | 3445.66 action_date | 2015-01-31 awarding_agency_code | 097 awarding_agency_name | DEPARTMENT OF DEFENSE (DOD) recipient_state_code | FL recipient_state_name | FLORIDA type_of_contract_pricing_code | J type_of_contract_pricing | FIRM FIXED PRICE award_description | IGF::OT::IGF FARO CAM2 MEASURE TRAINING SESSION product_or_service_code | J019 product_or_service_code_description | MAINT/REPAIR/REBUILD OF EQUIPMENT- SHIPS, SMALL CRAFT, PONTOONS, AND FLOATING DOCKS naics_code | 334513 naics_description | INSTRUMENTS AND RELATED PRODUCTS MANUFACTURING FOR MEASURING, DISPLAYING, AND CONTROLLING INDUSTRIAL PROCESS VARIABLES multi_year_contract_code | N multi_year_contract | NO veteran_owned_business | f woman_owned_business | f minority_owned_business | f subcontinent_asian_asian_indian_american_owned_business | f asian_pacific_american_owned_business | f black_american_owned_business | f hispanic_american_owned_business | f native_american_owned_business | f other_minority_owned_business | f data_year | 2015 action_month | 1 action_dom | 31 action_year | 2015 -RECORD 1------------------------------------------------------------------------------------------------------------------------------------------------------------------------- award_id_piid | 47XM federal_action_obligation | 514.95 action_date | 2015-03-15 awarding_agency_code | 097 awarding_agency_name | DEPARTMENT OF DEFENSE (DOD) recipient_state_code | NC recipient_state_name | NORTH CAROLINA type_of_contract_pricing_code | J type_of_contract_pricing | FIRM FIXED PRICE award_description | 8501907413!LIGHT, CHEMILUMINESCENT product_or_service_code | 6260 product_or_service_code_description | NONELECTRICAL LIGHTING FIXTURES naics_code | 335129 naics_description | OTHER LIGHTING EQUIPMENT MANUFACTURING multi_year_contract_code | N multi_year_contract | NO veteran_owned_business | f woman_owned_business | f minority_owned_business | f subcontinent_asian_asian_indian_american_owned_business | f asian_pacific_american_owned_business | f black_american_owned_business | f hispanic_american_owned_business | f native_american_owned_business | f other_minority_owned_business | f data_year | 2015 action_month | 3 action_dom | 15 action_year | 2015 only showing top 2 rows

What are the fields in our data?

We told you there wer 29 fields, but what if this were a dataset you downlaoded and it did not have a lot of metadata?

What is Metadata?
Metadata is "data about data" and tells us things like the type of data - is it text or numeric? Also, the size of a field (such as how long a name or address can be). This information is known as the schema for our DataFrame.

Since we started with a table, we could click on our table in the Data tab in the toolbar on the lefthand side and see the schema and some sample data, but we want an approach that works with any DataFrame, so we will call the printSchema() method on our DataFrame we generated from the table.

The following cell is Step 1d

df_transactions.printSchema()
root |-- award_id_piid: string (nullable = true) |-- federal_action_obligation: string (nullable = true) |-- action_date: string (nullable = true) |-- awarding_agency_code: string (nullable = true) |-- awarding_agency_name: string (nullable = true) |-- recipient_state_code: string (nullable = true) |-- recipient_state_name: string (nullable = true) |-- type_of_contract_pricing_code: string (nullable = true) |-- type_of_contract_pricing: string (nullable = true) |-- award_description: string (nullable = true) |-- product_or_service_code: string (nullable = true) |-- product_or_service_code_description: string (nullable = true) |-- naics_code: string (nullable = true) |-- naics_description: string (nullable = true) |-- multi_year_contract_code: string (nullable = true) |-- multi_year_contract: string (nullable = true) |-- veteran_owned_business: string (nullable = true) |-- woman_owned_business: string (nullable = true) |-- minority_owned_business: string (nullable = true) |-- subcontinent_asian_asian_indian_american_owned_business: string (nullable = true) |-- asian_pacific_american_owned_business: string (nullable = true) |-- black_american_owned_business: string (nullable = true) |-- hispanic_american_owned_business: string (nullable = true) |-- native_american_owned_business: string (nullable = true) |-- other_minority_owned_business: string (nullable = true) |-- data_year: string (nullable = true) |-- action_month: integer (nullable = true) |-- action_dom: integer (nullable = true) |-- action_year: integer (nullable = true)

How Many Transaction Records do We Have?

We told you we had about 20 million rows, but if you had to dfigure it out, how would you do that? Write a query!

In the next cell, write a query to get a count of the records. We know we need a SELECT and FROM clause, so start with that.

For the column, we really only want one column - that's the count of how many rows there are in the data.

Here we can use what are known as aggregate functions, these llow us to get a count, sum min, max, avg, and other aggregate values. Here we will use COUNT, and for what we want to count, we just want to count rows, so we can use our wildcard field again.

The following cell is Step 1e

spark.sql("""
 
 
""").show()
+------------+ |transactions| +------------+ | 19833810| +------------+

Data Wrangling: Learning About and Cleaning Our Data

Before we start a project, we need to understand our data.

  • Is our data good?
    • What number of records do we have for each year? Is it fairly consistent?
    • The agency ID we need to look for is the "awarding_agency_code". But are the agency codes the same every year?
    • Do we have errors? are there numbers where there should be text? Is there text where there should be numbers?
    • We would want to ask a lot of questions to see if the data has errors
  • Are there missing values?
    • Are there contracts with no amounts? Is that valid? What does it mean?

We already did some profiling and transformations to the transactions you loaded

  • We looked at the agency codes - they were similar across the years
  • We looked for missing amounts - some but not a huge problem
  • We looked at differences in agency names or codes between years, but the $$$ amounts were small
  • When loading the above data, we filtered out some of the rows where there were problematic records, so our data is cleaner.

Part 2: Let's Try Some Wrangling

We have run an aggregate to get a COUNT of all of the rows in the data, but what if we wanted to get a count for each of the 5 years? We can use COUNT again, but now we will add an optional GROUP BY clause. Have you ever seen a friend eat a bag of M&M candies, but first they sorted them into piles by color? Possibly they are a data scientist at heart. In that situation, the M&Ms were their data and they did the following SQL:

GROUP BY color

We can do the same to group the data based on a field named action_year(but unlike your friend with the M&Ms, we won't eat our data).

Since years come in a specific order (time always seems to move forward, even if theoretical physicists say it does not have to). For that reason, we are going to also include an optional ORDER BY clause, but that has to be at the end of our query since it applies to the result of our query.

We are going to aggregate our data by year and ask the following question:

  • Is the number of transactions similar each year?

The following cell is Step 2a

df_trans_summary = spark.sql("""
 
 
 
""")
 
df_trans_summary.show()
+-----------+-----------+ |action_year|trans_count| +-----------+-----------+ | 2014| 2527125| | 2015| 3760139| | 2016| 4244542| | 2017| 4468861| | 2018| 4833137| +-----------+-----------+

Cleaning up the NULLS

In the initial query there are six transactions that don't have a date.

Those that don't have a date actually don't really have any value - they are NULL. This is not the same as zero. When a field is NULL, it cannot be compared to anything (even NULL), because it has no value. NULL is the absence of value (please don't call someone a NULL, it's not nice).

However, we can check if the value is NULL.

Go back up to the prior cell and add a WHERE clause after the FROM clause. Keep in mind that the WHERE clause filters for the rows we want. This means we want the rows where the action_year is not null.