Spark

Spark

搭建环境

Taming Big Data with Apache Spark and Python - Getting Started - Sundog Education with Frank Kane (sundog-education.com)

Note:Step12使用pyspark指令前记得先进入anaconda中的环境。

Introduction

Overview

TBC

Resilient Distributed Data Set(RDD)弹性分布式数据集

RDD是数据集,我们通常对一个RDD做一些操作去获得另外一个RDD。我们需要实例化一个对象SparkContext来执行这些操作。

Basic Operation

Transforming RDD

  • map
  • flatmap
  • filter: removing information potentially that you don't care about
  • distinct
  • sample
  • union, intersection, subtract, cartesian

RDD Action

  • collect
  • count: 统计RDD中value出现的次数
  • countByValue
  • take
  • top
  • reduce
  • ... and more ...

Example:统计user对movie的评分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pyspark import SparkConf, SparkContext
import collections

# setMaster指定在单个主机(local)还是在集群(cluster)中运行,这里我们暂时使用单线程
conf = SparkConf().setMaster("local").setAppName("RatingsHistogram")
# 创建SparkContext
sc = SparkContext(conf = conf)

# 使用sc.textFile创建RDD,text中每一行(整行文本)对应RDD中一个值
# exp:
#user_id movie_id rating timestep
#196 242 4 991232423
#186 302 3 984927391
#......
lines = sc.textFile("file:///SparkCourse/ml-100k/u.data")
# 使用map与lambda对RDD进行transform
ratings = lines.map(lambda x: x.split()[2])
# 对新的RDD进行action
result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
print("%s %i" % (key, value))

Key-Value RDD

Create Key-Value RDD

1
totalsByAge = rdd.map(lambda x:(x,1))

键值对的value不一定非得是一个值,也可以是列表

Special Action

  • reduceByKey(): combine values with the same key using some function.

    exp: use rdd.reduceByKey(lambda x,y:x+y) to adds values up

  • groupByKey(): Group values with the same key

  • sortByKey(): Sort RDD by key values

  • keys(), values(): create an RDD of just the keys, or just the values

Example: 统计一定年龄段的朋友有多少

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# datasets
# indexd name age friends_num
# 0 Will 33 385
# 1 Jean 33 2
# 2 Huge 55 221
# 3 Luke 40 465
#...

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("FriendsByAge")
sc = SparkContext(conf = conf)

# transform to key-value RDD
def parseLine(line):
fields = line.split(',')
age = int(fields[2])
numFriends = int(fields[3])
return (age, numFriends)

lines = sc.textFile("file:///SparkCourse/fakefriends.csv")
rdd = lines.map(parseLine)

# transform (33,385) to (33, (385, 1)),and them sum up respectively, 385用来计算总朋友数,1用来计算人头数用于求平均
totalsByAge = rdd.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])

results = averagesByAge.collect()
for result in results:
print(result)

Filtering RDD

map:对RDD处理,input和output始终是一对一关系

flatmap: 可以从一个value生成多个values

e.g. (The quick red fox ...) ——》 lines.flatmap(lambda x:x.split()) ——》 (The) (quick) (red) (fox)...

Example: 找出气候站一年的最低温低

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# dataset
# wether sation code, date, type, temp, other
# ITE00100554, 18000101, TMAX, -75,,,E

from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("MinTemperatures")
sc = SparkContext(conf = conf)

def parseLine(line):
fields = line.split(',')
stationID = fields[0]
entryType = fields[2]
temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
return (stationID, entryType, temperature)

lines = sc.textFile("file:///SparkCourse/1800.csv")
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))
results = minTemps.collect();

for result in results:
print(result[0] + "\t{:.2f}F".format(result[1]))

Flatmap

Example: 统计文本中词汇出现次数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///sparkcourse/book.txt")
words = input.flatMap(lambda x: x.split())
wordCounts = words.countByValue()

