0%

RAG Deployment Part 2: Transition from Chroma DB to PostgreSQL

December 20, 2025

Llm

Rag

Vectordb

1. Transition from Chroma to PostgreSQL

1.1. Step 1. Create Remote Database with Target Schema

Setup a PostgreSQL Databse with your own choice of cloud service provider:

import psycopg2
from pgvector.psycopg2 import register_vector
import json
from dotenv import load_dotenv
import os

load_dotenv(override=True)

conn = psycopg2.connect(
    host=os.getenv("POSTGRES_HOST"),
    database=os.getenv("POSTGRES_DATABASE"),
    user=os.getenv("POSTGRES_USER"),
    password=os.getenv("POSTGRES_PASSWORD"),
    sslmode="require"  # Neon requires SSL
)

cur = conn.cursor()

try:
    cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
    conn.commit()
    print("Vector extension created successfully")
except Exception as e:
    print(f"Error creating extension: {e}")
    conn.rollback()
    # Check if extension exists
    cur.execute("SELECT * FROM pg_available_extensions WHERE name = 'vector';")
    result = cur.fetchone()
    if result:
        print("Vector extension is available but not installed. Contact Neon support.")
    else:
        print("Vector extension is not available in this database.")
    exit(1)

register_vector(conn)

cur.execute("""
    DROP TABLE IF EXISTS embeddings;
    
    CREATE TABLE embeddings (
        id TEXT PRIMARY KEY,
        content TEXT,
        metadata JSONB,
        embedding vector(1536)
    );
    
    CREATE INDEX ON embeddings USING ivfflat (embedding vector_cosine_ops)
    WITH (lists = 100);
""")
conn.commit()
print("Table created successfully")

1.2. Step 2. Migrate from Chroma DB to PostgreSQL DB

import psycopg2
from pgvector.psycopg2 import register_vector
import json
from chromadb import PersistentClient
from dotenv import load_dotenv
import os

load_dotenv(override=True)

CHROMA_DB_NAME = "preprocessed_db"
COLLECTION_NAME = "docs"

conn = psycopg2.connect(
    host=os.getenv("POSTGRES_HOST"),
    database=os.getenv("POSTGRES_DATABASE"),
    user=os.getenv("POSTGRES_USER"),
    password=os.getenv("POSTGRES_PASSWORD"),
    sslmode="require"
)
register_vector(conn)


def clean_text(text):
    """Remove null bytes and other problematic characters"""
    if text is None:
        return ""
    return text.replace('\x00', '')


def migrate_chroma_to_postgres():
    # Load from ChromaDB
    chroma = PersistentClient(path=CHROMA_DB_NAME)
    collection = chroma.get_collection(COLLECTION_NAME)
    result = collection.get(include=['embeddings', 'documents', 'metadatas'])

    # Insert into PostgreSQL
    cur = conn.cursor()

    for i, (doc_id, doc, embedding, metadata) in enumerate(
        zip(result['ids'], result['documents'],
            result['embeddings'], result['metadatas'])
    ):
        # Clean the document text
        cleaned_doc = clean_text(doc)

        # Clean metadata values if they're strings
        cleaned_metadata = {}
        for key, value in metadata.items():
            if isinstance(value, str):
                cleaned_metadata[key] = clean_text(value)
            else:
                cleaned_metadata[key] = value

        try:
            cur.execute(
                """
                INSERT INTO embeddings (id, content, metadata, embedding)
                VALUES (%s, %s, %s, %s)
                ON CONFLICT (id) DO UPDATE
                SET content = EXCLUDED.content,
                    metadata = EXCLUDED.metadata,
                    embedding = EXCLUDED.embedding
                """,
                (doc_id, cleaned_doc, json.dumps(cleaned_metadata), embedding)
            )
        except Exception as e:
            print(f"Error at document {i} (id: {doc_id}): {e}")
            conn.rollback()
            continue

        if i % 100 == 0:
            conn.commit()
            print(f"Migrated {i} documents...")

    conn.commit()
    print(f"Migration complete! Total documents: {len(result['ids'])}")


# Run migration
migrate_chroma_to_postgres()

1.3. Step 3. Inject New Article into PostgreSQL Vector DB

When everything works properly we would like to directly inject the data into the vector db without going though chroma again.

Most of the implmentations are directly coped from:

