Building a Movie Recommendation Service with Apache Spark

In this tutorial I’ll show you building a movie recommendation service with Apache Spark. Two users are alike if they rated a product similarly. For example, if Alice rated a book 3/5 and Bob also rated the same book 3.3/5 they are very much alike. Now if Bob buys another book and rates it 4/5 we should suggest that book to Alice, that’s what a recommender system does. See references if you want to know more about how recommender systems work. We are going to use Alternating Least Squares method from MLLib, and MovieLens 100K dataset which is only 5 MB in size. Download the dataset from https://grouplens.org/datasets/movielens/. Code :

from pyspark.mllib.recommendation import ALS,MatrixFactorizationModel, Rating
from pyspark import SparkContext

sc = SparkContext ()

#Replace filepath with appropriate data
movielens = sc.textFile(“filepath/u.data”)

movielens.first() #u’196\t242\t3\t881250949’
movielens.count() #100000

#Clean up the data by splitting it,
#movielens readme says the data is split by tabs and
#is user product rating timestamp
clean_data = movielens.map(lambda x:x.split(‘\t’))

#We’ll need to map the movielens data to a Ratings object
#A Ratings object is made up of (user, item, rating)
mls = movielens.map(lambda l: l.split(‘\t’))
ratings = mls.map(lambda x: Rating(int(x[0]),\
int(x[1]), float(x[2])))

#Setting up the parameters for ALS
rank = 5 # Latent Factors to be made
numIterations = 10 # Times to repeat process

#Need a training and test set, test set is not used in this example.
train, test = ratings.randomSplit([0.7,0.3],7856)

#Create the model on the training data
model = ALS.train(train, rank, numIterations)

For Product X, Find N Users to Sell To

model.recommendUsers(242,100)

For User Y Find N Products to Promote

model.recommendProducts(196,10)

#Predict Single Product for Single User
model.predict(196, 242)

References:

  1. Building a Recommender System in Spark with ALS, LearnByMarketing.com
  2. MovieLens
  3. Video : Collaborative Filtering, Stanford University
  4. Matrix Factorisation and Dimensionality Reduction, Thierry Silbermann
  5. Building a Recommendation Engine with Spark, Nick Pentreath, Packt

GraphFrames PySpark Example : Learn Data Science

In this post, GraphFrames PySpark example is discussed with shortest path problem. GraphFrames is a Spark package that allows DataFrame-based graphs in Saprk. Spark version 1.6.2 is considered for all examples. Including the package with PySaprk shell :

pyspark –packages graphframes:graphframes:0.1.0-spark1.6

Code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext ()
sqlContext = SQLContext(sc)

# create vertex DataFrame for users with id and name attributes
v = sqlContext.createDataFrame([
("a", "Alice"),
("b", "Bob"),
("c", "Charlie"),
], ["id", "name"])

# create edge DataFrame with "src" and "dst" attributes
e = sqlContext.createDataFrame([
("a", "b", "friends"),
("b", "c", "follow"),
("c", "b", "follow"),
], ["src", "dst", "relationship"])

# create a GraphFrame with v, e
from graphframes import *
g = GraphFrame(v, e)

# example : getting in-degrees of each vertex
g.inDegrees.show()

Output:

id inDegree
b 2
c 1

example : getting “follow” relationships in the graph

1
g.edges.filter("relationship = 'follow'").count()

Output:

2

getting shortest paths to “a” from each vertex

1
2
results = g.shortestPaths(landmarks=\["a"\])
results.select("id", "distances").show()

Feel free to ask your questions in the comments section!

Logistic Regression with Spark : Learn Data Science

Logistic regression with Spark is achieved using MLlib. Logistic regression returns binary class labels that is “0” or “1”. In this example, we consider a data set that consists only one variable “study hours” and class label is whether the student passed (1) or not passed (0).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from pyspark import SparkContext
from pyspark import SparkContext
import numpy as np
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

sc = SparkContext ()

def createLabeledPoints(label, points):
return LabeledPoint(label, points)