for word, count in wordCounts.items():
cleanWord = word.encode('ascii', 'ignore')
if (cleanWord):
print(cleanWord.decode() + " " + str(count))

上面代码只通过空格分解,会出现spark,这种情况,我们下面用正则表达式改进一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///sparkcourse/book.txt")
words = input.flatMap(normalizeWords)
wordCounts = words.countByValue()

for word, count in wordCounts.items():
cleanWord = word.encode('ascii', 'ignore')
if (cleanWord):
print(cleanWord.decode() + " " + str(count))

加上排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import re
from pyspark import SparkConf, SparkContext

def normalizeWords(text):
return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster("local").setAppName("WordCount")
sc = SparkContext(conf = conf)

input = sc.textFile("file:///sparkcourse/book.txt")
words = input.flatMap(normalizeWords)

# 用另外一种方法实现词频统计
wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
# key,value转换,然后用sordbykey方法
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey()
results = wordCountsSorted.collect()

for result in results:
count = str(result[0])
word = result[1].encode('ascii', 'ignore')
if (word):
print(word.decode() + ":\t\t" + count)

Spark SQL

一种dataframe,可以用sql语句查询,可以与rdd互相转换

Example:将RDD转为SparkSQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from pyspark.sql import SparkSession
from pyspark.sql import Row

# Create a SparkSessiony用于操作SparkSQL
# spark.getOrCreate()与spark.close()相对应
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

def mapper(line):
fields = line.split(',')
return Row(ID=int(fields[0]), name=str(fields[1].encode("utf-8")), \
age=int(fields[2]), numFriends=int(fields[3]))

# 创建一个value是Row的RDD
lines = spark.sparkContext.textFile("fakefriends.csv")
people = lines.map(mapper)

