Skip to main content
Point to actual main.go
Source Link

Some random additional thoughts

  • With the setup shown above, you can scale both horizontally and vertically.
  • Depending on your use cases, it might also be a good idea to use Kafka, Kafka Streams and maybe ksqlDB, especially if your primary use case are aggregations. Telegraf could even be adapted to that: use the kafka output plugin while keeping the output format and you should be good to go.

Some random additional thoughts

  • With the setup shown above, you can scale both horizontally and vertically.
  • Depending on your use cases, it might also be a good idea to use Kafka, Kafka Streams and maybe ksqlDB, especially if your primary use case are aggregations. Telegraf could even be adapted to that: use the kafka output plugin while keeping the output format and you should be good to go.
Point to actual main.go
Source Link
  1. The Sources and the Consumers (Sinks) are decoupled.
  2. Instead of an in-memory cache, you have a message queue - you will not lose any ping metrics and it scales much better.
  3. This scales: You can add an arbitrary number of consumers to the message queue with durable subscriptions under the same name. This would mean that the messages would be fanned out. On kubernetes, you could even tie that to a Horizontal Pod Autoscaler and would not even have to mind about scaling. You can also scale the message queue, basically ad infinitum.
  4. Each component becomes dead simple. I have implemented this with TelegrafTelegraf as the ping source, NatsNATS as the message queue, a simple Go programa simple Go program as the Consumers and CloudEventsCloudEvents as the data transfer format.

I have chosen NATSNATS here for several reasons:

  1. The Sources and the Consumers (Sinks) are decoupled.
  2. Instead of an in-memory cache, you have a message queue - you will not lose any ping metrics and it scales much better.
  3. This scales: You can add an arbitrary number of consumers to the message queue with durable subscriptions under the same name. This would mean that the messages would be fanned out. On kubernetes, you could even tie that to a Horizontal Pod Autoscaler and would not even have to mind about scaling. You can also scale the message queue, basically ad infinitum.
  4. Each component becomes dead simple. I have implemented this with Telegraf as the ping source, Nats as the message queue, a simple Go program as the Consumers and CloudEvents as the data transfer format.

I have chosen NATS here for several reasons:

  1. The Sources and the Consumers (Sinks) are decoupled.
  2. Instead of an in-memory cache, you have a message queue - you will not lose any ping metrics and it scales much better.
  3. This scales: You can add an arbitrary number of consumers to the message queue with durable subscriptions under the same name. This would mean that the messages would be fanned out. On kubernetes, you could even tie that to a Horizontal Pod Autoscaler and would not even have to mind about scaling. You can also scale the message queue, basically ad infinitum.
  4. Each component becomes dead simple. I have implemented this with Telegraf as the ping source, NATS as the message queue, a simple Go program as the Consumers and CloudEvents as the data transfer format.

I have chosen NATS here for several reasons:

Source Link

Note that the example code is available on GitHub

Here is how I would do it:

Simple deployment

This approach has several advantages:

  1. The Sources and the Consumers (Sinks) are decoupled.
  2. Instead of an in-memory cache, you have a message queue - you will not lose any ping metrics and it scales much better.
  3. This scales: You can add an arbitrary number of consumers to the message queue with durable subscriptions under the same name. This would mean that the messages would be fanned out. On kubernetes, you could even tie that to a Horizontal Pod Autoscaler and would not even have to mind about scaling. You can also scale the message queue, basically ad infinitum.
  4. Each component becomes dead simple. I have implemented this with Telegraf as the ping source, Nats as the message queue, a simple Go program as the Consumers and CloudEvents as the data transfer format.

The Idea

Ping source

As written, I have implemented this via telegraf and a dead simple config:

[[outputs.nats]]
  servers = ["nats://nats:4222"]
  name = "telegraf"
  subject = "telegraf"
  data_format = "cloudevents"
  cloudevents_version = "1.0"
  cloudevents_source = "telegraf"
  cloudevents_event_type = "com.github.mwmahlberg.se-monitoringevents-454044.telegraf"
  cloudevents_event_time = "creation"
  [outputs.nats.jetstream]
    name = "ping-results"
[[inputs.ping]]
  urls = [${PING_TARGETS}]
  method = "native"

All you have to do is to add the URLs you want to ping, and you can even do this via an environment variable, as shown.

Message Queue

The message queue serves two purposes here: it replaces the in-memory cache, which may lose data easily. Also, it decouples the Sources and Sinks for the pings, making either or both easily replaceable. Also, with choosing a pubsub pattern, you can have multiple consumer types subscribing to ping events.

I have chosen NATS here for several reasons:

  1. It is comparatively lightweight for a message queue
  2. It scales well
  3. The client is dead simple to use.

Consumers

Now all your consumers have to do is to subscribe to the NATS "cluster" (in the demo, it is a single server), all under the same durable subscriber name, and process the message. If you make consumers durable, they will also be able to process pings that were executed while they were offline.

