Deep Learning Pipelines on Databricks - 1.2.0(Python)

Deep Learning Pipelines for Apache Spark - Release 1.2.0

Deep Learning Pipelines is a new library published by Databricks to provide high-level APIs for scalable deep learning model application and transfer learning via integration of popular deep learning libraries with MLlib Pipelines and Spark SQL. For an overview and the philosophy behind the library, check out the Databricks blog post. This notebook parallels the Deep Learning Pipelines README, detailing usage examples with additional tips for getting started with the library on Databricks.

Cluster set-up

Deep Learning Pipelines is available as a Spark Package. To use it on your cluster, create a new library with the Source option "Maven Coordinate", using "Search Spark Packages and Maven Central" to find "spark-deep-learning". Then attach the library to a cluster.

Note:

  • This notebook works with spark-deep-learning release 1.2.0-spark2.3-s_2.11. Please refer to the project's github page for the latest examples and docs.
  • To run this notebook, also create and attach the following libraries via PyPI: tensorflow==1.10.0, keras==2.2.2, h5py.

Deep Learning Pipelines 1.2.0 is compatible with Spark versions 2.3 or higher and works with any instance type (CPU or GPU).

What's new in Deep Learning Pipelines 1.0.0

  • Use Spark's native image reader (similar implementation in Deep Learning Pipelines has been removed).
  • KerasImageFileEstimator can be used for model tuning with Spark MLlib (e.g. CrossValidator).
  • Full Python & Scala support for transfer learning on images (DeepImageFeaturizer and DeepImagePredictor): InceptionV3, Xception, ResNet50, VGG16, and VGG19 models.

In this notebook

Deep Learning Pipelines provides a suite of tools around deep learning. The tools can be categorized as

Working with image data:

  • Loading images natively in Spark DataFrames
  • Transfer learning, a super quick way to leverage deep learning
  • [New in 1.0.0] Distributed hyperparameter tuning via Spark MLlib Pipelines
  • Applying deep learning models at scale to images, using your own or known popular models, to make predictions or transform them into features

Working with general tensors:

  • Applying deep learning models at scale to tensors of up to 2 dimensions

Deploying Models in SQL:

  • Deploying models as SQL functions to empower everyone by making deep learning available in SQL

We'll cover each one with examples below.

Working with image data

Let us first get some images to work with in this notebook. We'll use the flowers dataset from the TensorFlow retraining tutorial.

%sh 
curl -O http://download.tensorflow.org/example_images/flower_photos.tgz
tar xzf flower_photos.tgz &>/dev/null
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0 2 218M 2 4600k 0 0 4528k 0 0:00:49 0:00:01 0:00:48 4528k 16 218M 16 36.5M 0 0 18.1M 0 0:00:12 0:00:02 0:00:10 18.1M 62 218M 62 136M 0 0 43.7M 0 0:00:04 0:00:03 0:00:01 43.7M 100 218M 100 218M 0 0 54.6M 0 0:00:03 0:00:03 --:--:-- 54.6M
display(dbutils.fs.ls('file:/databricks/driver/flower_photos'))
file:/databricks/driver/flower_photos/daisy/daisy/36864
file:/databricks/driver/flower_photos/tulips/tulips/40960
file:/databricks/driver/flower_photos/sunflowers/sunflowers/36864
file:/databricks/driver/flower_photos/dandelion/dandelion/53248
file:/databricks/driver/flower_photos/LICENSE.txtLICENSE.txt418049
file:/databricks/driver/flower_photos/roses/roses/36864

The 'file:/...' directory will be cleared out upon cluster termination. That doesn't matter for this example notebook, but in most cases we'd want to store the images in a more permanent place.

Let us move the files to dbfs so we can see how to work with it in the use cases below.

img_dir = '/tmp/flower_photos'
dbutils.fs.mkdirs(img_dir)

dbutils.fs.cp('file:/databricks/driver/flower_photos/tulips', img_dir + "/tulips", recurse=True)
dbutils.fs.cp('file:/databricks/driver/flower_photos/daisy', img_dir + "/daisy", recurse=True)
dbutils.fs.cp('file:/databricks/driver/flower_photos/LICENSE.txt', img_dir)

display(dbutils.fs.ls(img_dir))
dbfs:/tmp/flower_photos/LICENSE.txtLICENSE.txt418049
dbfs:/tmp/flower_photos/daisy/daisy/0
dbfs:/tmp/flower_photos/tulips/tulips/0

Let us create a small sample set of images for quick demonstrations.

