Mastering Large Datasets with Python: Parallelize and Distribute Your Python Code
- Ch1. Introduction
- Procedural programming
- Parallel programming
- The map function for transforming data
- The reduce function for advanced transformations
- Distributed computing for speed and scale
- Hadoop: A distributed framework for map and reduce
- Spark for high-powered map, reduce, and more
- AWS Elastic MapReduce (EMR) - Large datasets in the cloud
- Ch2. Accelerating large dataset work: Map and parallel computing
- Ch3. Function pipelines for mapping complex transformations
- Ch4. Processing large datasets with lazy workflows
- Ch5. Accumulation operations with reduce
- Ch6. Speeding up map and reduce with advanced parallelization
- Ch7. Processing truly big datasets with Hadoop and Spark
- Ch8. Best practices for large data with Apache Streaming and mrjob
- Ch9. PageRank with map and reduce in PySpark
- Ch10. Faster decision-making with machine learning and PySpark
- Ch11. Large datasets in the cloud with Amazon Web Services and S3
- Ch12. MapReduce in the cloud with Amazon’s Elastic MapReduce
My notes and highlights on the book.
Authors: John T. Wolohan
Ch1. Introduction
Map and reduce style of programming:
- easily write parallel programs
- organize the code around two functions:
map
andreduce
MapReduce
= framework for parallel and distributed computing;map
andreduce
= style of programming that allows running the work in parallel with minimal rewriting and extend the work to distributed workflows
Dask -> another tool for managing large data without map
and reduce
Procedural programming
Program Workflow
- Starts to run
- issues an instruction
- instruction is executed
- repeat 2 and 3
- finishes running
Parallel programming
Program workflow
- Starts to run
- divides up the work into chunks of instructions and data
- each chunk of work is executed independently
- chunks of work are reassembled
- finishes running
The
map
andreduce
style is applicable everywhere, but its specific strengths are in areas where you may need to scale
The map function for transforming data
-
map
: function to transform sequences of data from one type to another - Always retains the same number of objects in the output as were provided in the input
- performs one-to-one transformations -> is a great way to transform data so it is more suitable for use
Declarative programming: focuses on explaining the logic of the code and not on specifying low-level details -> scaling is natural, the logic stays the same
The reduce function for advanced transformations
-
reduce
: transform a sequence of data into a data structure of any shape or size - MapReduce programming pattern relies on the
map
function to transform some data into another type of data and then uses thereduce
function to combine that data - performs one-to-any transformations -> is a great way to assemble data into a final result
Distributed computing for speed and scale
Extension of parallel computing in which the computer resource we are dedicating to work on each chunk of a given task is its own machine
Hadoop: A distributed framework for map and reduce
- Designed as an open source implementation of Google’s original MapReduce framework
- Evolved into distributed computing software used widely by companies processing large amounts of data
Spark for high-powered map, reduce, and more
- Something of a sucessor to the Apache Hadoop framework that does more of its work in memory instead of by writing to file
- Can run more than 100x faster than Hadoop
AWS Elastic MapReduce (EMR) - Large datasets in the cloud
- Popular way to implement Hadoop and Spark
- tackle small problems with parallel programming as its cost effective
- tackle large problems with parallel programming because we can procure as many resources as we need
Ch2. Accelerating large dataset work: Map and parallel computing
map
’s primary capabilities:
- Replace
for
loops - Transform data
-
map
evaluates only when necessary, not when called -> genericmap
object as output
map
makes easy to parallel code -> break into pieces
Pattern
- Take a sequence of data
- Transform it with a function
- Get the outputs
Generators
instead of normal loops prevents storing all objects in memory in advance
Lazy functions for large datasets
-
map
= lazy function = it doesn’t evaluate when we callmap
- Python stores the instructions for evaluating the function and runs them at the exact moment we ask for the value
- Common lazy objects in Python =
range
function - Lazy
map
allows us to transform a lot of data without an unnecessarily large amount of memory or spending the time to generate it
Parallel processing
Problems
Inability to pickle data or functions
- Pickling: Python’s version of object serialization or mashalling
- Storing objects from our code in an efficient binary format on the disk that can be read back by our program at a later time (
pickle
module) - allows us to share data across procesors or even machines, saving the instructions and data and then executing them elsewhere
- Objects we can’t pickle: lambda functions, nested functions, nested classes
-
pathos
anddill
module allows us to pickle almost anything
Order-sensitive operations
- Work in parallel: not guaranteed that tasks will be finished in the same order they’re input
- If work needs to be processed in a linear order -> probably shouldn’t do it in parallel
- Even though Python may not complete the problems in order, it still remembers the order in which it was supposed to do them ->
map
returns in the exact order we would expect, even if it doesn’t process in that order
State-dependent operations
- Common solution for the state problem: take the internal state and make it an external variable
Other observations
- Best way to flatten a list into one big list -> Python’s itertools
chain
function: takes an iterable of iterables and chains them together so they can all be accessed one after another -> lazy by default - Best way to visualize graphs is to take it out of Python and import it into Gephi: dedicated piece of graph visualization software
Anytime we’re converting a sequence of some type into a sequence of another type, what we’re doing can be expressed as a map -> N-to-N transformation: we’re converting N data elements, into N data elements but in different format
- To make this type of problem parallel only adds up to few lines of code:
- one import
- wrangling our processors with
Pool()
- modifying our
map
statements to usePool.map
method
Ch3. Function pipelines for mapping complex transformations
Helper functions and function chains
Helper functions: small, simple functions that we rely on to do complex things -> break down large problems into small pieces that we can code quickly
Function chains or pipelines: the way we put helper functions to work
Creating a pipeline
- Chaining helper functions together
- Ways to do this:
- Using a sequence of maps
- Chaining functions together with
compose
- Creating a function pipeline with
pipe
-
compose
andpipe
are functions in thetoolz
package
Compose
from toolz.functoolz import compose
- Pass
compose
all the functions we want to include in our pipeline - Pass in reverse order because
compose
is going to apply them backwards - Store the output of our
compose
function, which is itself a function, to a variable - Call that variable or pass it along to
map
Pipe
from toolz.functoolz import pipe
-
pipe
function will pass a value through a pipeline -
pipe
expects the functions to be in the order we want to apply them -
pipe
evaluates each of the functions and returns a results - If we want to pass it to
map
, we have to wrap it in a function definition
Summary
Major advantages of creating pipelines of helper functions are that the code becomes: Readable and clear; Modular and easy to edit
- Modular code play very nice with
map
and can readily move into parallel workflows, such as by using thePool()
- We can simplify working with nested data structures by using nested function pipelines, which we can apply with
map
Ch4. Processing large datasets with lazy workflows
Laziness
- Lazy evaluation: strategy when deciding when to perform computations
- Under lazy evaluation, the Python interpreter executes lazy Python code only when the program needs the results of that code
- Opposite of eager evaluation, where everything is evaluated when it’s called
Shrinking sequences with the filter function
-
filter
: function for pruning sequences. - Takes a sequence and restricts it to only the elements that meet a given condition
- Related functions to know
-
itertools.filterfalse
: get all the results that make a qualifier function returnFalse
-
toolz.dicttoolz.keyfilter
: filter on the keys of adict
-
toolz.dicttoolz.valfilter
: filter on the values of adict
-
toolz.dicttoolz.itemfilter
: filter on both the keys and the values of a dict
-
Combining sequences with zip
-
zip
: function for merging sequences. - Takes two sequences and returns a single sequence of
tuples
, each of which contains an element from each of the original sequences - Behaves like a zipper, it interlocks the values of Python iterables
Lazy file searching with iglob
-
iglob
: function for lazily reading from the filesystem. - Lazy way of querying our filesystem
- Find a sequence of files on our filesystem that match a given pattern
from glob import iglob
posts = iglob("path/to/posts/2020/06/*.md")
Understanding iterators: the magic behind lazy Python
- Replace data with instructions about where to find data and replace transformations with instructions for how to execute those transformations.
- The computer only has to concern itself with the data it is processing right now, as opposed to the data it just processed or has to process in the future
- Iterators are the base class of all the Python data types that can be iterated over
The iteration process is defined by a special method called
.__iter__()
. If a class has this method and returns an object with a.__next__()
method, then we can iterate over it.
- One-way streets: once we call
next
, the item returned is removed from the sequence. We can never back up or retrieve that item again - Not meant for by-hand inspection -> meant for processing big data
Generators: functions for creating data
- Class of functions in Python that lazily produce values in a sequence
- We can create generators with functions using
yield
statements or through concise and powerful list comprehension-like generator expressions - They’re a simple way of implementing an iterator
- Primary advantage of generators and lazy functions: avoiding storing more in memory than we need to
-
itertools.islice
: take chunks from a sequence
Lazy functions are great at processing data, but hardware still limits how quickly we can work through it
-
toolz.frequencies
: takes a sequence in and returns adict
of items that occurred in the sequence as keys with corresponding values equal to the number of times they occurred -> provides the frequencies of items in our sequence
Simulations
- For simulations -> writing classes allow us to consolidate the data about each piece of the simulation
-
itertools.count()
: returns a generator that produces an infinite sequence of increasing numbers - Unzipping = the opposite of zipping -> takes a single sequence and returns two -> unzip =
zip(*my_sequence)
operator.methodcaller
: takes a string and returns a function that calls that method with the name of that string on any object passed to it -> call class methods using functions is helpful = allows us to use functions likemap
andfilter
on them
Ch5. Accumulation operations with reduce
-
reduce
: function for N-to-X transformations - We have a sequence and want to transform it into something that we can’t use
map
for -
map
can take care of the transformations in a very concise manner, whereasreduce
can take care of the very final transformation
Three parts of reduce
- Accumulator function
- Sequence: object that we can iterate through, such as lists, strings, and generators
- Initializer: initial value to be passed to our accumulator (may be optional) -> use an initalizer not when we want to change the value of our data, but when we want to change the type of the data
from functools import reduce
reduce(acc_fn, sequence, initializer)
Accumulator functions
- Does the heavy lifting for
reduce
- Special type of helper function
- Common prototype:
- take an accumulated value and the next element in the sequence
- return another object, typically of the same type as the accumulated value
- accumulator functions always needs to return a value
- Accumulator functions take two variables: one for the accumulated data (often designated as acc, left, or a), and one for the next element in the sequence (designated nxt, right, or b).
def my_add(acc, nxt):
return acc + nxt
# or, using lambda functions
lambda acc, nxt: acc + nxt
Reductions
filter
frequencies
Using map and reduce together
If you can decompose a problem into an N-to-X transformation, all that stands between you and a reduction that solves that problem is a well-crafted accumulation function
- Using
map
andreduce
pattern to decouple the transformation logic from the actual transformation itself:- leads to highly reusable code
- with large datasets -> simple functions becomes paramount -> we may have to wait a long time to discover we made a small error
Speeding up map and reduce
Using a parallel map can counterintuitively be slower than using a lazy map in map an reduce scenarios
- We can always use parallelization at the
reduce
level instead of at themap
level
Ch6. Speeding up map and reduce with advanced parallelization
- Parallel
reduce
: use parallelization in the accumulation process instead of the transformation process
Getting the most out of parallel map
Parallel map
will be slower than lazy map
when:
- we’re going to iterate through the sequence a second time later in our workflow
- size of the work done in each parallel instance is small compared to the overhead that parallelization imposes -> chunksize: size of the different pieces into which we break our tasks for parallel processing
- Python makes chunksize available as an option -> vary according to the task at hand
.imap
and starmap
More parallel maps:
.imap
-
.imap
: for lazy parallel mapping - use
.imap
method to work in parallel on very large sequences efficiently - Lazy and parallel? use the
.imap
and.imap_unordered
methods ofPool()
-> both methods return iterators instead of lists -
.imap_unordered
: behaves the same, except it doesn’t necessarily put the sequence in the right order for our iterator
starmap
- use
starmap
to work with complex iterables, especially those we’re likely to create using thezip
function -> more than one single parameter (map’s limitation) -
starmap
unpackstuples
as positional parameters to the function with which we’re mapping -
itertools.starmap
: lazy function -
Pool().starmap
: parallel function
Parallel reduce for faster reductions
Parallel reduce
:
- break a problem into chunks
- make no guarantees about order
- need to pickle data
- be finicky about stateful objects
- run slower than its linear counterpart on small datasets
- run faster than its linear counterpart on big datasets
- require an accumulator function, some data, and an initial value
- perform N-to-X transformations
Parallel reduce has six parameters: an accumulation function, a sequence, an initializer value, a map, a chunksize, and a combination function - three more than the standard reduce function
Parallel reduce
workflow:
- break our problem into pieces
- do some work
- combine the work
- return a result
With parallel
reduce
we trade the simplicity of always having the same combination function for the flexibility of more possible transformations
Implementing parallel reduce
:
- Importing the proper classes and functions
- Rounding up some processors
- Passing our
reduce
function the right helper functions and variables
- Python doesn’t natively support parallel
reduce
->pathos
library -
toolz.fold
-> parallelreduce
implementation
toolz
library: functional utility library that Python never came with. High-performance version of the library =CyToolz
Ch7. Processing truly big datasets with Hadoop and Spark
- Hadoop: set of tools that support distributed map and reduce style of programming through Hadoop MapReduce
- Spark: analytics toolkit designed to modernize Hadoop
Distributed computing
- share tasks and data long-term across a network of computers
- offers large benefits in speed when we can parallelize our work
- challenges:
- keeping track of all our data
- coordinating our work
If we distribute our work prematurely, we’ll end up losing performance spending too much time talking between computers and processors. A lot of performance improvements at the high-performance limits of distributed computing revolve around optimizing communication between machines
Hadoop five modules
- MapReduce: way of dividing work into parallelizable chunks
- YARN: scheduler and resource manager
- HDFS: file system for Hadoop
- Ozone: Hadoop extension for object storage and semantic computing
- Common: set of utilities that are shared across the previous four modules
YARN for job scheduling
- Scheduling
- Oversees all of the work that is being done
- Acts as a final decision maker in terms of how resources should be allocated across the cluster
- Application management (node managers): work at the node (single-machine) level to determine how resources should be allocated within that machine
- federation: tie together resource managers in extremely high demand use cases where thousands of nodes are not sufficient
The data storage backbone of Hadoop: HDFS
Hadoop Distributed File System (HDFS) -> reliable, performant foundation for high-performance distributed computing (but with that comes complexity). Use cases:
- process big datasets
- be flexible in hardware choice
- be protected against hardware failure
Moving code is faster than moving data
MapReduce jobs using Python and Hadoop Streaming
Hadoop MapReduce with Python -> Hadoop Streaming = utility for using Hadoop MapReduce with programming languages besides Java
Hadoop natively supports compression data: .gz, .bz2, and .snappy
Spark for interactive workflows
Analytics-oriented data processing framework designed to take advantage of higher-RAM compute clusters. Advantages for Python programmers:
- direct Python interface -
PySpark
: allows for us to interactively explore big data through a PySpark shell REPL - can query SQL databases directly (Java Database Connectivity - JDBC)
- has a DataFrame API: rows-and-columns data structure familiar to
pandas
-> provides a convenience layer on top of the core Spark data object: the RDD (Resilient Distributed Dataset) - Spark has two high-performance data structures: RDDs, which are excellent for any type of data, and DataFrames, which are optimized for tabular data.
Favor Spark over Hadoop when:
- processing streaming data
- need to get the task completed nearly instantaneously
- willing to pay for high-RAM compute clusters
PySpark for mixing Python and Spark
PySpark: we can call Spark’s Scala methods through Python just like we would a normal Python library
Ch8. Best practices for large data with Apache Streaming and mrjob
Use Hadoop to process
- lots of data fast: distributed parallelization
- data that’s important: low data loss
- enormous amounts of data: petabyte scale
Drawbacks
- To use Hadoop with Python -> Hadoop Streaming utility
- Repeatedly read in string from
stdin
- Error messages for Java are not helpful
Unstructured data: Logs and documents
- Hadoop creators designed Hadoop to work on unstructured data -> data in the form of documents
- Unstructured data is notoriously unwieldly =/= tabular data
- But, is one of the most common forms of data around
JSON for passing data between mapper and reducer
- JavaScript Object Notation (JSON)
- Data format used for moving data in plain text between one place and another
-
json.dumps()
andjson.loads()
functions from Python’s json library to achieve the transfer - Advantages:
- easy for humans and machines to read
- provides a number of useful basic data types (string, numeric, array)
- emphasis on key-value pairs that aids the loose coupling of systems
mrjob for pythonic Hadoop streaming
-
mrjob
: Python library for Hadoop Streaming that focuses on cloud compatibility for truly scalable analysis - keeps the mapper and reducer steps but wraps them up in a single worker class named
mrjob
-
mrjob
versions ofmap
andreduce
share the same type signature, taking in keys and values and outputting keys and values -
mrjob
enforces JSON data exchange between the mapper and reducer phases, so we need to ensure that our output data is JSON serializable.
Ch9. PageRank with map and reduce in PySpark
PySpark’s RDD class methods:
-
map
-like methods: replicate the function ofmap
-
reduce
-like methods: replicate the function ofreduce
- Convenience methods: solve common problems
Partitions are the abstraction that RDDs use to implement parallelization. The data in an RDD is split up across different partitions, and each partition is handled in memory. It is common in large data tasks to partition an RDD by a key
Map-like methods in PySpark
.map
.flatMap
.mapValues
.flatMapValues
. mapPartitions
.mapPartitionsWithIndex
Reduce-like methods in PySpark
.reduce
.fold
-
.aggregate
-> provides all the functionality of a parallel reduce. We can provide an initializer value, an aggregation function, and a combination function
Convenience methods in PySpark
Many of these mirror functions in functools
, itertools
and toolz
. Some examples:
- .countByKey()
- .countByValue()
- .distinct()
- .countApproxDistinct()
- .filter()
- .first()
- .groupBy()
- .groupByKey()
- .saveAsTextFile()
- .take()
Saving RDDs to text files
Excellent for a few reasons:
- The data is in a human-readable, persistent format.
- We can easily read this data back into Spark with the
.textFile
method ofSparkContext
. - The data is well structured for other parallel tools, such as Hadoop’s MapReduce.
- We can specify a compression format for efficient data storage or transfer.
RDD
.aggregate
method—returns a dict. We need an RDD so that we can take advantage of Spark’s parallelization. To get an RDD, we’ll need to explicitly convert the items of that dict into an RDD using the.parallelize
method from our SparkContext:sc
.
- Spark programs often use \ characters in their method chaining to increase their readability
- Using the
byKey
variations of methods in PySpark often results in significant speed-ups because like data is worked on by the same distributed compute worker
Ch10. Faster decision-making with machine learning and PySpark
One of the reasons why Spark is so popular = built-in machine learning capabilities
PySpark’s machine learning capabilities live in a package called ml
. This package itself contains a few different modules categorizing some of the core machine learning capabilities, including
-
pyspark.ml.feature
— For feature transformation and creation -
pyspark.ml.classification
— Algorithms for judging the category in which a data point belongs -
pyspark.ml.tuning
— Algorithms for improving our machine learners -
pyspark.ml.evaluation
— Algorithms for evaluating machine leaners -
pyspark.ml.util
— Methods of saving and loading machine learners
PySpark’s machine learning features expect us to have our data in a PySpark
DataFrame
object - not anRDD
. TheRDD
is an abstract parallelizable data structure at the core of Spark, whereas theDataFrame
is a layer on top of theRDD
that provides a notion of rows and columns
Organizing the data for learning
Spark’s ml classifiers look for two columns in a DataFrame
:
- A
label
column: indicates the correct classification of the data - A
features
column: contains the features we’re going to use to predict that label
Auxiliary classes
- PySpark’s
StringIndexer
: transforms categorical data stored as category names (using strings) and indexes the names as numerical variables.StringIndexer
indexes categories in order of frequency — from most common to least common. The most common category will be 0, the second most common category 1, and so on - Most data structures in Spark are immutable -> property of Scala (in which Spark is written)
- Spark’s ml only want one column name
features
-> PySpark’sVectorAssembler
:Transformer
likeStringIndexer
-> takes some input column names and an output column name and has methods to return a newDataFrame
that has all the columns of the original, plus the new column we want to add - The feature creation classes are
Transformer
-class objects, and their methods return newDataFrames
, rather than transforming them in place
Evaluation
PySpark’s ml.evaluation
module:
BinaryClassifierEvaluator
RegressionEvaluator
MulticlassClassificationEvaluator
Cross-validation in PySpark
CrossValidator
class: k-fold cross-validation, needs to be initialized with:
- An estimator
- A parameter estimator -
ParamGridBuilder
object - An evaluator
Ch11. Large datasets in the cloud with Amazon Web Services and S3
S3 is the go-to service for large datasets:
- effectively unlimited storage capacity. We never have to worry about our dataset becoming too large
- cloud-based. We can scale up and down quickly as necessary.
- offers object storage. We can focus on organizing our data with metadata and store many different types of data.
- managed service. Amazon Web Services takes care of a lot of the details for us, such as ensuring data availability and durability. They also take care of security patches and software updates.
- supports versioning and life cycle policies. We can use them to update or archive our data as it ages
Objects for convenient heterogenous storage
- Object storage: storage pattern that focuses on the what of the data instead of the where
- With object storage we recognize objects by a unique identifier (instead of the name and directory)
- Supports arbitrary metadata -> we can tag our objects flexibly based on our needs (helps us find those objects later when we need to use them)
- Querying tools are available for S3 that allow SQL-like querying on these metadata tags for metadata analysis
- Unique identifiers -> we can store heterogenous data in the same way
Parquet: A concise tabular data store
- CSV is a simple, tabular data store, and JSON is a human-readable document store. Both are common in data interchange and are often used in the storage of distributed large datasets. Parquet is a Hadoop- native tabular data format.
- Parquet uses clever metadata to improve the performance of map and reduce operations. Running a job on Parquet can take as little as 1/100th the time a comparable job on a CSV or JSON file would take. Additionally, Parquet supports efficient compression. As a result, it can be stored at a fraction of the cost of CSV or JSON.
- These benefits make Parquet an excellent option for data that primarily needs to be read by a machine, such as for batch analytics operations. JSON and CSV remain good options for smaller data or data that’s likely to need some human inspection.
Boto is a library that provides Pythonic access to many of the AWS APIs. We need the access key and secret key to programmatically access AWS through boto
Ch12. MapReduce in the cloud with Amazon’s Elastic MapReduce
Convenient cloud clusters with EMR
Ways to get access to a compute cluster that support both Hadoop and Spark:
- AWS: Amazon’s Elastic MapReduce
- Microsoft’s Azure HDInsight
- Google’s Cloud Dataproc
AWS EMR
- AWS EMR is a managed data cluster service
- We specify general properties of the cluster, and AWS runs software that creates the cluster for us
- When we’re done using the cluster, AWS absorbs the compute resources back into its network
- Pricing model is a per-compute-unit per-second charge
- There are no cost savings to doing things slowly. AWS encourages us to parallelize our problems away
Starting EMR clusters with mrjob
- We can run Hadoop jobs on EMR with the
mrjob
library, which allows us to write distributed MapReduce and procure cluster computing in Python. - We can use
mrjob
’s configuration files to describe what we want our clusters to look like, including which instances we’d like to use, where we’d like those instances to be located, and any tags we may want to add.
Hadoop on EMR is excellent for large data processing workloads, such as batch analytics or extract-transform-load (ETL)
Machine learning in the cloud with Spark on EMR
- Hadoop is great for low-memory workloads and massive data.
- Spark is great for jobs that are harder to break down into map and reduce steps, and situations where we can afford higher memory machines
Running machine learning algorithms on a truly large dataset
- Get a sample of the full dataset.
- Train and evaluate a few models on that dataset.
- Select some models to evaluate on the full dataset.
- Train several models on the full dataset in the cloud.
Run your Spark code with
spark-submit
utility instead of Python. Thespark-submit
utility queues up a Spark job, which will run in parallel locally and simulate what would happen if you ran the program on an active cluster
EC2 instance types and clusters
-
M-series
: use for Hadoop and for testing Spark jobs -
C-series
: compute-heavy workloads such as Spark analytics, Batch Spark jobs -
R-series
: high-memory, use for streaming analytics
Software available on EMR
- JupyterHub: cluster-ready version of Jupyter Notebook -> run interactive Spark and Hadoop jobs from a notebook environment
- Hive: compile SQL code to Hadoop MapReduce jobs
- Pig: compile Pig-latin (SQL-like) commands to run Hadoop MapReduce jobs