Example Step 1: Convert PySpark Script to teradatamlspk Script - Example Step 1: Convert PySpark Script to teradatamlspk Script - Teradata Package for Python

Teradata® pyspark2teradataml User Guide

Deployment
VantageCloud
VantageCore
Edition
Enterprise
IntelliFlex
VMware
Product
Teradata Package for Python
Release Number
20.00
Published
March 2024
Language
English (United States)
Last Update
2024-04-11
dita:mapPath
oeg1710443196055.ditamap
dita:ditavalPath
ayr1485454803741.ditaval
dita:id
oeg1710443196055
Product Category
Teradata Vantage

This step converts PySpark script to teradatamlspk script.

Assume the PySpark script is saved in /tmp folder with the name Predicting_House_Prices_Pyspark.py.

Run the following commands to produce a Python script and an HTML report.

  • >>> from teradatamlspk import pyspark2teradataml
  • >>> pyspark2teradataml('/tmp/Predicting_House_Prices_Pyspark.py')
    Python script '/tmp/Predicting_House_Prices_Pyspark.py' converted to '/tmp/Predicting_House_Prices_Pyspark_tdmlspk.py' successfully.
    Script conversion report '/tmp/Predicting_House_Prices_Pyspark_tdmlspk.html' published successfully.

Generated Python script

