1

I'm trying to use celery to handle the heavy task of creating a new qdrant collection every time a new model is created, I need to extract the content of the file, create embedding and store it in qdrant db as a collection. The problem is, I get the following error when I call embeddings.embed with HuggingFaceEmbedding inside celery.

celery-dev-1  | [2024-03-27 10:18:27,451: INFO/ForkPoolWorker-19] Load pretrained SentenceTransformer: sentence-transformers/all-mpnet-base-v2
celery-dev-1  | [2024-03-27 10:18:35,856: ERROR/MainProcess] Process 'ForkPoolWorker-19' pid:115 exited with 'signal 11 (SIGSEGV)'
celery-dev-1  | [2024-03-27 10:18:35,868: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 11 (SIGSEGV) Job: 3.')
celery-dev-1  | Traceback (most recent call last):
celery-dev-1  |   File "/usr/local/lib/python3.10/site-packages/billiard/pool.py", line 1264, in mark_as_worker_lost
celery-dev-1  |     raise WorkerLostError(
celery-dev-1  | billiard.einfo.ExceptionWithTraceback:
celery-dev-1  | """
celery-dev-1  | Traceback (most recent call last):
celery-dev-1  |   File "/usr/local/lib/python3.10/site-packages/billiard/pool.py", line 1264, in mark_as_worker_lost
celery-dev-1  |     raise WorkerLostError(
celery-dev-1  | billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV) Job: 3.
celery-dev-1  | """

Here is the knowledge model when the task is called,

class Knowledge(Common):
    name = models.CharField(max_length=255, blank=True, null=True)
    file = models.FileField(upload_to=knowledge_path, storage=PublicMediaStorage())
    qd_knowledge_id = models.CharField(max_length=255, blank=True, null=True)
    is_public = models.BooleanField(default=False)

    #
    def save(self, *args, **kwargs):

        if self.pk is None:
            collection_name = f"{self.name}-{datetime.now().strftime('%Y_%m_%d_%H_%M_%S')}"
            process_files_and_upload_to_qdrant.delay(self.file.name, collection_name)
            self.qd_knowledge_id = collection_name
        super().save(*args, **kwargs)

here is the task and the functions it calls:

@shared_task
def process_files_and_upload_to_qdrant(file_name, collection_name):
    file_path = default_storage.open(file_name)
    result = process_file(file_path, collection_name)
    return result

def process_file(file : InMemoryUploadedFile, collection_name):
    text = read_data_from_pdf(file)
    chunks = get_text_chunks(text)
    embeddings = get_embeddings(chunks)
    client.create_collection(
            collection_name=collection_name,
            vectors_config=qdrant_models.VectorParams(
                size=768, distance=qdrant_models.Distance.COSINE
            ),
        )
    client.upsert(collection_name=collection_name, wait=True, points=embeddings)


def read_data_from_pdf(file : InMemoryUploadedFile):
    text = ""

    pdf_reader = PdfReader(file)

    for page in pdf_reader.pages:
        text += page.extract_text()

    return text


def get_text_chunks(texts: str):
    text_splitter = CharacterTextSplitter(
        separator="\n", chunk_size=1000, chunk_overlap=200, length_function=len
    )
    chunks = text_splitter.split_text(texts)
    return chunks


def get_embeddings(text_chunks):
    from langchain_community.embeddings import HuggingFaceEmbeddings
    from qdrant_client.http.models import PointStruct
    embeddings = HuggingFaceEmbeddings(
        model_name="sentence-transformers/all-mpnet-base-v2"
    )
    points = []
   
    for chunk in text_chunks:
        embedding = embeddings.embed_query(chunk) <---- The error occurs here
        point_id = str(uuid.uuid4())  
        points.append(
            PointStruct(id=point_id, vector=embedding, payload={"text": chunk})
        )

    return points

How do I approach this? Since the model is created as a many to many field, the response takes a long time, due to which I'm trying to move it into a celery task. (Some delay when storing in qdrant is acceptable, it just shouldn't affect the api response time). The api works fine when I do it without celery, but it's super slow.

I've tried splitting them into multiple small celery tasks, but I can't pass the embeddings or non-json serializable data into the task. I don't know how to approach this.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.