If you have multiple steps, you can repeat using the pubsub pattern: Simply send the processed event to a different topic on the message queue, have 1..n subscribers and process those messages.

The implementation

I have setup a little demo.

docker-compose.yaml

configs:
  telegraf:
    content: |
      [global_tags]
        env = "demo"
      [agent]
        interval = "10s"
        round_interval = true
        metric_batch_size = 1000
        metric_buffer_limit = 10000
        collection_jitter = "0s"
        flush_interval = "10s"
        flush_jitter = "0s"
        precision = "0s"
      [[outputs.health]]        
      [[outputs.nats]]
        servers = ["nats://nats:4222"]
        name = "telegraf"
        subject = "telegraf"
        data_format = "cloudevents"
        cloudevents_version = "1.0"
        cloudevents_source = "telegraf"
        cloudevents_event_type = "com.github.mwmahlberg.se-monitoringevents-454044.telegraf"
        cloudevents_event_time = "creation"
        [outputs.nats.jetstream]
          name = "ping-results"
      [[inputs.ping]]
        # The double dollar sign is required to escape the variable in docker-compose
        # and to pass it to the telegraf container verbatim.
        urls = [$${PING_TARGETS}]
        method = "native"

volumes:
  nats:
    driver: local
services:
  nats:
    image: nats:2-alpine
    command: "-m 8222 -n mq --js -sd /data"
    volumes:
      - type: volume
        source: nats
        target: /data
    ports:
      - 4222:4222
      - 8222:8222
    healthcheck:
      test: [ "CMD", "wget", "http://localhost:8222/healthz", "-q", "-S", "-O", "-" ]
      interval: 10s
      timeout: 1s
      retries: 5
      start_period: 30s
  telegraf:
    image: telegraf:1.34-alpine
    environment:
      - HOSTNAME=telegraf
      - PING_TARGETS="telegraf","nats"
    restart: always
    healthcheck:
      test: [ "CMD", "wget", "http://localhost:8080/healthz", "-q", "-S", "-O", "-" ]
      interval: 10s
      timeout: 1s
      retries: 5
      start_period: 20s
    depends_on:
      nats:
        condition: service_healthy
    cap_add:
      # Required in podman
      - NET_RAW
    configs:
      - source: telegraf
        target: /etc/telegraf/telegraf.conf

  processor:
    image: mwmahlberg/se-monitoringevents-454044-processor:latest
    depends_on:
      nats:
        condition: service_healthy
      telegraf:
        condition: service_healthy
    deploy:
      mode: replicated
      replicas: 2
    environment:
      - PROCESSOR_NATS_HOST=nats:4222
      - PROCESSOR_NATS_STREAM_NAME=ping-results
      - PROCESSOR_LOG_LEVEL=debug

Absolutely nothing spectacular here. The telegraf is set up as the ping source, NATS as the message queue and a little Go application as the consumer.

Note that this docker-compose.yaml can actually be run. The image for the processor is available on DockerHub

However, and this is part of the "trick": There are two instances of the consumer, and it can be scaled up to many.

The processor

It is a simple, one-file go program:

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log/slog"
    "os"
    "os/signal"
    "syscall"
    "time"

    cloudevents "github.com/cloudevents/sdk-go/v2"
    "github.com/kelseyhightower/envconfig"
    "github.com/nats-io/nats.go"
    "github.com/nats-io/nats.go/jetstream"
)

// loglevel is  just a wrapper around slog.Level to implement the UnmarshalText method
// for parsing log levels from environment variables.
type loglevel slog.Level

// UnmarshalText implements the encoding.TextUnmarshaler interface for loglevel.
func (l *loglevel) UnmarshalText(text []byte) error {
    switch string(text) {
    case "debug":
        *l = loglevel(slog.LevelDebug)
    case "info":
        *l = loglevel(slog.LevelInfo)
    case "warn":
        *l = loglevel(slog.LevelWarn)
    case "error":
        *l = loglevel(slog.LevelError)
    default:
        return fmt.Errorf("invalid log level: %s", text)
    }
    return nil
}

// timestamp is a wrapper around time.Time to implement the UnmarshalJSON method
// for parsing timestamps from JSON data.
// It converts the timestamp from nanoseconds since epoch to time.Time.
type timestamp time.Time

// UnmarshalJSON implements the json.Unmarshaler interface for timestamp.
func (t *timestamp) UnmarshalJSON(data []byte) error {
    var ts int64
    if err := json.Unmarshal(data, &ts); err != nil {
        return err
    }
    *t = timestamp(time.Unix(0, ts))
    return nil
}

// config holds the configuration for the processor service.
// It uses the envconfig package to load configuration from environment variables.
type config struct {
    LogLevel loglevel `split_words:"true" default:"info" desc:"Log level (debug, info, warn, error)"`
    Nats     struct {
        Host   string `split_words:"true" required:"true" default:"localhost:4222" desc:"NATS server host and port"`
        Client struct {
            Name string `split_words:"true" default:"processor" desc:"NATS client name"`
        }
        Stream struct {
            ClientName string `split_words:"true" default:"processor" desc:"NATS JetStream client name"`
            Name       string `split_words:"true" default:"ping-results" desc:"NATS JetStream stream name"`
        }
    }
}

