from pyspark.sql.functions import rand, col
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame
from pyspark import StorageLevel
import sys
from functools import reduce

def merge_noaa_dataframe(spark, years):
    """
    Merge NOAA datasets for given years
    """
    print("Merging NOAA datasets for years {years}")
    years_append_list=[]

    for year in years:
        path = "s3://noaa-global-hourly-pds/"+year+"/"
        year_df = spark.read.csv(path)
        if year in ['2006','2014','2015','2017','2018','2022','2023']:
            year_df = year_df.withColumn("_c58", rand())
        year_df = year_df.union(year_df)
        years_append_list.append(year_df)

    merge_df = reduce(DataFrame.unionAll, years_append_list)
    return merge_df

def simulate_out_of_disk_error(spark, years):
    """
    Simulate an out-of-disk error by generating a large DataFrame and forcing the Spark executor to spill the data to temporary files.
    """
    df = merge_noaa_dataframe(spark, years)

    # Trigger an out-of-disk error by exceeding the available disk space
    df = df.withColumn("random_column", rand())
    df.persist(storageLevel=StorageLevel.DISK_ONLY)
    return df


if __name__ == "__main__":
    # Create a Spark session
    spark = (
        SparkSession.builder
        .appName("Simulate Out-of-Disk Error")
        .getOrCreate()
    )

    # Set the years
    years = ['2004','2006','2009','2010','2011','2012','2013','2014','2015','2017','2018','2022','2023']
    print(f"Generating NOAA dataframe for {years} years")

    # Simulate the out-of-disk error
    df = simulate_out_of_disk_error(spark, years)
    df.show()
    #df.write.mode("overwrite").parquet("s3://vasveena-test-demo/sparkresults/")
