My flink app designed to process IoT data from sensors.
Sensors send data through gateways. this is what the sample data looks like
case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long)
Data from the same sensor can come from different gateways
If the gateway is disconnected from the network, then I receive a special event about this case class GatewayEvents(gatewayId: String, event: String, timestamp: Long) and use the broadcast stream which is connected to the main data stream from the sensors
the sensor may not send data in two cases,
- it is broken
- the gateway is disconnected from the network (will receive
GatewayEvents("gwId","disconnected",1617979694)message in broadcast stream)
If I received a message that some gateway was disconnected from the network and the sensors that sent data through it stopped sending data (for example, within 1 minute), I need to create a special event
my semi-implemented implementation looks like this:
case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)
val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...
val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)
val events: sensorData.
.keyBy(_.sensorId)
.connect(broadcastGatewayEventsStream)
.process(...)
Can't make the implementation of this process. Any ideas? I think the SessionWindows will help me, but I can't figure out how best to do it
If I get the disconnected event wait for 1 minute (or any time really), if no data arrives in this time then emit some event type?