%md # +  # **A simple word count application** The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data. We continue from the [word counting example](https://mashimo.wordpress.com/2014/05/11/the-hello-world-of-text-processing/) and in this notebook, we will write code that calculates the most common words in the [Complete Works of William Shakespeare](http://www.gutenberg.org/ebooks/100) retrieved from [Project Gutenberg](http://www.gutenberg.org/wiki/Main_Page). This could also be scaled to larger applications, such as finding the most common words in Wikipedia. ** During this example we will cover: ** * *Part 0:* What is Apache Spark * *Part 1:* Creating a base DataFrame and performing operations * *Part 2:* Counting with Spark SQL and DataFrames * *Part 3:* Finding unique words and a mean value * *Part 4:* Apply word count to a file Note that for reference, you can look up the details of the relevant methods in [Spark's Python API](https://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.sql).
%md ## Part 0: Spark An introduction to using Apache Spark with the PySpark SQL API running in a notebook
Part 0: Spark
An introduction to using Apache Spark with the PySpark SQL API running in a notebook
Last refresh: Never
%md ### What is Spark [Apache Spark](https://spark.apache.org/) is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since. Spark it is a fast and general engine for large-scale data processing. [Databricks](https://databricks.com/) is a company founded by the creators of Apache Spark, that aims to help clients with cloud-based big data processing using Spark.
What is Spark
Apache Spark is an open-source cluster-computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since.
Spark it is a fast and general engine for large-scale data processing.
Databricks is a company founded by the creators of Apache Spark, that aims to help clients with cloud-based big data processing using Spark.
Last refresh: Never
%md Traditional analysis tools like R and Python Pandas run on a single machine but data are growing faster than computation speed. The Opportunity: **Cloud computing** is a game-changer. It provides access to low-cost computing and storage. Distributing data over cluster of machines means lots of hard drives, lots of CPUs but also lots of memory ! Storage is getting cheaper but stalling CPU speeds are the bottlenecks. A new Opportunity: Keep more data **in-memory**! In-memory can make a big difference, up to 100x faster. Spark is a new distributed execution engine that leverages the in-memory paradigm. The challenge with cloud computing has always been programming the resources. Spark is developed in Scala and - besides Scala itself - supports other languages such as Java and Python. We are using for this example the Python programming interface to Spark (pySpark). pySpark provides an easy-to-use programming abstraction and parallel runtime: “Here’s an operation, run it on all of the data”.
Traditional analysis tools like R and Python Pandas run on a single machine but data are growing faster than computation speed.
The Opportunity: Cloud computing is a game-changer.
It provides access to low-cost computing and storage.
Distributing data over cluster of machines means lots of hard drives, lots of CPUs but also lots of memory !
Storage is getting cheaper but stalling CPU speeds are the bottlenecks.
A new Opportunity: Keep more data in-memory!
In-memory can make a big difference, up to 100x faster.
Spark is a new distributed execution engine that leverages the in-memory paradigm.
The challenge with cloud computing has always been programming the resources.
Spark is developed in Scala and - besides Scala itself - supports other languages such as Java and Python.
We are using for this example the Python programming interface to Spark (pySpark).
pySpark provides an easy-to-use programming abstraction and parallel runtime: “Here’s an operation, run it on all of the data”.
Last refresh: Never
%md #### Spark Context In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. Executor programs run on **cluster nodes** or in local threads. The results from these tasks are delivered back to the driver. Where does code run? - Locally, in the driver - Distributed at the executors (Executors run in parallel and have much more memory) - Both at the driver and the executors Problems with cheap hardware are: failures, network speeds versus shared memory, much more latency, network slower than storage, uneven performance. How do we deal with machine failures? We launch another task. How do we deal with slow tasks? We launch another task. When running Spark, you start a new Spark application by creating a [SparkContext](http://spark.apache.org/docs/latest/api/python/pyspark.html#pyspark.SparkContext). SparkContext tells Spark how and where to access a cluster. The program next creates a [SQLContext](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext) object which is used to create and manage the DataFrames. When the `SparkContext` is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won't be used for other applications. When using Databricks, both a `SparkContext` and a `SQLContext` are created for you automatically. `sc` is your `SparkContext`, and `sqlContext` is your `SQLContext`.
Spark Context
In Spark, communication occurs between a driver and executors. The driver has Spark jobs that it needs to run and these jobs are split into tasks that are submitted to the executors for completion. Executor programs run on cluster nodes or in local threads. The results from these tasks are delivered back to the driver. Where does code run?
- Locally, in the driver
- Distributed at the executors (Executors run in parallel and have much more memory)
- Both at the driver and the executors
Problems with cheap hardware are: failures, network speeds versus shared memory, much more latency, network slower than storage, uneven performance.
How do we deal with machine failures? We launch another task.
How do we deal with slow tasks? We launch another task.
When running Spark, you start a new Spark application by creating a SparkContext.
SparkContext tells Spark how and where to access a cluster.
The program next creates a SQLContext object which is used to create and manage the DataFrames.
When the SparkContext
is created, it asks the master for some cores to use to do work. The master sets these cores aside just for you; they won't be used for other applications.
When using Databricks, both a SparkContext
and a SQLContext
are created for you automatically. sc
is your SparkContext
, and sqlContext
is your SQLContext
.
Last refresh: Never
# Display the type of the Spark sqlContext type(sqlContext)
%md Note that the type is `HiveContext`. This means we're working with a version of Spark that has [Hive](https://hive.apache.org/) support. Compiling Spark with Hive support is a good idea, even if you don't have a Hive metastore. As the [Spark Programming Guide](http://spark.apache.org/docs/latest/sql-programming-guide.html#starting-point-sqlcontext) states, a `HiveContext` "provides a superset of the functionality provided by the basic `SQLContext`. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs [user-defined functions], and the ability to read data from Hive tables."
Note that the type is HiveContext
. This means we're working with a version of Spark that has Hive support. Compiling Spark with Hive support is a good idea, even if you don't have a Hive metastore. As the
Spark Programming Guide states, a HiveContext
"provides a superset of the functionality provided by the basic SQLContext
. Additional features include the ability to write queries using the more complete HiveQL parser, access to Hive UDFs [user-defined functions], and the ability to read data from Hive tables."
Last refresh: Never
%md ## ** Part 1: Creating a base DataFrame and performing operations **
Part 1: Creating a base DataFrame and performing operations
Last refresh: Never
%md DataFrames (DF) are the key concept, the primary abstraction in Spark. Similar to Python Pandas dataframe, they are immutable once constructed and enable operations on collection of elements in parallel. You construct DataFrames by parallelizing existing Python collections (lists), by transforming an existing Spark or pandas DFs or from files in HDFS or any other storage system. DataFrames support two types of operations: transformations and actions. Transformations are lazy (not computed immediately). A transformed DF is executed when action runs on it. A transformation to a DataFrame is for example *select*. *Actions* to a DataFrame are for example *show* and *count*.
DataFrames (DF) are the key concept, the primary abstraction in Spark.
Similar to Python Pandas dataframe, they are immutable once constructed and enable operations on collection of elements in parallel.
You construct DataFrames by parallelizing existing Python collections (lists), by transforming an existing Spark or pandas DFs or from files in HDFS or any other storage system.
DataFrames support two types of operations: transformations and actions.
Transformations are lazy (not computed immediately). A transformed DF is executed when action runs on it.
A transformation to a DataFrame is for example select. Actions to a DataFrame are for example show and count.
Last refresh: Never
%md ### Spark Program Lifecycle 1. Create DataFrames from external data or create DataFrame from a collection in driver program 2. Lazily transform them into new DataFrames 3. cache() some DataFrames for reuse (optional) 4. Perform actions to execute parallel computation and produce results Most of Python code runs in driver, except for code passed to transformations. Transformations run at executors. Actions can run both at executors and driver
Spark Program Lifecycle
- Create DataFrames from external data or create DataFrame from a collection in driver program
- Lazily transform them into new DataFrames
- cache() some DataFrames for reuse (optional)
- Perform actions to execute parallel computation and produce results
Most of Python code runs in driver, except for code passed to transformations. Transformations run at executors. Actions can run both at executors and driver
Last refresh: Never
%md In this part, we will explore creating a base DataFrame with `sqlContext.createDataFrame` and using DataFrame operations to count words.
In this part, we will explore creating a base DataFrame with sqlContext.createDataFrame
and using DataFrame operations to count words.
Last refresh: Never
%md #### ** Create a DataFrame ** We'll start by generating a base DataFrame using a Python list of tuples and the `sqlContext.createDataFrame` method. Then we'll print out the type and schema of the DataFrame. The Python API has several examples for using the [`createDataFrame` method](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.SQLContext.createDataFrame).
Create a DataFrame
We'll start by generating a base DataFrame using a Python list of tuples and the sqlContext.createDataFrame
method. Then we'll print out the type and schema of the DataFrame. The Python API has several examples for using the createDataFrame
method.
Last refresh: Never
# create a silly test dataframe from Python collections (lists) wordsDF = sqlContext.createDataFrame([('look',), ('spark',), ('tutorial',), ('spark',), ('look', ), ('python', )], ['word']) wordsDF.show() print type(wordsDF) wordsDF.printSchema()
%md As you can see, DataFrame is a class of pyspark.sql
As you can see, DataFrame is a class of pyspark.sql
Last refresh: Never
%md #### Create a new DataFrame from an existing one This use lazy evaluation: results are not computed right away – Spark remembers the set of transformations applied to the base DataFrame. Think of this as a recipe for creating result. Spark Actions like show(), collect() or count() then cause Spark to execute the recipe to transform the source. It is the mechanism for getting results out of Spark.
Create a new DataFrame from an existing one
This use lazy evaluation: results are not computed right away – Spark remembers the set of transformations applied to the base DataFrame. Think of this as a recipe for creating result.
Spark Actions like show(), collect() or count() then cause Spark to execute the recipe to transform the source. It is the mechanism for getting results out of Spark.
Last refresh: Never
%md ##### ** Length of each word ** You can create a new DataFrame from our base DF `wordsDF` by calling the [`select` DataFrame function](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.select) and pass in the appropriate recipe: we can use the SQL `length` function to find the number of characters in each word. The [`length` function](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.length) is found in the `pyspark.sql.functions` module.
Length of each word
You can create a new DataFrame from our base DF wordsDF
by calling the select
DataFrame function and pass in the appropriate recipe: we can use the SQL length
function to find the number of characters in each word.
The length
function is found in the pyspark.sql.functions
module.
Last refresh: Never
from pyspark.sql.functions import length wordsLengthsDF = wordsDF.select(length('word').alias('lengths')) # transformation wordsLengthsDF.show() # action
%md ## ** Part 2: Counting with Spark SQL and DataFrames **
Part 2: Counting with Spark SQL and DataFrames
Last refresh: Never
%md Now, let's count the number of times a particular word appears in the 'word' column. There are multiple ways to perform the counting, but some are much less efficient than others. A naive approach would be to call `collect` on all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use **data parallel operations**.
Now, let's count the number of times a particular word appears in the 'word' column. There are multiple ways to perform the counting, but some are much less efficient than others.
A naive approach would be to call collect
on all of the elements and count them in the driver program. While this approach could work for small datasets, we want an approach that will work for any size dataset including terabyte- or petabyte-sized datasets. In addition, performing all of the work in the driver program is slower than performing it in parallel in the workers. For these reasons, we will use data parallel operations.
Last refresh: Never
%md ### ** Using `groupBy` and `count` ** Using DataFrames, we can preform aggregations by grouping the data using the [`groupBy` function](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame.groupBy) on the DataFrame. Using `groupBy` returns a [`GroupedData` object](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData) and we can use the functions available for `GroupedData` to aggregate the groups. For example, we can call `avg` or `count` on a `GroupedData` object to obtain the average of the values in the groups or the number of occurrences in the groups, respectively. To find the counts of words, we group by the words and then use the [`count` function](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.GroupedData.count) to find the number of times that words occur.
Using groupBy
and count
Using DataFrames, we can preform aggregations by grouping the data using the groupBy
function on the DataFrame. Using groupBy
returns a GroupedData
object and we can use the functions available for GroupedData
to aggregate the groups. For example, we can call avg
or count
on a GroupedData
object to obtain the average of the values in the groups or the number of occurrences in the groups, respectively.
To find the counts of words, we group by the words and then use the count
function to find the number of times that words occur.
Last refresh: Never
wordCountsDF = (wordsDF .groupBy('word').count()) wordCountsDF.show()
%md You can see that without using alias(), the column gets simply the function name (e.g., "count" in this case). Let's also add some unit testing.
You can see that without using alias(), the column gets simply the function name (e.g., "count" in this case).
Let's also add some unit testing.
Last refresh: Never
# Load in the testing code # If incorrect it will report back '1 test failed' for each failed test from databricks_test_helper import Test
# TEST groupBy and count Test.assertEquals(wordCountsDF.collect(), [('tutorial', 1), ('spark', 2), ('look', 2), ('python', 1)], 'incorrect counts for wordCountsDF')
A simple word count application
The volume of unstructured text in existence is growing dramatically, and Spark is an excellent tool for analyzing this type of data.
We continue from the word counting example and in this notebook, we will write code that calculates the most common words in the Complete Works of William Shakespeare retrieved from Project Gutenberg. This could also be scaled to larger applications, such as finding the most common words in Wikipedia.
During this example we will cover:
Note that for reference, you can look up the details of the relevant methods in Spark's Python API.
Last refresh: Never