Spark
Spark
搭建环境
Note:Step12使用pyspark
指令前记得先进入anaconda中的环境。
Introduction
Overview
TBC
Resilient Distributed Data Set(RDD)弹性分布式数据集
RDD是数据集,我们通常对一个RDD做一些操作去获得另外一个RDD。我们需要实例化一个对象SparkContext
来执行这些操作。
Basic Operation
Transforming RDD
- map
- flatmap
- filter: removing information potentially that you don't care about
- distinct
- sample
- union, intersection, subtract, cartesian
RDD Action
- collect
- count: 统计RDD中value出现的次数
- countByValue
- take
- top
- reduce
- ... and more ...
Example:统计user对movie的评分
1 | from pyspark import SparkConf, SparkContext |
Key-Value RDD
Create Key-Value RDD
1 | totalsByAge = rdd.map(lambda x:(x,1)) |
键值对的value不一定非得是一个值,也可以是列表
Special Action
reduceByKey(): combine values with the same key using some function.
exp: use
rdd.reduceByKey(lambda x,y:x+y)
to adds values upgroupByKey(): Group values with the same key
sortByKey(): Sort RDD by key values
keys(), values(): create an RDD of just the keys, or just the values
Example: 统计一定年龄段的朋友有多少
1 | # datasets |
Filtering RDD
map:对RDD处理,input和output始终是一对一关系
flatmap: 可以从一个value生成多个values
e.g. (The quick red fox ...) ——》 lines.flatmap(lambda x:x.split()) ——》 (The) (quick) (red) (fox)...
Example: 找出气候站一年的最低温低
1 | # dataset |
Flatmap
Example: 统计文本中词汇出现次数
1 | from pyspark import SparkConf, SparkContext |
上面代码只通过空格分解,会出现spark,
这种情况,我们下面用正则表达式改进一下
1 | import re |
加上排序
1 | import re |
Spark SQL
一种dataframe,可以用sql语句查询,可以与rdd互相转换
Example:将RDD转为SparkSQL
1 | from pyspark.sql import SparkSession |
Example:直接打开DataFrame+用执行代码处理数据
1 | from pyspark.sql import SparkSession |
Example:计算某个年龄平均有几个朋友
1 | from pyspark.sql import SparkSession |
func
Passing columns as parameters
- func.explode(): similar to flatmap
- func.split()
- func.lower()
Example: WordCounting处理非结构数据
1 | from pyspark.sql import SparkSession |
Example: 找最大最小温度
1 | from pyspark.sql import SparkSession |
Exercise:customer_order
1 | from pyspark.sql import SparkSession |
Advanced Example
找出最受欢迎的电影-最多评分数的电影
1 | from pyspark.sql import SparkSession |
Broadcast
将一个变量分发到cluster上每个点
example:给电影ID找对于的电影名字
1 | from pyspark.sql import SparkSession |
Example: find the most popular superhero
数据集:
Marvel-graph.txt
4395 7483 9475 7483
4802 3939 ...
每行第一个ID是superhero ID,后面跟着的是在漫画中和这个superhero同时出现过的superhero ID
一个超级英雄可能多次在每一行的第一个出现
Marvel-names.txt
superhero ID与名字的映射
Mission1:找出最受欢迎的superhero
1 | from pyspark.sql import SparkSession |
Mission2:找出最不起眼的superhero
1 | from pyspark.sql import SparkSession |
Mission 3 找到两个超级英雄的分离度
分离度的意思就是再图中的距离
1 | #Boilerplate stuff: |
Item-Based CF
1 | from pyspark.sql import SparkSession |