0

I am trying to use Spark-SQL to read and select data from a JSON string.

Here is what I did:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("aaa");
sparkConf.setMaster("local[*]");

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SparkSession sc = SparkSession.builder().sparkContext(javaSparkContext.sc()).getOrCreate();

String data = "{\"temp\":25, \"hum01\":50, \"env\":{\"lux\":1000, \"geo\":[32.5, 43.8]}}";
String querySql = "select env.lux as abc from testData";

System.out.println("start 01, time is"+System.currentTimeMillis());
List<String> dataList = Arrays.asList(data);
Dataset<String> dataset = sc.createDataset(dataList, Encoders.STRING());
dataset.printSchema();
System.out.println("start 02, time is"+System.currentTimeMillis());
Dataset<Row> df = sc.read().json(dataset);
System.out.println("start 03, time is"+System.currentTimeMillis());
List<String> queryResultJson = null;
try{
  df.createOrReplaceTempView("testData");
  System.out.println("start 04, time is"+System.currentTimeMillis());
  Dataset<Row> queryData = sc.sql(querySql);
  System.out.println("start 05, time is"+System.currentTimeMillis());
  queryResultJson = queryData.toJSON().collectAsList();
  System.out.println("start 06, time is"+System.currentTimeMillis());
}catch (Exception e) {
  e.printStackTrace();
} finally {
  sc.catalog().dropTempView("testData");
}

The result is look like this:

start 01, time is1543457455652
start 02, time is1543457458766
start 03, time is1543457459993
start 04, time is1543457460190
start 05, time is1543457460334
start 06, time is1543457460818

It seems like that the dataset creation process takes too much time. I want to use this function in a streaming data process flow. But the performance is too poor to use.

Is there any way to make dataset creation go faster? Or is there any other method to query a Json data with SQL like language?

2
  • Using Spark structured streaming you won't create your dataset like that. You will directly read it from the stream. It's also recommended to use a predefined schema instead of relying on Spark inference. Commented Nov 29, 2018 at 9:50
  • Thank you very much! Big help. So with the streaming mode, I can send all the data to a Socket and then build a stream processing all the data received from the Socket. Is this right? Besides, is there examples how I can use a predefined schema? Commented Nov 29, 2018 at 10:53

1 Answer 1

0

You won't create your dataset the same way when using spark structured streaming. For example if your source is a socket with a schema describing your data:

SparkSession spark = SparkSession.builder()
    .appName("Simple Application")
    .master("local[*]")
    .getOrCreate();
StructType sensorSchema = new StructType().add("temp", new IntegerType())
        .add("hum01", new IntegerType())
        .add("env", new StructType()
                                .add("lux", new IntegerType())
                                .add("geo", new ArrayType(new FloatType(), false)));
Dataset<Row> socketDF = spark
    .readStream()
    .format("socket")
    .option("host", "localhost")
    .option("port", 9999)
    .schema()
    .load()
    .selectExp("temp, hum01, env");

Then you can start to benchmark your algorithm.

Sign up to request clarification or add additional context in comments.

6 Comments

Hi, do I always need a schema if I want to use structed streaming to process Json data?
No you don't. It's recommended
From stackoverflow.com/questions/43297973/…, it is stated that there are two ways to parse Json data with structured streaming. But the first way needs a shcema, the second way only parse a few items. What I want is to parse data with Sql like language(where functions could be used) and I don't know the schema. There may be different types of Json data to process. Could this be possible?
If you have heterogeneous data in your stream it's going to be quite complex. You will need to find a way to tel in the stream what kind of data to know how you can handle it. But there is nothing built-in in spark for such usecase (in json at least). Maybe a third party lib could help you with that but i have no idea
Thanks a lot. So maybe the sample code is the only way I can use, although it is quit slow...
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.