// pingresult represents the structure of the ping result message.
// It is the Go representation of the data sent by the Telegraf ping plugin via CloudEvents.
type pingresult struct {
    Fields struct {
        AverageResponseMS   float64 `json:"average_response_ms"`
        MaximumResponseMS   float64 `json:"maximum_response_ms"`
        MinimumResponseMS   float64 `json:"minimum_response_ms"`
        PacketsReceived     int     `json:"packets_received"`
        PacketsTransmitted  int     `json:"packets_transmitted"`
        PercentPacketLoss   float64 `json:"percent_packet_loss"`
        ResultCode          int     `json:"result_code"`
        StandardDeviationMS float64 `json:"standard_deviation_ms"`
        TTL                 int     `json:"ttl"`
    }
    Name      string            `json:"name"`
    Tags      map[string]string `json:"tags"`
    Timestamp timestamp         `json:"timestamp"`
}

// processMsg is a function that processes incoming messages from the NATS JetStream.
// It unmarshals the CloudEvent data into a pingresult struct and sends it to the downstream channel.
// If the message cannot be processed, it sends a negative acknowledgment (Nak) to the NATS server.
// This way, the message will be retried later and is not lost.
func processMsg(downstream chan *pingresult) func(msg jetstream.Msg) {
    return func(msg jetstream.Msg) {
        evt := cloudevents.NewEvent()
        if err := evt.UnmarshalJSON(msg.Data()); err != nil {
            msg.Nak()
            slog.Error("Failed to unmarshal CloudEvent", "error", err)
            return
        }
        r := new(pingresult)
        if err := evt.DataAs(r); err != nil {
            msg.Nak()
            slog.Error("Failed to unmarshal CloudEvent data", "error", err)
            return
        }
        slog.Debug("Received CloudEvent", "event", evt.Type(),
            "source", evt.Source(),
            "url", r.Tags["url"],
            "average", r.Fields.AverageResponseMS,
            "packageloss", r.Fields.PercentPacketLoss,
            "subject", msg.Subject(),
        )
        msg.Ack()
        downstream <- r
    }
}

func main() {
    var cfg config
    envconfig.Usage("processor", &cfg)

    // Load configuration from environment variables
    if err := envconfig.Process("processor", &cfg); err != nil {
        slog.Error("Failed to process environment variables", "error", err)
        os.Exit(1)
    }

    // Make sure we have a decent logger
    slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
        Level: slog.Level(cfg.LogLevel),
    })))

    slog.Info("Starting processor service", "cfg", cfg)

    // We need to handle OS signals to gracefully shut down the service
    // This is important for long-running services to avoid data loss
    // and to clean up resources properly.
    ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
    defer cancel()

    // Connect to the NATS server
    nc, err := nats.Connect(fmt.Sprintf("nats://%s", cfg.Nats.Host), nats.Name(cfg.Nats.Client.Name))
    if err != nil {
        slog.Error("Failed to connect to NATS server", "error", err)
        os.Exit(1)
    }
    defer nc.Close()

    // Create a JetStream context...
    js, err := jetstream.New(nc)
    if err != nil {
        slog.Error("Failed to create JetStream context", "error", err)
        os.Exit(1)
    }

    // ... and connect to the stream of incoming CloudEvents
    stream, err := js.Stream(ctx, cfg.Nats.Stream.Name)
    if err != nil {
        if err != jetstream.ErrStreamNotFound {
            slog.Error("Failed to get stream", "error", err)
            os.Exit(1)
        }
        slog.Error("Stream not found!", "error", err)
        os.Exit(1)
    }

    // Create a consumer for the stream...
    cons, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
        Durable: "telegraf"})
    if err != nil {
        slog.Error("Failed to create consumer", "error", err)
        os.Exit(1)
    }

    // ... and consume messages from the stream.
    procChan := make(chan *pingresult, 64)
    // Note that we hand over a callback function to the consumer (the result of processMsg)
    // This function will be called for each message received from the stream.
    cons.Consume(processMsg(procChan))

    // All we have to do now is to process the messages received from the stream.
    // Of course, we could also do this in the callback function, but this way we can
    // separate the processing logic from the message receiving logic.
    go func() {
        for {
            select {
            case msg := <-procChan:
                // Here you can add your processing logic
                // For example, you can send the result to another stream or store it in a database
                slog.Info("Processing ping result", "result", msg)

            case <-ctx.Done():
                // THis will be called when the service received SIGINT or SIGTERM
                slog.Info("Shutting down processor service")
                return
            }
        }
    }()

}

You can sure expand on it, but here, it only serves as a stand in.