0

I'm trying to run a RAG pipeline using Haystack (and Milvus) on my cluster instance using python (3.10.12).
The Imports and relevant packages I have in this env are shown at the end of this question.

my code is:

  1. model embedding & generator functions:
@component
def model_embedder(self, documents,cache_dir=cache_dir):
    tokenizer = AutoTokenizer.from_pretrained(mymodel, cache_dir=cache_dir)
    model = AutoModelForSeq2SeqLM.from_pretrained(mymodel, cache_dir=cache_dir)

    embeddings = []
    for doc in documents:
        inputs = tokenizer(doc.content, padding="max_length", truncation=True, return_tensors="pt")
        with torch.no_grad():
            output = model(**inputs)
            embedding = output.pooler_output.squeeze(0).cpu().numpy()
        embeddings.append(embedding)

    for doc, embedding in zip(documents, embeddings):
        doc.embedding = embedding
    return documents

@component
def model_generator(self, query, context=None, generation_kwargs={}, cache_dir=cache_dir):
    tokenizer = AutoTokenizer.from_pretrained(mymodel, cache_dir=cache_dir)
    model = AutoModelForSeq2SeqLM.from_pretrained(mymodel, cache_dir=cache_dir)

    inputs = tokenizer(query, context=context, padding="max_length", truncation=True, return_tensors="pt")
    with torch.no_grad():
        output = model.generate(**inputs, **generation_kwargs)
    return tokenizer.decode(output[0], skip_special_tokens=True)
  1. RAG Pipeline:
rag_pipeline = Pipeline()
rag_pipeline.add_component("converter", MarkdownToDocument())
rag_pipeline.add_component(
    "splitter", DocumentSplitter(split_by="sentence", split_length=2)
)
rag_pipeline.add_component("embedder", model_embedder)  
rag_pipeline.add_component(document_store)
rag_pipeline.add_component(
    "retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)
)
rag_pipeline.add_component("writer", DocumentWriter(document_store))
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component(
    "generator",
    model_generator,
)

rag_pipeline.connect("converter.documents", "splitter.documents")  
rag_pipeline.connect("splitter.documents", "embedder.documents")   
rag_pipeline.connect("embedder", "writer")
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")

rag_pipeline.draw('./rag_pipeline.png')

If I decorate with @component, I get the following error:

---------------------------------------------------------------------------
ComponentError                            Traceback (most recent call last)
Cell In[11], line 2
      1 @component
----> 2 def model_embedder(self, documents,cache_dir=cache_dir):
      4     tokenizer = AutoTokenizer.from_pretrained(mymodel, cache_dir=cache_dir)

File /.../rag_env/lib/python3.10/site-packages/haystack/core/component/component.py:517, in _Component.__call__(self, cls, is_greedy)
    513     return self._component(cls, is_greedy=is_greedy)
    515 if cls:
    516     # Decorator is called without parens
--> 517     return wrap(cls)
    519 # Decorator is called with parens
    520 return wrap

File /.../rag_env/lib/python3.10/site-packages/haystack/core/component/component.py:513, in _Component.__call__.<locals>.wrap(cls)
    512 def wrap(cls):
--> 513     return self._component(cls, is_greedy=is_greedy)

File /.../rag_env/lib/python3.10/site-packages/haystack/core/component/component.py:464, in _Component._component(self, cls, is_greedy)
    462 # Check for required methods and fail as soon as possible
    463 if not hasattr(cls, "run"):
--> 464     raise ComponentError(f"{cls.__name__} must have a 'run()' method. See the docs for more information.")
    466 def copy_class_namespace(namespace):
...
    469 
    470     Simply copy the whole namespace from the decorated class.
    471     """

ComponentError: model_embedder must have a 'run()' method. See the docs for more information.

And if I dont use the @component decorator (and remove the self from both functions), these funcitons compile, but then when i run the rag_pipeline code, I get the following error:

---------------------------------------------------------------------------
PipelineValidationError                   Traceback (most recent call last)
Cell In[13], line 6
      2 rag_pipeline.add_component("converter", MarkdownToDocument())
      3 rag_pipeline.add_component(
      4     "splitter", DocumentSplitter(split_by="sentence", split_length=2)
      5 )
----> 6 rag_pipeline.add_component("embedder", model_embedder)  
      7 rag_pipeline.add_component(document_store)
      8 rag_pipeline.add_component(
      9     "retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)
     10 )

File /.../rag_env/lib/python3.10/site-packages/haystack/core/pipeline/base.py:313, in PipelineBase.add_component(self, name, instance)
    311 # Component instances must be components
    312 if not isinstance(instance, Component):
--> 313     raise PipelineValidationError(
    314         f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?"
    315     )
    317 if getattr(instance, "__haystack_added_to_pipeline__", None):
    318     msg = (
    319         "Component has already been added in another Pipeline. Components can't be shared between Pipelines. "
    320         "Create a new instance instead."
    321     )

PipelineValidationError: '<class 'function'>' doesn't seem to be a component. Is this class decorated with @component?

The imports I'm using are:

import os
import urllib.request

from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument

from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter

from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever

from haystack.components.builders import PromptBuilder

import mdit_plain
from haystack import component

import huggingface_hub
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
import torch

Since I suspect there might be some issue (/incompatability) with the haystack libraries I use, I show here all the "haystack" related libraries and versions i have in the current environment:

Package                      Version
---------------------------- --------------
farm-haystack                1.26.3
haystack                     0.42
haystack-ai                  2.5.1
haystack-experimental        0.2.0
milvus-haystack              0.0.10

I'd be happy to get any suggestions / help how to resolve the issue and run the RAG pipeline.

1 Answer 1

1
  1. The only package you need is haystack-ai (pip install haystack-ai). Installing farm-haystack and haystack-ai in the same Python environment causes problems. See Get started page.

  2. Here you can find some usage examples for Haystack + Milvus.

  3. Looking at your code, you are trying to implement a custom Embedder + a custom Generator. I suggest looking at the docs to understand if the existing Haystack components are suitable your use case. For example, you might take a look at SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder, and HuggingFaceLocalGenerator.

  4. If existing components do not fit your use case, take a look at Creating Custom Components guide. In short, a Component is an object and the execution logic must live inside the run method.

  5. Your Pipeline tries to do indexing + RAG. It would be better to have different pipelines. Docs for RAG Pipeline - Tutorial on RAG - Tutorial on Indexing.


Feel free to join the Haystack Discord channel for further guidance and to interact with the community.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.