-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathKDD_pyspark.py
148 lines (120 loc) · 5.2 KB
/
KDD_pyspark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#
#
# Copyright 2016 rikonaka
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
#
#
#
#!/usr/bin/env python
# dataset: http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html
import sys
import os
# Path for spark source folder
os.environ['SPARK_HOME']="/path/to/spark"
# Append pyspark to Python Path
sys.path.append("/path/to/spark/python")
try:
from pyspark import SparkContext, SparkConf
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.feature import StandardScaler
print ("Successfully imported Spark Modules")
except ImportError as e:
print ("Can not import Spark Modules", e)
sys.exit(1)
from numpy import array
from math import sqrt
def parse_interaction(line):
"""
Parses a network data interaction.
"""
line_split = line.split(",")
clean_line_split = line_split[0:-1]
return (line_split[-1], array([float(x) for x in clean_line_split]))
def distance(a, b):
"""
Calculates the euclidean distance between two numeric RDDs
"""
return sqrt(a.zip(b).map(lambda x: (x[0]-x[1])).map(lambda x: x*x).reduce(lambda a,b: a+b))
def dist_to_centroid(datum, clusters):
"""
Determines the distance of a point to its cluster centroid
"""
cluster = clusters.predict(datum)
centroid = clusters.centers[cluster]
return sqrt(sum([x**2 for x in (centroid - datum)]))
def clustering_score(data, k):
clusters = KMeans.train(data, k, maxIterations=10, runs=5, initializationMode="random")
result = (k, clusters, data.map(lambda datum: dist_to_centroid(datum, clusters)).mean())
#result = (k, clusters, data.map(lambda datum: dist_to_centroid(datum, clusters)))
print "Clustering score for k=%(k)d is %(score)f" % {"k": k, "score": result[2]}
return result
if __name__ == "__main__":
if (len(sys.argv) != 3):
print "Usage: /home/spark-1.6.1-bin-hadoop2.4/bin/spark-submit --driver-memory 2g " + \
"KDD_spark.py max_k corrected"
sys.exit(1)
# set up environment
max_k = int(sys.argv[1])
data_file = sys.argv[2]
conf = SparkConf().setAppName("kdd-cup-99") \
sc = SparkContext(conf=conf)
# load raw data
print "Loading RAW data..."
raw_data = sc.textFile(data_file)
labels = raw_data.map(lambda line: line.strip().split(",")[-1])
# Prepare data for clustering input
# the data contains non-numeric features, we want to exclude them since
# k-means works with numeric features. These are the first three and the last
# column in each data row
print "Parsing dataset..."
parsed_data = raw_data.map(parse_interaction)
parsed_data_values = parsed_data.values().cache()
# Standardize data
print "Standardizing data..."
standardizer = StandardScaler(True, True)
standardizer_model = standardizer.fit(parsed_data_values)
standardized_data_values = standardizer_model.transform(parsed_data_values)
# Evaluate values of k from 5 to 40
print "Calculating total in within cluster distance for different k values (10 to %(max_k)d):" % {"max_k": max_k}
scores = map(lambda k: clustering_score(standardized_data_values, k), range(10, max_k+1, 10))
# Obtain min score k
min_k = min(scores, key=lambda x: x[2])[0]
print "Best k value is %(best_k)d" % {"best_k": min_k}
# Use the best model to assign a cluster to each datum
# We use here standardized data - it is more appropriate for exploratory purposes
print "Obtaining clustering result sample for k=%(min_k)d..." % {"min_k": min_k}
best_model = min(scores, key=lambda x: x[2])[1]
cluster_assignments_sample = standardized_data_values.map(lambda datum: str(best_model.predict(datum))+","+",".join(map(str,datum)))
# Save assignment sample to file
print "Saving sample to file..."
cluster_assignments_sample.saveAsTextFile("sample_standardized")
labels.saveAsTextFile("labels")
print "DONE!"