1. Transition from Chroma to PostgreSQL
1. Transition from Chroma to PostgreSQL
1.1. Step 1. Create Remote Database with Target Schema
1.1. Step 1. Create Remote Database with Target Schema
Setup a PostgreSQL Databse with your own choice of cloud service provider:
1import psycopg2 2from pgvector.psycopg2 import register_vector 3import json 4from dotenv import load_dotenv 5import os 6 7load_dotenv(override=True) 8 9conn = psycopg2.connect( 10 host=os.getenv("POSTGRES_HOST"), 11 database=os.getenv("POSTGRES_DATABASE"), 12 user=os.getenv("POSTGRES_USER"), 13 password=os.getenv("POSTGRES_PASSWORD"), 14 sslmode="require" # Neon requires SSL 15) 16 17cur = conn.cursor() 18 19try: 20 cur.execute("CREATE EXTENSION IF NOT EXISTS vector;") 21 conn.commit() 22 print("Vector extension created successfully") 23except Exception as e: 24 print(f"Error creating extension: {e}") 25 conn.rollback() 26 # Check if extension exists 27 cur.execute("SELECT * FROM pg_available_extensions WHERE name = 'vector';") 28 result = cur.fetchone() 29 if result: 30 print("Vector extension is available but not installed. Contact Neon support.") 31 else: 32 print("Vector extension is not available in this database.") 33 exit(1) 34 35register_vector(conn) 36 37cur.execute(""" 38 DROP TABLE IF EXISTS embeddings; 39 40 CREATE TABLE embeddings ( 41 id TEXT PRIMARY KEY, 42 content TEXT, 43 metadata JSONB, 44 embedding vector(1536) 45 ); 46 47 CREATE INDEX ON embeddings USING ivfflat (embedding vector_cosine_ops) 48 WITH (lists = 100); 49""") 50conn.commit() 51print("Table created successfully")
1.2. Step 2. Migrate from Chroma DB to PostgreSQL DB
1.2. Step 2. Migrate from Chroma DB to PostgreSQL DB
1import psycopg2 2from pgvector.psycopg2 import register_vector 3import json 4from chromadb import PersistentClient 5from dotenv import load_dotenv 6import os 7 8load_dotenv(override=True) 9 10CHROMA_DB_NAME = "preprocessed_db" 11COLLECTION_NAME = "docs" 12 13conn = psycopg2.connect( 14 host=os.getenv("POSTGRES_HOST"), 15 database=os.getenv("POSTGRES_DATABASE"), 16 user=os.getenv("POSTGRES_USER"), 17 password=os.getenv("POSTGRES_PASSWORD"), 18 sslmode="require" 19) 20register_vector(conn) 21 22 23def clean_text(text): 24 """Remove null bytes and other problematic characters""" 25 if text is None: 26 return "" 27 return text.replace('\x00', '') 28 29 30def migrate_chroma_to_postgres(): 31 # Load from ChromaDB 32 chroma = PersistentClient(path=CHROMA_DB_NAME) 33 collection = chroma.get_collection(COLLECTION_NAME) 34 result = collection.get(include=['embeddings', 'documents', 'metadatas']) 35 36 # Insert into PostgreSQL 37 cur = conn.cursor() 38 39 for i, (doc_id, doc, embedding, metadata) in enumerate( 40 zip(result['ids'], result['documents'], 41 result['embeddings'], result['metadatas']) 42 ): 43 # Clean the document text 44 cleaned_doc = clean_text(doc) 45 46 # Clean metadata values if they're strings 47 cleaned_metadata = {} 48 for key, value in metadata.items(): 49 if isinstance(value, str): 50 cleaned_metadata[key] = clean_text(value) 51 else: 52 cleaned_metadata[key] = value 53 54 try: 55 cur.execute( 56 """ 57 INSERT INTO embeddings (id, content, metadata, embedding) 58 VALUES (%s, %s, %s, %s) 59 ON CONFLICT (id) DO UPDATE 60 SET content = EXCLUDED.content, 61 metadata = EXCLUDED.metadata, 62 embedding = EXCLUDED.embedding 63 """, 64 (doc_id, cleaned_doc, json.dumps(cleaned_metadata), embedding) 65 ) 66 except Exception as e: 67 print(f"Error at document {i} (id: {doc_id}): {e}") 68 conn.rollback() 69 continue 70 71 if i % 100 == 0: 72 conn.commit() 73 print(f"Migrated {i} documents...") 74 75 conn.commit() 76 print(f"Migration complete! Total documents: {len(result['ids'])}") 77 78 79# Run migration 80migrate_chroma_to_postgres()
1.3. Step 3. Inject New Article into PostgreSQL Vector DB
1.3. Step 3. Inject New Article into PostgreSQL Vector DB
When everything works properly we would like to directly inject the data into the vector db without going though chroma again.
Most of the implmentations are directly coped from:
1import psycopg2 2from pgvector.psycopg2 import register_vector 3from dotenv import load_dotenv 4import os 5from openai import AzureOpenAI 6from litellm import completion 7from pydantic import BaseModel, Field 8import frontmatter 9import re 10import time 11import json 12from typing import TypedDict 13from tqdm import tqdm 14 15load_dotenv(override=True) 16 17# Set environment variables for litellm 18os.environ["AZURE_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY") 19os.environ["AZURE_API_BASE"] = os.getenv("AZURE_OPENAI_ENDPOINT") 20os.environ["AZURE_API_VERSION"] = os.getenv("AZURE_API_VERSION") 21 22 23class CustomDocument(TypedDict): 24 tags: str 25 title: str 26 text: str 27 28 29class Result(BaseModel): 30 page_content: str 31 metadata: dict 32 33 34class Chunk(BaseModel): 35 headline: str = Field( 36 description="A brief heading for this chunk, typically a few words, that is most likely to be surfaced in a query. This headline must be in English") 37 summary: str = Field( 38 description="A few sentences summarizing the content of this chunk to answer common questions, this summary must be in English") 39 original_text: str = Field( 40 description="The original text of this chunk from the provided document, exactly as is, not changed in any way") 41 42 def as_result(self, document): 43 metadata = {"title": document["title"], "tags": document["tags"]} 44 return Result(page_content=self.headline + "\n\n" + self.summary + "\n\n" + self.original_text, metadata=metadata) 45 46 47class Chunks(BaseModel): 48 chunks: list[Chunk] 49 50 51class ArticleInjector: 52 """Inject new articles into PostgreSQL vector database""" 53 54 def __init__(self, average_chunk_size: int = 2500): 55 # PostgreSQL connection 56 self.conn = psycopg2.connect( 57 host=os.getenv("POSTGRES_HOST"), 58 database=os.getenv("POSTGRES_DATABASE"), 59 user=os.getenv("POSTGRES_USER"), 60 password=os.getenv("POSTGRES_PASSWORD"), 61 sslmode="require" 62 ) 63 register_vector(self.conn) 64 65 # Azure OpenAI setup 66 self.client = AzureOpenAI( 67 api_key=os.getenv("AZURE_OPENAI_API_KEY"), 68 api_version=os.getenv("AZURE_API_VERSION"), 69 azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT") 70 ) 71 72 self.embedding_model = "text-embedding-ada-002" 73 self.model = f"azure/{os.getenv('AZURE_OPENAI_MODEL')}" 74 self.average_chunk_size = average_chunk_size 75 76 def get_tags_and_title_from_blogpost(self, filepath: str) -> tuple[str, str]: 77 try: 78 blog_post = frontmatter.load(filepath) 79 except Exception as e: 80 print(f"Error loading {filepath}: {e}") 81 raise 82 83 tags = blog_post.get("tag", "") 84 title = blog_post.get("title", "") 85 86 if isinstance(tags, list): 87 tags = ",".join(sorted(tags)) 88 elif isinstance(tags, str) and "," in tags: 89 tags = ",".join(sorted([t.strip() for t in tags.split(",")])) 90 return tags, title 91 92 def load_document(self, filepath: str) -> CustomDocument: 93 """Load a single markdown file and return as CustomDocument""" 94 blog_post = frontmatter.load(filepath) 95 tags, title = self.get_tags_and_title_from_blogpost(filepath) 96 97 # Get content without frontmatter 98 text = blog_post.content 99 100 # Remove <style>...</style> blocks (including multiline) 101 text = re.sub(r'<style[^>]*>.*?</style>', '', 102 text, flags=re.DOTALL | re.IGNORECASE) 103 104 # Clean up extra whitespace 105 text = text.strip() 106 107 return CustomDocument(tags=tags, title=title, text=text) 108 109 def make_user_prompt(self, document: CustomDocument): 110 how_many = (len(document["text"]) // self.average_chunk_size) + 1 111 return f""" 112 You take a document and you split the document into overlapping chunks for a KnowledgeBase. 113 114 The document is from the articles from Blog of James Lee. 115 The document is of tags: {document["tags"]} 116 The document has title: {document["title"]} 117 118 A chatbot will use these chunks to answer questions about the articles and retrieve a related list of articles for the reader. 119 You should divide up the document as you see fit, being sure that the entire document is returned in the chunks - don't leave anything out. 120 This document should probably be split into {how_many} chunks, but you can have more or less as appropriate. 121 There should be overlap between the chunks as appropriate; typically about 25% overlap or about 50 words, so you have the same text in multiple chunks for best retrieval results. 122 123 For each chunk, you should provide a headline, a summary, and the original text of the chunk. 124 Together your chunks should represent the entire document with overlap. 125 126 Here is the document: 127 128 {document["text"]} 129 130 Respond with the chunks. 131 """ 132 133 def process_document(self, document: CustomDocument) -> list[Result]: 134 """Process document into chunks using LLM""" 135 messages = [ 136 {"role": "user", "content": self.make_user_prompt(document)}] 137 response = completion( 138 model=self.model, messages=messages, response_format=Chunks) 139 reply = response.choices[0].message.content 140 doc_as_chunks = Chunks.model_validate_json(reply).chunks 141 return [chunk.as_result(document) for chunk in doc_as_chunks] 142 143 def create_embeddings_batched(self, texts: list[str], batch_size: int = 50) -> list[list[float]]: 144 """Create embeddings in batches to avoid rate limits""" 145 all_embeddings = [] 146 147 for i in tqdm(range(0, len(texts), batch_size), desc="Creating embeddings"): 148 batch = texts[i:i + batch_size] 149 150 try: 151 response = self.client.embeddings.create( 152 model=self.embedding_model, 153 input=batch 154 ) 155 all_embeddings.extend([e.embedding for e in response.data]) 156 except Exception as e: 157 if "rate limit" in str(e).lower(): 158 print(f"Rate limit hit, waiting 60 seconds...") 159 time.sleep(60) 160 # Retry the same batch 161 response = self.client.embeddings.create( 162 model=self.embedding_model, 163 input=batch 164 ) 165 all_embeddings.extend([e.embedding for e in response.data]) 166 else: 167 raise 168 169 # Add delay between batches to avoid rate limits 170 if i + batch_size < len(texts): 171 time.sleep(2) 172 173 return all_embeddings 174 175 def clean_text(self, text): 176 """Remove null bytes and other problematic characters""" 177 if text is None: 178 return "" 179 return text.replace('\x00', '') 180 181 def get_next_id(self) -> int: 182 """Get the next available ID from PostgreSQL""" 183 cur = self.conn.cursor() 184 cur.execute( 185 "SELECT COALESCE(MAX(CAST(id AS INTEGER)), 0) + 1 FROM embeddings WHERE id ~ '^[0-9]+$'") 186 next_id = cur.fetchone()[0] 187 return next_id 188 189 def inject_article(self, filepath: str): 190 """ 191 Load a markdown article, chunk it, create embeddings, and insert into PostgreSQL 192 193 Args: 194 filepath: Absolute path to the markdown file 195 """ 196 print(f"Loading article from: {filepath}") 197 198 # Load document 199 document = self.load_document(filepath) 200 print(f"Title: {document['title']}") 201 print(f"Tags: {document['tags']}") 202 203 # Process into chunks 204 print("Processing document into chunks...") 205 chunks = self.process_document(document) 206 print(f"Created {len(chunks)} chunks") 207 208 # Create embeddings 209 texts = [chunk.page_content for chunk in chunks] 210 vectors = self.create_embeddings_batched(texts, batch_size=50) 211 212 # Get starting ID 213 start_id = self.get_next_id() 214 print(f"Starting ID: {start_id}") 215 216 # Insert into PostgreSQL 217 cur = self.conn.cursor() 218 219 for i, (chunk, embedding) in enumerate(zip(chunks, vectors)): 220 doc_id = str(start_id + i) 221 cleaned_doc = self.clean_text(chunk.page_content) 222 223 # Clean metadata values if they're strings 224 cleaned_metadata = {} 225 for key, value in chunk.metadata.items(): 226 if isinstance(value, str): 227 cleaned_metadata[key] = self.clean_text(value) 228 else: 229 cleaned_metadata[key] = value 230 231 try: 232 cur.execute( 233 """ 234 INSERT INTO embeddings (id, content, metadata, embedding) 235 VALUES (%s, %s, %s, %s) 236 ON CONFLICT (id) DO UPDATE 237 SET content = EXCLUDED.content, 238 metadata = EXCLUDED.metadata, 239 embedding = EXCLUDED.embedding 240 """, 241 (doc_id, cleaned_doc, json.dumps(cleaned_metadata), embedding) 242 ) 243 except Exception as e: 244 print(f"Error inserting chunk {i} (id: {doc_id}): {e}") 245 self.conn.rollback() 246 continue 247 248 self.conn.commit() 249 print(f"Successfully injected {len(chunks)} chunks into PostgreSQL") 250 251 # Verify 252 cur.execute("SELECT COUNT(*) FROM embeddings") 253 total = cur.fetchone()[0] 254 print(f"Total documents in database: {total}") 255 256 def close(self): 257 """Close database connection""" 258 self.conn.close() 259 260 261if __name__ == "__main__": 262 import sys 263 264 if len(sys.argv) < 2: 265 print("Usage: uv run step3_inject_new_article.py <path_to_markdown_file>") 266 sys.exit(1) 267 268 filepath = sys.argv[1] 269 270 if not os.path.exists(filepath): 271 print(f"Error: File not found: {filepath}") 272 sys.exit(1) 273 274 injector = ArticleInjector(average_chunk_size=2500) 275 276 try: 277 injector.inject_article(filepath) 278 finally: 279 injector.close()
1.4. Step 4. Wrap our Agentic Solution into a Class
1.4. Step 4. Wrap our Agentic Solution into a Class
1import psycopg2 2from pgvector.psycopg2 import register_vector 3from dotenv import load_dotenv 4import os 5from openai import AzureOpenAI 6from pydantic import BaseModel, Field 7 8load_dotenv(override=True) 9 10 11class Result(BaseModel): 12 page_content: str 13 metadata: dict 14 15 16class RankOrder(BaseModel): 17 order: list[int] = Field( 18 description="The order of relevance of chunks, from most relevant to least relevant, by chunk id number" 19 ) 20 21 22class RAGQuestionAnswerer: 23 """RAG system for answering questions using PostgreSQL vector store""" 24 25 def __init__(self, retrieval_k: int = 10): 26 # PostgreSQL connection 27 self.conn = psycopg2.connect( 28 host=os.getenv("POSTGRES_HOST"), 29 database=os.getenv("POSTGRES_DATABASE"), 30 user=os.getenv("POSTGRES_USER"), 31 password=os.getenv("POSTGRES_PASSWORD"), 32 sslmode="require" 33 ) 34 register_vector(self.conn) 35 36 # Azure OpenAI setup 37 self.client = AzureOpenAI( 38 api_key=os.getenv("AZURE_OPENAI_API_KEY"), 39 api_version=os.getenv("AZURE_API_VERSION"), 40 azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT") 41 ) 42 43 self.embedding_model = "text-embedding-ada-002" 44 self.model = os.getenv('AZURE_OPENAI_MODEL') 45 self.retrieval_k = retrieval_k 46 47 def create_embeddings(self, batch_of_texts: list[str]) -> list[list[float]]: 48 response = self.client.embeddings.create( 49 model=self.embedding_model, 50 input=batch_of_texts 51 ) 52 return [e.embedding for e in response.data] 53 54 def fetch_context_unranked(self, question: str) -> list[Result]: 55 """Query PostgreSQL for relevant chunks""" 56 # Get query embedding 57 query_embedding = self.create_embeddings([question])[0] 58 59 # Query PostgreSQL 60 cur = self.conn.cursor() 61 cur.execute( 62 """ 63 SELECT id, content, metadata, 64 embedding <=> %s::vector AS distance 65 FROM embeddings 66 ORDER BY distance 67 LIMIT %s 68 """, 69 (query_embedding, self.retrieval_k) 70 ) 71 72 results = cur.fetchall() 73 chunks = [] 74 for row in results: 75 chunks.append(Result( 76 page_content=row[1], 77 metadata=row[2] 78 )) 79 80 return chunks 81 82 def rerank(self, question: str, chunks: list[Result]) -> list[Result]: 83 """Rerank chunks using LLM""" 84 system_prompt = """ 85You are a document re-ranker. 86You are provided with a question and a list of relevant chunks of text from a query of a knowledge base. 87The chunks are provided in the order they were retrieved; this should be approximately ordered by relevance, but you may be able to improve on that. 88You must rank order the provided chunks by relevance to the question, with the most relevant chunk first. 89Reply only with the list of ranked chunk ids, nothing else. Include all the chunk ids you are provided with, reranked. 90""" 91 user_prompt = f"The user has asked the following question:\n\n{question}\n\nOrder all the chunks of text by relevance to the question, from most relevant to least relevant. Include all the chunk ids you are provided with, reranked.\n\n" 92 user_prompt += "Here are the chunks:\n\n" 93 for index, chunk in enumerate(chunks): 94 user_prompt += f"# CHUNK ID: {index + 1}:\n\n{chunk.page_content}\n\n" 95 user_prompt += "Reply only with the list of ranked chunk ids, nothing else." 96 97 messages = [ 98 {"role": "system", "content": system_prompt}, 99 {"role": "user", "content": user_prompt}, 100 ] 101 102 response = self.client.beta.chat.completions.parse( 103 model=self.model, 104 messages=messages, 105 response_format=RankOrder, 106 max_tokens=100 # Short JSON array output 107 ) 108 reply = response.choices[0].message.parsed 109 order = reply.order 110 print(f"Reranked order: {order}") 111 return [chunks[i - 1] for i in order] 112 113 def fetch_reranked_context(self, question: str) -> list[Result]: 114 """Fetch and rerank context""" 115 chunks = self.fetch_context_unranked(question) 116 return self.rerank(question, chunks) 117 118 def rewrite_query(self, question: str, history: list = []) -> str: 119 """Rewrite the user's question to be more specific""" 120 sys_message = f""" 121You are in a conversation with a user, answering questions about the articles from the blog of James Lee. 122You are about to look up information in a Knowledge Base to answer the user's question. 123 124This is the history of your conversation so far with the user: 125{history} 126 127And this is the user's current question: 128{question} 129 130Respond only with a single, refined question that you will use to search the Knowledge Base. 131It should be a VERY short specific question most likely to surface content. Focus on the question details. 132IMPORTANT: Respond ONLY with the knowledgebase query, nothing else. 133 134Dont mention the name James Lee 135""" 136 response = self.client.chat.completions.create( 137 model=self.model, 138 messages=[{"role": "system", "content": sys_message}], 139 max_tokens=100 # Short rewritten query 140 ) 141 return response.choices[0].message.content 142 143 def make_rag_messages(self, question: str, history: list, chunks: list[Result]) -> list[dict]: 144 """Create messages for RAG""" 145 SYSTEM_PROMPT = """ 146You are a knowledgeable, friendly assistant to search for articles in the blog of James Lee. 147You are chatting with a user about finding related articles. 148Your answer will be evaluated for accuracy, relevance and completeness, so make sure it only answers the question and fully answers it. 149If you don't know the answer, say so. 150For context, here are specific extracts from the Knowledge Base that might be directly relevant to the user's question: 151{context} 152 153With this context, please answer the user's question. Be accurate, relevant and complete. 154""" 155 context = "\n\n".join( 156 f"Extract from article titled '{chunk.metadata['title']}':\n{chunk.page_content}" 157 for chunk in chunks 158 ) 159 system_prompt = SYSTEM_PROMPT.format(context=context) 160 return [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": question}] 161 162 def answer_question(self, question: str, history: list[dict] = []) -> tuple[str, list]: 163 """ 164 Answer a question using RAG and PostgreSQL vector store 165 166 Args: 167 question: The user's question 168 history: Conversation history 169 170 Returns: 171 tuple: (answer, retrieved_chunks) 172 """ 173 # Rewrite query for better retrieval 174 query = self.rewrite_query(question, history) 175 print(f"Rewritten query: {query}") 176 177 # Fetch and rerank context 178 chunks = self.fetch_reranked_context(query) 179 180 # Generate answer 181 messages = self.make_rag_messages(question, history, chunks) 182 response = self.client.chat.completions.create( 183 model=self.model, 184 messages=messages, 185 max_tokens=800 # Limit answer length for faster responses 186 ) 187 188 return response.choices[0].message.content, chunks 189 190 def close(self): 191 """Close database connection""" 192 self.conn.close() 193 194 195if __name__ == "__main__": 196 # Example usage 197 rag = RAGQuestionAnswerer(retrieval_k=10) 198 199 try: 200 question = "restore database " 201 answer, chunks = rag.answer_question(question) 202 203 print("\n" + "="*80) 204 print("QUESTION:", question) 205 print("="*80) 206 print("\nANSWER:", answer) 207 # print("\n" + "="*80) 208 titles = [chunk.metadata['title'] for chunk in chunks] 209 # print(titles) 210 # print("="*80) 211 finally: 212 rag.close()
2. Expose this Service via a Fast-API Endpoint
2. Expose this Service via a Fast-API Endpoint
1@app.get("/articles") 2async def answer(question: str): 3 from src.RAGQuestionAnswerer import RAGQuestionAnswerer 4 5 rag = RAGQuestionAnswerer(retrieval_k=10) 6 answer, chunks = rag.answer_question(question) 7 8 titles = [chunk.metadata['title'] for chunk in chunks] 9 return {"answer": answer, "titles": titles}
3. Quick fix for Data in Jsonb Column
3. Quick fix for Data in Jsonb Column
Our agentic solution make use of metadata to route users to different answers.
Assume that such data are simply "title" or "id" of the related documents, then we can update it arbitrarily in postgresql as follows:
3.1. Find Articles for Direct Adjustment
3.1. Find Articles for Direct Adjustment
1--- find article with target title 2SELECT id, metadata->>'title' as current_title 3FROM embeddings 4WHERE metadata->>'title' ILIKE '%Without Langchain%';
3.2. Update Target Articles
3.2. Update Target Articles
1--- update those article 2UPDATE embeddings 3SET metadata = jsonb_set( 4 metadata, 5 '{title}', 6 '"RAG Deployment Part 1: Semantic Chunking, Agentic Rephase and Reranking; Chroma Database"' 7) 8WHERE metadata->>'title' ILIKE '%Without Langchain%';











