1

I am trying to decode and process protobuf-encoded MQTT messages (from an Eclipse Mosquitto broker) using Apache Beam. In addition to the encoded fields, I also want to process the full topic of each message for grouping and aggregations, as well as the timestamp.

What I have tried so far

I can connect to Mosquitto via

val options = PipelineOptionsFactory.create()
val pipeline = Pipeline.create(options)

val mqttReader: MqttIO.Read = MqttIO
    .read()
    .withConnectionConfiguration(
        MqttIO.ConnectionConfiguration.create(
            "tcp://localhost:1884",
            "my/topic/+"
        )
    )

val readMessages = pipeline.apply<PCollection<ByteArray>>(mqttReader)

In order to decode the messages, I have compiled the .proto schema (in my case quote.proto containing the Quote message) via Gradle, which allows my to transform ByteArray into Quote objects via Quote.parseFrom():

val quotes = readMessages
    .apply(
        ParDo.of(object : DoFn<ByteArray, QuoteOuterClass.Quote>() {
            @ProcessElement
            fun processElement(context: ProcessContext) {
                val protoRow = context.element()
                context.output(QuoteOuterClass.Quote.parseFrom(protoRow))
            }
        })
    )

Using this, in the next apply, I can then access individual fields with a ProcessFunction and a lambda, e.g. { quote -> "${quote.volume}" }. However, there are two problems:

  1. With this pipeline I do not have access to the topic or timestamp of each message.
  2. After sending the decoded messages back to the broker with plain UTF8 encoding, I believe that they do not get decoded correctly.

Additional considerations

  1. Apache Beam provides a ProtoCoder class, but I cannot figure out how to use it in conjunction with MqttIO. I suspect that the implementation has to look similar to
    val coder = ProtoCoder
        .of(QuoteOuterClass.Quote::class.java)
        .withExtensionsFrom(QuoteOuterClass::class.java)
  1. Instead of a PCollection<ByteArray>, the Kafka IO reader provides a PCollection<KafkaRecord<Long, String>>, which has all the relevant fields (including topic). I am wondering if something similar can be achieved with Mqtt + ProtoBuf.

  2. A similar implementation to what I want to achieve can be done in Spark Structured Streaming + Apache Bahir as follows:

val df_mqttStream = spark.readStream
    .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
    .option("topic", topic)
    .load(brokerUrl)

val parsePayload = ProtoSQL.udf { bytes: Array[Byte] => Quote.parseFrom(bytes) }

val quotesDS = df_mqttStream.select("id", "topic", "payload")
    .withColumn("quote", parsePayload($"payload"))
    .select("id", "topic", "quote.*")

However, with Spark 2.4 (the latest supported version), accessing the message topic is broken (related issue, my ticket in Apache Jira).

1 Answer 1

1

From my understanding, the latest version of Apache Beam (2.27.0) does simply not offer a way to extract the specific topics of MQTT messages.

I have extended the MqttIO to return MqttMessage objects that include a topic (and a timestamp) in addition to the byte array payload. The changes currently exist as a pull request draft.

With these changes, the topic can simply be accessed as message.topic.

val readMessages = pipeline.apply<PCollection<MqttMessage>>(mqttReader)

val topicOfMessages: PCollection<String> = mqttMessages
    .apply(
        ParDo.of(object : DoFn<MqttMessage, String>() {
            @ProcessElement
            fun processElement(
                @Element message: MqttMessage,
                out: OutputReceiver<String>
            ) { out.output(message.topic) }
        })
    )
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.