forked from jlyang1990/Spark_Python_Do_Big_Data_Analytics
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathDEMO Spark ML DecisionTrees.py
executable file
·134 lines (105 loc) · 4.66 KB
/
DEMO Spark ML DecisionTrees.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# -*- coding: utf-8 -*-
"""
Spark with Python
Copyright : V2 Maestros @2016
Code Demo : Spark Machine Learning - Decision Trees
Problem Statement
*****************
The input data is the iris dataset. It contains recordings of
information about flower samples. For each sample, the petal and
sepal length and width are recorded along with the type of the
flower. We need to use this dataset to build a decision tree
model that can predict the type of flower based on the petal
and sepal information.
## Techniques Used
1. Decision Trees
2. Training and Testing
3. Confusion Matrix
-----------------------------------------------------------------------------
"""
#import os
#os.chdir("C:/Personal/V2Maestros/Courses/Big Data Analytics with Spark/Python")
#os.curdir
"""--------------------------------------------------------------------------
Load Data
-------------------------------------------------------------------------"""
#Load the CSV file into a RDD
irisData = SpContext.textFile("iris.csv")
irisData.cache()
irisData.count()
#Remove the first line (contains headers)
dataLines = irisData.filter(lambda x: "Sepal" not in x)
dataLines.count()
"""--------------------------------------------------------------------------
Cleanup Data
-------------------------------------------------------------------------"""
from pyspark.sql import Row
#Create a Data Frame from the data
parts = dataLines.map(lambda l: l.split(","))
irisMap = parts.map(lambda p: Row(SEPAL_LENGTH=float(p[0]),\
SEPAL_WIDTH=float(p[1]), \
PETAL_LENGTH=float(p[2]), \
PETAL_WIDTH=float(p[3]), \
SPECIES=p[4] ))
# Infer the schema, and register the DataFrame as a table.
irisDf = SpSession.createDataFrame(irisMap)
irisDf.cache()
#Add a numeric indexer for the label/target column
from pyspark.ml.feature import StringIndexer
stringIndexer = StringIndexer(inputCol="SPECIES", outputCol="IND_SPECIES")
si_model = stringIndexer.fit(irisDf)
irisNormDf = si_model.transform(irisDf)
irisNormDf.select("SPECIES","IND_SPECIES").distinct().show()
irisNormDf.cache()
"""--------------------------------------------------------------------------
Perform Data Analytics
-------------------------------------------------------------------------"""
#See standard parameters
irisNormDf.describe().show()
#Find correlation between predictors and target
for i in irisNormDf.columns:
if not( isinstance(irisNormDf.select(i).take(1)[0][0], unicode)) :
print( "Correlation to Species for ", i, \
irisNormDf.stat.corr('IND_SPECIES',i))
"""--------------------------------------------------------------------------
Prepare data for ML
-------------------------------------------------------------------------"""
#Transform to a Data Frame for input to Machine Learing
#Drop columns that are not required (low correlation)
from pyspark.ml.linalg import Vectors
def transformToLabeledPoint(row) :
lp = ( row["SPECIES"], row["IND_SPECIES"], \
Vectors.dense([row["SEPAL_LENGTH"],\
row["SEPAL_WIDTH"], \
row["PETAL_LENGTH"], \
row["PETAL_WIDTH"]]))
return lp
irisLp = irisNormDf.rdd.map(transformToLabeledPoint)
irisLpDf = SpSession.createDataFrame(irisLp,["species","label", "features"])
irisLpDf.select("species","label","features").show(10)
irisLpDf.cache()
"""--------------------------------------------------------------------------
Perform Machine Learning
-------------------------------------------------------------------------"""
#Split into training and testing data
(trainingData, testData) = irisLpDf.randomSplit([0.9, 0.1])
trainingData.count()
testData.count()
testData.show()
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Create the model
dtClassifer = DecisionTreeClassifier(maxDepth=2, labelCol="label",\
featuresCol="features")
dtModel = dtClassifer.fit(trainingData)
dtModel.numNodes
dtModel.depth
#Predict on the test data
predictions = dtModel.transform(testData)
predictions.select("prediction","species","label").show()
#Evaluate accuracy
evaluator = MulticlassClassificationEvaluator(predictionCol="prediction", \
labelCol="label",metricName="accuracy")
evaluator.evaluate(predictions)
#Draw a confusion matrix
predictions.groupBy("label","prediction").count().show()