import psycopg2
from pgvector.psycopg2 import register_vector
from dotenv import load_dotenv
import os
from openai import AzureOpenAI
from litellm import completion
from pydantic import BaseModel, Field
import frontmatter
import re
import time
import json
from typing import TypedDict
from tqdm import tqdm

load_dotenv(override=True)

# Set environment variables for litellm
os.environ["AZURE_API_KEY"] = os.getenv("AZURE_OPENAI_API_KEY")
os.environ["AZURE_API_BASE"] = os.getenv("AZURE_OPENAI_ENDPOINT")
os.environ["AZURE_API_VERSION"] = os.getenv("AZURE_API_VERSION")


class CustomDocument(TypedDict):
    tags: str
    title: str
    text: str


class Result(BaseModel):
    page_content: str
    metadata: dict


class Chunk(BaseModel):
    headline: str = Field(
        description="A brief heading for this chunk, typically a few words, that is most likely to be surfaced in a query. This headline must be in English")
    summary: str = Field(
        description="A few sentences summarizing the content of this chunk to answer common questions, this summary must be in English")
    original_text: str = Field(
        description="The original text of this chunk from the provided document, exactly as is, not changed in any way")

    def as_result(self, document):
        metadata = {"title": document["title"], "tags": document["tags"]}
        return Result(page_content=self.headline + "\n\n" + self.summary + "\n\n" + self.original_text, metadata=metadata)


class Chunks(BaseModel):
    chunks: list[Chunk]


