0

I am having this error when running a EMR with a notebook passing some dates:

    An error occurred: An error occurred while calling o236.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 87.0 failed 4 times, most recent failure: Lost task 5.3 in stage 87.0 (TID 4695) (ip-10-19-32-195.eu-west-2.compute.internal executor 5): org.apache.spark.SparkUpgradeException: You may get a different result due to the upgrading to Spark >= 3.0: Fail to parse '2024-4-01' in the new parser. You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0, or set to CORRECTED and treat it as an invalid datetime string.

I have read that passing one of these solve the issue:

spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

or

spark.sql("set spark.sql.legacy.timeParserPolicy=LEGACY")

and that happens when running it in the notebook, but when passing it to pyspark script and running it as step it keeps failing.

I have tried to pass it as part of the configuration when creating the cluster in the optional_node_config block like this:

def create_emr_sedona_cluster(json_cluster_config, json_build_config, session, use_spot_instances=False,
                              step_concurrency_level=1):
    current_date = datetime.now()
    formatted_date = current_date.strftime("%Y%m%d")
    cluster_name = json_cluster_config[0]['clustername']
    build = json_build_config[0]['build']
    loguri = json_cluster_config[0]['loguri'] + "/" + build
    build_cluster_name = build + cluster_name + formatted_date
    emr_client = session.client('emr')

    print("json_config: ", json_cluster_config)

    instance_market = 'SPOT' if use_spot_instances else 'ON_DEMAND'
    bootstrap_path = json_cluster_config[0]['bootstrapfilelocation']

    bootstrap_actions = [
    ]

    ebs_configuration = {        }

    optional_node_config = [
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.yarn.dist.jars": "/usr/lib/spark/jars/sedona-spark-shaded-3.3_2.12-1.7.1.jar,/usr/lib/spark/jars/geotools-wrapper-1.7.1-28.5.jar",
                "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
                "spark.kryo.registrator": "org.apache.sedona.core.serde.SedonaKryoRegistrator",
                "spark.sql.extensions": "org.apache.sedona.viz.sql.SedonaVizExtensions,org.apache.sedona.sql.SedonaSqlExtensions",
                "spark.dynamicAllocation.enabled": "true",
                "spark.dynamicAllocation.minExecutors": "2",
                "spark.dynamicAllocation.maxExecutors": "10",
                "spark.executor.memory": "8g",
                "spark.executor.cores": "4",
                "spark.executor.memoryOverhead": "1024",
                "spark.driver.memory": "8g",
                "spark.sql.shuffle.partitions": "200",
                "spark.default.parallelism": "200",
                "spark.kryoserializer.buffer.max": "512m",
                "spark.sql.autoBroadcastJoinThreshold": "-1",
                "spark.network.timeout": "300s",
                "spark.executor.heartbeatInterval": "60s",
                "spark.sql.legacy.timeParserPolicy": "LEGACY"
            }
        }
    ]

    core_instance_count = json_cluster_config[0].get('coreinstancecount', 2)

    response = emr_client.run_job_flow(
    )

    # ✅ Return the Cluster ID
    cluster_id = response['JobFlowId']
    print(f"EMR cluster started with ID: {cluster_id}")
    return cluster_id

but is still not processing it correctly, is there anyway to deal with this?

thanks

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.