1

I am trying to get the count individual column to publish metrics. I have a I have a df [customerId : string, totalRent : bigint, totalPurchase: bigint, itemTypeCounts: map<string, int> ]

Right now I am doing :

val totalCustomers = df.count

val totalPurchaseCount = df.filter("totalPurchase > 0").count

val totalRentCount = df.filter("totalRent > 0").count


publishMetrics("Total Customer",  totalCustomers )
publishMetrics("Total Purchase",  totalPurchaseCount )
publishMetrics("Total Rent",  totalRentCount )

publishMetrics("Percentage of Rent",  percentage(totalRentCount, totalCustomers) )
publishMetrics("Percentage of Purchase",  percentage(totalPurchaseCount, totalCustomers) )

private def percentageCalc(num: Long, denom: Long): Double = {
val numD: Long = num
val denomD: Long = denom
return if (denomD == 0.0) 0.0
else (numD / denomD) * 100
}

But I am not sure how do I do this for itemTypeCounts which is a map. I want count and percentage based on each key entry. The issue is the key value is dynamic , I mean there is no way I know the key value before hand. Can some one tell me how do get count for each key values. I am new to scala/spark, any other efficient approaches to get the counts of each columns are much appreciated.

Sample data :

customerId : 1
totalPurchase : 17
totalRent : 0
itemTypeCounts : {"TV" : 4, "Blender" : 2}

customerId : 2
totalPurchase : 1
totalRent : 1
itemTypeCounts : {"Cloths" : 4}

customerId : 3
totalPurchase : 0
totalRent : 10
itemTypeCounts : {"TV" : 4}

So the output is :

totalCustomer : 3
totalPurchaseCount : 2 (2 customers with totalPurchase > 0)
totalRent : 2 (2 customers with totalRent > 0)
itemTypeCounts_TV : 2
itemTypeCounts_Cloths  : 1
itemTypeCounts_Blender  : 1
2
  • Could you provide sample input data and desired output? Commented Oct 18, 2016 at 7:47
  • @LiMuBei Updated with sample data. Commented Oct 18, 2016 at 15:56

3 Answers 3

1

You can accomplish this in Spark SQL, I show two examples of this below (one where the keys are known and can be enumerated in code, one where the keys are unknown). Note that by using Spark SQL, you take advantage of the catalyst optimizer, and this will run very efficiently:

val data = List((1,17,0,Map("TV" -> 4, "Blender" -> 2)),(2,1,1,Map("Cloths" -> 4)),(3,0,10,Map("TV" -> 4)))
val df = data.toDF("customerId","totalPurchase","totalRent","itemTypeCounts")

//Only good if you can enumerate the keys
def countMapKey(name:String) = {
    count(when($"itemTypeCounts".getItem(name).isNotNull,lit(1))).as(s"itemTypeCounts_$name")
}
val keysToCount = List("TV","Blender","Cloths").map(key => countMapKey(key))
df.select(keysToCount :_*).show
+-----------------+----------------------+---------------------+
|itemTypeCounts_TV|itemTypeCounts_Blender|itemTypeCounts_Cloths|
+-----------------+----------------------+---------------------+
|                2|                     1|                    1|
+-----------------+----------------------+---------------------+

//More generic
val pivotData = df.select(explode(col("itemTypeCounts"))).groupBy(lit(1).as("tmp")).pivot("key").count.drop("tmp")
val renameStatement = pivotData.columns.map(name => col(name).as(s"itemTypeCounts_$name"))

pivotData.select(renameStatement :_*).show
+----------------------+---------------------+-----------------+
|itemTypeCounts_Blender|itemTypeCounts_Cloths|itemTypeCounts_TV|
+----------------------+---------------------+-----------------+
|                     1|                    1|                2|
+----------------------+---------------------+-----------------+
Sign up to request clarification or add additional context in comments.

Comments

0

I'm a spark newbie myself, so there is probably a better way to do this. But one thing you could try is transforming the itemTypeCounts into a data structure in scala that you could work with. I converted each row to a List of (Name, Count) pairs e.g. List((Blender,2), (TV,4)).

With this you can have a List of such list of pairs, one list of pairs for each row. In your example, this will be a List of 3 elements:

