diff --git a/src/components/aggregrate_eval_results.py b/src/components/aggregrate_eval_results.py index e78e8ef..6cf52ba 100644 --- a/src/components/aggregrate_eval_results.py +++ b/src/components/aggregrate_eval_results.py @@ -6,16 +6,15 @@ @lightweight_component( consumes={ - "context_precision": pa.float32(), - "context_relevancy": pa.float32(), + "context_precision": pa.float32(), + "context_relevancy": pa.float32(), }, produces={ "metric": pa.string(), - "score": pa.float32() - } + "score": pa.float32(), + }, ) class AggregateResults(DaskTransformComponent): - def transform(self, dataframe: dd.DataFrame) -> dd.DataFrame: metrics = list(self.consumes.keys()) agg = dataframe[metrics].mean() diff --git a/src/components/chunking_component.py b/src/components/chunking_component.py index e541d2a..8d5a041 100644 --- a/src/components/chunking_component.py +++ b/src/components/chunking_component.py @@ -1,20 +1,22 @@ -import typing as t +import typing as t + import pandas as pd import pyarrow as pa from fondant.component import PandasTransformComponent from fondant.pipeline import lightweight_component + @lightweight_component( - consumes={"text":pa.string()}, - produces={"text":pa.string(), "original_document_id":pa.string()}, - extra_requires=["langchain==0.0.329"] + consumes={"text": pa.string()}, + produces={"text": pa.string(), "original_document_id": pa.string()}, + extra_requires=["langchain==0.0.329"], ) class ChunkTextComponent(PandasTransformComponent): """Component that chunks text into smaller segments. More information about the different chunking strategies can be here: - https://python.langchain.com/docs/modules/data_connection/document_transformers/ - https://www.pinecone.io/learn/chunking-strategies/. - """ + """ def __init__( self, @@ -24,13 +26,14 @@ def __init__( ): """ Args: - chunk_size: the chunk size - chunk_overlap: the overlap between chunks + chunk_size: the chunk size + chunk_overlap: the overlap between chunks. """ from langchain.text_splitter import RecursiveCharacterTextSplitter + self.chunker = RecursiveCharacterTextSplitter( chunk_size=chunk_size, - chunk_overlap=chunk_overlap + chunk_overlap=chunk_overlap, ) def chunk_text(self, row) -> t.List[t.Tuple]: @@ -46,6 +49,7 @@ def chunk_text(self, row) -> t.List[t.Tuple]: def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: import itertools + results = dataframe.apply( self.chunk_text, axis=1, diff --git a/src/components/evaluate_ragas.py b/src/components/evaluate_ragas.py index dca537a..92c99dd 100644 --- a/src/components/evaluate_ragas.py +++ b/src/components/evaluate_ragas.py @@ -1,21 +1,19 @@ -import typing as t -import pyarrow as pa import pandas as pd +import pyarrow as pa from fondant.component import PandasTransformComponent from fondant.pipeline import lightweight_component - @lightweight_component( consumes={ - "question": pa.string(), - "retrieved_chunks": pa.list_(pa.string()) - }, + "question": pa.string(), + "retrieved_chunks": pa.list_(pa.string()), + }, produces={ - "context_precision": pa.float32(), - "context_relevancy": pa.float32() - }, - extra_requires=["ragas==0.0.21"] + "context_precision": pa.float32(), + "context_relevancy": pa.float32(), + }, + extra_requires=["ragas==0.0.21"], ) class RagasEvaluator(PandasTransformComponent): def __init__( @@ -23,22 +21,23 @@ def __init__( *, llm_module_name: str, llm_class_name: str, - llm_kwargs: dict + llm_kwargs: dict, ) -> None: """ Args: llm_module_name: Module from which the LLM is imported. Defaults to langchain.chat_models llm_class_name: Name of the selected llm. Defaults to ChatOpenAI - llm_kwargs: Arguments of the selected llm + llm_kwargs: Arguments of the selected llm. """ self.llm = self.extract_llm( llm_module_name=llm_module_name, llm_class_name=llm_class_name, llm_kwargs=llm_kwargs, ) - + from ragas.llms import LangchainLLM + self.gpt_wrapper = LangchainLLM(llm=self.llm) self.metric_functions = self.extract_metric_functions( metrics=["context_precision", "context_relevancy"], @@ -76,10 +75,12 @@ def create_hf_ds(dataframe: pd.DataFrame): ) from datasets import Dataset + return Dataset.from_pandas(dataframe) def ragas_eval(self, dataset): from ragas import evaluate + return evaluate(dataset=dataset, metrics=self.metric_functions) def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: @@ -93,4 +94,4 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: results_df = result.to_pandas() results_df = results_df.set_index(dataframe.index) - return results_df \ No newline at end of file + return results_df diff --git a/src/components/retrieve_from_weaviate.py b/src/components/retrieve_from_weaviate.py index 4d53593..608d4c3 100644 --- a/src/components/retrieve_from_weaviate.py +++ b/src/components/retrieve_from_weaviate.py @@ -1,13 +1,12 @@ -import typing as t -import dask.dataframe as dd import pandas as pd import pyarrow as pa from fondant.component import PandasTransformComponent from fondant.pipeline import lightweight_component + @lightweight_component( produces={"retrieved_chunks": pa.list_(pa.string())}, - extra_requires=["weaviate-client==3.24.1"] + extra_requires=["weaviate-client==3.24.1"], ) class RetrieveFromWeaviateComponent(PandasTransformComponent): def __init__( @@ -60,7 +59,6 @@ def retrieve_chunks_from_embeddings(self, vector_query: list): return [retrieved_chunk["passage"] for retrieved_chunk in result_dict] def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: - if "embedding" in dataframe.columns: dataframe["retrieved_chunks"] = dataframe["embedding"].apply( self.retrieve_chunks_from_embeddings, @@ -72,4 +70,4 @@ def transform(self, dataframe: pd.DataFrame) -> pd.DataFrame: msg, ) - return dataframe \ No newline at end of file + return dataframe diff --git a/src/pipeline_eval.py b/src/pipeline_eval.py index 0935327..2dd4056 100644 --- a/src/pipeline_eval.py +++ b/src/pipeline_eval.py @@ -1,10 +1,10 @@ """Fondant pipeline to evaluate a RAG pipeline.""" import pyarrow as pa -from fondant.pipeline import Pipeline, Resources -from components.retrieve_from_weaviate import RetrieveFromWeaviateComponent -from components.evaluate_ragas import RagasEvaluator from components.aggregrate_eval_results import AggregateResults +from components.evaluate_ragas import RagasEvaluator +from components.retrieve_from_weaviate import RetrieveFromWeaviateComponent +from fondant.pipeline import Pipeline, Resources def create_pipeline( @@ -61,12 +61,12 @@ def create_pipeline( ) retrieve_chunks = embed_text_op.apply( - RetrieveFromWeaviateComponent, - arguments={ - "weaviate_url": weaviate_url, - "class_name": weaviate_class, - "top_k": retrieval_top_k - }, + RetrieveFromWeaviateComponent, + arguments={ + "weaviate_url": weaviate_url, + "class_name": weaviate_class, + "top_k": retrieval_top_k, + }, ) retriever_eval = retrieve_chunks.apply( @@ -74,16 +74,16 @@ def create_pipeline( arguments={ "llm_module_name": llm_module_name, "llm_class_name": llm_class_name, - "llm_kwargs": llm_kwargs - } + "llm_kwargs": llm_kwargs, + }, ) retriever_eval.apply( - AggregateResults, + AggregateResults, consumes={ "context_precision": "context_precision", - "context_relevancy": "context_relevancy" - } + "context_relevancy": "context_relevancy", + }, ) return evaluation_pipeline diff --git a/src/pipeline_index.py b/src/pipeline_index.py index 72fc318..394d63a 100644 --- a/src/pipeline_index.py +++ b/src/pipeline_index.py @@ -1,8 +1,10 @@ """Fondant pipeline to index a RAG system.""" -import pyarrow as pa -from fondant.pipeline import Pipeline, Resources from pathlib import Path + +import pyarrow as pa from components.chunking_component import ChunkTextComponent +from fondant.pipeline import Pipeline, Resources + def create_pipeline( *, @@ -17,15 +19,13 @@ def create_pipeline( accelerator_name=None, ): """Create a Fondant pipeline based on the provided arguments.""" - - Path(base_path).mkdir(parents=True, exist_ok=True) pipeline = Pipeline( name="indexing-pipeline", description="Pipeline to prepare and process data for building a RAG solution", - base_path=base_path - ) + base_path=base_path, + ) text = pipeline.read( "load_from_hf_hub", @@ -35,29 +35,27 @@ def create_pipeline( "n_rows_to_load": n_rows_to_load, }, produces={ - "text": pa.string() - } + "text": pa.string(), + }, ) - chunks = text.apply( ChunkTextComponent, - arguments=chunk_args + arguments=chunk_args, ) - embeddings = chunks.apply( "embed_text", arguments={ "model_provider": embed_model_provider, - "model": embed_model + "model": embed_model, }, resources=Resources( accelerator_number=number_of_accelerators, accelerator_name=accelerator_name, ), cluster_type="local" if number_of_accelerators is not None else "default", - cache=False + cache=False, ) embeddings.write( @@ -68,8 +66,8 @@ def create_pipeline( }, consumes={ "text": pa.string(), - "embedding": pa.list_(pa.float32()), - } + "embedding": pa.list_(pa.float32()), + }, ) return pipeline