BerkeleyX CS105x on EdX
https://courses.edx.org/courses/course-v1:BerkeleyX+CS105x+1T2016/courseware
Scalable, efficient analysis of Big Data.
Week 1
Sources of big data
- Online actions – clicks, ads, pause, tranasctions
- User generated Content (web and Mobile)
- Health and scientific conputing
- Graph data (networks)
- Log files
- Machine Syslog FIle
- Internet of things: Example measurements, RFID tags
Public datasets
https://github.com/caesar0301/awesome-public-datasets
https://data.sfgov.org/
https://sf311.org/
Key Data Management Concepts
A data model is a collection of concepts for describing data
A schema is a description of a particular collection of data, using a given data model.
Structured: Relational database, Formated messages
Semi structured: Documents XML, Tagged Text, Media
Unstructured: Plain text, Media
Structured Data
A relational data model is the most used data model – relation, a table with rows and columns.
Every relation has a schema defining each columns’ type
The programmer must statically specify the schema
Semi-structured Tabular data
One of the most common data formats
- A table is a collection of rows and columns
- Each column has a name
- Each cell may or may not have a value
- Each column has a type (string, integer): Together, the column types are the schema for the data
- Two choices for how the schema is determined:
— Spark dynamically infers the schema while reading each row
— Programmer statically specifies the schemaUnstructured data
Only one column with string or binaary type.
Eg. Facebook post, Instagram image, News article…
To transform Unstructured data in Semistructured and structured we need to perform Extract-Transform-Load process, here we impose structure on unstructered data.
Analysis
Traditional analysis tools are:
- Unix shell commands grep, awk, sed
- pandas
- R
Key limitation is the all run on a single machine.
Big data examples
Facebook’s dialy logs 60 TB
Google web index 10+PB
Cost of 1 TB of disc ~$35
Time to read 1 TB from disc: 3 hours (100MB/s
The big data problem
One machine can not process or even store all the data
Solution is to distribute data over cluster of machines
Lots of hard drives, and CPU and memory.
We take the data and partition it over cluster of machines.
Partition is Sparks DataFrame
The Spark Computing Framework Provides programming abstraction and parallel runtime to hide complexities of fault-tolerance and slow machines.
Apache Spark Components
- Spark SQL
- Spark Streaming
- MLib & MP machine learning
- Graph x (graph)
- Apache spark
** References**
https://spark.apache.org/docs/latest/sql-programming-guide.html
https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
Python Spark (pySpark)
pySpark provides an easy to use programming abstraction and parallel runtime
DataFrames are the key concept
Spark Driver and Workers
A Spark program is two programs – a driver program and a workers program.
Worker programs runon a cluster nodes or in local threads
DataFrames are distributed across workers.
Spark and SQL Contexts
A spark program first creates a SparkContext object (it tells Spark how and where to access a cluster)
The program next creates a sqlContext object
Use sqlContext to create DataFrames
Spark Essentials: Master
The master parameter for a SparkContext determines which type and size of a cluster to use.
local – runnSpark locally with one worker thread (no paralelism)
local[K] – run Spark locally with K worker threads (ideally set to number of cores)
spark://HOST:PORT – connect to a Spark standalone cluster; PORT depends on config (7077 by default)
mesos://HOST:PORT – connect to a Mesos clusterl PORT depends on config (5050 by default)
DataFrames
The primary abstraction in Spark
– Immutable once constructed
– Track lineage information to efficiently recompute lost data
– Enable operations on collection of eleements in parallel
You construct DataFrames
– by parallelizing existing Python collections (lists)
– by transforming annexisting Spark or pandas DFs
– from files in HDFS (Hadoop) or any other storage system
Each row of a DataFrame is a Row object
The fields in a ROw can be accessed like attributes
>>> row = Row(name='Alice', age=11)
>> row
Row(age=11, name='Alice')
>>> row['name'], row['age']
('Alice', 11)
>>> row.name, row.age
('Alice', 11)
DataFrames
Two types of operations: transformations and actions
Transformations are lazy (not computed immediately)
Transformed DF is executed when action runs on it
Persist (cache) DFs in memory or disk
Working with DataFrames
reate a DataFrame from a data source (list)
Apply transformations to a DataFrame (select, filter)
Apply actions to a DataFrme (show, count)
Lifecycle
list- createDataFrame – DataFrame – filter – filtered DataFrame – select – Transformed DataFrame – show action – result.
show action causes createDataFrame, filter and select transforms to be executed
Create DataFrame from Python list
data = [('alice',1), ('bob', 2)]
df = sqlContext.createDataFrame(data, ['name', 'age'])
No computation occurs with sqlContext.createDataFrame() – Spark only records how to create the DataFrame.
Pandas: Python Data Analysis Library
Open source dara analysis and modeling library
pandas DataFrame is a table with named columns
– The most commonly used pandas object
– Represented as a python dictionary
– Each pandas Series object represents a column
Creating DataFrames
Easy to create pySpark DataFrames form pandas (and R)
spark_df = sqlContext.createDataFrame(pandas_df)
Creating Data frames
FFrom HDFS, text files, JSON files, Apache Parquet, Hypertable, Amazon S3, Apache Hbase, Sequence Files any other Hadoop InputFormat and directory or glob wildcard: /data/201404*
df = sqlContext.read.text("readme.txt")
df.collect()
[Row(value=u'hello'), Row(value=u'this')]
It loads text file and returns a DataFrame with a single string column named ‘value’
Each line in text file is a row
Lazy evaluation means no execution happens now.
Spark Transformations
Create a new DataFrame from existing one.
The apply method creates a DataFrame from one column
ageCol = people.age
You can select one or more columns from a DataFrame
df.select('*') # selects all columns
df.select('name','age) # selects name and age columns
df.select(df.name, (df.age + 10).alias('age'))
Remove columns
The drop method returns a new DataFrame that drops the specified column
df.drop(df.age)
Review: Python lamnda Functions
Small anontmous functions (not bound to a name
lambda a, b: a+b # returns the sum of its two arguments
Can use lamdba functions wherever function objects are required
Restricted to a single expression.
User Defined Function Transformation
Transform DataFrame using User Defined Function
from pyspark.sql.types import IntegerType
slen = udf(lambda s: len(s), IntegerType())
df.select(slen(df.name).alias('slen'))
# creates a DataFrame of [Row(slen=5), Row(slen=3)]
UDF takes named or lambda function and the return type of the function.
Other useful transformations
filter(func) # returns a new DF formed by selecting these rows of the source on which func returns true
where(func) # alias for filter
distinct() # returna new DF that contains the distinct rows of the source DataFrame
orderBy(*cols, **kw) # returns a new DF sorted by the specified columns and int he sort order specified by kw
sort(*cols, **kw) # like orderBy
explode(col) # returns a new row for each element in the given array or map
func is a Python named function or lambda function
Using Transformations
df = sqlContext.createDataFrame(data, ['name', 'age'])
from pyspark.sql.types import IntegerType
doubled = udf(lambda s: s * 2, IntegerType())
df2 = df.select(df.name, doubled(df.age).alias('age'))
df3 = df2.filter(df2.age > 3)
df4 = df2.distinct()
df5 = df2.sort("age", ascending=False) # descending order
data3 = [Row(a=1, intlist=[1,2])]
df6 = sqlContext.createDataFrame(data3)
df6.select(explode(df6.intlist).alias("anInt"))
[Row(anInt=1), Row(anInt=2)]
GroupedData Transformations
groupBy(*cols) groups the DataFrame using the specified columns, so we can run aggregation on them
agg(*exprs) # compute aggregates (avg, min, max, sum or count) and returns the result as DataFrame
count() # counts the number of records for each group
avg(*args) # computes average values for numeric columns for each group
GroupedData Examples
data = [('Alice', 1, 6), ('Bob', 2, 8), ('Alice', 3, 9), ('Bob', 4, 7)]
df = sqlContext.createDataFrame(data, ['name', 'age', 'grade'])
df1 = df.groupBy(df.name)
df.agg({"*": "count"}).collect()
[Row(name=i'Alice', count(1)=2), Row(name=u'Bob', count(1)=2)]
# or use
df.groupBy(df.name).count()
# average example
df.groupBy().avg().collect()
[Row(avg(age)=2.5, avg(grade)=7.5]
Transforming a DataFrame
linesDF = sqlContext.read.text('...')
commentsDF = linesDF.filter(isComment)
Lazy evaluation means nothing executes – Spark saves recipe for transforming source.
Apache Spark Actions
Spark Actions cause Spark to execute recipe to transform source
Is a mechanism for getting results out of Spark.
Some useful actions
show(n, truncate) # prints the first n rows of the DataFrame
take(n) # returns the first n rows as a list of Row
collect() # return all the records as a list of Row
count() # returns the number of rows in this DataFrame
describe(*cols) # Exploratory Data Analysis function that computes statistics (count, mean, stdev, min, max( for numeric columns - if no columns are given, this function computes statistics for all numerical columns
count for DataFrames is an action, while for GroupedData it is a transformation.
df.collect()
df.show()
df.count()
df.take(1)
df.describe()
Saving data in cash
linesDF.cache()
Spark Program Lifecycle
- Create DataFrames from external data or createDataFrame form a collection in driver program
- Lazily transform them into new DataFrames
-
- cache() some DataFrames for reuse
- Perform actions to execute paralel computation and produce results
Where COde Runs
Most python code runs in driver – except the code passed to transformations
Transformations run at executors
Actions run at executors and driver
Combining DataFrames
cDF = aDF.unionAll(bdf)
Use DatFrame reference API
unionAll() – return a new DataFrame containing union of rows in this frame and another frame
It runs completely at executors – very scalable and efficient.
Best Practices
Use Spark Transformations and Actions wherever possible
Never use collect() in production, instead use take(n)
cache() DataFrames that you reuse a lot.
Databricks Community Edition
https://accounts.cloud.databricks.com/registration.html#signup/community
Create new Notebook, Library Folder
Home> Users > Right click on the username and select Create
How to prepare a new cluster
- Select Cluster in main nav
- Click on + Create new Cluster
- A new cluster with 6GB of memory will appear
Cluster will be terminated after one hour of inactivity
Create library
Go to username and right click
Create > New Library
PyPy Name: spark_mooc_meta
Create a notebook
Python notebook
Shift + Enter to submit
import spark_mooc_meta
Week 2
https://courses.edx.org/courses/course-v1:BerkeleyX+CS105x+1T2016/courseware/9d251397874d4f0b947b606c81ccf83c/3cf61a8718fe4ad5afcd8fb35ceabb6e/