1

The Goal
i am trying to process speech to text transcription on the background in python/flask. I am using the Celery package to execute tasks async, with a Redis (on docker) instance as broker. I installed the flower package to monitor the celery tasks.

The Problem
According to the flower dashboard, tasks are received, but never executed. On the broker tab, i see that there was never a message sent to the redis broker (i don't know if that is relevant). The only time i could get it to work is when i set the --pool flag to solo. But i don't want that since that is bad practice for a system in production. And even when i do that, only 1 task gets executed and it stops receiving afterwards.

The code In the flask project, i have an app.py, transcription.py, and a transcribejob.py.

Starting with transcribejob.py:

from transcription import Transcriber, WhisperStrategy
from celery import Celery
import os
import json
import sys

transcriber = Transcriber(WhisperStrategy())
#to start the celery worker, enter in the terminal:
#celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4

#to monitor proces: celery -A transcribejob flower
redis_url = 'redis://127.0.0.1:6379/0'
celery = Celery('transcribejob', broker=redis_url, backend=redis_url)
cwd = os.getcwd()
print(f"transcribejob.py, cwd: {cwd}", file=sys.stdout)

@celery.task
def transcribe_export(temp_audio_path, audio_file_name):
    result = transcriber.transcribe(os.path.join(temp_audio_path, audio_file_name))
    txt_fn = audio_file_name.replace(".wav", "") + '.txt'
    with open(os.path.join(cwd, txt_fn), 'w') as f:
        json.dump(result, f)

On to transcription.py:

import whisper


class TranscribeStrategy:
    def __init__(self, vendor):
        self.vendor = vendor

    def do_transcribe(self, audio_file_path):
        pass


class WhisperStrategy(TranscribeStrategy):
    model = whisper.load_model("small")

    def __init__(self):
        super().__init__("openai-whisper-small")

    def do_transcribe(self, audio_file_path):
        return self.model.transcribe(audio_file_path, language="nl")


class Transcriber:
    def __init__(self, transcribe_strategy):
        self.strategy = transcribe_strategy

    def transcribe(self, audio_file_path):
        return {"vendor": self.strategy.vendor, "result": self.strategy.do_transcribe(audio_file_path)}

and this is my app.py:

from flask import Flask, request
from io import BytesIO
from transcribejob import transcribe_export
import os
import uuid
from pydub import AudioSegment

app = Flask(__name__)
temp_audio_path = os.path.join(os.getcwd(), 'tempAudioStorage')

@app.route('/transcribe', methods=['POST'])
def transcribe_audio_file():
    if 'file' not in request.files:
        return "No file uploaded", 400

    file = request.files['file']

    if file.content_type not in {'audio/wav', 'audio/mpeg'}:
        return "Invalid file format. Only WAV and MP3 files are allowed. Received Format: "+file.content_type, 400

    file_data = BytesIO(file.read())
    audio_format = file.content_type.split('/')[-1]
    if audio_format == 'mpeg':
        audio_format = 'mp3'
    audio = AudioSegment.from_file(file_data, format=audio_format)
    audio_file_name = str(uuid.uuid4())+".wav"
    audio.export(os.path.join(temp_audio_path, audio_file_name), format='wav')

    transcribe_export.delay(temp_audio_path, audio_file_name)

    return 'Transcription request received, transcription in process', 200


if __name__ == '__main__':
    app.run()

The Output when running flask, executing the celery command i get the following output:

(venv) PS C:\Users\SeanS\PycharmProjects\isampTranscribe> celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe


 -------------- celery@SeanS01 v5.2.7 (dawn-chorus)

--- ***** -----

-- ******* ---- Windows-10-10.0.22621-SP0 2023-04-13 14:26:11

- *** --- * ---

- ** ---------- [config]

- ** ---------- .> app:         transcribejob:0x22c05f52c10

- ** ---------- .> transport:   redis://127.0.0.1:6379/0

- ** ---------- .> results:     redis://127.0.0.1:6379/0

- *** --- * --- .> concurrency: 4 (prefork)

-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)

--- ***** -----

-------------- [queues]

                .> celery           exchange=celery(direct) key=celery

 

 

[tasks]

  . transcribejob.transcribe_export

 

[2023-04-13 14:26:11,887: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0

[2023-04-13 14:26:11,891: INFO/MainProcess] mingle: searching for neighbors

[2023-04-13 14:26:12,429: INFO/SpawnPoolWorker-1] child process 18524 calling self.run()

[2023-04-13 14:26:12,450: INFO/SpawnPoolWorker-3] child process 29760 calling self.run()

[2023-04-13 14:26:12,453: INFO/SpawnPoolWorker-2] child process 30472 calling self.run()

[2023-04-13 14:26:12,460: INFO/SpawnPoolWorker-4] child process 30600 calling self.run()

[2023-04-13 14:26:12,921: WARNING/MainProcess] C:\Users\SeanS\PycharmProjects\isampTranscribe\venv\lib\site-packages\celery\app\control.py:56: DuplicateNodenameWarning: Received multiple replies from node name: celery@SeanS01.

Please make sure you give each node a unique nodename using

the celery worker `-n` option.

  warnings.warn(DuplicateNodenameWarning(

 

[2023-04-13 14:26:12,922: INFO/MainProcess] mingle: all alone

[2023-04-13 14:26:12,944: INFO/MainProcess] celery@SeanS01 ready.

[2023-04-13 14:26:15,346: INFO/MainProcess] Events of group {task} enabled by remote.

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

[2023-04-13 14:27:09,247: INFO/MainProcess] Task transcribejob.transcribe_export[653178f2-c67d-4f96-afa8-f1cbcd470d91] received

[2023-04-13 14:27:10,041: INFO/SpawnPoolWorker-5] child process 25092 calling self.run()

[2023-04-13 14:27:10,051: INFO/SpawnPoolWorker-6] child process 26020 calling self.run()

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

[2023-04-13 14:27:15,740: INFO/SpawnPoolWorker-7] child process 30576 calling self.run()

transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe

and this is my flower dashboard: messages to broker from celery

Tasks status from celery

the redis instance runs on docker and the ports are correclty mapped 6379->6379.

Why won't my celery tasks execute?

1 Answer 1

0

I found the a suitable solution.

The default value for the --pool flag for celery is prefork. Unfortunately it seems that this is not supported for windows. the gevent pool does work.

just install gevent with pip install gevent and then add --pool=gevent to your celery worker command.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.