List(
  List((Blender,2), (TV,4)), 
  List((Cloths,4)), 
  List((TV,4))
) 

Once you have this structure, transforming it to a desired output is standard scala.

Worked example is below:

val itemTypeCounts = df.select("itemTypeCounts")

//Build List of List of Pairs as suggested above
val itemsList = itemTypeCounts.collect().map {
  row =>
    val values = row.getStruct(0).mkString("",",","").split(",")
    val fields = row.schema.head.dataType.asInstanceOf[StructType].map(s => s.name).toList
    fields.zip(values).filter(p => p._2 != "null")
}.toList

// Build a summary map for the list constructed above
def itemTypeCountsSummary(frames: List[List[(String, String)]], summary: Map[String, Int]) : Map[String, Int] = frames match {
  case Nil => summary
  case _ => itemTypeCountsSummary(frames.tail, merge(frames.head, summary))
}

//helper method for the summary map.
def merge(head: List[(String, String)], summary: Map[String, Int]): Map[String, Int] = {
  val headMap = head.toMap.map(e => ("itemTypeCounts_" + e._1, 1))
  val updatedSummary = summary.map{e => if(headMap.contains(e._1)) (e._1, e._2 + 1) else e}
  updatedSummary ++ headMap.filter(e => !updatedSummary.contains(e._1))
}

val summaryMap = itemTypeCountsSummary(itemsList, Map())

summaryMap.foreach(e => println(e._1 + ": " + e._2 ))

Output:

itemTypeCounts_Blender: 1
itemTypeCounts_TV: 2
itemTypeCounts_Cloths: 1

Comments

0

Borrowing the input from Nick and using spark-sql pivot solution:

val data = List((1,17,0,Map("TV" -> 4, "Blender" -> 2)),(2,1,1,Map("Cloths" -> 4)),(3,0,10,Map("TV" -> 4)))
val df = data.toDF("customerId","totalPurchase","totalRent","itemTypeCounts")
df.show(false)
df.createOrReplaceTempView("df")

+----------+-------------+---------+-----------------------+
|customerId|totalPurchase|totalRent|itemTypeCounts         |
+----------+-------------+---------+-----------------------+
|1         |17           |0        |[TV -> 4, Blender -> 2]|
|2         |1            |1        |[Cloths -> 4]          |
|3         |0            |10       |[TV -> 4]              |
+----------+-------------+---------+-----------------------+

Assuming that we know the distinct itemType beforehand, we can use

val dfr = spark.sql("""
select * from (
select explode(itemTypeCounts) itemTypeCounts from (
select flatten(collect_list(map_keys(itemTypeCounts))) itemTypeCounts from df
) ) t
pivot ( count(itemTypeCounts) as c3 
for itemTypeCounts in ('TV' ,'Blender' ,'Cloths') ) 
""")
dfr.show(false)

+---+-------+------+
|TV |Blender|Cloths|
+---+-------+------+
|2  |1      |1     |
+---+-------+------+

For renaming columns,

dfr.select(dfr.columns.map( x => col(x).alias("itemTypeCounts_" + x )):_* ).show(false)

+-----------------+----------------------+---------------------+
|itemTypeCounts_TV|itemTypeCounts_Blender|itemTypeCounts_Cloths|
+-----------------+----------------------+---------------------+
|2                |1                     |1                    |
+-----------------+----------------------+---------------------+

To get the distinct itemType dynamically and pass it to pivot

val item_count_arr = spark.sql(""" select array_distinct(flatten(collect_list(map_keys(itemTypeCounts)))) itemTypeCounts from df """).as[Array[String]].first

item_count_arr: Array[String] = Array(TV, Blender, Cloths)

spark.sql(s"""
select * from (
select explode(itemTypeCounts) itemTypeCounts from (
select flatten(collect_list(map_keys(itemTypeCounts))) itemTypeCounts from df
) ) t
pivot ( count(itemTypeCounts) as c3 
for itemTypeCounts in (${item_count_arr.map(c => "'"+c+"'").mkString(",")}) ) 
""").show(false)

+---+-------+------+
|TV |Blender|Cloths|
+---+-------+------+
|2  |1      |1     |
+---+-------+------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.