3

I used rdd.map in order to extract and decode a json from a column like so:

def process_data(row):
    encoded_data = json.loads(row["value"])
    base64_bytes = encoded_data["payload"].encode('ascii')
    ecoded_data_bytes = base64.b64decode(base64_bytes)
    data = json.loads(ecoded_data_bytes.decode('ascii'), strict=False)
    return data, row["file_name"], row["load_time"]

df = df.rdd.map(process_data).toDF

I got the data column as a map type, but I want it as a struct, can I do it?

A row of the data I’m working with looks like that:

{“value” = <encoded data>, “file_name”=“a”, “load_time”=1/1/1}

The encoded data(what’s in value) looks like this:

{“payload”=[
  {
    “key_1”={
     “key_2”=val_2, 
     “key_3”=val_3
    }
  }, {
    “key_1”={
     “key_2”=val_2, 
     “key_3”=val_3
  }}, 
}]}

To avoid this problem I also tried to use 'withColumn' to decode and load the json, but when I loaded the json with this command:

df.withColumn("payload", from_json(col("payload"), json_schema))

Every cell in "payload" returned null(even when I limited myself to only one row).

Why this kind of load does not work? is there a better way?

3
  • 1
    Does this topic help you? Commented Feb 17, 2022 at 15:28
  • Or this link Commented Feb 17, 2022 at 15:30
  • 1
    @Christophe I have a json per row, so I think it won’t work. The second one seems to be the same as I did(or is more to it?) Commented Feb 17, 2022 at 16:15

1 Answer 1

2

For casting a map to a json part: after asking a colleague, I understood that such casting couldn't work, simply because map type is key value one without any specific schema not like struct type. Because more information is needed, map to struct cast can't work.

For the loading a json part: I managed to solve the json issue after removing the json loading and using the "failfast" mode to load the json:

json_schema = spark.read.json(df.rdd.map(lambda row: row["payload"])).schema
df = df.withColumn("payload", from_json(col("payload"), json_schema, options={"mode": "FAILFAST"}))

I got an exception: BadRecordException: java.lang.RuntimeException: Parsing JSON arrays as structs is forbidden.

I warped the payload with another json, like so:

def warp_data(payload):
  try:
    payload = json.loads(payload, strict=False)
    payload_as_dictionary = {"payload": payload}
    return json.dumps(payload_as_dictionary)
  except:
    return None

warp_data_udf = udf(warp_data)

And doing:

df.withColumn("payload", warp_data_udf("payload"))

After that I was able to load the json and work with it.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.