GraphFrames PySpark Example : Learn Data Science

In this post, GraphFrames PySpark example is discussed with shortest path problem. GraphFrames is a Spark package that allows DataFrame-based graphs in Saprk. Spark version 1.6.2 is considered for all examples. Including the package with PySaprk shell :

pyspark –packages graphframes:graphframes:0.1.0-spark1.6

Code:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext ()
sqlContext = SQLContext(sc)

# create vertex DataFrame for users with id and name attributes
v = sqlContext.createDataFrame([
("a", "Alice"),
("b", "Bob"),
("c", "Charlie"),
], ["id", "name"])

# create edge DataFrame with "src" and "dst" attributes
e = sqlContext.createDataFrame([
("a", "b", "friends"),
("b", "c", "follow"),
("c", "b", "follow"),
], ["src", "dst", "relationship"])

# create a GraphFrame with v, e
from graphframes import *
g = GraphFrame(v, e)

# example : getting in-degrees of each vertex
g.inDegrees.show()

Output:

id inDegree
b 2
c 1

example : getting “follow” relationships in the graph

1
g.edges.filter("relationship = 'follow'").count()

Output:

2

getting shortest paths to “a” from each vertex

1
2
results = g.shortestPaths(landmarks=\["a"\])
results.select("id", "distances").show()

Feel free to ask your questions in the comments section!

Logistic Regression with Spark : Learn Data Science

Logistic regression with Spark is achieved using MLlib. Logistic regression returns binary class labels that is “0” or “1”. In this example, we consider a data set that consists only one variable “study hours” and class label is whether the student passed (1) or not passed (0).

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from pyspark import SparkContext
from pyspark import SparkContext
import numpy as np
from numpy import array
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import LogisticRegressionWithLBFGS

sc = SparkContext ()

def createLabeledPoints(label, points):
return LabeledPoint(label, points)

studyHours = [
[ 0, [0.5]],
[ 0, [0.75]],
[ 0, [1.0]],
[ 0, [1.25]],
[ 0, [1.5]],
[ 0, [1.75]],
[ 1, [1.75]],
[ 0, [2.0]],
[ 1, [2.25]],
[ 0, [2.5]],
[ 1, [2.75]],
[ 0, [3.0]],
[ 1, [3.25]],
[ 0, [3.5]],
[ 1, [4.0]],
[ 1, [4.25]],
[ 1, [4.5]],
[ 1, [4.75]],
[ 1, [5.0]],
[ 1, [5.5]]
]

data = []

for x, y in studyHours:
data.append(createLabeledPoints(x, y))

model = LogisticRegressionWithLBFGS.train( sc.parallelize(data) )

print (model)

print (model.predict([1]))

Output:

1
2
3
spark-submit regression-mllib.py
(weights=[0.215546777333], intercept=0.0)
1

References:

  1. Logistic Regression - Wikipedia.org
  2. See other posts in Learn Data Science

k-Means Clustering Spark Tutorial : Learn Data Science

k-Means clustering with Spark is easy to understand. MLlib comes bundled with k-Means implementation (KMeans) which can be imported from pyspark.mllib.clustering package. Here is a very simple example of clustering data with height and weight attributes.

Arguments to KMeans.train:

  1. k is the number of desired clusters
  2. maxIterations is the maximum number of iterations to run.
  3. runs is the number of times to run the k-means algorithm
  4. initializationMode can be either ‘random’or ‘k-meansII’
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    from pyspark import SparkContext
    from pyspark.mllib.clustering import KMeans
    from numpy import array

    sc = SparkContext()
    sc.setLogLevel ("ERROR")

    #12 records with height, weight data
    data = array([185,72, 170,56, 168,60, 179,68, 182,72, 188,77, 180,71, 180,70, 183,84, 180,88, 180,67, 177,76]).reshape(12,2)

    #Generate Kmeans
    model = KMeans.train(sc.parallelize(data), 2, runs=50, initializationMode="random")

    #Print out the cluster of each data point
    print (model.predict(array([185, 71])))
    print (model.predict(array([170, 56])))
    print (model.predict(array([168, 60])))
    print (model.predict(array([179, 68])))
    print (model.predict(array([182, 72])))
    print (model.predict(array([188, 77])))
    print (model.predict(array([180, 71])))
    print (model.predict(array([180, 70])))
    print (model.predict(array([183, 84])))
    print (model.predict(array([180, 88])))
    print (model.predict(array([180, 67])))
    print (model.predict(array([177, 76])))

Output
0
1
1
0
0
0
0
0
0
0
0
0
(10 items go to cluster 0, where as 2 items go to cluster 2)

Above is a very naive example in which we use training dataset as input data too. In real world we will train a model, save it and later use it for predicting clusters of input data. So here is how you can save a trained model and later load it for prediction.

Training and Storing the Model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans
from numpy import array

sc = SparkContext()

#12 records with height, weight data
data = array([185,72, 170,56, 168,60, 179,68, 182,72, 188,77, 180,71, 180,70, 183,84, 180,88, 180,67, 177,76]).reshape(12,2)

#Generate Kmeans
model = KMeans.train(sc.parallelize(data), 2, runs=50, initializationMode="random")

model.save(sc, "savedModelDir")

This will create a directory, _savedModelDir_ with two subdirectories _data_ and _metadata_ where the model is stored. **Using Already Trained Model for Predicting Clusters** Now, let's use trained model by loading it. We need to import KMeansModel in order to use it for loading the model from file.

from pyspark import SparkContext
from pyspark.mllib.clustering import KMeans, KMeansModel
from numpy import array

sc = SparkContext()

#Generate Kmeans
model = KMeansModel.load(sc, "savedModelDir")

#Print out the cluster of each data point
print (model.predict(array([185, 71])))
print (model.predict(array([170, 56])))
print (model.predict(array([168, 60])))
print (model.predict(array([179, 68])))
print (model.predict(array([182, 72])))
print (model.predict(array([188, 77])))
print (model.predict(array([180, 71])))
print (model.predict(array([180, 70])))
print (model.predict(array([183, 84])))
print (model.predict(array([180, 88])))
print (model.predict(array([180, 67])))
print (model.predict(array([177, 76])))

References:

  1. Clustering and Feature Extraction in MLlib, UCLA
  2. k-Means Clustering Algorithm Explained, DnI Institute
  3. k-Means Clustering with Python, iDevji