# Time Series Analysis in Spark SQL

"""

Time Series Analysis in Spark SQL

Written JP Vijaykumar

Date Mar 8 2021

This script is provided for educational purpose only.

Pls modify/change the script as may be required to suit your environment.

I presented a script to process data using Time Series Analysis algorithm in sql and pl/sql earlier.

In this article, I am using the same code(90%) from my previous article and 10% pyspark code.

If you know sql, coding in pyspark is not that difficult.

I like spark sql for the following reasons:

01) It is open source.

02) It combines the rich functionality of python and sql

03) It has the datamining libraries.

04) can be installed on my desktop and play around.

Besides, I love scripting and complex algorithms.

I used the following urls to install spark and setup spark on my desktop.

https://www.youtube.com/watch?v=IQfG0faDrzE

https://www.youtube.com/watch?v=WQErwxRTiW0

http://media.sundog-soft.com/spark-python-install.pdf

There are slight variations in the way, Time Series Analysis is performed, from presentation to presentation.

I followed mostly the below mentioned vedio presentation, on Time Series Analysis for programming in sql and pl/sql,

and spark sql presented in this article.

https://www.youtube.com/watch?v=HIWXdHlDSFs --TIME SERIES ANALYSIS

Questions to be answered:

01) Using the ratio to moving average method calculate seasonally adjusted indicies for each quarter.

02) Obtain a regression trend line representing the above data.

03) Obtain a seasonally adjusted trend estimate for the 4th quarter of 2011.

I created the "e:/data/TimeSeries.csv" file with following data:

year ,q1 ,q2 ,q3 ,q4

2008,20,30,39,60

2009,40,51,62,81

2010,50,64,74,95

Pls modify the code with the location of the csv file on your machine.

"""

#spark-submit.cmd python/pysparkTimeSeriesAnalysis.py

from pyspark.sql import SparkSession

from pyspark import SparkContext,SparkConf

from pyspark.sql.functions import *

from pyspark.sql.window import Window

spark = SparkSession.builder.appName("TimeSeriesAnalysis") \

.master("local[*]").getOrCreate()

spark.conf.set("spark.sql.debug.maxToStringFields",100)

spark.conf.set("spark.sql.crossJoin.enabled", "true") #To enable cartesian product in sql

df = spark.read.csv("e:/data/TimeSeries.csv",inferSchema=True,header=True)

print(df.printSchema())

print(df.columns)

df.show()

#trim alternative. When spaces are there in DataFrame columns' names

#select the specific column with df.columns[

df.select(df.columns[3]).show()

#unpivot table, rotate rows as columns

#crosstable function can be implemented with "explode" option

df = df.select(array(col(df.columns[1]),col(df.columns[2]),col(df.columns[3]),col(df.columns[4])).alias("val"))

df = df.withColumn("val",explode(col("val")))

df.show()

w = Window().orderBy("val")

df.select("*",row_number().over(w).alias("id")).show() #add rownum/row_num/rowid to output data starts from "1"

df2 = df.withColumn ("rowid",row_number().over(Window.orderBy(monotonically_increasing_id())) + 0) #rownum starts from "1"

df2.show()

df = df.repartition(1).withColumn("rnum",monotonically_increasing_id() + 1) #add rownum to output data

#by default monotonically_increasing_id starts with "0", add "+ 1" to start with "1"

df.select("*").show() #add rownum to output data starts with "0"

df.registerTempTable("DF")

spark.catalog.cacheTable("DF")

spark.sql("select rnum + 1 as id,val from DF").show() #you can add "+ 1" while selecting data from the DataFrame also

#######################################