studyHours = [
[ 0, [0.5]],
[ 0, [0.75]],
[ 0, [1.0]],
[ 0, [1.25]],
[ 0, [1.5]],
[ 0, [1.75]],
[ 1, [1.75]],
[ 0, [2.0]],
[ 1, [2.25]],
[ 0, [2.5]],
[ 1, [2.75]],
[ 0, [3.0]],
[ 1, [3.25]],
[ 0, [3.5]],
[ 1, [4.0]],
[ 1, [4.25]],
[ 1, [4.5]],
[ 1, [4.75]],
[ 1, [5.0]],
[ 1, [5.5]]
]

data = []

for x, y in studyHours:
data.append(createLabeledPoints(x, y))

model = LogisticRegressionWithLBFGS.train( sc.parallelize(data) )

print (model)

print (model.predict([1]))

Output:

1
2
3
spark-submit regression-mllib.py
(weights=[0.215546777333], intercept=0.0)
1

References:

  1. Logistic Regression - Wikipedia.org
  2. See other posts in Learn Data Science

k-Means Clustering Spark Tutorial : Learn Data Science

k-Means clustering with Spark is easy to understand. MLlib comes bundled with k-Means implementation (KMeans) which can be imported from pyspark.mllib.clustering package. Here is a very simple example of clustering data with height and weight attributes.

Arguments to KMeans.train:

  1. k is the number of desired clusters
  2. maxIterations is the maximum number of iterations to run.
  3. runs is the number of times to run the k-means algorithm
  4. initializationMode can be either ‘random’or ‘k-meansII’
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    from numpy import array

    sc = SparkContext()
    sc.setLogLevel ("ERROR")

    #12 records with height, weight data
    data = array([185,72, 170,56, 168,60, 179,68, 182,72, 188,77, 180,71, 180,70, 183,84, 180,88, 180,67, 177,76]).reshape(12,2)

    #Generate Kmeans
    model = KMeans.train(sc.parallelize(data), 2, runs=50, initializationMode="random")

    #Print out the cluster of each data point
    print (model.predict(array([185, 71])))
    print (model.predict(array([170, 56])))
    print (model.predict(array([168, 60])))
    print (model.predict(array([179, 68])))
    print (model.predict(array([182, 72])))
    print (model.predict(array([188, 77])))
    print (model.predict(array([180, 71])))
    print (model.predict(array([180, 70])))
    print (model.predict(array([183, 84])))
    print (model.predict(array([180, 88])))
    print (model.predict(array([180, 67])))
    print (model.predict(array([177, 76])))

Output
0
1
1
0
0
0
0
0
0
0
0
0
(10 items go to cluster 0, where as 2 items go to cluster 2)

Above is a very naive example in which we use training dataset as input data too. In real world we will train a model, save it and later use it for predicting clusters of input data. So here is how you can save a trained model and later load it for prediction.

Training and Storing the Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans
from numpy import array

sc = SparkContext()

#12 records with height, weight data
data = array([185,72, 170,56, 168,60, 179,68, 182,72, 188,77, 180,71, 180,70, 183,84, 180,88, 180,67, 177,76]).reshape(12,2)

#Generate Kmeans
model = KMeans.train(sc.parallelize(data), 2, runs=50, initializationMode="random")

model.save(sc, "savedModelDir")

This will create a directory, _savedModelDir_ with two subdirectories _data_ and _metadata_ where the model is stored. **Using Already Trained Model for Predicting Clusters** Now, let's use trained model by loading it. We need to import KMeansModel in order to use it for loading the model from file.

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array

sc = SparkContext()

#Generate Kmeans
model = KMeansModel.load(sc, "savedModelDir")

#Print out the cluster of each data point
print (model.predict(array([185, 71])))
print (model.predict(array([170, 56])))
print (model.predict(array([168, 60])))
print (model.predict(array([179, 68])))
print (model.predict(array([182, 72])))
print (model.predict(array([188, 77])))
print (model.predict(array([180, 71])))
print (model.predict(array([180, 70])))
print (model.predict(array([183, 84])))
print (model.predict(array([180, 88])))
print (model.predict(array([180, 67])))
print (model.predict(array([177, 76])))

References:

  1. Clustering and Feature Extraction in MLlib, UCLA
  2. k-Means Clustering Algorithm Explained, DnI Institute
  3. k-Means Clustering with Python, iDevji