1

I created RDD[String] in which each String element contains multiple JSON strings, but all these JSON strings have the same scheme over the whole RDD.

For example:

RDD{String] called as rdd contains the following entries: String 1:

{"data":"abc", "field1":"def"}
{"data":"123", "field1":"degf"}
{"data":"87j", "field1":"hzc"}
{"data":"efs", "field1":"ssaf"}

String 2:

{"data":"fsg", "field1":"agas"}
{"data":"sgs", "field1":"agg"}
{"data":"sdg", "field1":"agads"}

My goal is to convert this RDD[String] into DataFrame. If I just do it this way:

val df = rdd.toDF()

..., then it does not work correctly. Actually df.count() gives me 2, instead of 7 for the above example, because JSON strings are batched and are not recognized individually.

How can I create DataFrame so that each row would correspond to particular JSON string?

4
  • you can use flatMap over your first RDD[String] so that each json shall be each row string in new RDD[String] Commented May 12, 2017 at 15:21
  • @RameshMaharjan: Could you show it in the answer? Commented May 12, 2017 at 16:32
  • @RameshMaharjan: I get Char if I do flatMap. Commented May 12, 2017 at 16:34
  • if you have a valid json you can directly read as val data = spark.read.json(input) Commented May 12, 2017 at 17:30

2 Answers 2

2

I can't check it right now but i think this should work:

// split each string by newline character
val splitted: RDD[Array[String]] = rdd.map(_.split("\n"))

// flatten
val jsonRdd: RDD[String] = splitted.flatMap(identity)
Sign up to request clarification or add additional context in comments.

4 Comments

Wjat is identity?
en.wikipedia.org/wiki/Identity_function Basically flatMap(identity) means flatMap(x => x)
Hmm, jsonRdd.printScheme() gives me root |-- _1: string (nullable = true)
in fact when I run jsonRdd.toDF().foreach(f => println(f)), I get [{"data":"abc", "field1":"def"}] [{"data":"123", "field1":"degf"}] .... It looks like each string was passed to an array, but I now need to convert each field into DataFrame's column.
1

By following the information you've provided in your question, following can be your solution :

import sqlContext.implicits._
val str1 = "{\"data\":\"abc\", \"field1\":\"def\"}\n{\"data\":\"123\", \"field1\":\"degf\"}\n{\"data\":\"87j\", \"field1\":\"hzc\"}\n{\"data\":\"efs\", \"field1\":\"ssaf\"}"
val str2 = "{\"data\":\"fsg\", \"field1\":\"agas\"}\n{\"data\":\"sgs\", \"field1\":\"agg\"}\n{\"data\":\"sdg\", \"field1\":\"agads\"}"
val input = Seq(str1, str2)

val rddData = sc.parallelize(input).flatMap(_.split("\n"))
  .map(line => line.split(","))
  .map(array => (array(0).split(":")(1).trim.replaceAll("\\W", ""), array(1).split(":")(1).trim.replaceAll("\\W", "")))
rddData.toDF("data", "field1").show


Edited
You can exclude the fieldNames and just use .toDF() but that would give default column names from your data (like _1 _2 or col_1 col_2 etc)
Instead you can create a schema to create dataframe as below (you can add more fields)

val rddData = sc.parallelize(input).flatMap(_.split("\n"))
  .map(line => line.split(","))
  .map(array => Row.fromSeq(Seq(array(0).split(":")(1).trim.replaceAll("\\W", ""), array(1).split(":")(1).trim.replaceAll("\\W", ""))))

val schema = StructType(Array(StructField("data", StringType, true),
  StructField("field1", StringType, true)))

sqlContext.createDataFrame(rddData, schema).show

Or
You can just create dataset directly but you will need a case class (you can add more fields) as below

val dataSet = sc.parallelize(input).flatMap(_.split("\n"))
  .map(line => line.split(","))
  .map(array => Dinasaurius(array(0).split(":")(1).trim.replaceAll("\\W", ""),
    array(1).split(":")(1).trim.replaceAll("\\W", ""))).toDS

dataSet.show

The case class for above dataset is

case class Dinasaurius(data: String,
                       field1: String)

I hope I answered all your questions

5 Comments

Is it mandatory to pass column names "data", "field1"? I have 100 columns in my real data set.
While it seems to work, it is adjusted to 2 columns (fields). I my real data I have 100 columns (fields). How can I fit your solution?
@Dinosaurius, I have answered all of your confusions and questions.
Indeed I managed to solve the problem as follows based on your ideas, but without case class and it works: val jsonStrings: RDD[String] = sc.parallelize(input).map(_.split("\n")).flatMap(x => x) val result = sqlContext.read.json(jsonStrings) var df = result.toDF()
@Dinosaurius, I am happy to see that my post helped you :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.