class ArticleInjector:
    """Inject new articles into PostgreSQL vector database"""

    def __init__(self, average_chunk_size: int = 2500):
        # PostgreSQL connection
        self.conn = psycopg2.connect(
            host=os.getenv("POSTGRES_HOST"),
            database=os.getenv("POSTGRES_DATABASE"),
            user=os.getenv("POSTGRES_USER"),
            password=os.getenv("POSTGRES_PASSWORD"),
            sslmode="require"
        )
        register_vector(self.conn)

        # Azure OpenAI setup
        self.client = AzureOpenAI(
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
            api_version=os.getenv("AZURE_API_VERSION"),
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
        )

        self.embedding_model = "text-embedding-ada-002"
        self.model = f"azure/{os.getenv('AZURE_OPENAI_MODEL')}"
        self.average_chunk_size = average_chunk_size

    def get_tags_and_title_from_blogpost(self, filepath: str) -> tuple[str, str]:
        try:
            blog_post = frontmatter.load(filepath)
        except Exception as e:
            print(f"Error loading {filepath}: {e}")
            raise

        tags = blog_post.get("tag", "")
        title = blog_post.get("title", "")

        if isinstance(tags, list):
            tags = ",".join(sorted(tags))
        elif isinstance(tags, str) and "," in tags:
            tags = ",".join(sorted([t.strip() for t in tags.split(",")]))
        return tags, title

    def load_document(self, filepath: str) -> CustomDocument:
        """Load a single markdown file and return as CustomDocument"""
        blog_post = frontmatter.load(filepath)
        tags, title = self.get_tags_and_title_from_blogpost(filepath)

        # Get content without frontmatter
        text = blog_post.content

        # Remove <style>...</style> blocks (including multiline)
        text = re.sub(r'<style[^>]*>.*?</style>', '',
                      text, flags=re.DOTALL | re.IGNORECASE)

        # Clean up extra whitespace
        text = text.strip()

        return CustomDocument(tags=tags, title=title, text=text)

    def make_user_prompt(self, document: CustomDocument):
        how_many = (len(document["text"]) // self.average_chunk_size) + 1
        return f"""
            You take a document and you split the document into overlapping chunks for a KnowledgeBase.

            The document is from the articles from Blog of James Lee.
            The document is of tags: {document["tags"]}
            The document has title: {document["title"]}

            A chatbot will use these chunks to answer questions about the articles and retrieve a related list of articles for the reader.
            You should divide up the document as you see fit, being sure that the entire document is returned in the chunks - don't leave anything out.
            This document should probably be split into {how_many} chunks, but you can have more or less as appropriate.
            There should be overlap between the chunks as appropriate; typically about 25% overlap or about 50 words, so you have the same text in multiple chunks for best retrieval results.

            For each chunk, you should provide a headline, a summary, and the original text of the chunk.
            Together your chunks should represent the entire document with overlap.

            Here is the document:

            {document["text"]}

            Respond with the chunks.
        """

    def process_document(self, document: CustomDocument) -> list[Result]:
        """Process document into chunks using LLM"""
        messages = [
            {"role": "user", "content": self.make_user_prompt(document)}]
        response = completion(
            model=self.model, messages=messages, response_format=Chunks)
        reply = response.choices[0].message.content
        doc_as_chunks = Chunks.model_validate_json(reply).chunks
        return [chunk.as_result(document) for chunk in doc_as_chunks]

    def create_embeddings_batched(self, texts: list[str], batch_size: int = 50) -> list[list[float]]:
        """Create embeddings in batches to avoid rate limits"""
        all_embeddings = []

        for i in tqdm(range(0, len(texts), batch_size), desc="Creating embeddings"):
            batch = texts[i:i + batch_size]

            try:
                response = self.client.embeddings.create(
                    model=self.embedding_model,
                    input=batch
                )
                all_embeddings.extend([e.embedding for e in response.data])
            except Exception as e:
                if "rate limit" in str(e).lower():
                    print(f"Rate limit hit, waiting 60 seconds...")
                    time.sleep(60)
                    # Retry the same batch
                    response = self.client.embeddings.create(
                        model=self.embedding_model,
                        input=batch
                    )
                    all_embeddings.extend([e.embedding for e in response.data])
                else:
                    raise

            # Add delay between batches to avoid rate limits
            if i + batch_size < len(texts):
                time.sleep(2)

        return all_embeddings

    def clean_text(self, text):
        """Remove null bytes and other problematic characters"""
        if text is None:
            return ""
        return text.replace('\x00', '')

    def get_next_id(self) -> int:
        """Get the next available ID from PostgreSQL"""
        cur = self.conn.cursor()
        cur.execute(
            "SELECT COALESCE(MAX(CAST(id AS INTEGER)), 0) + 1 FROM embeddings WHERE id ~ '^[0-9]+$'")
        next_id = cur.fetchone()[0]
        return next_id

    def inject_article(self, filepath: str):
        """
        Load a markdown article, chunk it, create embeddings, and insert into PostgreSQL

        Args:
            filepath: Absolute path to the markdown file
        """
        print(f"Loading article from: {filepath}")

        # Load document
        document = self.load_document(filepath)
        print(f"Title: {document['title']}")
        print(f"Tags: {document['tags']}")

        # Process into chunks
        print("Processing document into chunks...")
        chunks = self.process_document(document)
        print(f"Created {len(chunks)} chunks")

        # Create embeddings
        texts = [chunk.page_content for chunk in chunks]
        vectors = self.create_embeddings_batched(texts, batch_size=50)

        # Get starting ID
        start_id = self.get_next_id()
        print(f"Starting ID: {start_id}")

        # Insert into PostgreSQL
        cur = self.conn.cursor()

        for i, (chunk, embedding) in enumerate(zip(chunks, vectors)):
            doc_id = str(start_id + i)
            cleaned_doc = self.clean_text(chunk.page_content)

            # Clean metadata values if they're strings
            cleaned_metadata = {}
            for key, value in chunk.metadata.items():
                if isinstance(value, str):
                    cleaned_metadata[key] = self.clean_text(value)
                else:
                    cleaned_metadata[key] = value

            try:
                cur.execute(
                    """
                    INSERT INTO embeddings (id, content, metadata, embedding)
                    VALUES (%s, %s, %s, %s)
                    ON CONFLICT (id) DO UPDATE
                    SET content = EXCLUDED.content,
                        metadata = EXCLUDED.metadata,
                        embedding = EXCLUDED.embedding
                    """,
                    (doc_id, cleaned_doc, json.dumps(cleaned_metadata), embedding)
                )
            except Exception as e:
                print(f"Error inserting chunk {i} (id: {doc_id}): {e}")
                self.conn.rollback()
                continue

        self.conn.commit()
        print(f"Successfully injected {len(chunks)} chunks into PostgreSQL")

        # Verify
        cur.execute("SELECT COUNT(*) FROM embeddings")
        total = cur.fetchone()[0]
        print(f"Total documents in database: {total}")

    def close(self):
        """Close database connection"""
        self.conn.close()


if __name__ == "__main__":
    import sys

    if len(sys.argv) < 2:
        print("Usage: uv run step3_inject_new_article.py <path_to_markdown_file>")
        sys.exit(1)

    filepath = sys.argv[1]

    if not os.path.exists(filepath):
        print(f"Error: File not found: {filepath}")
        sys.exit(1)

    injector = ArticleInjector(average_chunk_size=2500)

    try:
        injector.inject_article(filepath)
    finally:
        injector.close()

1.4. Step 4. Wrap our Agentic Solution into a Class

import psycopg2
from pgvector.psycopg2 import register_vector
from dotenv import load_dotenv
import os
from openai import AzureOpenAI
from pydantic import BaseModel, Field

load_dotenv(override=True)


class Result(BaseModel):
    page_content: str
    metadata: dict


class RankOrder(BaseModel):
    order: list[int] = Field(
        description="The order of relevance of chunks, from most relevant to least relevant, by chunk id number"
    )


class RAGQuestionAnswerer:
    """RAG system for answering questions using PostgreSQL vector store"""

    def __init__(self, retrieval_k: int = 10):
        # PostgreSQL connection
        self.conn = psycopg2.connect(
            host=os.getenv("POSTGRES_HOST"),
            database=os.getenv("POSTGRES_DATABASE"),
            user=os.getenv("POSTGRES_USER"),
            password=os.getenv("POSTGRES_PASSWORD"),
            sslmode="require"
        )
        register_vector(self.conn)

        # Azure OpenAI setup
        self.client = AzureOpenAI(
            api_key=os.getenv("AZURE_OPENAI_API_KEY"),
            api_version=os.getenv("AZURE_API_VERSION"),
            azure_endpoint=os.getenv("AZURE_OPENAI_ENDPOINT")
        )

        self.embedding_model = "text-embedding-ada-002"
        self.model = os.getenv('AZURE_OPENAI_MODEL')
        self.retrieval_k = retrieval_k

    def create_embeddings(self, batch_of_texts: list[str]) -> list[list[float]]:
        response = self.client.embeddings.create(
            model=self.embedding_model,
            input=batch_of_texts
        )
        return [e.embedding for e in response.data]

    def fetch_context_unranked(self, question: str) -> list[Result]:
        """Query PostgreSQL for relevant chunks"""
        # Get query embedding
        query_embedding = self.create_embeddings([question])[0]

        # Query PostgreSQL
        cur = self.conn.cursor()
        cur.execute(
            """
            SELECT id, content, metadata,
                   embedding <=> %s::vector AS distance
            FROM embeddings
            ORDER BY distance
            LIMIT %s
            """,
            (query_embedding, self.retrieval_k)
        )

        results = cur.fetchall()
        chunks = []
        for row in results:
            chunks.append(Result(
                page_content=row[1],
                metadata=row[2]
            ))

        return chunks

    def rerank(self, question: str, chunks: list[Result]) -> list[Result]:
        """Rerank chunks using LLM"""
        system_prompt = """
You are a document re-ranker.
You are provided with a question and a list of relevant chunks of text from a query of a knowledge base.
The chunks are provided in the order they were retrieved; this should be approximately ordered by relevance, but you may be able to improve on that.
You must rank order the provided chunks by relevance to the question, with the most relevant chunk first.
Reply only with the list of ranked chunk ids, nothing else. Include all the chunk ids you are provided with, reranked.
"""
        user_prompt = f"The user has asked the following question:\n\n{question}\n\nOrder all the chunks of text by relevance to the question, from most relevant to least relevant. Include all the chunk ids you are provided with, reranked.\n\n"
        user_prompt += "Here are the chunks:\n\n"
        for index, chunk in enumerate(chunks):
            user_prompt += f"# CHUNK ID: {index + 1}:\n\n{chunk.page_content}\n\n"
        user_prompt += "Reply only with the list of ranked chunk ids, nothing else."

        messages = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]

        response = self.client.beta.chat.completions.parse(
            model=self.model,
            messages=messages,
            response_format=RankOrder,
            max_tokens=100  # Short JSON array output
        )
        reply = response.choices[0].message.parsed
        order = reply.order
        print(f"Reranked order: {order}")
        return [chunks[i - 1] for i in order]

    def fetch_reranked_context(self, question: str) -> list[Result]:
        """Fetch and rerank context"""
        chunks = self.fetch_context_unranked(question)
        return self.rerank(question, chunks)

    def rewrite_query(self, question: str, history: list = []) -> str:
        """Rewrite the user's question to be more specific"""
        sys_message = f"""
You are in a conversation with a user, answering questions about the articles from the blog of James Lee.
You are about to look up information in a Knowledge Base to answer the user's question.

This is the history of your conversation so far with the user:
{history}

And this is the user's current question:
{question}

Respond only with a single, refined question that you will use to search the Knowledge Base.
It should be a VERY short specific question most likely to surface content. Focus on the question details.
IMPORTANT: Respond ONLY with the knowledgebase query, nothing else.

Dont mention the name James Lee
"""
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "system", "content": sys_message}],
            max_tokens=100  # Short rewritten query
        )
        return response.choices[0].message.content

    def make_rag_messages(self, question: str, history: list, chunks: list[Result]) -> list[dict]:
        """Create messages for RAG"""
        SYSTEM_PROMPT = """
You are a knowledgeable, friendly assistant to search for articles in the blog of James Lee.
You are chatting with a user about finding related articles.
Your answer will be evaluated for accuracy, relevance and completeness, so make sure it only answers the question and fully answers it.
If you don't know the answer, say so.
For context, here are specific extracts from the Knowledge Base that might be directly relevant to the user's question:
{context}

With this context, please answer the user's question. Be accurate, relevant and complete.
"""
        context = "\n\n".join(
            f"Extract from article titled '{chunk.metadata['title']}':\n{chunk.page_content}"
            for chunk in chunks
        )
        system_prompt = SYSTEM_PROMPT.format(context=context)
        return [{"role": "system", "content": system_prompt}] + history + [{"role": "user", "content": question}]

    def answer_question(self, question: str, history: list[dict] = []) -> tuple[str, list]:
        """
        Answer a question using RAG and PostgreSQL vector store

        Args:
            question: The user's question
            history: Conversation history

        Returns:
            tuple: (answer, retrieved_chunks)
        """
        # Rewrite query for better retrieval
        query = self.rewrite_query(question, history)
        print(f"Rewritten query: {query}")

        # Fetch and rerank context
        chunks = self.fetch_reranked_context(query)

        # Generate answer
        messages = self.make_rag_messages(question, history, chunks)
        response = self.client.chat.completions.create(
            model=self.model,
            messages=messages,
            max_tokens=800  # Limit answer length for faster responses
        )

        return response.choices[0].message.content, chunks

    def close(self):
        """Close database connection"""
        self.conn.close()