1    #!/usr/bin/env python
2    # coding: utf-8
3    
4    # In[1]:
5    
6    
7    import getpass; import os
8    import pandas as pd
9    import numpy as np
10   
11   from teradatamlspk import TeradataConf, TeradataContext
12   from teradatamlspk.sql import TeradataSession, SQLContext
13   
14   from teradatamlspk.sql.types import *
15   import teradatamlspk.sql.functions as F
16   
17   
18   from teradatamlspk.ml.regression import LinearRegression
19   
20   
21   from teradatamlspk.ml.feature import VectorAssembler, StandardScaler
22   from teradatamlspk.ml.evaluation import RegressionEvaluator
23   
24   
25   # In[3]:
26   
27   
28   import seaborn as sns
29   import matplotlib.pyplot as plt
30   
31   
32   # In[4]:
33   
34   
35   # Visualization
36   from IPython.core.interactiveshell import InteractiveShell
37   InteractiveShell.ast_node_interactivity = "all"
38   
39   pd.set_option('display.max_columns', 200)
40   pd.set_option('display.max_colwidth', 400)
41   
42   from matplotlib import rcParams
43   sns.set(context='notebook', style='whitegrid', rc={'figure.figsize': (18,4)})
44   rcParams['figure.figsize'] = 18,4
45   
46   
47   # In[5]:
48   
49   
50   rnd_seed=23
51   np.random.seed=rnd_seed
52   np.random.set_state=rnd_seed
53   
54   
55   # In[6]:
56   
57   
58   spark = TeradataSession.builder.master("local[2]").appName("Linear-Regression-California-Housing").getOrCreate(host=getpass.getpass('Enter host: '), user=getpass.getpass('Enter user: '), password=getpass.getpass('Enter password: '))
59   
60   
61   # In[7]:
62   
63   
64   spark
65   
66   
67   # In[8]:
68   
69   
70   sc = spark.teradataContext
71   sc
72   
73   
74   # In[9]:
75   
76   
77   sqlContext = SQLContext(spark.teradataContext)
78   sqlContext
79   
80   
81   # In[10]:
82   
83   
84   HOUSING_DATA = 'cal_housing.csv'
85   
86   
87   # In[11]:
88   
89   
90   # define the schema, corresponding to a line in the csv data file.
91   schema = StructType([
92       StructField("long", FloatType(), nullable=True),
93       StructField("lat", FloatType(), nullable=True),
94       StructField("medage", FloatType(), nullable=True),
95       StructField("totrooms", FloatType(), nullable=True),
96       StructField("totbdrms", FloatType(), nullable=True),
97       StructField("pop", FloatType(), nullable=True),
98       StructField("houshlds", FloatType(), nullable=True),
99       StructField("medinc", FloatType(), nullable=True),
100      StructField("medhv", FloatType(), nullable=True)]
101  )
102  
103  
104  # In[12]:
105  
106  
107  # Load housing data
108  housing_df = spark.read.csv(path=HOUSING_DATA, schema=schema, header=True).cache()
109  
110  
111  # In[13]:
112  
113  
114  housing_df.take(5)
115  
116  
117  # In[14]:
118  
119  
120  # Show first five rows
121  housing_df.show(5)
122  
123  
124  # In[15]:
125  
126  
127  # show the dataframe columns
128  housing_df.columns
129  
130  
131  # In[16]:
132  
133  
134  # show the schema of the dataframe
135  housing_df.printSchema()
136  
137  
138  # In[17]:
139  
140  
141  # run a sample selection
142  housing_df.select('pop','totbdrms').show(10)
143  
144  
145  # In[18]:
146  
147  
148  # group by housingmedianage and see the distribution
149  result_df = housing_df.groupBy("medage").count().sort("medage", ascending=False)
150  
151  
152  # In[19]:
153  
154  
155  result_df.show(10)
156  
157  
158  # In[20]:
159  
160  
161  result_df.toPandas().plot.bar(x='medage',figsize=(14, 6))
162  
163  
164  # In[21]:
165  
166  
167  (housing_df.describe().select(
168                      "summary",
169                      F.round("medage", 4).alias("medage"),
170                      F.round("totrooms", 4).alias("totrooms"),
171                      F.round("totbdrms", 4).alias("totbdrms"),
172                      F.round("pop", 4).alias("pop"),
173                      F.round("houshlds", 4).alias("houshlds"),
174                      F.round("medinc", 4).alias("medinc"),
175                      F.round("medhv", 4).alias("medhv"))
176                      .show())
177  
178  
179  # In[24]:
180  
181  
182  from teradatamlspk.sql.functions import col
183  # Adjust the values of `medianHouseValue`
184  housing_df = housing_df.withColumn("medhv", col("medhv")/100000)
185  
186  
187  # In[25]:
188  
189  
190  housing_df.show(2)
191  
192  
193  # In[26]:
194  
195  
196  housing_df.columns
197  
198  
199  # In[27]:
200  
201  
202  housing_df = (housing_df.withColumn("rmsperhh", F.round(col("totrooms")/col("houshlds"), 2))
203                         .withColumn("popperhh", F.round(col("pop")/col("houshlds"), 2))
204                         .withColumn("bdrmsperrm", F.round(col("totbdrms")/col("totrooms"), 2)))
205  
206  
207  # In[28]:
208  
209  
210  housing_df.show(5)
211  
212  
213  # In[29]:
214  
215  
216  # Re-order and select columns
217  housing_df = housing_df.select("medhv", 
218                                "totbdrms", 
219                                "pop", 
220                                "houshlds", 
221                                "medinc", 
222                                "rmsperhh", 
223                                "popperhh", 
224                                "bdrmsperrm")
225  
226  
227  # In[30]:
228  
229  
230  featureCols = ["totbdrms", "pop", "houshlds", "medinc", "rmsperhh", "popperhh", "bdrmsperrm"]
231  
232  
233  # In[31]:
234  
235  
236  # put features into a feature vector column
237  assembler = VectorAssembler(inputCols=featureCols, outputCol="features") 
238  
239  
240  # In[32]:
241  
242  
243  assembled_df = assembler.transform(housing_df)
244  
245  
246  # In[33]:
247  
248  
249  assembled_df.show(10, truncate=False)
250  
251  
252  # In[34]:
253  
254  
255  # Initialize the `standardScaler`
256  standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")
257  
258  
259  # In[35]:
260  
261  
262  # Fit the DataFrame to the scaler
263  scaled_df = standardScaler.fit(assembled_df).transform(assembled_df)
264  
265  
266  # In[36]:
267  
268  
269  # Inspect the result
270  scaled_df.select("features", "features_scaled").show(10, truncate=False)
271  
272  
273  # In[37]:
274  
275  
276  # Split the data into train and test sets
277  train_data, test_data = scaled_df.randomSplit([.8,.2], seed=rnd_seed)
278  
279  
280  # In[38]:
281  
282  
283  train_data.columns
284  
285  
286  # In[39]:
287  
288  
289  # Initialize `lr`
290  lr = (LinearRegression(featuresCol='features_scaled', labelCol="medhv", predictionCol='predmedhv', 
291                                 maxIter=10, regParam=0.3, elasticNetParam=0.8, standardization=False))
292  
293  
294  # In[40]:
295  
296  
297  # Fit the data to the model
298  linearModel = lr.fit(train_data)
299  
300  
301  # In[41]:
302  
303  
304  # Coefficients for the model
305  linearModel.coefficients
306  
307  
308  # In[42]:
309  
310  
311  featureCols
312  
313  
314  # In[43]:
315  
316  
317  # Intercept for the model
318  linearModel.intercept
319  
320  
321  # In[44]:
322  
323  
324  # Generate predictions
325  predictions = linearModel.transform(test_data)
326  
327  
328  # In[45]:
329  
330  
331  # Extract the predictions and the "known" correct labels
332  predandlabels = predictions.select("predmedhv", "medhv")
333  
334  
335  # In[46]:
336  
337  
338  predandlabels.show()
339  
340  
341  # In[47]:
342  
343  
344  # Get the RMSE
345  print("RMSE: {0}".format(linearModel.summary.rootMeanSquaredError))
346  
347  
348  # In[48]:
349  
350  
351  print("MAE: {0}".format(linearModel.summary.meanAbsoluteError))
352  
353  
354  # In[49]:
355  
356  
357  # Get the R2
358  print("R2: {0}".format(linearModel.summary.r2))
359  
360  
361  # In[50]:
362  
363  
364  evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='rmse')
365  print("RMSE: {0}".format(evaluator.evaluate(predandlabels)))
366  
367  
368  # In[51]:
369  
370  
371  evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='mae')
372  print("MAE: {0}".format(evaluator.evaluate(predandlabels)))
373  
374  
375  # In[52]:
376  
377  
378  evaluator = RegressionEvaluator(predictionCol="predmedhv", labelCol='medhv', metricName='r2')
379  print("R2: {0}".format(evaluator.evaluate(predandlabels)))
380  
381  
382  # In[53]:
383  
384  
385  # mllib is old so the methods are available in rdd
386  metrics = RegressionMetrics(predandlabels.rdd)
387  
388  
389  # In[54]:
390  
391  
392  print("RMSE: {0}".format(metrics.rootMeanSquaredError))
393  
394  
395  # In[55]:
396  
397  
398  print("MAE: {0}".format(metrics.meanAbsoluteError))
399  
400  
401  # In[56]:
402  
403  
404  print("R2: {0}".format(metrics.r2))
405  
406  
407  # In[57]:
408  
409  
410  spark.stop()
411  
412  

Generated HTML report

GeneratedReport-pyspark2teradataml