1

How to write a new column with JSON format through DataFrame. I tried several approaches but it's writing the data as JSON-escaped String field. Currently its writing as {"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}

Instead I want it to be as {"test":{"id":1,"name":"name","problem_field": {"x":100,"y":200}}}

problem_field is a new column that is being created based on the values read from other fields as:

val dataFrame = oldDF.withColumn("problem_field", s)

I have tried the following approaches

  1. dataFrame.write.json(<<outputPath>>)
  2. dataFrame.toJSON.map(value => value.replace("\\", "").replace("{\"value\":\"", "").replace("}\"}", "}")).write.json(<<outputPath>>)

Tried converting to DataSet as well but no luck. Any pointers are greatly appreciated.

I have already tried the logic mentioned here: How to let Spark parse a JSON-escaped String field as a JSON Object to infer the proper structure in DataFrames?

0

1 Answer 1

1

For starters, your example data has an extraneous comma after "y\":200 which will prevent it from being parsed as it is not valid JSON.

From there, you can use from_json to parse the field, assuming you know the schema. In this example, I'm parsing the field separately to first get the schema:

scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> json.printSchema
root
 |-- test: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: string (nullable = true)


scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{
case org.apache.spark.sql.Row(x : String) => x
})
problem_field: org.apache.spark.sql.DataFrame = [x: bigint, y: bigint]          

scala> problem_field.printSchema
root
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

If the schema of problem_fields contents is inconsistent between rows, this solution will still work but may not be an optimal way of handling things, as it will produce a sparse Dataframe where each row contains every field encountered in problem_field. For example:

scala> val json = spark.read.json(Seq("""{"test":{"id":1,"name":"name","problem_field": "{\"x\":100,\"y\":200}"}}""", """{"test":{"id":1,"name":"name","problem_field": "{\"a\":10,\"b\":20}"}}""").toDS)
json: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> val problem_field = spark.read.json(json.select($"test.problem_field").map{case org.apache.spark.sql.Row(x : String) => x})
problem_field: org.apache.spark.sql.DataFrame = [a: bigint, b: bigint ... 2 more fields]

scala> problem_field.printSchema
root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- x: long (nullable = true)
 |-- y: long (nullable = true)

scala> val fixed = json.withColumn("test", struct($"test.id", $"test.name", from_json($"test.problem_field", problem_field.schema).as("problem_field")))
fixed: org.apache.spark.sql.DataFrame = [test: struct<id: bigint, name: string ... 1 more field>]

scala> fixed.printSchema
root
 |-- test: struct (nullable = false)
 |    |-- id: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- problem_field: struct (nullable = true)
 |    |    |-- a: long (nullable = true)
 |    |    |-- b: long (nullable = true)
 |    |    |-- x: long (nullable = true)
 |    |    |-- y: long (nullable = true)

scala> fixed.select($"test.problem_field.*").show
+----+----+----+----+
|   a|   b|   x|   y|
+----+----+----+----+
|null|null| 100| 200|
|  10|  20|null|null|
+----+----+----+----+

Over the course of hundreds, thousands, or millions of rows, you can see how this would present a problem.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the comment Charlie, the extra comma after "y\":200 was a typo and it can be ignored. Coming to the solution, in my case the no. of fields in the problem_field is not fixed. I am creating this field dynamically and each record will have different number of fields. How can that be handled? Thanks.
If that's the case, there's no good solution - if the JSON in problem_field is arbitrary and inconsistent, then your dataframe would be too. You're better off keeping it as a string and extracting individual fields on an ad-hoc basis using get_json_object, or addressing the data modeling problems that got you to this point.
I've edited my solution to illustrate that it will work for the case that you've described, but may not be the best way of dealing with your issue.
OK, will check that. Thanks.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.