The Goal
i am trying to process speech to text transcription on the background in python/flask. I am using the Celery package to execute tasks async, with a Redis (on docker) instance as broker. I installed the flower package to monitor the celery tasks.
The Problem
According to the flower dashboard, tasks are received, but never executed.
On the broker tab, i see that there was never a message sent to the redis broker (i don't know if that is relevant).
The only time i could get it to work is when i set the --pool flag to solo. But i don't want that since that is bad practice for a system in production. And even when i do that, only 1 task gets executed and it stops receiving afterwards.
The code In the flask project, i have an app.py, transcription.py, and a transcribejob.py.
Starting with transcribejob.py:
from transcription import Transcriber, WhisperStrategy
from celery import Celery
import os
import json
import sys
transcriber = Transcriber(WhisperStrategy())
#to start the celery worker, enter in the terminal:
#celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4
#to monitor proces: celery -A transcribejob flower
redis_url = 'redis://127.0.0.1:6379/0'
celery = Celery('transcribejob', broker=redis_url, backend=redis_url)
cwd = os.getcwd()
print(f"transcribejob.py, cwd: {cwd}", file=sys.stdout)
@celery.task
def transcribe_export(temp_audio_path, audio_file_name):
result = transcriber.transcribe(os.path.join(temp_audio_path, audio_file_name))
txt_fn = audio_file_name.replace(".wav", "") + '.txt'
with open(os.path.join(cwd, txt_fn), 'w') as f:
json.dump(result, f)
On to transcription.py:
import whisper
class TranscribeStrategy:
def __init__(self, vendor):
self.vendor = vendor
def do_transcribe(self, audio_file_path):
pass
class WhisperStrategy(TranscribeStrategy):
model = whisper.load_model("small")
def __init__(self):
super().__init__("openai-whisper-small")
def do_transcribe(self, audio_file_path):
return self.model.transcribe(audio_file_path, language="nl")
class Transcriber:
def __init__(self, transcribe_strategy):
self.strategy = transcribe_strategy
def transcribe(self, audio_file_path):
return {"vendor": self.strategy.vendor, "result": self.strategy.do_transcribe(audio_file_path)}
and this is my app.py:
from flask import Flask, request
from io import BytesIO
from transcribejob import transcribe_export
import os
import uuid
from pydub import AudioSegment
app = Flask(__name__)
temp_audio_path = os.path.join(os.getcwd(), 'tempAudioStorage')
@app.route('/transcribe', methods=['POST'])
def transcribe_audio_file():
if 'file' not in request.files:
return "No file uploaded", 400
file = request.files['file']
if file.content_type not in {'audio/wav', 'audio/mpeg'}:
return "Invalid file format. Only WAV and MP3 files are allowed. Received Format: "+file.content_type, 400
file_data = BytesIO(file.read())
audio_format = file.content_type.split('/')[-1]
if audio_format == 'mpeg':
audio_format = 'mp3'
audio = AudioSegment.from_file(file_data, format=audio_format)
audio_file_name = str(uuid.uuid4())+".wav"
audio.export(os.path.join(temp_audio_path, audio_file_name), format='wav')
transcribe_export.delay(temp_audio_path, audio_file_name)
return 'Transcription request received, transcription in process', 200
if __name__ == '__main__':
app.run()
The Output when running flask, executing the celery command i get the following output:
(venv) PS C:\Users\SeanS\PycharmProjects\isampTranscribe> celery -A transcribejob:celery worker --loglevel=info --max-memory-per-child=1000000 --concurrency=4
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
-------------- celery@SeanS01 v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.22621-SP0 2023-04-13 14:26:11
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: transcribejob:0x22c05f52c10
- ** ---------- .> transport: redis://127.0.0.1:6379/0
- ** ---------- .> results: redis://127.0.0.1:6379/0
- *** --- * --- .> concurrency: 4 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. transcribejob.transcribe_export
[2023-04-13 14:26:11,887: INFO/MainProcess] Connected to redis://127.0.0.1:6379/0
[2023-04-13 14:26:11,891: INFO/MainProcess] mingle: searching for neighbors
[2023-04-13 14:26:12,429: INFO/SpawnPoolWorker-1] child process 18524 calling self.run()
[2023-04-13 14:26:12,450: INFO/SpawnPoolWorker-3] child process 29760 calling self.run()
[2023-04-13 14:26:12,453: INFO/SpawnPoolWorker-2] child process 30472 calling self.run()
[2023-04-13 14:26:12,460: INFO/SpawnPoolWorker-4] child process 30600 calling self.run()
[2023-04-13 14:26:12,921: WARNING/MainProcess] C:\Users\SeanS\PycharmProjects\isampTranscribe\venv\lib\site-packages\celery\app\control.py:56: DuplicateNodenameWarning: Received multiple replies from node name: celery@SeanS01.
Please make sure you give each node a unique nodename using
the celery worker `-n` option.
warnings.warn(DuplicateNodenameWarning(
[2023-04-13 14:26:12,922: INFO/MainProcess] mingle: all alone
[2023-04-13 14:26:12,944: INFO/MainProcess] celery@SeanS01 ready.
[2023-04-13 14:26:15,346: INFO/MainProcess] Events of group {task} enabled by remote.
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
[2023-04-13 14:27:09,247: INFO/MainProcess] Task transcribejob.transcribe_export[653178f2-c67d-4f96-afa8-f1cbcd470d91] received
[2023-04-13 14:27:10,041: INFO/SpawnPoolWorker-5] child process 25092 calling self.run()
[2023-04-13 14:27:10,051: INFO/SpawnPoolWorker-6] child process 26020 calling self.run()
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
[2023-04-13 14:27:15,740: INFO/SpawnPoolWorker-7] child process 30576 calling self.run()
transcribejob.py, cwd: C:\Users\SeanS\PycharmProjects\isampTranscribe
and this is my flower dashboard:

the redis instance runs on docker and the ports are correclty mapped 6379->6379.
Why won't my celery tasks execute?
