0

I've the following DF schema:

scala> hotelsDF.printSchema()
root
 |-- id: long (nullable = true)
 |-- version: integer (nullable = true)
 |-- timestamp: long (nullable = true)
 |-- changeset: long (nullable = true)
 |-- uid: integer (nullable = true)
 |-- user_sid: binary (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: binary (nullable = true)
 |    |    |-- value: binary (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)

I need to filter records which have key equal to tourism and value equal to hotel. I do it with the following SQL query:

sqlContext.sql("select * from nodes where array_contains(tags.key, binary('tourism')) and array_contains(tags.value, binary('hotel'))").show()

So far, so good.

Now, my question is how can I select the value for a given tag key? Pseudoquery will be something like:

sqlContext.sql("select tags.tourism from nodes where array_contains(tags.key, binary('tourism')) and array_contains(tags.value, binary('hotel'))").show()

and return hotel for all entries.

0

2 Answers 2

1

you could explode the array and then filter:

hotelsDF.withColumn(
    "tags1", 
    explode(col("tags"))
).drop(
    "tags"
).filter(
    (col("tags1.key") == "tourism") & (col("tags1.value") == "hotel")
).show()
Sign up to request clarification or add additional context in comments.

2 Comments

Seems cool. As far as I understand if I have multiple tags (elements in the array) per row I will get duplicates and will need to group it later on?
I've managed to do it in a typesafe way but +1 for being #soreadytohelp
0

I've solved it with different approach. I've added the following case classes:

case class Entry(
                  id: Long,
                  version: Int,
                  timestamp: Long,
                  changeset: Long,
                  uid: Int,
                  user_sid: Array[Byte],
                  tags: Array[Tag],
                  latitude: Double,
                  longitude: Double
                )

case class Tag(key: Array[Byte], value: Array[Byte])

case class Hotel(
                  id: Long,
                  stars: Option[String],
                  latiutde: Double,
                  longitude: Double,
                  name: String,
                  rooms: Option[String]
                )

What's interesting (and caused some problems to me) the equivalent of spark's binary is just Array[Byte].

and processed the DF in the following way:

def process(country: String) = {
    val dir = "/whatever/dir"
    val df = spark.read.parquet(s"$dir/$country/latest.node.parquet")

    df
      .as[Entry]
      .filter(e => e.tags != null && e.tags.nonEmpty)
      .filter(e =>
        e.tags.exists(t => new String(t.key).equalsIgnoreCase("tourism") && new String(t.value).equalsIgnoreCase("hotel"))
      )
      .map(e => Hotel(
        e.id,
        e.tags.find(findTag("stars")).map(t => new String(t.value)),
        e.latitude,
        e.longitude,
        e.tags.find(findTag("name")).map(t => new String(t.value)).orNull,
        e.tags.find(findTag("rooms")).map(t => new String(t.value))
      ))
      .repartition(1)
      .write
      .format("csv")
      .option("nullValue", null)
      .option("header", value = true)
      .option("delimiter", ",")
      .save(s"$dir/$country/csv")
  }

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.