sample_img_dir = img_dir + "/sample"
dbutils.fs.rm(sample_img_dir, recurse=True)
dbutils.fs.mkdirs(sample_img_dir)
files =  dbutils.fs.ls(img_dir + "/daisy")[0:10] + dbutils.fs.ls(img_dir + "/tulips")[0:1]
for f in files:
  dbutils.fs.cp(f.path, sample_img_dir)
display(dbutils.fs.ls(sample_img_dir))
dbfs:/tmp/flower_photos/sample/100080576_f52e8ee070_n.jpg100080576_f52e8ee070_n.jpg26797
dbfs:/tmp/flower_photos/sample/100930342_92e8746431_n.jpg100930342_92e8746431_n.jpg26200
dbfs:/tmp/flower_photos/sample/10140303196_b88d3d6cec.jpg10140303196_b88d3d6cec.jpg117247
dbfs:/tmp/flower_photos/sample/10172379554_b296050f82_n.jpg10172379554_b296050f82_n.jpg36410
dbfs:/tmp/flower_photos/sample/10172567486_2748826a8b.jpg10172567486_2748826a8b.jpg102862
dbfs:/tmp/flower_photos/sample/10172636503_21bededa75_n.jpg10172636503_21bededa75_n.jpg27419
dbfs:/tmp/flower_photos/sample/102841525_bd6628ae3c.jpg102841525_bd6628ae3c.jpg132803
dbfs:/tmp/flower_photos/sample/1031799732_e7f4008c03.jpg1031799732_e7f4008c03.jpg102618
dbfs:/tmp/flower_photos/sample/10391248763_1d16681106_n.jpg10391248763_1d16681106_n.jpg51688
dbfs:/tmp/flower_photos/sample/10437754174_22ec990b77_m.jpg10437754174_22ec990b77_m.jpg13946
dbfs:/tmp/flower_photos/sample/10437770546_8bb6f7bdd3_m.jpg10437770546_8bb6f7bdd3_m.jpg13518

Loading images

The first step to apply deep learning on images is the ability to load the images. Spark and Deep Learning Pipelines include utility functions that can load millions of images into a Spark DataFrame and decode them automatically in a distributed fashion, allowing manipulation at scale.

Using Spark's ImageSchema

from pyspark.ml.image import ImageSchema
image_df = ImageSchema.readImages(sample_img_dir)

or we can use custom image library

from sparkdl.image import imageIO
image_df = imageIO.readImagesWithCustomFn(sample_img_dir, decode_f=imageIO.PIL_decode)
Using TensorFlow backend.

The resulting DataFrame contains a string column named "image" containing an image struct with schema == ImageSchema.

image_df.show()
+--------------------+ | image| +--------------------+ |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| |[dbfs:/tmp/flower...| +--------------------+

Transfer learning

Deep Learning Pipelines provides utilities to perform transfer learning on images, which is one of the fastest (code and run-time -wise) ways to start using deep learning. Using Deep Learning Pipelines, it can be done in just several lines of code.

Firstly, we need to create training & test DataFrames for transfer learning - this piece of code is longer than transfer learning itself below!

from pyspark.ml.image import ImageSchema
from pyspark.sql.functions import lit
from sparkdl.image import imageIO

tulips_df = ImageSchema.readImages(img_dir + "/tulips").withColumn("label", lit(1))
daisy_df = imageIO.readImagesWithCustomFn(img_dir + "/daisy", decode_f=imageIO.PIL_decode).withColumn("label", lit(0))
tulips_train, tulips_test, _ = tulips_df.randomSplit([0.005, 0.005, 0.99])  # use larger training sets (e.g. [0.6, 0.4] for non-community edition clusters)
daisy_train, daisy_test, _ = daisy_df.randomSplit([0.005, 0.005, 0.99])     # use larger training sets (e.g. [0.6, 0.4] for non-community edition clusters)
train_df = tulips_train.unionAll(daisy_train)
test_df = tulips_test.unionAll(daisy_test)

# Under the hood, each of the partitions is fully loaded in memory, which may be expensive.
# This ensure that each of the paritions has a small size.
train_df = train_df.repartition(100)
test_df = test_df.repartition(100)
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from sparkdl import DeepImageFeaturizer 

featurizer = DeepImageFeaturizer(inputCol="image", outputCol="features", modelName="InceptionV3")
lr = LogisticRegression(maxIter=20, regParam=0.05, elasticNetParam=0.3, labelCol="label")
p = Pipeline(stages=[featurizer, lr])

p_model = p.fit(train_df)

Note: the training step may take a while on Community Edition - try making a smaller training set in that case.

Let's see how well the model does: