2

Suppose I have a DataFrame with requests from different users by different protocols and recorded metric value:

+---+-----+--------+------------+
| ts| user|protocol|metric_value|
+---+-----+--------+------------+
|  0|user1|     tcp|         197|
|  1|user1|     udp|         155|
|  2|user1|     tcp|         347|
|  3|user1|     tcp|         117|
|  4|user1|     tcp|         230|
|  5|user1|     udp|         225|
|  6|user1|     udp|         297|
|  7|user1|     tcp|         790|
|  8|user1|     udp|         216|
|  9|user1|     udp|         200|
+---+-----+--------+------------+

I need to add another column where there'll be an average metric_value of last records (before current timestamp and no older than current_ts - 4) for each protocol for current user. So, algorithm is something like:

  • for each line X:
    • find all rows where row.user == X.user and row.ts < X.ts
    • from these rows extract most recent metric_value for each protocol (if corresponding record is older than X.ts - 4, than throw it out)
    • calculate avg of these metric_values
    • append calculated avg to the row in a new column

Desired result should be like:

+---+-----+--------+------------+-------+
| ts| user|protocol|metric_value|avg_val|
+---+-----+--------+------------+-------+
|  0|user1|     tcp|         197|   null| // no data for user1
|  1|user1|     udp|         155|    197| // only tcp value available
|  2|user1|     tcp|         347|    176| // (197 + 155) / 2
|  3|user1|     tcp|         117|    251| // (347 + 155) / 2
|  4|user1|     tcp|         230|    136| // (117 + 155) / 2
|  5|user2|     udp|         225|   null| // because no data for user2
|  6|user1|     udp|         297|    230| // because record with ts==1 is too old now
|  7|user1|     tcp|         790|  263.5| // (297 + 230) / 2
|  8|user1|     udp|         216|  543.5| // (297 + 790) / 2
|  9|user1|     udp|         200|    503| // (216 + 790) / 2
+---+-----+--------+------------+-------+

Note, that there could be any number of protocols and users in the table.

How it could be achieved?

I've tried with window functions, lag(1) and partitioning by protocol, but aggregating functions just count avg over single partition, but not over differen partition results. Most close result was with sql request using row_number over partition by protocol, but I couldn't propagate row.ts < X.ts condition there.

1 Answer 1

0

Here is Scala based solution, you can convert the logic into Python/Pyspark

Sample Data :

val df = Seq((0,"user1","tcp",197),(1,"user1","udp",155),(2,"user1","tcp",347),(3,"user1","tcp",117),(4,"user1","tcp",230),(5,"user2","udp",225),(6,"user1","udp",297),(7,"user1","tcp",790),(8,"user1","udp",216),(9,"user1","udp",200))
.toDF("ts","user","protocol","metric_value")

For each row, get all rows (protocol,metric_value) for current_row.ts -4 in a list.

val winspec = Window.partitionBy("user").orderBy("ts").rangeBetween(Window.currentRow - 4, Window.currentRow-1)
val df2 = df.withColumn("recent_list", collect_list(struct($"protocol", $"metric_value")).over(winspec))

df2.orderBy("ts").show(false)
/*

+---+-----+--------+------------+------------------------------------------------+
|ts |user |protocol|metric_value|recent_list                                       |
+---+-----+--------+------------+------------------------------------------------+
|0  |user1|tcp     |197         |[]                                              |
|1  |user1|udp     |155         |[[tcp, 197]]                                    |
|2  |user1|tcp     |347         |[[tcp, 197], [udp, 155]]                        |
|3  |user1|tcp     |117         |[[tcp, 197], [udp, 155], [tcp, 347]]            |
|4  |user1|tcp     |230         |[[tcp, 197], [udp, 155], [tcp, 347], [tcp, 117]]|
|5  |user2|udp     |225         |[]                                              |
|6  |user1|udp     |297         |[[tcp, 347], [tcp, 117], [tcp, 230]]            |
|7  |user1|tcp     |790         |[[tcp, 117], [tcp, 230], [udp, 297]]            |
|8  |user1|udp     |216         |[[tcp, 230], [udp, 297], [tcp, 790]]            |
|9  |user1|udp     |200         |[[udp, 297], [tcp, 790], [udp, 216]]            |
+---+-----+--------+------------+------------------------------------------------+

Now you have all the required information in a single row. You can write a UDF to apply your logic of getting latest protocol type and average.


def getAverageValueForUniqRecents(list : Array[StructType]): Double = {
  // you logic goes here. 
  // Loop through your array in REVERSE ORDER
  // maintain a set to check if protocol already visited then skip, otherwise SUM
  //Finally average
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.