I'm trying to run a RAG pipeline using Haystack (and Milvus) on my cluster instance using python (3.10.12).
The Imports and relevant packages I have in this env are shown at the end of this question.
my code is:
- model embedding & generator functions:
@component
def model_embedder(self, documents,cache_dir=cache_dir):
tokenizer = AutoTokenizer.from_pretrained(mymodel, cache_dir=cache_dir)
model = AutoModelForSeq2SeqLM.from_pretrained(mymodel, cache_dir=cache_dir)
embeddings = []
for doc in documents:
inputs = tokenizer(doc.content, padding="max_length", truncation=True, return_tensors="pt")
with torch.no_grad():
output = model(**inputs)
embedding = output.pooler_output.squeeze(0).cpu().numpy()
embeddings.append(embedding)
for doc, embedding in zip(documents, embeddings):
doc.embedding = embedding
return documents
@component
def model_generator(self, query, context=None, generation_kwargs={}, cache_dir=cache_dir):
tokenizer = AutoTokenizer.from_pretrained(mymodel, cache_dir=cache_dir)
model = AutoModelForSeq2SeqLM.from_pretrained(mymodel, cache_dir=cache_dir)
inputs = tokenizer(query, context=context, padding="max_length", truncation=True, return_tensors="pt")
with torch.no_grad():
output = model.generate(**inputs, **generation_kwargs)
return tokenizer.decode(output[0], skip_special_tokens=True)
- RAG Pipeline:
rag_pipeline = Pipeline()
rag_pipeline.add_component("converter", MarkdownToDocument())
rag_pipeline.add_component(
"splitter", DocumentSplitter(split_by="sentence", split_length=2)
)
rag_pipeline.add_component("embedder", model_embedder)
rag_pipeline.add_component(document_store)
rag_pipeline.add_component(
"retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)
)
rag_pipeline.add_component("writer", DocumentWriter(document_store))
rag_pipeline.add_component("prompt_builder", PromptBuilder(template=prompt_template))
rag_pipeline.add_component(
"generator",
model_generator,
)
rag_pipeline.connect("converter.documents", "splitter.documents")
rag_pipeline.connect("splitter.documents", "embedder.documents")
rag_pipeline.connect("embedder", "writer")
rag_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")
rag_pipeline.draw('./rag_pipeline.png')
If I decorate with @component, I get the following error:
---------------------------------------------------------------------------
ComponentError Traceback (most recent call last)
Cell In[11], line 2
1 @component
----> 2 def model_embedder(self, documents,cache_dir=cache_dir):
4 tokenizer = AutoTokenizer.from_pretrained(mymodel, cache_dir=cache_dir)
File /.../rag_env/lib/python3.10/site-packages/haystack/core/component/component.py:517, in _Component.__call__(self, cls, is_greedy)
513 return self._component(cls, is_greedy=is_greedy)
515 if cls:
516 # Decorator is called without parens
--> 517 return wrap(cls)
519 # Decorator is called with parens
520 return wrap
File /.../rag_env/lib/python3.10/site-packages/haystack/core/component/component.py:513, in _Component.__call__.<locals>.wrap(cls)
512 def wrap(cls):
--> 513 return self._component(cls, is_greedy=is_greedy)
File /.../rag_env/lib/python3.10/site-packages/haystack/core/component/component.py:464, in _Component._component(self, cls, is_greedy)
462 # Check for required methods and fail as soon as possible
463 if not hasattr(cls, "run"):
--> 464 raise ComponentError(f"{cls.__name__} must have a 'run()' method. See the docs for more information.")
466 def copy_class_namespace(namespace):
...
469
470 Simply copy the whole namespace from the decorated class.
471 """
ComponentError: model_embedder must have a 'run()' method. See the docs for more information.
And if I dont use the @component decorator (and remove the self from both functions), these funcitons compile, but then when i run the rag_pipeline code, I get the following error:
---------------------------------------------------------------------------
PipelineValidationError Traceback (most recent call last)
Cell In[13], line 6
2 rag_pipeline.add_component("converter", MarkdownToDocument())
3 rag_pipeline.add_component(
4 "splitter", DocumentSplitter(split_by="sentence", split_length=2)
5 )
----> 6 rag_pipeline.add_component("embedder", model_embedder)
7 rag_pipeline.add_component(document_store)
8 rag_pipeline.add_component(
9 "retriever", MilvusEmbeddingRetriever(document_store=document_store, top_k=3)
10 )
File /.../rag_env/lib/python3.10/site-packages/haystack/core/pipeline/base.py:313, in PipelineBase.add_component(self, name, instance)
311 # Component instances must be components
312 if not isinstance(instance, Component):
--> 313 raise PipelineValidationError(
314 f"'{type(instance)}' doesn't seem to be a component. Is this class decorated with @component?"
315 )
317 if getattr(instance, "__haystack_added_to_pipeline__", None):
318 msg = (
319 "Component has already been added in another Pipeline. Components can't be shared between Pipelines. "
320 "Create a new instance instead."
321 )
PipelineValidationError: '<class 'function'>' doesn't seem to be a component. Is this class decorated with @component?
The imports I'm using are:
import os
import urllib.request
from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever
from haystack.components.builders import PromptBuilder
import mdit_plain
from haystack import component
import huggingface_hub
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
import torch
Since I suspect there might be some issue (/incompatability) with the haystack libraries I use, I show here all the "haystack" related libraries and versions i have in the current environment:
Package Version
---------------------------- --------------
farm-haystack 1.26.3
haystack 0.42
haystack-ai 2.5.1
haystack-experimental 0.2.0
milvus-haystack 0.0.10
I'd be happy to get any suggestions / help how to resolve the issue and run the RAG pipeline.