Suppose I have a DataFrame with requests from different users by different protocols and recorded metric value:
+---+-----+--------+------------+
| ts| user|protocol|metric_value|
+---+-----+--------+------------+
| 0|user1| tcp| 197|
| 1|user1| udp| 155|
| 2|user1| tcp| 347|
| 3|user1| tcp| 117|
| 4|user1| tcp| 230|
| 5|user1| udp| 225|
| 6|user1| udp| 297|
| 7|user1| tcp| 790|
| 8|user1| udp| 216|
| 9|user1| udp| 200|
+---+-----+--------+------------+
I need to add another column where there'll be an average metric_value of last records (before current timestamp and no older than current_ts - 4) for each protocol for current user. So, algorithm is something like:
- for each line X:
- find all rows where row.user == X.user and row.ts < X.ts
- from these rows extract most recent metric_value for each protocol (if corresponding record is older than X.ts - 4, than throw it out)
- calculate avg of these metric_values
- append calculated avg to the row in a new column
Desired result should be like:
+---+-----+--------+------------+-------+
| ts| user|protocol|metric_value|avg_val|
+---+-----+--------+------------+-------+
| 0|user1| tcp| 197| null| // no data for user1
| 1|user1| udp| 155| 197| // only tcp value available
| 2|user1| tcp| 347| 176| // (197 + 155) / 2
| 3|user1| tcp| 117| 251| // (347 + 155) / 2
| 4|user1| tcp| 230| 136| // (117 + 155) / 2
| 5|user2| udp| 225| null| // because no data for user2
| 6|user1| udp| 297| 230| // because record with ts==1 is too old now
| 7|user1| tcp| 790| 263.5| // (297 + 230) / 2
| 8|user1| udp| 216| 543.5| // (297 + 790) / 2
| 9|user1| udp| 200| 503| // (216 + 790) / 2
+---+-----+--------+------------+-------+
Note, that there could be any number of protocols and users in the table.
How it could be achieved?
I've tried with window functions, lag(1) and partitioning by protocol, but aggregating functions just count avg over single partition, but not over differen partition results. Most close result was with sql request using row_number over partition by protocol, but I couldn't propagate row.ts < X.ts condition there.