spark.sql("""

with

frqma01 as (select round(avg(val),2) frqma_val from DF where rnum>=1 and rnum<=4 ),

frqma02 as (select round(avg(val),2) frqma_val from DF where rnum>=2 and rnum<=5 ),

frqma03 as (select round(avg(val),2) frqma_val from DF where rnum>=3 and rnum<=6 ),

frqma04 as (select round(avg(val),2) frqma_val from DF where rnum>=4 and rnum<=7 ),

frqma05 as (select round(avg(val),2) frqma_val from DF where rnum>=5 and rnum<=8 ),

frqma06 as (select round(avg(val),2) frqma_val from DF where rnum>=6 and rnum<=9 ),

frqma07 as (select round(avg(val),2) frqma_val from DF where rnum>=7 and rnum<=10 ),

frqma08 as (select round(avg(val),2) frqma_val from DF where rnum>=8 and rnum<=11 ),

frqma09 as (select round(avg(val),2) frqma_val from DF where rnum>=9 and rnum<=12 ),

frqma_rpt as (select cast('Four Quarter Moving Average: ' as char(60))||frqma_val description from(

select frqma_val from frqma01

union all

select frqma_val from frqma02

union all

select frqma_val from frqma03

union all

select frqma_val from frqma04

union all

select frqma_val from frqma05

union all

select frqma_val from frqma06

union all

select frqma_val from frqma07

union all

select frqma_val from frqma08

union all

select frqma_val from frqma09

)),

ctdma1 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma01 union all select frqma_val from frqma02)),

ctdma2 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma02 union all select frqma_val from frqma03)),

ctdma3 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma03 union all select frqma_val from frqma04)),

ctdma4 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma04 union all select frqma_val from frqma05)),

ctdma5 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma05 union all select frqma_val from frqma06)),

ctdma6 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma06 union all select frqma_val from frqma07)),

ctdma7 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma07 union all select frqma_val from frqma08)),

ctdma8 as (select round(avg(frqma_val),2) ctdma_val from (select frqma_val from frqma08 union all select frqma_val from frqma09)),

ctdma_rpt as (select cast('Centered Average: ' as char(60))||ctdma_val from (

select ctdma_val from ctdma1

union all

select ctdma_val from ctdma2

union all

select ctdma_val from ctdma3

union all

select ctdma_val from ctdma4

union all

select ctdma_val from ctdma5

union all

select ctdma_val from ctdma6

union all

select ctdma_val from ctdma7

union all

select ctdma_val from ctdma8

)),

pctavg_rpt as (select cast('PCT of Average: ' as char(60))||pct_avg from (

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma1 where rnum=3

union all

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma2 where rnum=4

union all

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma3 where rnum=5

union all

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma4 where rnum=6

union all

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma5 where rnum=7

union all

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma6 where rnum=8

union all

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma7 where rnum=9

union all

select round(val*100/ctdma_val,2) pct_avg from DF,ctdma8 where rnum=10

)),

q3_1 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma1 where rnum=3 ),

q4_1 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma2 where rnum=4 ),

q1_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma3 where rnum=5 ),

q2_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma4 where rnum=6 ),

q3_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma5 where rnum=7 ),

q4_2 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma6 where rnum=8 ),

q1_3 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma7 where rnum=9 ),

q2_3 as (select round(val*100/ctdma_val,2) pct_avg from DF,ctdma8 where rnum=10),

mean_rpt as (select cast('Mean: ' as char(60))||mean from (

select round(avg(pct_avg),2) mean from (select * from q1_2 union all select * from q1_3)

union all

select round(avg(pct_avg),2) mean from (select * from q2_2 union all select * from q2_3)

union all

select round(avg(pct_avg),2) mean from (select * from q3_1 union all select * from q3_2)

union all

select round(avg(pct_avg),2) mean from (select * from q4_1 union all select * from q4_2)

)),

m1 as (select round(avg(pct_avg),2) mean from (select * from q1_2 union all select * from q1_3)),

m2 as (select round(avg(pct_avg),2) mean from (select * from q2_2 union all select * from q2_3)),

m3 as (select round(avg(pct_avg),2) mean from (select * from q3_1 union all select * from q3_2)),

m4 as (select round(avg(pct_avg),2) mean from (select * from q4_1 union all select * from q4_2)),

adj_factor as (select round(400/sum(mean),4) adj_factor from (select mean from m1 union all select mean from m2 union all select mean from m3 union all select mean from m4 )),

s1 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m1),

s2 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m2),

s3 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m3),

s4 as (select round(adj_factor*mean,2) seasonal_idx from adj_factor,m4),

sum_ssidx as (select cast('Sum Seasonal Index: ' as char(60))||sum(seasonal_idx) from (select seasonal_idx from s1 union all select seasonal_idx from s2 union all select seasonal_idx from s3 union all select seasonal_idx from s4)),

tc as (select count(*) num_recs,sum(rnum) sum_x,sum(rnum)/count(*) mean_x,sum(val) sum_y,sum(val)/count(*) mean_y,sum(rnum*val) sum_xy,sum(power(rnum,2)) sum_x_sqr from DF),

b as (select round((num_recs*sum_xy - sum_x*sum_y)/(num_recs*sum_x_sqr - power(sum_x,2)),2) b_val from tc),

a as (select round( mean_y - b_val*mean_x,2) a_val from tc,b)

select cast('X code and Y code values: ' as char(60))||rnum||' '||val description from DF

union all

select * from frqma_rpt

union all

select * from ctdma_rpt

union all

select * from pctavg_rpt

union all

select * from mean_rpt

union all

select cast('Seasonal Index: ' as char(60))||seasonal_idx from s1

union all

select cast('Seasonal Index: ' as char(60))||seasonal_idx from s2

union all

select cast('Seasonal Index: ' as char(60))||seasonal_idx from s3

union all

select cast('Seasonal Index: ' as char(60))||seasonal_idx from s4

union all

select * from sum_ssidx

union all

select cast('X Adjustment Factor: ' as char(60))||adj_factor from adj_factor

union all

select cast('b value : ' as char(60))||b_val from b

union all

select cast('a value : ' as char(60))||a_val from a

union all

select cast('Seasonally Adjusted Trend Estimate for 1st Quarter of 2011: ' as char(60))||round((a_val + b_val*13)*seasonal_idx/100,2) seasonal_index from a,b,s1

union all

select cast('Seasonally Adjusted Trend Estimate for 2nd Quarter of 2011: ' as char(60))||round((a_val + b_val*14)*seasonal_idx/100,2)seasonal_index from a,b,s2

union all

select cast('Seasonally Adjusted Trend Estimate for 3rd Quarter of 2011: ' as char(60))||round((a_val + b_val*15)*seasonal_idx/100,2)seasonal_index from a,b,s3

union all

select cast('Seasonally Adjusted Trend Estimate for 4th Quarter of 2011: ' as char(60))||round((a_val + b_val*16)*seasonal_idx/100,2) seasonal_index from a,b,s4

"""

).show(60,False) #with this option, "show" will not chop columns' length in display

spark.stop()

#References:

#http://www.orafaq.com/node/3187 "TIME SERIES ANALYSIS IN SQL AND PL/SQL"

#http://www.orafaq.com/node/3204

#https://stackoverflow.com/questions/33742895/how-to-show-full-column-content-in-a-spark-dataframe

- jp_vijaykumar's blog
- Log in to post comments