1

I have a dataset like this:

+---+-------------------+-----------------------+
|id |time               |range                  |
+---+-------------------+-----------------------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
+---+-------------------+-----------------------+

with the schema

root
 |-- id: string (nullable = true)
 |-- time: string (nullable = true)
 |-- range: string (nullable = true)

I want to filter rows which have the hour/minute in the time column between the hours/minutes in the range column.

+---+-------------------+-----------------------+-----------+
|id |time               |range                  |between    |
+---+-------------------+-----------------------+-----------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|true       |
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|false      |
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|false      |
+---+-------------------+-----------------------+-----------+

I know that in Scala I have to transform the range column to something like

array(named_struct("start", "00h00", "end", "03h00"), named_struct("start", "15h30", "end", "17h30"), named_struct("start", "21h00", "end", "23h59"))

But I haven't found a way to do it in Java. How can I do this, or is there a better solution?

Thanks.

0

1 Answer 1

2

One way you could do that is:

  1. Normalize your time using Spark's static function.
  2. Check if your value is in the range using a UDF (user defined functions)

Using static functions:

df = df
    .withColumn(
        "date",
        date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
    .withColumn("h", hour(col("date")))
    .withColumn("m", minute(col("date")))
    .withColumn("s", second(col("date")))
    .withColumn("event", expr("h*3600 + m*60 +s"))
    .drop("date")
    .drop("h")
    .drop("m")
    .drop("s");

If your dataframe looks like before:

+---+-------------------+-----------------------+
|id |time               |range                  |
+---+-------------------+-----------------------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
+---+-------------------+-----------------------+

After, it should look like:

+---+-------------------+-----------------------+-----+
|id |time               |range                  |event|
+---+-------------------+-----------------------+-----+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|
+---+-------------------+-----------------------+-----+

Using a UDF:

df = df.withColumn("between",
    callUDF("inRange", col("range"), col("event")));

and the result will be:

+---+-------------------+-----------------------+-----+-------+
|id |time               |range                  |event|between|
+---+-------------------+-----------------------+-----+-------+
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|true   |
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|false  |
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|false  |
+---+-------------------+-----------------------+-----+-------+

The InRangeUdf

Your UDF would look like:

package net.jgp.books.sparkInAction.ch14.lab900_in_range;

import org.apache.spark.sql.api.java.UDF2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InRangeUdf implements UDF2<String, Integer, Boolean> {
  private static Logger log = LoggerFactory
      .getLogger(InRangeUdf.class);

  private static final long serialVersionUID = -21621751L;

  @Override
  public Boolean call(String range, Integer event) throws Exception {
    log.debug("-> call({}, {})", range, event);
    String[] ranges = range.split(";");
    for (int i = 0; i < ranges.length; i++) {
      log.debug("Processing range #{}: {}", i, ranges[i]);
      String[] hours = ranges[i].split("-");
      int start =
          Integer.valueOf(hours[0].substring(0, 2)) * 3600 +
              Integer.valueOf(hours[0].substring(3)) * 60;
      int end =
          Integer.valueOf(hours[1].substring(0, 2)) * 3600 +
              Integer.valueOf(hours[1].substring(3)) * 60;
      log.debug("Checking between {} and {}", start, end);
      if (event >= start && event <= end) {
        return true;
      }
    }
    return false;
  }

}

Driver code

Your driver code will look like:

package net.jgp.books.sparkInAction.ch14.lab900_in_range;

import static org.apache.spark.sql.functions.*;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Custom UDF to check if in range.
 * 
 * @author jgp
 */
public class InCustomRangeApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    InCustomRangeApp app = new InCustomRangeApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Custom UDF to check if in range")
        .master("local[*]")
        .getOrCreate();
    spark.udf().register(
        "inRange",
        new InRangeUdf(),
        DataTypes.BooleanType);

    Dataset<Row> df = createDataframe(spark);
    df.show(false);

    df = df
        .withColumn(
            "date",
            date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
        .withColumn("h", hour(col("date")))
        .withColumn("m", minute(col("date")))
        .withColumn("s", second(col("date")))
        .withColumn("event", expr("h*3600 + m*60 +s"))
        .drop("date")
        .drop("h")
        .drop("m")
        .drop("s");
    df.show(false);

    df = df.withColumn("between",
        callUDF("inRange", col("range"), col("event")));
    df.show(false);
  }

  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "id",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "time",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "range",
            DataTypes.StringType,
            false) });

    List<Row> rows = new ArrayList<>();
    rows.add(RowFactory.create("id1", "2019-03-11 05:00:00",
        "00h00-07h30;23h30-23h59"));
    rows.add(RowFactory.create("id2", "2019-03-11 09:00:00",
        "00h00-07h30;23h30-23h59"));
    rows.add(RowFactory.create("id3", "2019-03-11 10:30:00",
        "00h00-07h30;23h30-23h59"));

    return spark.createDataFrame(rows, schema);
  }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.