1

I'm working on a Nuxt project that uses NuxtHub and Cloudflare Queues.

✅ I can successfully publish messages to the queue like this:

const { cloudflare } = event.context;

console.log("Publishing to queue:", log);

await cloudflare.env.COMPARISON_QUEUE.send(log);

📦 My current nuxt.config.ts looks like this:

export default defineNuxtConfig({
  compatibilityDate: "2024-11-01",
  devtools: { enabled: true },
  modules: [
    "@nuxt/eslint",
    "@nuxt/ui-pro",
    "@nuxt/content",
    "@nuxt/icon",
    "@nuxthub/core"
  ],
  hub: {
    workers: true,
    database: true,
    bindings: {
      queue: {
        COMPARISON_QUEUE: {
          queue_name: "comparison-queue",
        }
      },
      observability: {
        logs: true
      }
    }
  }
});

🧩 I have a queue consumer in server/plugin/queue-consumer.ts:

import { defineNitroPlugin } from 'nitropack/runtime';
import { processMessages } from '../queue/comparison.consumer';

export default defineNitroPlugin((nitroApp) => {
  nitroApp.hooks.hook('cloudflare:queue', async (batchWrapper, env, ctx) => {
    console.log('[Queue] Batch wrapper received:', JSON.stringify(batchWrapper, null, 2));

    const batch = batchWrapper?.event || batchWrapper?.batch || batchWrapper;
    const messages = batch?.messages || [];
    console.log('[Queue] Extracted messages:', messages.length);

    if (ctx && typeof ctx.waitUntil === 'function') {
      try {
        ctx.waitUntil(processMessages(batch, env, batchWrapper));
      } catch (error) {
        console.error('[Queue] Error in processMessages:', error);
      }
    } else {
      try {
        await processMessages(batch, env, batchWrapper);
      } catch (error) {
        console.error('[Queue] Error in processMessages (fallback):', error);
      }
    }

    for (const message of messages) {
      try {
        console.log('[Queue] Processing message ID:', message.id);
        if (message && typeof message.ack === 'function') {
          message.ack();
        }
      } catch (err) {
        console.error('[Queue] Error ACK:', err);
        if (message && typeof message.retry === 'function') {
          message.retry({ delaySeconds: 30 });
        }
      }
    }
  });
});

🔍 The plugin works perfectly when used in a regular Nuxt app with Cloudflare Workers without NuxtHub.

🤔 Now that I'm using NuxtHub (@nuxthub/core version 0.8.25), I'm not seeing any logs or evidence that the queue consumer is invoked at all. I suspect I might be missing a configuration step to register the consumer, but I can't find any documentation or examples.

Question:
Do I need to declare the queue consumer explicitly somewhere in NuxtHub’s configuration? Or is there something else I need to do to make the queue consumer work?

1 Answer 1

1

Ran into the same issue and what I had to do was basically add my worker project as a consumer in the cloudflare dashboard:

Cloudflare dashboard -> storage & databases -> queues -> select your queue -> settings and add your project (worker) as a consumer.

Now I see the message being consumed and acknowledged.

Hope it helps!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.