import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql import HiveContext
from pyspark.sql import SQLContext
import pyspark.sql.functions as func
conf = SparkConf().setMaster("local").setAppName("OccupancyProject")
#sc = SparkContext(conf = conf)
#sqlContext = SQLContext(sc)
sqlContext = pyspark.SQLContext(sc)
sc
OccupancyData_2 = sc.textFile("C:/Backup/BAIS/Big Data/Project/OccupancyProject-master/OccupancyData_2.csv")
OccupancyData_1 = sc.textFile("C:/Users/engga/Downloads/OccupancyData_1.txt")
OccupancyTraining = sqlContext.read.json("C:/Backup/BAIS/Big Data/Project/OccupancyProject-master/OccupancyData_Train.json", multiLine=True)
OccupancyData_1.take(5)
OccupancyData_2.take(5)
header1=OccupancyData_1.first()
OccupancyData_1=OccupancyData_1.filter(lambda line: line!=header1)
header2=OccupancyData_2.first()
OccupancyData_2=OccupancyData_2.filter(lambda line: line!=header2)
Occupancy_DF1 = OccupancyData_1.map(lambda x: x.split(",")).map(lambda x: (float(x[0].replace('"','')), float(x[2]), float(x[3]), float(x[4]), float(x[5]), float(x[6]), float(x[7]))).toDF()
Occupancy_DF2 = OccupancyData_2.map(lambda x: x.split(",")).map(lambda x: (float(x[0].replace('"','')), float(x[2]), float(x[3]), float(x[4]), float(x[5]), float(x[6]), float(x[7]))).toDF()
Occupancy_DF1.printSchema()
Occupancy_DF2.printSchema()
Occupancy_DF1 = Occupancy_DF1.selectExpr("_1 as ID", "_2 as Temperature","_3 as Humidity", "_4 as Light", "_5 as CO2","_6 as HumidityRatio","_7 as Occupancy")
Occupancy_DF2 = Occupancy_DF2.selectExpr("_1 as ID", "_2 as Temperature","_3 as Humidity", "_4 as Light", "_5 as CO2","_6 as HumidityRatio","_7 as Occupancy")
Occupancy_DF1.printSchema()
Occupancy_DF2.printSchema()
Occupancy_DF1.show(3)
Occupancy_DF2.show(3)
OccupancyTraining.show(3)
OccupancyTraining=OccupancyTraining.dropna()
Occupancy_DF1=Occupancy_DF1.dropna()
Occupancy_DF2=Occupancy_DF2.dropna()
OccupancyTraining=OccupancyTraining.drop(OccupancyTraining.date)
OccupancyTraining=OccupancyTraining.withColumn("Light", func.round(OccupancyTraining["Light"], 3))
Occupancy_DF1=Occupancy_DF1.withColumn("Light", func.round(Occupancy_DF1["Light"], 3))
Occupancy_DF2=Occupancy_DF2.withColumn("Light", func.round(Occupancy_DF2["Light"], 3))
OccupancyTraining=OccupancyTraining.withColumn("CO2", func.round(OccupancyTraining["CO2"], 3))
Occupancy_DF1=Occupancy_DF1.withColumn("CO2", func.round(Occupancy_DF1["CO2"], 3))
Occupancy_DF2=Occupancy_DF2.withColumn("CO2", func.round(Occupancy_DF2["CO2"], 3))
OccupancyTraining.describe().toPandas().transpose()
Occupancy_DF1.describe().toPandas().transpose()
Occupancy_DF2.describe().toPandas().transpose()
import pandas as pd
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int' or t[1] == 'double']
sampled_data = OccupancyTraining.select(numeric_features).sample(False, 0.8).toPandas()
axs = pd.plotting.scatter_matrix(sampled_data, figsize=(10, 10))
n = len(sampled_data.columns)
for i in range(n):
v = axs[i, 0]
v.yaxis.label.set_rotation(0)
v.yaxis.label.set_ha('right')
v.set_yticks(())
h = axs[n-1, i]
h.xaxis.label.set_rotation(90)
h.set_xticks(())
##Reference from https://towardsdatascience.com/building-a-linear-regression-with-pyspark-and-mllib-d065c3ba246a
import six
for i in OccupancyTraining.columns:
if not( isinstance(OccupancyTraining.select(i).take(1)[0][0], six.string_types)):
print( "Correlation to Occupancy for ", i, OccupancyTraining.stat.corr('Occupancy',i))
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'], outputCol = 'features')
OT = vectorAssembler.transform(OccupancyTraining)
OT = OT.select(['Features', 'Occupancy'])
OT.show(3)
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'], outputCol = 'features')
OT_T1 = vectorAssembler.transform(Occupancy_DF1)
OT_T1 = OT_T1.select(['Features', 'Occupancy'])
OT_T1.show(3)
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler(inputCols = ['Temperature', 'Humidity', 'Light', 'CO2', 'HumidityRatio'], outputCol = 'features')
OT_T2 = vectorAssembler.transform(Occupancy_DF2)
OT_T2 = OT_T2.select(['Features', 'Occupancy'])
OT_T2.show(3)
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol = 'Features', labelCol='Occupancy', maxIter=10, regParam=0.3, elasticNetParam=0.8)
lr_model = lr.fit(OT)
print("Coefficients: " + str(lr_model.coefficients))
print("Intercept: " + str(lr_model.intercept))
trainingSummary = lr_model.summary
print("RMSE: %f" % trainingSummary.rootMeanSquaredError)
print("r2: %f" % trainingSummary.r2)
OT.describe().show()
predictions = lr_model.transform(OT_T1)
predictions.select("prediction","Occupancy","features").show()
lr_predictions = lr_model.transform(OT_T1)
lr_predictions.select("prediction","Occupancy","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="Occupancy",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))
lr_predictions = lr_model.transform(OT_T2)
lr_predictions.select("prediction","Occupancy","features").show(5)
from pyspark.ml.evaluation import RegressionEvaluator
lr_evaluator = RegressionEvaluator(predictionCol="prediction", \
labelCol="Occupancy",metricName="r2")
print("R Squared (R2) on test data = %g" % lr_evaluator.evaluate(lr_predictions))