import pyspark.sql.functions as f
import requests
import re
import tarfile
import zipfile
import json
import hashlib
FORCE_DOWNLOAD = False
MANIFEST_URL = 'https://www.sjsu.edu/datascienceforall/seminars/spark-jupyter-manifest.json'
URL_HOST = ".s3-us-west-2.amazonaws.com/"
TEMP_DIR = "/ds4all/"
ZIPPED_DIR = "/ds4all/zipped/"
UNZIPPED_DIR = "/ds4all/unzipped/"
DATA_DIR = "/user/hive/warehouse/usa_transactions_table"
DB_NAME = "usa_transactions_table"
def cleanAll():
' Removes the database directory if it exists on DBFS'
spark.sql(f"DROP TABLE IF EXISTS {DB_NAME}")
try:
dbutils.fs.rm(DATA_DIR,recurse=True)
except Exception:
pass
def get_manifest():
''' Returns the dictionary that's the download manifest based on the URL
entered in the URL widget.
If it's not a valid URL or returns a status code other than 200, an exception is raised.
If the manifest is not valid JSON, or does not contain name:value pairs named
id, data_list, and download_list, an exception is raised.
'''
manifest_url = dbutils.widgets.get("manifest_url")
response = requests.get(manifest_url)
if response.status_code != 200:
raise Exception(f"The manifest URL {manifest_url} returned a status of " + str(response.status))
manifest = response.text
try:
manifest_dict = json.loads(manifest)
# The manifest should have a data_list element and a down_Load list element
if "data_list" in manifest_dict == False:
raise Exception("The manifest does not contain a data_list.")
if "download_list" in manifest_dict == False:
raise Exception("The manifest does not contain a download_list.")
return(manifest_dict)
except json.JSONDecodeError as err:
raise Exception("The manifest is not a valid JSON document.", err)
def check_data(manifest):
''' Function used to check if the data directory contains valid
copies of all of the files or directories in the download.
The manifest dictionary is passed as a parameter and is expected
to comtain a data_list containing the names of each file or
directory expected in the data directory. If a file or directory
is missing, this method returns False. If all of the files or
directories exist, it returns True.
'''
try:
data_dir_list = dbutils.fs.ls(DATA_DIR)
if len(db_dir_list) == 0:
return(False)
file_list = manifest["file_list"]
existing_list = dbutils.fs.ls(DATA_DIR)
for file_name in file_list:
found == False
for info in existing_list:
if info.name == file_name:
found == True
break
if found == False:
return(False)
# looped through all of the required files and they are there
return(True)
except Exception:
# The directory does not exist or does not match the manifest
return(False)
def get_bucket_id(manifest):
''' The manifest is expected to contain a name:value pair named id
where the value is the bucket name on S3 where the files are
staged. If the id is missing or is a blanks string, then an
exception is raised, otherwise the bucket id is returned.
'''
try:
bucket = manifest['id'].strip()
if len(bucket) == 0:
raise Exception("The id provided in the manifest was an empty string, but should be the name of the bucket being downloaded from.")
else:
return(bucket)
except Exception as e:
raise Exception("An error occurred in retrieving the bucket id from the manifest", e)
def download_file(manifest_item, bucket_id):
''' Given a dictionary from the download list, download the file to the
temporary directory for downloading the file and check the
MD5 sum to make sure it matches.
If the MD5 sum does not match, an excetion is raised, otherwise it prints
that the file was successfully downloaded.
'''
file_name = manifest_item["name"]
item_md5sum = manifest_item["md5"]
request_url = "https://" + bucket_id + URL_HOST + file_name
local_name = ZIPPED_DIR + file_name
print("requesting file from:", request_url)
r = requests.get(request_url, stream=True)
status_code = r.status_code
# If the status code is 200, then we successfully retrieved the file
if status_code != 200:
raise Exception(f"The {file_name} download failed. A status code of {str(status_code)} was returned from the URL:{request_url}.")
else: # write the file
with open(local_name, 'wb') as file:
for chunk in r.iter_content(chunk_size=4096):
file.write(chunk)
file.flush()
file.close()
#check if the hash of the file downloaded matches the md5 sum in the manifest
with open(local_name, 'rb') as data_file:
md5sum = hashlib.md5( data_file.read() ).hexdigest()
if md5sum.lower() != item_md5sum.lower():
raise Exception(f"The file {file_name} downloaded from Google Drive generated a MD5 sum of {md5sum} instead of the MD5 sum in the manifest ({item_md5sum}) so it may be corrupted and the processing was terminated.")
else:
print ("successfully downloaded:", file_name)
def process_file(manifest_item):
''' The file is now downloaded. If the file is zipped,
it first needs to be unziiped, and either way, moved
to the DBFS data directory.
'''
local_name = ZIPPED_DIR + manifest_item["name"]
local_path = "file:" + local_name
is_zipped = manifest_item["zipped"] == "true" # This is either Ture or False
if is_zipped:
with tarfile.open(name=local_name,mode='r:bz2') as tar_ref:
tar_ref.extractall(path=UNZIPPED_DIR)
untar_info = dbutils.fs.ls("file:" + UNZIPPED_DIR)
# The zip file could contain a directory, a file, or more than 1 file,
# so we loop through the file list, moving all of them to DBFS
for info in untar_info:
destination = DATA_DIR + "/" + info.name
dbutils.fs.mv(info.path, destination, recurse=True)
dbutils.fs.rm(local_path)
else: # file was not zipped (or should remain zipped), so just move it
destination = DATA_DIR + "/" + manifest_item["name"]
dbutils.fs.mv(local_path, destination)
print ("processed:", local_name)
def load_data(manifest_list, bucket_id):
''' Loops through the files in the download list from the manifest and
downloads the file, verifies the MD5 sum is correct, unzips it if needed,
and moves the file or folder that was in it to the data directory.'''
# Create the empty temporary directories
try:
dbutils.fs.rm("file:" + TEMP_DIR,recurse=True)
except Exception:
pass
# Create the temporary local directory and sub-directories
dbutils.fs.mkdirs("file:" + TEMP_DIR)
dbutils.fs.mkdirs("file:" + ZIPPED_DIR)
dbutils.fs.mkdirs("file:" + UNZIPPED_DIR)
# Loop through the files to download
for item in manifest_list:
download_file(item, bucket_id)
process_file(item)
# Remove the temp directory used to unzip the files
dbutils.fs.rm("file:" + TEMP_DIR, recurse=True)
def createTable():
spark.sql(f"DROP TABLE IF EXISTS {DB_NAME}")
spark.sql(f"""
CREATE TABLE {DB_NAME}
USING PARQUET
LOCATION '{DATA_DIR}'
""")
spark.sql(f"MSCK REPAIR TABLE {DB_NAME}")
# *******************************************
# Run the Actual Routine to Load the Data
# This code uses the above defined functions
# *******************************************
if FORCE_DOWNLOAD == True:
cleanAll()
if ( spark.catalog._jcatalog.tableExists(DB_NAME) != True):
manifest_dict = get_manifest()
if check_data(manifest_dict) == False:
cleanAll()
bucket_id = get_bucket_id(manifest_dict)
download_list = manifest_dict["download_list"]
load_data(download_list, bucket_id)
else:
print("All of the required files exist in the data directory already, so the download was not processed.")
# The table definition is deleted when the cluster shuts down, but the files remain.
# The call to createTable will create the able if we just loaded the files or if they were already there
createTable()
else:
print("The table already exists. If you want to recreate it, set the FORCE_DOWNLOAD constant to true and re-run this cell.")
requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.1.bz2
successfully downloaded: file.1.bz2
processed: /ds4all/zipped/file.1.bz2
requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.2.bz2
successfully downloaded: file.2.bz2
processed: /ds4all/zipped/file.2.bz2
requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.3.bz2
successfully downloaded: file.3.bz2
processed: /ds4all/zipped/file.3.bz2
requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.4.bz2
successfully downloaded: file.4.bz2
processed: /ds4all/zipped/file.4.bz2
requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.5.bz2
successfully downloaded: file.5.bz2
processed: /ds4all/zipped/file.5.bz2
requesting file from: https://6997ffe1-3201-496e-a376-d9830f394f9f.s3-us-west-2.amazonaws.com/file.6.bz2
successfully downloaded: file.6.bz2
processed: /ds4all/zipped/file.6.bz2
df_transactions = spark.sql("""
""")
df_transactions.show(2,truncate=False, vertical=True)
-RECORD 0-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
award_id_piid | N6883615F0074
federal_action_obligation | 3445.66
action_date | 2015-01-31
awarding_agency_code | 097
awarding_agency_name | DEPARTMENT OF DEFENSE (DOD)
recipient_state_code | FL
recipient_state_name | FLORIDA
type_of_contract_pricing_code | J
type_of_contract_pricing | FIRM FIXED PRICE
award_description | IGF::OT::IGF FARO CAM2 MEASURE TRAINING SESSION
product_or_service_code | J019
product_or_service_code_description | MAINT/REPAIR/REBUILD OF EQUIPMENT- SHIPS, SMALL CRAFT, PONTOONS, AND FLOATING DOCKS
naics_code | 334513
naics_description | INSTRUMENTS AND RELATED PRODUCTS MANUFACTURING FOR MEASURING, DISPLAYING, AND CONTROLLING INDUSTRIAL PROCESS VARIABLES
multi_year_contract_code | N
multi_year_contract | NO
veteran_owned_business | f
woman_owned_business | f
minority_owned_business | f
subcontinent_asian_asian_indian_american_owned_business | f
asian_pacific_american_owned_business | f
black_american_owned_business | f
hispanic_american_owned_business | f
native_american_owned_business | f
other_minority_owned_business | f
data_year | 2015
action_month | 1
action_dom | 31
action_year | 2015
-RECORD 1-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
award_id_piid | 47XM
federal_action_obligation | 514.95
action_date | 2015-03-15
awarding_agency_code | 097
awarding_agency_name | DEPARTMENT OF DEFENSE (DOD)
recipient_state_code | NC
recipient_state_name | NORTH CAROLINA
type_of_contract_pricing_code | J
type_of_contract_pricing | FIRM FIXED PRICE
award_description | 8501907413!LIGHT, CHEMILUMINESCENT
product_or_service_code | 6260
product_or_service_code_description | NONELECTRICAL LIGHTING FIXTURES
naics_code | 335129
naics_description | OTHER LIGHTING EQUIPMENT MANUFACTURING
multi_year_contract_code | N
multi_year_contract | NO
veteran_owned_business | f
woman_owned_business | f
minority_owned_business | f
subcontinent_asian_asian_indian_american_owned_business | f
asian_pacific_american_owned_business | f
black_american_owned_business | f
hispanic_american_owned_business | f
native_american_owned_business | f
other_minority_owned_business | f
data_year | 2015
action_month | 3
action_dom | 15
action_year | 2015
only showing top 2 rows
df_transactions.printSchema()
root
|-- award_id_piid: string (nullable = true)
|-- federal_action_obligation: string (nullable = true)
|-- action_date: string (nullable = true)
|-- awarding_agency_code: string (nullable = true)
|-- awarding_agency_name: string (nullable = true)
|-- recipient_state_code: string (nullable = true)
|-- recipient_state_name: string (nullable = true)
|-- type_of_contract_pricing_code: string (nullable = true)
|-- type_of_contract_pricing: string (nullable = true)
|-- award_description: string (nullable = true)
|-- product_or_service_code: string (nullable = true)
|-- product_or_service_code_description: string (nullable = true)
|-- naics_code: string (nullable = true)
|-- naics_description: string (nullable = true)
|-- multi_year_contract_code: string (nullable = true)
|-- multi_year_contract: string (nullable = true)
|-- veteran_owned_business: string (nullable = true)
|-- woman_owned_business: string (nullable = true)
|-- minority_owned_business: string (nullable = true)
|-- subcontinent_asian_asian_indian_american_owned_business: string (nullable = true)
|-- asian_pacific_american_owned_business: string (nullable = true)
|-- black_american_owned_business: string (nullable = true)
|-- hispanic_american_owned_business: string (nullable = true)
|-- native_american_owned_business: string (nullable = true)
|-- other_minority_owned_business: string (nullable = true)
|-- data_year: string (nullable = true)
|-- action_month: integer (nullable = true)
|-- action_dom: integer (nullable = true)
|-- action_year: integer (nullable = true)
This Notebook
This notebook was created to help introduce students to the wonderful world of data science using Spark and Jupyter notebooks. The target audience are students who have no background in data science, programming, distributed computing, Python, or Apache Spark.
This notebook is designed to run on a Databricks Community account (our thanks to Databricks for making community accounts available!).
This notebook was created as part of the Data Science for All Seminar Series funded by NSF Grant #1829622
PI: Leslie Albert, Ph.D., Co-PIs: Scott Jensen, Ph.D. and Esperanza Huerta, Ph.D.
Copyright Scott Jensen, San Jose State University
This ds4all notebook by Scott Jensen,Ph.D. is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.