# 利用RDD创建DataFrame
# Infer the schema, and register the DataFrame as a table.
# cache是将这个表存入内存里
schemaPeople = spark.createDataFrame(people).cache()
# 要对DataFrame进行操作,需要创建一个临时View(如果View已经存在则替换)
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
# return a dataframe
# 这里people对应View的名字,age对于创建Row是给的名字
teenagers = spark.sql("SELECT * FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are RDDs and support all the normal RDD operations.
for teen in teenagers.collect():
print(teen)

# We can also use functions instead of SQL queries:
schemaPeople.groupBy("age").count().orderBy("age").show()

spark.stop()

Example:直接打开DataFrame+用执行代码处理数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("SparkSQL").getOrCreate()

# 读取csv文件
# option("header", "true")表明这个csv文件有header
# option("inferSchema", "true")要求推理检测模式
people = spark.read.option("header", "true").option("inferSchema", "true")\
.csv("file:///SparkCourse/fakefriends-header.csv")

# print属性名与属性类型
print("Here is our inferred schema:")
people.printSchema()

print("Let's display the name column:")
people.select("name").show()

print("Filter out anyone over 21:")
people.filter(people.age < 21).show()

print("Group by age")
people.groupBy("age").count().show()

print("Make everyone 10 years older:")
people.select(people.name, people.age + 10).show()

spark.stop()

Example:计算某个年龄平均有几个朋友

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as func

spark = SparkSession.builder.appName("FriendsByAge").getOrCreate()

lines = spark.read.option("header", "true").option("inferSchema", "true").csv("file:///SparkCourse/fakefriends-header.csv")

# Select only age and numFriends columns
friendsByAge = lines.select("age", "friends")

# From friendsByAge we group by "age" and then compute average
friendsByAge.groupBy("age").avg("friends").show()

# Sorted
friendsByAge.groupBy("age").avg("friends").sort("age").show()

# Formatted more nicely
# agg()聚合多个命令,func.round()取小数点后几位
friendsByAge.groupBy("age").agg(func.round(func.avg("friends"), 2)).sort("age").show()

# With a custom column name
# alias()可以自定义列的名字
friendsByAge.groupBy("age").agg(func.round(func.avg("friends"), 2)
.alias("friends_avg")).sort("age").show()

spark.stop()

func

Passing columns as parameters

  • func.explode(): similar to flatmap
  • func.split()
  • func.lower()

Example: WordCounting处理非结构数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pyspark.sql import SparkSession
from pyspark.sql import functions as func

spark = SparkSession.builder.appName("WordCount").getOrCreate()

# Read each line of my book into a dataframe
inputDF = spark.read.text("file:///SparkCourse/book.txt")

# Split using a regular expression that extracts words
words = inputDF.select(func.explode(func.split(inputDF.value, "\\W+")).alias("word"))
wordsWithoutEmptyString = words.filter(words.word != "")

# Normalize everything to lowercase
lowercaseWords = wordsWithoutEmptyString.select(func.lower(wordsWithoutEmptyString.word).alias("word"))

# Count up the occurrences of each word
wordCounts = lowercaseWords.groupBy("word").count()

# Sort by counts
wordCountsSorted = wordCounts.sort("count")

# Show the results.
wordCountsSorted.show(wordCountsSorted.count())

Example: 找最大最小温度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

spark = SparkSession.builder.appName("MinTemperatures").getOrCreate()

# define the schema
# 根据列顺序分配
schema = StructType([ \
StructField("stationID", StringType(), True), \
StructField("date", IntegerType(), True), \
StructField("measure_type", StringType(), True), \
StructField("temperature", FloatType(), True)])

# // Read the file as dataframe
df = spark.read.schema(schema).csv("file:///SparkCourse/1800.csv")
df.printSchema()

# Filter out all but TMIN entries
minTemps = df.filter(df.measure_type == "TMIN")

# Select only stationID and temperature
stationTemps = minTemps.select("stationID", "temperature")

# Aggregate to find minimum temperature for every station
minTempsByStation = stationTemps.groupBy("stationID").min("temperature")
# 当有show()等action才会开始真正执行上面的代码
minTempsByStation.show()

# Convert temperature to fahrenheit and sort the dataset
# withColumn()新建一列名为“temperature",value是第二个参数
# 创建好新列后再进行select
minTempsByStationF = minTempsByStation.withColumn("temperature",
func.round(func.col("min(temperature)") * 0.1 * (9.0 / 5.0) + 32.0, 2))\
.select("stationID", "temperature").sort("temperature")

# Collect, format, and print the results
results = minTempsByStationF.collect()

for result in results:
print(result[0] + "\t{:.2f}F".format(result[1]))

spark.stop()

Exercise:customer_order

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType

spark = SparkSession.builder.appName("TotalSpentByCustomer").master("local[*]").getOrCreate()

# Create schema when reading customer-orders
customerOrderSchema = StructType([ \
StructField("cust_id", IntegerType(), True),
StructField("item_id", IntegerType(), True),
StructField("amount_spent", FloatType(), True)
])

# Load up the data into spark dataset
customersDF = spark.read.schema(customerOrderSchema).csv("file:///SparkCourse/customer-orders.csv")

totalByCustomer = customersDF.groupBy("cust_id").agg(func.round(func.sum("amount_spent"), 2) \
.alias("total_spent"))

totalByCustomerSorted = totalByCustomer.sort("total_spent")

# totalByCustomerSorted.count()算出整个table一共多少行,这是为了输出整个表
totalByCustomerSorted.show(totalByCustomerSorted.count())

spark.stop()

Advanced Example

找出最受欢迎的电影-最多评分数的电影

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType

spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# Create schema when reading u.data
schema = StructType([ \
StructField("userID", IntegerType(), True), \
StructField("movieID", IntegerType(), True), \
StructField("rating", IntegerType(), True), \
StructField("timestamp", LongType(), True)])

# Load up movie data as dataframe
# option("sep", "\t")说明以"\t"为分割符
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("file:///SparkCourse/ml-100k/u.data")

# Some SQL-style magic to sort all movies by popularity in one line!
topMovieIDs = moviesDF.groupBy("movieID").count().orderBy(func.desc("count"))

# Grab the top 10
topMovieIDs.show(10)

# Stop the session
spark.stop()

Broadcast

将一个变量分发到cluster上每个点

example:给电影ID找对于的电影名字

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, LongType
import codecs

def loadMovieNames():
movieNames = {}
# CHANGE THIS TO THE PATH TO YOUR u.ITEM FILE:
with codecs.open("E:/SparkCourse/ml-100k/u.ITEM", "r", encoding='ISO-8859-1', errors='ignore') as f:
for line in f:
fields = line.split('|')
movieNames[int(fields[0])] = fields[1]
return movieNames

spark = SparkSession.builder.appName("PopularMovies").getOrCreate()

# 将loadMovieNames方法return的字典分发到cluster上所有节点
nameDict = spark.sparkContext.broadcast(loadMovieNames())

# Create schema when reading u.data
schema = StructType([ \
StructField("userID", IntegerType(), True), \
StructField("movieID", IntegerType(), True), \
StructField("rating", IntegerType(), True), \
StructField("timestamp", LongType(), True)])

# Load up movie data as dataframe
moviesDF = spark.read.option("sep", "\t").schema(schema).csv("file:///SparkCourse/ml-100k/u.data")

movieCounts = moviesDF.groupBy("movieID").count()

# Create a user-defined function to look up movie names from our broadcasted dictionary
def lookupName(movieID):
return nameDict.value[movieID]
lookupNameUDF = func.udf(lookupName)

# Add a movieTitle column using our new udf
moviesWithNames = movieCounts.withColumn("movieTitle", lookupNameUDF(func.col("movieID")))

# Sort the results
# 新的一种排序方法
sortedMoviesWithNames = moviesWithNames.orderBy(func.desc("count"))

# Grab the top 10
sortedMoviesWithNames.show(10, False)

# Stop the session
spark.stop()

数据集:

  • Marvel-graph.txt

    4395 7483 9475 7483

    4802 3939 ...

    每行第一个ID是superhero ID,后面跟着的是在漫画中和这个superhero同时出现过的superhero ID

    一个超级英雄可能多次在每一行的第一个出现

  • Marvel-names.txt

    superhero ID与名字的映射

Mission1:找出最受欢迎的superhero

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("MostPopularSuperhero").getOrCreate()

schema = StructType([ \
StructField("id", IntegerType(), True), \
StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv("file:///SparkCourse/Marvel-names.txt")

# 暂时不在意这个datafram的schema
lines = spark.read.text("file:///SparkCourse/Marvel-graph.txt")

# Small tweak vs. what's shown in the video: we trim each line of whitespace as that could
# throw off the counts.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
.withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
.groupBy("id").agg(func.sum("connections").alias("connections"))

mostPopular = connections.sort(func.col("connections").desc()).first()

mostPopularName = names.filter(func.col("id") == mostPopular[0]).select("name").first()

print(mostPopularName[0] + " is the most popular superhero with " + str(mostPopular[1]) + " co-appearances.")

Mission2:找出最不起眼的superhero

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

spark = SparkSession.builder.appName("MostObscureSuperheroes").getOrCreate()

schema = StructType([ \
StructField("id", IntegerType(), True), \
StructField("name", StringType(), True)])

names = spark.read.schema(schema).option("sep", " ").csv("file:///SparkCourse/Marvel-names.txt")

lines = spark.read.text("file:///SparkCourse/Marvel-graph.txt")

# Small tweak vs. what's shown in the video: we trim whitespace from each line as this
# could throw the counts off by one.
connections = lines.withColumn("id", func.split(func.trim(func.col("value")), " ")[0]) \
.withColumn("connections", func.size(func.split(func.trim(func.col("value")), " ")) - 1) \
.groupBy("id").agg(func.sum("connections").alias("connections"))

minConnectionCount = connections.agg(func.min("connections")).first()[0]

minConnections = connections.filter(func.col("connections") == minConnectionCount)

# 使用join方法联合两个表
minConnectionsWithNames = minConnections.join(names, "id")

print("The following characters have only " + str(minConnectionCount) + " connection(s):")

minConnectionsWithNames.select("name").show()

Mission 3 找到两个超级英雄的分离度

分离度的意思就是再图中的距离

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#Boilerplate stuff:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local").setAppName("DegreesOfSeparation")
sc = SparkContext(conf = conf)

# The characters we wish to find the degree of separation between:
startCharacterID = 5306 #SpiderMan
targetCharacterID = 14 #ADAM 3,031 (who?)

# Our accumulator, used to signal when we find the target character during
# our BFS traversal.
hitCounter = sc.accumulator(0)

def convertToBFS(line):
fields = line.split()
heroID = int(fields[0])
connections = []
for connection in fields[1:]:
connections.append(int(connection))

color = 'WHITE'
distance = 9999

if (heroID == startCharacterID):
color = 'GRAY'
distance = 0

return (heroID, (connections, distance, color))


def createStartingRdd():
inputFile = sc.textFile("file:///sparkcourse/marvel-graph.txt")
return inputFile.map(convertToBFS)

# 在原有rdd基础上加上新数据
def bfsMap(node):
characterID = node[0]
data = node[1]
connections = data[0]
distance = data[1]
color = data[2]

results = []

#If this node needs to be expanded...
if (color == 'GRAY'):
for connection in connections:
newCharacterID = connection
newDistance = distance + 1
newColor = 'GRAY'
if (targetCharacterID == connection):
hitCounter.add(1)

newEntry = (newCharacterID, ([], newDistance, newColor))
results.append(newEntry)

#We've processed this node, so color it black
color = 'BLACK'

#Emit the input node so we don't lose it.
results.append( (characterID, (connections, distance, color)) )
return results

#去除rdd冗余数据
def bfsReduce(data1, data2):
edges1 = data1[0]
edges2 = data2[0]
distance1 = data1[1]
distance2 = data2[1]
color1 = data1[2]
color2 = data2[2]

distance = 9999
color = color1
edges = []

# See if one is the original node with its connections.
# If so preserve them.
if (len(edges1) > 0):
edges.extend(edges1)
if (len(edges2) > 0):
edges.extend(edges2)

# Preserve minimum distance
if (distance1 < distance):
distance = distance1

if (distance2 < distance):
distance = distance2

# Preserve darkest color
if (color1 == 'WHITE' and (color2 == 'GRAY' or color2 == 'BLACK')):
color = color2

if (color1 == 'GRAY' and color2 == 'BLACK'):
color = color2

if (color2 == 'WHITE' and (color1 == 'GRAY' or color1 == 'BLACK')):
color = color1

if (color2 == 'GRAY' and color1 == 'BLACK'):
color = color1

return (edges, distance, color)


#Main program here:
iterationRdd = createStartingRdd()

for iteration in range(0, 10):
print("Running BFS iteration# " + str(iteration+1))

# Create new vertices as needed to darken or reduce distances in the
# reduce stage. If we encounter the node we're looking for as a GRAY
# node, increment our accumulator to signal that we're done.
mapped = iterationRdd.flatMap(bfsMap)

# Note that mapped.count() action here forces the RDD to be evaluated, and
# that's the only reason our accumulator is actually updated.
print("Processing " + str(mapped.count()) + " values.")

if (hitCounter.value > 0):
print("Hit the target character! From " + str(hitCounter.value) \
+ " different direction(s).")
break

# Reducer combines data for each character ID, preserving the darkest
# color and shortest path.
iterationRdd = mapped.reduceByKey(bfsReduce)

Item-Based CF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType
import sys

def computeCosineSimilarity(spark, data):
# Compute xx, xy and yy columns
pairScores = data \
.withColumn("xx", func.col("rating1") * func.col("rating1")) \
.withColumn("yy", func.col("rating2") * func.col("rating2")) \
.withColumn("xy", func.col("rating1") * func.col("rating2"))

# Compute numerator, denominator and numPairs columns
calculateSimilarity = pairScores \
.groupBy("movie1", "movie2") \
.agg( \
func.sum(func.col("xy")).alias("numerator"), \
(func.sqrt(func.sum(func.col("xx"))) * func.sqrt(func.sum(func.col("yy")))).alias("denominator"), \
func.count(func.col("xy")).alias("numPairs")
)

# Calculate score and select only needed columns (movie1, movie2, score, numPairs)
result = calculateSimilarity \
.withColumn("score", \
func.when(func.col("denominator") != 0, func.col("numerator") / func.col("denominator")) \
.otherwise(0) \
).select("movie1", "movie2", "score", "numPairs")

return result

# Get movie name by given movie id
def getMovieName(movieNames, movieId):
result = movieNames.filter(func.col("movieID") == movieId) \
.select("movieTitle").collect()[0]

return result[0]


spark = SparkSession.builder.appName("MovieSimilarities").master("local[*]").getOrCreate()

movieNamesSchema = StructType([ \
StructField("movieID", IntegerType(), True), \
StructField("movieTitle", StringType(), True) \
])

moviesSchema = StructType([ \
StructField("userID", IntegerType(), True), \
StructField("movieID", IntegerType(), True), \
StructField("rating", IntegerType(), True), \
StructField("timestamp", LongType(), True)])


# Create a broadcast dataset of movieID and movieTitle.
# Apply ISO-885901 charset
movieNames = spark.read \
.option("sep", "|") \
.option("charset", "ISO-8859-1") \
.schema(movieNamesSchema) \
.csv("file:///SparkCourse/ml-100k/u.item")

# Load up movie data as dataset
movies = spark.read \
.option("sep", "\t") \
.schema(moviesSchema) \
.csv("file:///SparkCourse/ml-100k/u.data")


ratings = movies.select("userId", "movieId", "rating")

# Emit every movie rated together by the same user.
# Self-join to find every combination.
# Select movie pairs and rating pairs
moviePairs = ratings.alias("ratings1") \
.join(ratings.alias("ratings2"), (func.col("ratings1.userId") == func.col("ratings2.userId")) \
& (func.col("ratings1.movieId") < func.col("ratings2.movieId"))) \
.select(func.col("ratings1.movieId").alias("movie1"), \
func.col("ratings2.movieId").alias("movie2"), \
func.col("ratings1.rating").alias("rating1"), \
func.col("ratings2.rating").alias("rating2"))


moviePairSimilarities = computeCosineSimilarity(spark, moviePairs).cache()

if (len(sys.argv) > 1):
scoreThreshold = 0.97
coOccurrenceThreshold = 50.0

movieID = int(sys.argv[1])

# Filter for movies with this sim that are "good" as defined by
# our quality thresholds above
filteredResults = moviePairSimilarities.filter( \
((func.col("movie1") == movieID) | (func.col("movie2") == movieID)) & \
(func.col("score") > scoreThreshold) & (func.col("numPairs") > coOccurrenceThreshold))

# Sort by quality score.
results = filteredResults.sort(func.col("score").desc()).take(10)

print ("Top 10 similar movies for " + getMovieName(movieNames, movieID))

for result in results:
# Display the similarity result that isn't the movie we're looking at
similarMovieID = result.movie1
if (similarMovieID == movieID):
similarMovieID = result.movie2

print(getMovieName(movieNames, similarMovieID) + "\tscore: " \
+ str(result.score) + "\tstrength: " + str(result.numPairs))