if __name__ == "__main__":
    # Example usage
    rag = RAGQuestionAnswerer(retrieval_k=10)

    try:
        question = "restore database "
        answer, chunks = rag.answer_question(question)

        print("\n" + "="*80)
        print("QUESTION:", question)
        print("="*80)
        print("\nANSWER:", answer)
        # print("\n" + "="*80)
        titles = [chunk.metadata['title'] for chunk in chunks]
        # print(titles)
        # print("="*80)
    finally:
        rag.close()

2. Expose this Service via a Fast-API Endpoint

@app.get("/articles")
async def answer(question: str):
    from src.RAGQuestionAnswerer import RAGQuestionAnswerer

    rag = RAGQuestionAnswerer(retrieval_k=10)
    answer, chunks = rag.answer_question(question)

    titles = [chunk.metadata['title'] for chunk in chunks]
    return {"answer": answer, "titles": titles}

3. Quick fix for Data in Jsonb Column

Our agentic solution make use of metadata to route users to different answers.

Assume that such data are simply "title" or "id" of the related documents, then we can update it arbitrarily in postgresql as follows:

3.1. Find Articles for Direct Adjustment

--- find article with target title
SELECT id, metadata->>'title' as current_title
FROM embeddings
WHERE metadata->>'title' ILIKE '%Without Langchain%';

3.2. Update Target Articles

--- update those article
UPDATE embeddings
SET metadata = jsonb_set(
    metadata,
    '{title}',
    '"RAG Deployment Part 1: Semantic Chunking, Agentic Rephase and Reranking; Chroma Database"'
)
WHERE metadata->>'title' ILIKE '%Without Langchain%';