Synthetic Data Generation¶
RAG Applications are often tricky to evaluate, especially when you haven't obtained any user queries to begin. In this notebook, we'll see how we can use instructor
to quickly generate synthetic questions from a dataset to benchmark your retrieval systems using some simple metrics.
Data Ingestion¶
Let's first start by installing the required packages and ingesting the first 200 rows of the ms-marco
dataset into our local database.
!uv pip install instructor openai datasets lancedb tantivy tenacity tqdm
Audited 7 packages in 301ms
We're using lancedb
here to easily ingest large amounts of data. This is preferable since we can define our table schema using a Pydantic
Schema and also have LanceDB automatically handle the generation of the embeddings using their get_registry()
method that we can define as an object property.
from lancedb import connect
DB_PATH = "./db"
DB_TABLE = "ms_marco"
# Create a db at the path `./db`
db = connect(DB_PATH)
from lancedb.pydantic import LanceModel, Vector
from lancedb.embeddings import get_registry
func = get_registry().get("openai").create(name="text-embedding-3-small")
class Chunk(LanceModel):
passage:str = func.SourceField()
chunk_id:str
embedding:Vector(func.ndims()) = func.VectorField()
table = db.create_table(DB_TABLE, schema=Chunk, exist_ok=True, mode="overwrite")
from datasets import load_dataset
N_ROWS = 200
dataset = load_dataset("ms_marco", "v1.1", split="train", streaming=True).take(N_ROWS)
# from itertools import islice
first_item = next(iter(dataset))
first_item.keys()
dict_keys(['answers', 'passages', 'query', 'query_id', 'query_type', 'wellFormedAnswers'])
first_item['passages']['passage_text'][:3]
["Since 2007, the RBA's outstanding reputation has been affected by the 'Securency' or NPA scandal. These RBA subsidiaries were involved in bribing overseas officials so that Australia might win lucrative note-printing contracts. The assets of the bank include the gold and foreign exchange reserves of Australia, which is estimated to have a net worth of A$101 billion. Nearly 94% of the RBA's employees work at its headquarters in Sydney, New South Wales and at the Business Resumption Site.", "The Reserve Bank of Australia (RBA) came into being on 14 January 1960 as Australia 's central bank and banknote issuing authority, when the Reserve Bank Act 1959 removed the central banking functions from the Commonwealth Bank. The assets of the bank include the gold and foreign exchange reserves of Australia, which is estimated to have a net worth of A$101 billion. Nearly 94% of the RBA's employees work at its headquarters in Sydney, New South Wales and at the Business Resumption Site.", 'RBA Recognized with the 2014 Microsoft US Regional Partner of the ... by PR Newswire. Contract Awarded for supply and support the. Securitisations System used for risk management and analysis. ']
import hashlib
from itertools import batched
def get_passages(dataset):
for row in dataset:
for passage in row['passages']['passage_text']:
yield {
"passage":passage,
"chunk_id":hashlib.md5(passage.encode()).hexdigest()
}
passages = batched(get_passages(dataset),10)
for passage_batch in passages:
# print(passage_batch)
table.add(list(passage_batch))
Synthetic Questions¶
Now that we have the first ~2000 passages from the MS-Marco dataset ingested into our database. Let's start generating some synthetic questions using the chunks we've ingested.
Let's see how we might do so using instructor
by defining a datamodel that can help support this use-case.
from pydantic import BaseModel,Field
class QuestionAnswerPair(BaseModel):
"""
This model represents a pair of a question generated from a text chunk, its corresponding answer,
and the chain of thought leading to the answer. The chain of thought provides insight into how the answer
was derived from the question.
"""
chain_of_thought: str = Field(
description="The reasoning process leading to the answer."
)
question: str = Field(
description="The generated question from the text chunk."
)
answer: str = Field(description="The answer to the generated question.")
Once we've defined this data-model, we can then use it in an instructor call to generate a synthetic question.
from openai import OpenAI
from instructor import from_openai
client = from_openai(OpenAI())
def generate_question(chunk:str)->QuestionAnswerPair:
return client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "You are a world class AI that excels at generating hypothethical search queries. You're about to be given a text snippet and asked to generate a search query which is specific to the specific text chunk that you'll be given. Make sure to use information from the text chunk.",
},
{
"role": "user",
"content": f"Here is the text chunk: {chunk}"
}
],
response_model=QuestionAnswerPair
)
text_chunk = """
The Reserve Bank of Australia (RBA) came into being on 14 January 1960 as Australia 's central bank and banknote issuing authority, when the Reserve Bank Act 1959 removed the central banking functions from the Commonwealth Bank. The assets of the bank include the gold and foreign exchange reserves of Australia, which is estimated to have a net worth of A$101 billion. Nearly 94% of the RBA's employees work at its headquarters in Sydney, New South Wales and at the Business Resumption Site.
"""
print(generate_question(text_chunk).model_dump_json(indent=2))
{ "chain_of_thought": "To form a specific question from the given text chunk, I should focus on the unique details provided about the Reserve Bank of Australia, such as its creation, functions, and assets.", "question": "When was the Reserve Bank of Australia established as Australia's central bank and banknote issuing authority?", "answer": "The Reserve Bank of Australia was established as Australia's central bank and banknote issuing authority on 14 January 1960." }
Now that we've seen how to generate a single question, let's see how we might be able to scale this up. We can do so by taking advantage of the asyncio
library and tenacity
to handle retries.
chunks = table.to_pandas()
chunks = [item for item in chunks['passage']]
chunks[:2]
["Since 2007, the RBA's outstanding reputation has been affected by the 'Securency' or NPA scandal. These RBA subsidiaries were involved in bribing overseas officials so that Australia might win lucrative note-printing contracts. The assets of the bank include the gold and foreign exchange reserves of Australia, which is estimated to have a net worth of A$101 billion. Nearly 94% of the RBA's employees work at its headquarters in Sydney, New South Wales and at the Business Resumption Site.", "The Reserve Bank of Australia (RBA) came into being on 14 January 1960 as Australia 's central bank and banknote issuing authority, when the Reserve Bank Act 1959 removed the central banking functions from the Commonwealth Bank. The assets of the bank include the gold and foreign exchange reserves of Australia, which is estimated to have a net worth of A$101 billion. Nearly 94% of the RBA's employees work at its headquarters in Sydney, New South Wales and at the Business Resumption Site."]
from asyncio import Semaphore
from tenacity import retry, stop_after_attempt, wait_exponential
from openai import AsyncOpenAI
import asyncio
client = from_openai(AsyncOpenAI())
async def generate_questions(chunks:list[str],max_queries:int):
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10))
async def generate_question(chunk:str,sem:Semaphore)->tuple[QuestionAnswerPair,str]:
async with sem:
return (await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{
"role": "system",
"content": "You are a world class AI that excels at generating hypothethical search queries. You're about to be given a text snippet and asked to generate a search query which is specific to the specific text chunk that you'll be given. Make sure to use information from the text chunk.",
},
{
"role": "user",
"content": f"Here is the text chunk: {chunk}"
}
],
response_model=QuestionAnswerPair
), chunk)
sem = Semaphore(max_queries)
coros = [
generate_question(chunk,sem)
for chunk in
chunks
]
return await asyncio.gather(*coros)
questions = await generate_questions(chunks[:300],10)
Benchmarking Retrieval¶
Now that we've generated a list of questions to query our database with, let's do a quick benchmark to see how full text search compares against that of hybrid search. We'll use two simple metrics here - Mean Reciprocal Rank ( MRR ) and Recall.
Let's start by making sure we have an inverted index created on our table above that we can perform full text search on
table.create_fts_index("passage",replace=True)
This allows us to then use the .search
function on each table to query it using full text search. Let's see an example below.
for entry in table.search("RBA",query_type="fts").limit(2).to_list():
print(entry['passage'])
A rebuildable atomizer (RBA), often referred to as simply a “rebuildable,” is just a special type of atomizer used in the Vape Pen and Mod Industry that connects to a personal vaporizer. 1 The bottom feed RBA is, perhaps, the easiest of all RBA types to build, maintain, and use. 2 It is filled from below, much like bottom coil clearomizer. 3 Bottom feed RBAs can utilize cotton instead of silica for the wick. 4 The Genesis, or genny, is a top feed RBA that utilizes a short woven mesh wire. Results-Based Accountability® (also known as RBA) is a disciplined way of thinking and taking action that communities can use to improve the lives of children, youth, families, adults and the community as a whole. RBA is also used by organizations to improve the performance of their programs. RBA improves the lives of children, families, and communities and the performance of programs because RBA: 1 Gets from talk to action quickly; 2 Is a simple, common sense process that everyone can understand; 3 Helps groups to surface and challenge assumptions that can be barriers to innovation;
Metrics¶
Now that we've figured out how we might be able to query our table using full text search. Let's take a step back and see how we can implement some metrics to quantiatively evaluate the retrieved items. It's important to note that when we want to evalute the quality of our listings, we always take it at some subset of k.
This is important because k is often constrained by a business outcome and can help us determine how well our solution works
Eg. Here are some hypothetical scenarios
- k=5 : We'd like to display some recomended items based of a user query (Eg. Help me plan out a dinner with Jonathan next week -> Display 5 possible actions)
- k=10 : We have a small carousel with recomended items for a user to buy
- k=25 : We're using a re-ranker, is it filtering out the irrelevant chunks from the relevant chunks well?
- k=50 : We have a pipeline that fetches information for a model to respond with, are we fetching all relevant bits of information
Reciprocal Rank¶
Reciprocal Rank Imagine we're spotify and we want to suggest a couple of songs to the user. Which is a better result among the two lists of retrieved songs below? ( Note that 2 is the answer we want )
- [0,1,2,3,4]
- [0,1,3,4,2]
Obviously if we're suggesting songs to the user, we want the first relevant song to be listed as early as possible! Therefore we'd prefer 1 over 2 in the example above because 2 is ordered earlier in the first case. A metric that works well for this is the Reciprocal Rank (RR).
def rr(results,labels):
return max(
[round(1/(results.index(label)+1),2) if label in results else 0
for label in labels]
)
This is an aggressive metric and once we get to an position of > 10, the value doesn't change much anymore. Most of the big changes happen at indexes < 10.
Recall¶
Another metric that we can track is recall which measures how many of our retrieved items were retrieved.
def recall(results,relevant_chunks):
return sum([1 if chunk in results else 0 for chunk in relevant_chunks])/len(relevant_chunks)
Using Our Questions¶
Now that we've seen two metrics that we can use and how we might be able to generate some synthetic questions, let's try it out on an actual question.
To do so, we'll first generate a unique chunk id for our original passage that we generated the question from.
We'll then compare the chunk_ids of the retrieved chunks and then compute the mrr
and the recall
of the retrieved results.
import hashlib
sample_question,chunk = questions[0]
chunk_id = hashlib.md5(chunk.encode()).hexdigest()
chunk_id, sample_question.question, chunk
('b6d9bf888fd53590ee69a913bd9bf8a4', "What factors influence the average salary for people with a bachelor's degree?", "However, the average salary for people with a bachelor's degree varies widely based upon several factors, including their major, job position, location and years of experience. The National Association of Colleges and Employers conducted a salary survey that determined the average starting salary for graduates of various bachelor's degree programs.")
retrieved_results = table.search(sample_question.question,query_type='fts').limit(25).to_list()
retrieved_chunk_ids = [item['chunk_id'] for item in retrieved_results]
retrieved_chunk_ids[:3]
['b6d9bf888fd53590ee69a913bd9bf8a4', '7a0254c9dc709220367857dcb67f2c8d', '04e7e6f91463033aa87b4104ea16b477']
We can now compute the results for the retrieved items that we've obtained using full text search relative to the ground truth label that we have - the original chunk that we generated it from
recall(retrieved_chunk_ids,[chunk_id]), rr(retrieved_chunk_ids,[chunk_id])
(1.0, 1.0)
Scaling it up for different values of k
, where we can see how this value changes for different subsets of the retrieved items is relatively simple.
We can generate this mapping automatically using itertools.product
from itertools import product
SIZES = [3,5,10,15,25]
METRICS = [
["mrr",rr],
["recall",recall]
]
score_fns = {}
for metric,size in product(METRICS,SIZES):
metric_name, score_fn = metric
score_fns[f"{metric_name}@{size}"] = lambda predictions,labels, fn=score_fn, k=size: fn(predictions[:k],labels) # type: ignore
Running an Evaluation¶
We can now use the code above to run a test to see how our full text search performs for our synthetic questions.
import hashlib
from tqdm import tqdm
fts_results = []
for sample_qn, chunk in tqdm(questions):
chunk_id = hashlib.md5(chunk.encode()).hexdigest()
cleaned_question = ''.join(char for char in sample_qn.question if char.isalnum() or char.isspace())
retrieved_results = table.search(cleaned_question, query_type='fts').limit(25).to_list()
retrieved_chunk_ids = [item['chunk_id'] for item in retrieved_results]
fts_results.append(
{
metric: score_fn(retrieved_chunk_ids,[chunk_id])
for metric,score_fn
in score_fns.items()
}
)
100%|██████████| 300/300 [00:07<00:00, 41.64it/s]
import pandas as pd
df = pd.DataFrame(fts_results)
df.mean()
mrr@3 0.784267 mrr@5 0.791267 mrr@10 0.797633 mrr@15 0.798133 mrr@25 0.798433 recall@3 0.896667 recall@5 0.926667 recall@10 0.973333 recall@15 0.980000 recall@25 0.986667 dtype: float64
We can see that on average full text search is able to surface the relevant item 97-98% of the time if we take k=10
and that we have the relevant item in between the first and second item here.
Now, because these are synthetic question, there's likely to be a large amount of overlap in the phrases used in the questions and the original source text, leading to the high values.
In actual production applications and your domain specific dataset, it's useful to do these experiments and see what works best for your needs.