from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType,StringType
from pyspark.sql.functions import *
from pyspark import SparkContext
import sys

print (sys.argv)
s3srcpath = sys.argv[1]
s3tgtpath = s3srcpath + "noaaparquet/"

def main():

        sc =SparkContext()
        sc.setLocalProperty("callSite.short", "AWS EMR on EKS Sample Job")
        spark = SparkSession(sc)
        schema = StructType([
    StructField("id", StringType(), True),
    StructField("year_date", StringType(), True),
    StructField("element", StringType(), True),
        StructField("data_value", StringType(), True),
        StructField("m_flag", StringType(), True),
        StructField("q_flag", StringType(), True),
        StructField("s_flag", StringType(), True),
        StructField("obs_time", StringType(), True)
        ])
        dfreadings = spark.read.csv(s3srcpath + "noaa/csv.gz/2011.csv.gz", sep=',',schema =schema)



        dfstations = spark.read.csv(s3srcpath + "noaa/ghcnd-stations.txt")
        dfstationssplit = dfstations.select(
        dfstations._c0.substr(1,11).alias('id'),
        dfstations._c0.substr(1,2).alias('countrycode'),
        dfstations._c0.substr(14,9).alias('lat'),
        dfstations._c0.substr(22,9).alias('long'),
        dfstations._c0.substr(39,2).alias('state')
        )

        dfstationssplit.createOrReplaceTempView("stations_sparkvw")
        dfreadings.createOrReplaceTempView("readings_sparkvw")

        dfprocessed = spark.sql("select a.* ,b.countrycode , b.state from readings_sparkvw a, stations_sparkvw b where a.id = b.id and substring(year_date,1,4) ='2011' and element ='PRCP' and b.countrycode ='US'  ")


        dfprocessed.write.mode("overwrite").format('parquet').save(s3tgtpath)



if __name__ == "__main__":
    main()
