0

My flink app designed to process IoT data from sensors. Sensors send data through gateways. this is what the sample data looks like case class Data(sensorId: String, value: Float, gatewayId: String, timestamp: Long) Data from the same sensor can come from different gateways

If the gateway is disconnected from the network, then I receive a special event about this case class GatewayEvents(gatewayId: String, event: String, timestamp: Long) and use the broadcast stream which is connected to the main data stream from the sensors

the sensor may not send data in two cases,

  • it is broken
  • the gateway is disconnected from the network (will receive GatewayEvents("gwId","disconnected",1617979694) message in broadcast stream)

If I received a message that some gateway was disconnected from the network and the sensors that sent data through it stopped sending data (for example, within 1 minute), I need to create a special event

my semi-implemented implementation looks like this:

case class Data(sensorId: String, value: Float, gatewayId: String)
case class GatewayEvents(gatewayId: String, event: String, timestamp: Long)

val sensorData: DataStream[Data] ...
val gwData: DataStream[GatewayEvents] ...

val gatewayBroadcastStateDescriptor = new MapStateDescriptor[String, GatewayEvents]("gatewayEvents", classOf[String], classOf[GatewayEvents])
val broadcastGatewayEventsStream = gwData.broadcast(gatewayBroadcastStateDescriptor)

val events: sensorData.
  .keyBy(_.sensorId)
  .connect(broadcastGatewayEventsStream)
  .process(...)

Can't make the implementation of this process. Any ideas? I think the SessionWindows will help me, but I can't figure out how best to do it

2
  • 1
    So, if I understand correctly. The use-case is If I get the disconnected event wait for 1 minute (or any time really), if no data arrives in this time then emit some event type ? Commented Apr 9, 2021 at 21:54
  • Yes @DominikWosiński. Commented Apr 11, 2021 at 9:18

1 Answer 1

1

So, the simplest idea would be to use timers in this case I think. So, basically You could implement KeyedCoProcess function in a way that if it receives GatewayDisconnected message You will register timer (processing time) to fire after desired time. If any message arrives for sensor You would simply delete the registered timer, so that it won't fire. Inside ofonTimer function You can simply emit the desired event since if the timer fires it means that no value has arrived in the timespan.

One thing to note here is that if You keyBy(_.sensorId) it means the event would be generated for every sensor that was received through this gateway. If You want to emit only one event for the gatewa, You can simply change partitioning to keyBy(_.gatewayId).

Sign up to request clarification or add additional context in comments.

4 Comments

thanks. i think KeyedCoProcess should solve my use case. with no using broadcast state. i connected my sensorData and gwData and then keyed by .keyBy(_. sensorId,_.gatewayId) and applied KeyedCoProcessFunction . cannot understand how .keyBy works when 2 streams connected with parallelism > 1 can i connect keyed and non-keyed streams?
KeyBy basically means that elements with the same key will be processed by the same parallel instance of the operator. So, events that have sensorId=1 would always be processed by the same instance if you keyBy(_.sensorId).
thanks, thats clear. how it works when there are two connected streams and .keyBy(_. sensorId,_.gatewayId) ?
The same really :) It's just it takes composite key.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.