RAG-Based Product Discovery


TLDR: From Clunky Search to Conversational Commerce

  • Challenge

    • For a mid-sized European e-commerce marketplace, the traditional keyword-based search was a significant source of user frustration and lost revenue. It failed to understand user intent, could not handle synonyms or multimodal queries (images), and led to high rates of “no results found,” causing customers to abandon their purchasing journey.

  • My Role & Solution

    • As the lead ML/GenAI Engineer, I designed and implemented an end-to-end Multimodal Retrieval-Augmented Generation (RAG) system to transform the search bar into an intelligent, conversational shopping assistant. My contributions spanned the entire MLOps lifecycle:

      1. Strategy & Architecture: I architected a scalable, cost-effective solution on AWS, leveraging a modular, microservices-based design.
      2. Data Ingestion & Indexing: I built the automated pipelines using AWS Glue, Lambda, and Step Functions to process and index the multimodal product catalog (text, images, PDFs) into Amazon OpenSearch.
      3. RAG Development & Iteration: I drove the experimentation process, systematically progressing from a baseline RAG to implementing advanced strategies like Hybrid Search, Contextual Re-ranking, and Multimodal Retrieval using Amazon Bedrock (Claude 3, Titan Embeddings) and LangChain.
      4. Continual Learning: I designed and built the “Data Flywheel” pipeline using Amazon SageMaker to periodically fine-tune the embedding model on real user interaction data, ensuring the system continuously improves.
      5. Productionization & Monitoring: I engineered the low-latency Inference Pipeline on AWS Fargate, established a comprehensive observability stack using LangSmith and CloudWatch, and implemented a rigorous A/B testing framework to validate business impact in production.

  • Impact

    • The project successfully transitioned the search experience from a liability to a strategic asset. By deploying this system, we achieved measurable improvements in key business metrics over six months of A/B testing:
      - +12% Increase in Search-to-Purchase Conversion Rate: Users found relevant products more quickly and confidently.
      - +7% Increase in Average Order Value (AOV): The conversational assistant effectively cross-sold and recommended relevant complementary products.
      - -85% Reduction in “No Results Found” Rate: The system’s semantic and multimodal understanding dramatically improved query success.
      - Maintained <500ms p99 Latency: Ensured a fast and responsive user experience even at peak traffic.

System Architecture

The final architecture is a robust, event-driven system on AWS that balances real-time performance with offline continuous improvement. My core contributions are highlighted in green.



2. Problem Framing: From Business Need to a Measurable ML Vision

Building a successful Generative AI application does not begin with algorithms or infrastructure; it begins with a rigorous process of Problem Framing. This foundational stage translates a high-level business objective into a well-defined, feasible, and measurable machine learning task. A flawed framing can lead a project to deliver a technically impressive system that provides no real business value. This section outlines the structured process used to define the vision for the RAG-based search system.

A. Setting the Business Objectives

The project originated from a core business need: to transform the underperforming, keyword-based search bar into a revenue-driving, customer-centric product discovery engine. After aligning with key stakeholders—including Product Management, Sales, and Engineering—the primary business objectives were defined:

  • Increase Revenue: Directly tie the improved search experience to top-line growth by increasing conversion rates and average order value.

  • Enhance Customer Satisfaction: Reduce the friction and frustration inherent in traditional search, leading to higher user engagement, longer session durations, and improved brand loyalty.

  • Reduce Operational Overhead: Automate the process of understanding product relationships and user intent, eliminating the costly and scalable manual effort required to maintain keyword lists and synonym dictionaries.

B. Is RAG the Right Approach? (GenAI Use Case Evaluation)

Before committing to a complex RAG architecture, a critical evaluation was performed to determine if Generative AI was the appropriate solution.

When to Use RAG for E-commerce Search:

  • Complex Patterns: User queries are expressed in natural language, full of nuance, context, and ambiguity that simple rule-based systems cannot handle. RAG is designed to interpret these complex linguistic patterns.

  • Existing Knowledge Base: A rich, diverse knowledge base already exists in the form of product catalogs, detailed descriptions, customer reviews, and images. RAG excels at activating this latent knowledge.

  • Need for Grounded Generation: The system must provide factually accurate information about products. RAG’s ability to ground LLM responses in retrieved, authoritative data is critical for preventing hallucinations and building user trust.

  • Scalability and Evolving Data: The product catalog is constantly changing. A RAG system, supported by automated ingestion pipelines, can adapt to this evolving knowledge base far more effectively than a manually curated system.

When a Simpler Solution Might Suffice:

  • If the product catalog were small, static, and had a very simple query space, a simpler solution involving keyword search with a well-maintained synonym list might be adequate. However, given the scale, diversity, and dynamic nature of a modern e-commerce marketplace, this approach was deemed insufficient and unsustainable.

The Value Proposition of RAG-Powered Search

The analysis concluded that RAG was the optimal approach. It enables a paradigm shift from a simple search box to a conversational shopping assistant that understands intent, answers specific questions, and guides users through their discovery journey. This creates a powerful data flywheel: as more users engage with the improved system, they generate richer interaction data (clicks, conversions, feedback), which can then be used to continuously fine-tune the retrieval and ranking models, creating a virtuous cycle of improvement.

C. Defining the ML Problem

The broad business objective was translated into a specific, technical machine learning problem.

  • Ideal Outcome: A user finds the product that best meets their functional needs and stylistic preferences, resulting in a confident purchase.

  • System’s Goal: Given a multimodal user query (text and/or image), the system must:

    1. Retrieve a ranked list of the most relevant product information (descriptions, specifications, images, reviews) from the knowledge base.

    2. Generate a helpful, concise, and factually accurate natural language response that answers the user’s question and presents the most relevant products, citing its sources.

  • ML Task Formulation: This is a composite system that relies on several ML tasks:

    • Information Retrieval: At its core, this is a retrieval problem. The primary task is to rank documents based on their semantic relevance to a query.

    • Text Generation: The final step involves conditional text generation by an LLM

  • The “Relevance” Proxy: The ideal outcome (“user satisfaction”) is not directly measurable as a label. Therefore, relevance is used as a proxy. This relevance is further approximated by measurable user behaviors, which serve as the implicit labels for training and evaluation:

    • Good Proxies: Purchase, Add-to-Cart. These are strong signals of relevance but are relatively sparse.

    • Weaker Proxies: Click, Dwell Time. These are more abundant but can be noisy (e.g., a user might click on an irrelevant but eye-catching product).

D. Assessing Feasibility & Risks

A thorough feasibility assessment was conducted to identify potential risks and ensure the project was achievable within the given constraints.

Category

Assessment

Notes & Mitigation Strategy

Data

Medium Risk

Risk: Inconsistent quality and structure across product descriptions and reviews. Mitigation: Invest heavily in a robust data cleaning and pre-processing pipeline. Start with a single, clean product category for the PoC.

Problem Difficulty

High Risk

Risk: Multimodal and multilingual RAG are technically complex. Achieving low latency at scale is a significant engineering challenge. Mitigation: Adopt a phased approach. Begin with text-only RAG, then incrementally add multimodal and multilingual capabilities after the core system is validated.

Prediction Quality

High Risk

Risk: The cost of errors is high. An irrelevant result is a lost sale. A hallucinated product feature erodes trust and can lead to returns. Mitigation: Prioritize groundedness and factual accuracy in model evaluation. Implement rigorous prompt engineering and output guardrails.

Technical Requirements

High Risk

Risk: Sub-second p99 latency is required for a good user experience. LLM API and vector database costs can scale unpredictably. Mitigation: Architect for performance from day one with aggressive caching. Implement strict cost monitoring and alerting. Use smaller, faster models where possible.

Ethics & Fairness

Medium Risk

Risk: Potential for the model to amplify biases present in historical data (e.g., in product reviews or sales trends). Mitigation: Implement bias detection tools during evaluation. Ensure fairness is a key metric in re-ranker training. Provide transparency to users.

E. Defining Success Metrics

A multi-layered set of metrics was defined to provide a holistic view of the system’s performance, from its operational health to its direct impact on business goals.

Metric Type

Metric Name

How It’s Measured

Why It Matters

Business (Primary)

Search-to-Purchase Conversion Rate

A/B testing: (Purchases from Search) / (Total Search Sessions).

The ultimate measure of the system’s ability to drive revenue.

Business (Primary)

Average Order Value (AOV)

A/B testing: (Total Revenue from Search) / (Number of Orders from Search).

Measures the effectiveness of cross-selling and up-selling within the conversational experience.

Model Evaluation (Retrieval)

Mean Reciprocal Rank (MRR)

Offline evaluation against a golden dataset of (query, relevant_product) pairs.

Measures the quality of the ranking, ensuring the best result appears near the top.

Model Evaluation (Generation)

Groundedness / Faithfulness

Offline evaluation using an LLM-as-a-judge to score if the generated answer is factually consistent with the retrieved context.

The most critical AI quality metric. Directly measures the system’s resistance to hallucination and builds user trust.

Operational (Online)

p99 End-to-End Latency

Real-time monitoring of the API endpoint.

Ensures a fast and responsive user experience, which is critical for maintaining engagement and conversions.

Operational (Online)

Cost-per-Query

Aggregating costs from all services (Vector DB, LLM API, Compute) involved in a single request.

Provides financial visibility and ensures the system remains economically viable at scale.

User Engagement (Secondary)

User Feedback Ratio

Tracking the ratio of “thumbs up” to “thumbs down” clicks on generated responses.

A direct, real-time signal of user-perceived quality and satisfaction.


3. The End-to-End Project and Operational Blueprint

A. The LLMOps Tech Stack: An Architectural Blueprint

Building a production-grade RAG system requires a deliberate and holistic approach to technology selection. The MLOps Stack Canvas provides a structured framework for architecting the end-to-end system, ensuring that every component—from data ingestion to production monitoring—is chosen to meet the specific demands of a Generative AI application.

The following stack was selected to prioritize managed services for scalability and reduced operational overhead, while leveraging specialized frameworks like LangChain for rapid development and LangSmith for essential LLM-specific observability.

Canvas Block

Chosen Stack & Rationale

Key GenAI/RAG Considerations

1. Value Proposition

Project Charter & Design Docs

Goal Alignment: This block is the strategic “why.” It ensures all technical decisions directly align with the business objective: creating a conversational, multimodal product discovery engine to drive revenue and customer satisfaction.

2. Knowledge Base & Data Versioning

Vector DB: Amazon OpenSearch (k-NN)
Data Lake: AWS S3
Versioning: DVC

The Heart of RAG: The Vector DB is the indexed, searchable knowledge base. OpenSearch is chosen for its scalability and hybrid search capabilities. S3 serves as the raw data lake for documents and images before they are processed and indexed. DVC versions the curated datasets used for model fine-tuning.

3. Experiment Management

Amazon SageMaker Studio (Notebooks) & LangSmith (Tracing/Evaluation)

Shift from Model to Pipeline: For RAG, the core “experiment” is less about model architecture and more about prompt engineering and pipeline configuration (e.g., chunking, retrieval). LangSmith is purpose-built for tracing and evaluating these complex LLM chains, making it superior to traditional tools for this use case.

4. Feature Store for Re-ranking

Amazon SageMaker Feature Store & AWS Step Functions (Orchestration)

Enrichment for Precision: The Feature Store serves structured, real-time features (e.g., popularity, inventory_level) for the re-ranking model. This is a classic ML feature serving pattern used to inject business logic into the final ranking, distinct from the unstructured retrieval of the Vector DB.

5. Foundations (DevOps & Code)

Git (GitHub), Terraform (IaC), & GitHub Actions (CI/CD)

Prompts as Code: A standard, best-practice DevOps foundation. A key adaptation for GenAI is treating prompt templates as critical, version-controlled code artifacts. Terraform ensures the entire cloud infrastructure is reproducible and auditable.

6. Continuous Training & Indexing

GitHub Actions triggering AWS Step Functions

Focus on Re-Indexing & Fine-Tuning: The “training” pipeline here is twofold: 1) The frequent re-indexing of the Vector DB as the product catalog changes, and 2) The periodic fine-tuning of the embedding or re-ranking models. These are orchestrated, automated workflows.

7. Model Registry & Versioning

Amazon SageMaker Model Registry

Managing Custom Models: The registry is used to version and store the custom-trained models that we own—specifically, the fine-tuned embedding model and the contextual re-ranker. The foundational LLM is a managed API endpoint and is not stored here.

8. Deployment & Serving

Amazon Bedrock (LLM), SageMaker Endpoints (Custom Models), API Gateway + AWS Lambda (Inference Service)

A Distributed System: The RAG serving layer is a distributed application. The Lambda function acts as the orchestrator, making real-time calls to SageMaker (re-ranking), OpenSearch (retrieval), and Amazon Bedrock (generation). This modularity allows independent scaling.

9. System & Model Monitoring

LangSmith & Amazon CloudWatch

LLM-Specific Observability: This is a critical adaptation from traditional MLOps. While CloudWatch monitors infrastructure health (latency, errors), LangSmith provides essential, LLM-specific observability by tracing queries, monitoring for hallucinations, tracking token usage, and collecting user feedback.

10. Metadata & Artifacts

LangSmith, OpenSearch Metadata, DynamoDB

Rich Metadata is Key: Metadata is mission-critical. It links vector chunks back to their source product pages for citation. It enables filtering in OpenSearch. It stores traces and feedback in LangSmith. A dedicated table in DynamoDB can track the lineage of datasets and model versions.

11. Build vs. Buy

Hybrid: Buy Core, Build Differentiators

Buy: Leverage managed services for foundational components to accelerate development (Amazon Bedrock for LLMs, Amazon OpenSearch for vector search). Build: Focus engineering effort on the components that create a competitive advantage—the custom re-ranking model and the domain-specific ingestion logic.

12. Platform & Skills

Integrated AWS Platform

Unified Ecosystem: Choosing an integrated platform like AWS simplifies security, networking, and billing. It requires a cross-functional team with skills in data engineering, ML engineering, and data science, augmented by prompt engineering expertise.


B. The Four Core Pipelines: An Operational Blueprint

The end-to-end RAG system is powered by four distinct but interconnected pipelines. Two are offline processes focused on building and improving the system’s intelligence, while two are online, real-time processes that deliver the user experience and monitor its health.

Pipeline & Cadence

Trigger

Inputs

Key Steps

Outputs

1. Data Ingestion & Indexing Pipeline
(Batch/Streaming)

- Scheduled: Nightly full-catalog refresh (via Amazon EventBridge).
- Event-Driven: Real-time updates for single product changes (e.g., price, inventory) triggered by database events.

- Raw product data (from PIM/DB).
- Unstructured text (descriptions, reviews).
- Product images.

1. Extract: Pull data from all source systems.
2. Transform & Clean: Sanitize text, handle missing values, and structure data into a unified JSON format per product.
3. Chunk: Break down long text documents into semantically coherent chunks.
4. Generate Embeddings: Create text embeddings (e.g., with Amazon Titan) for text chunks and image embeddings for product images.
5. Index Data: Load the embeddings and rich metadata (product_id, category, brand) into the Amazon OpenSearch vector index.

- A fully populated and up-to-date vector index in Amazon OpenSearch.
- Execution logs and data quality reports.

2. Inference Pipeline
(Real-time)

- User query via the application’s API Gateway.

- User query (text and/or image).
- User context (e.g., user ID for personalization signals).

1. Query Transformation: Apply techniques like HyDE to enhance the query for better retrieval.
2. Hybrid Retrieval: Perform parallel semantic (vector) and keyword search to get top-K candidate documents.
3. Contextual Re-ranking: Re-rank the candidates using a model that incorporates business logic (e.g., popularity, stock levels) and personalization.
4. Prompt Construction: Build a final, optimized prompt with the top-ranked context and system instructions.
5. LLM Generation: Call Amazon Bedrock to generate a response, streaming the result token-by-token.
6. Post-processing & Guardrails: Apply safety filters and format the response with source citations.

- A streamed, factually grounded, and safe response delivered to the user.
- A detailed trace of the entire request (latency, token counts, retrieved docs) sent to the observability platform (LangSmith/CloudWatch).

3. Monitoring & Observability Pipeline
(Continuous)

- Continuous ingestion of data streams from all production services.

- System logs, metrics (latency, error rates), and traces from all services.
- LLM interaction data from LangSmith (prompts, responses, token usage, feedback).
- User feedback events (‘thumbs up/down’).

1. Collect & Aggregate: Gather data from CloudWatch, LangSmith, and application logs.
2. Process & Analyze: Calculate key metrics (p99 latency, cost-per-query, groundedness scores).
3. Drift & Anomaly Detection: Monitor for data drift in inputs and concept drift in model outputs.
4. Alerting: Trigger alerts (e.g., via PagerDuty/Slack) when KPIs or operational metrics breach pre-defined thresholds.

- Real-time dashboards (in CloudWatch/Grafana) showing system health, cost, and AI quality.
- Actionable alerts for the on-call engineering team.
- A curated dataset of logs and feedback to fuel the Continual Learning Pipeline.

4. Continual Learning / Data Flywheel Pipeline
(Periodic)

- Scheduled: Weekly or bi-weekly execution.
- Manual: Triggered by an ML engineer after a significant product catalog update or observed performance degradation.

- Aggregated user interaction logs (queries, clicks, conversions) from the observability system.
- User-flagged poor responses and explicit feedback data.

1. Dataset Creation: Process interaction logs to create new, high-quality training datasets of (query, relevant_product) pairs.
2. Model Retraining/Fine-tuning: Use the new dataset to fine-tune the embedding model or retrain the contextual re-ranking model.
3. Offline Evaluation: Rigorously evaluate the new candidate model against the current production model on offline metrics (MRR, NDCG).
4. Staging & A/B Test Setup: If the candidate model is superior, register it and prepare a new A/B testing configuration for production deployment.

- A new, versioned, and evaluated candidate model in the Amazon SageMaker Model Registry.
- Evaluation reports comparing the candidate model to the production “champion.”
- A deployment configuration ready for a live A/B test.


C. Project Management and Operational Strategy

A successful GenAI project requires more than just a strong technical architecture; it demands a well-defined plan, a clear team structure, and a robust governance strategy. This blueprint outlines the phased approach to development, the roles and responsibilities of the cross-functional team, and the versioning strategy that ensures reproducibility and quality from experimentation to production.

1. Project Stages: An Iterative Path to Production

We adopted an agile, iterative methodology designed to deliver value quickly, manage risk, and incorporate learnings at each stage. The project is broken down into five distinct, overlapping phases.

Stage

Timeline

Key Activities & Objectives

1. Ideation & Planning

Weeks 1-4

- Finalize Business Objectives: Solidify the KPIs and secure stakeholder alignment on the project’s goals.
- Define MVP Scope: Narrow the initial focus to a text-only, single-language RAG for one product category to ensure rapid validation.
- Architectural Blueprint: Finalize the core tech stack choices and create a high-level design for the MVP.

2. Experiment Management

Weeks 5-12

- Establish Baseline: Build and rigorously evaluate a “naive” text-only RAG pipeline to establish performance benchmarks.
- Test Advanced RAG: Systematically experiment with and quantify the impact of Hybrid Search, Contextual Re-ranking, and Query Transformations (HyDE).
- Explore Multimodality: Develop a PoC for image-based retrieval using multimodal embeddings, updating ingestion and inference pipelines to handle visual queries.
- Validate Multilingual Support: Test multilingual embedding models and language-specific text processing to assess the feasibility and complexity of international expansion.

3. Pipelines Development

Weeks 9-16

- Build Data Ingestion Pipeline: Engineer the robust, automated ETL pipeline in AWS, orchestrating it with AWS Step Functions.
- Engineer Inference Pipeline: Develop the production-grade, low-latency inference service using API Gateway and AWS Lambda.
- Implement MLOps Pipelines: Build the CI/CD workflows in GitHub Actions for automated testing and deployment, and script the periodic Continual Learning pipeline.

4. Deployment & Serving

Weeks 17-20

- Provision Production Infrastructure: Use Terraform to deploy the full, scalable production environment.
- Conduct Pre-flight Checks: Perform end-to-end integration tests and rigorous load testing against the production environment.
- MVP Rollout (A/B Test): Deploy the MVP to a small segment of live traffic (e.g., 5%) using an A/B testing framework to measure its real-world impact on primary business KPIs.

5. Monitoring & Continual Learning

Ongoing

- Activate Observability: Continuously monitor system health, cost, and AI quality dashboards (CloudWatch & LangSmith).
- Establish On-Call Rotation: Respond to automated alerts for latency spikes, error rate increases, or significant model drift.
- Turn the Data Flywheel: Execute the Continual Learning pipeline on a regular schedule (e.g., bi-weekly) to retrain models with new user interaction data.
- Iterate and Enhance: Analyze user feedback and monitoring data to inform the roadmap for the next set of features and improvements.

2. Cross-Functional Team & Roles

This project’s success hinges on the tight collaboration of a dedicated, cross-functional team.

Role

Key Responsibilities

Product Manager

Owns the product vision, defines the roadmap, prioritizes features, and is ultimately responsible for the business KPIs.

Data Engineer

Designs, builds, and maintains the scalable Data Ingestion and Indexing pipeline. Ensures data quality, freshness, and reliability.

ML/GenAI Engineer (My Role)

Leads the end-to-end technical implementation. Designs and builds the core RAG inference pipeline, implements advanced retrieval and re-ranking strategies, and engineers the MLOps workflows for CI/CD, monitoring, and continual learning.

Data Scientist (My Role)

Drives the experimentation and evaluation process. Selects, fine-tunes, and analyzes the performance of embedding and re-ranking models. Designs and interprets A/B tests.

Backend Engineer

Develops the public-facing API endpoints, handles user authentication, and integrates the RAG service with the broader e-commerce platform.

Frontend Engineer

Builds, tests, and iterates on the conversational user interface, focusing on creating an intuitive and responsive experience.

3. Versioning and Governance Strategy

A strict versioning and governance strategy is essential for ensuring reproducibility, quality, and maintainability.

Artifact

Versioning Tool

Governance Policy

Code (Application, Pipelines, Tests)

Git (GitHub)

All code changes must be submitted via a pull request (PR) and require at least one peer review and passing all CI checks before being merged. We follow a GitFlow branching model to manage features, releases, and hotfixes.

Infrastructure

Terraform & Git

The state of all cloud infrastructure is defined declaratively in Terraform. All infrastructure changes must be reviewed and applied through the CI/CD pipeline. No manual changes to the production environment are permitted.

Data (for Model Training)

DVC (Data Version Control)

The specific datasets used to fine-tune the embedding model or retrain the re-ranker are versioned with DVC. The DVC hash is logged with the model version, ensuring perfect reproducibility.

Prompts

Git

All system prompt templates are treated as mission-critical code. They are stored in a dedicated repository, versioned in Git, and deployed through the CI/CD pipeline.

Models (Custom-trained)

Amazon SageMaker Model Registry

Every custom-trained model (embedding, re-ranker) is versioned and registered. The registry entry links the model artifact to its source code, training data version, and its offline evaluation metrics, providing a complete audit trail. A model cannot be deployed unless its evaluation metrics meet a pre-defined quality bar.


D. Comprehensive Evaluation Strategy: The Quality Gauntlet

A production-grade RAG system requires a multi-layered evaluation strategy that goes far beyond simple accuracy. Our approach is to treat testing not as a single stage, but as a continuous process embedded throughout the entire lifecycle. We will evaluate individual components (retrieval, generation), the integrated pipelines, and the live system’s impact on business KPIs.

1. Offline Evaluation: Component-Wise and Pipeline Testing

This is the “dress rehearsal” stage, conducted in development and staging environments before any code or model is exposed to users. Its purpose is to ensure each component works correctly and the integrated system is robust.

Test Category

What We’re Testing

Tools & Methods

Key Metrics & Success Criteria

1. Data & Knowledge Base Validation

Quality of “Ingredients”: Ensuring the product data fed into the system is clean, complete, and correctly structured.

- Great Expectations: To define and validate schemas and data quality rules for product catalogs.
- Custom pytest checks: For validating the output of the PDF/URL parsers and text chunking logic.

- Schema Adherence: 100% compliance with the expected product data schema.
- Data Integrity: <1% null values in critical fields like product_title and description.

2. Retrieval Evaluation

Finding the Right Information: Does the system retrieve the most relevant product chunks for a given query?

- “Golden Dataset”: A manually curated set of 100-200 (query, relevant_product_id) pairs representing common and difficult search scenarios.
- Offline Metric Calculation: Using the golden dataset to test different retrieval strategies (Vector vs. Hybrid vs. HyDE).

- Hit Rate @ K=5 > 95%: At least one correct product is found in the top 5 results.
- MRR > 0.90: The first correct result is, on average, very close to the top position.
- NDCG@10 > 0.85: The overall ranking of the top 10 results is highly relevant.

3. Generation Evaluation

Generating a Trustworthy Answer: Is the LLM’s final response factually correct, relevant, and helpful?

- LLM-as-a-Judge: Using a powerful model (e.g., GPT-4o, Claude 3 Opus) to score the generated answers from our application’s LLM (e.g., Claude 3 Sonnet).

- Groundedness/Faithfulness > 99%: The answer must be factually consistent with the retrieved context. This is non-negotiable.
- Answer Relevance > 95%: The answer directly addresses the user’s query.

4. End-to-End Pipeline Tests

Integration & Robustness: Does the entire pipeline—from query ingestion to response generation—function correctly on a staging environment?

- pytest Integration Tests: Simulate API calls to a staging endpoint and assert the correctness of the response structure.
- Locust Load Tests: Simulate realistic traffic patterns against the staging environment.

- 100% Pass Rate on integration test suite.
- p99 Latency < 1500ms under simulated peak load.
- Error Rate < 0.1% during load test.

5. Guardrail & Safety Tests

System Resilience & Responsibility: Does the system handle inappropriate inputs and avoid generating harmful outputs?

- Adversarial Test Suite: A curated set of prompts designed to test for prompt injections, topical refusals, and toxic content generation.

- Refusal Rate > 99% for out-of-scope topics (e.g., medical advice).
- 0 Harmful Outputs: System must not generate toxic, biased, or unsafe content.

2. Online Evaluation: Testing in Production

The ultimate test of the system’s value is its performance with real users and its impact on the business. This is measured through carefully controlled online experiments.

Test Category

What We’re Testing

Tools & Methods

Key Metrics & Success Criteria

1. Business Impact (A/B Testing)

Driving Real Value: Does the new RAG system outperform the legacy keyword search on core business metrics?

- A/B Testing Framework: Randomly assigning users to either the legacy search (Control) or the new RAG search (Variant) and comparing their behavior over time.

- Primary KPI: Statistically significant increase in Search-to-Purchase Conversion Rate for the Variant group.
- Secondary KPIs: Statistically significant increases in AOV and Revenue Per Search User.

2. User Satisfaction & Engagement

Is the Experience Better? Are users engaging more with the new system and do they perceive it as higher quality?

- A/B Testing Framework
- Implicit Signals: Tracking user actions like query reformulation, clicks on product links, and session duration.
- Explicit Feedback: Analyzing the “thumbs up/down” ratio for generated responses.

- Reduced Query Reformulation Rate: Users find what they need on the first try.
- Higher Click-Through Rate (CTR) on recommended products.
- Positive Feedback Ratio > 85%.

3. Model Performance Monitoring (Shadow Testing)

Safe Model Rollouts: How does a new candidate model (e.g., a fine-tuned embedding model) perform on live traffic without impacting users?

- Shadow Deployment: Route a copy of live production traffic to the new candidate model in the background. The user sees the response from the current production model, but the candidate model’s response is logged for analysis.

- Offline Metric Comparison: Compare the candidate’s retrieval metrics (Hit Rate, MRR) against the production model on the same set of live queries.
- Latency Check: Ensure the candidate model meets latency SLOs before considering a promotion to an A/B test.

This comprehensive, two-pronged evaluation strategy ensures that we are not only building a system that is technically correct and robust (Offline Evaluation) but also one that is demonstrably effective and valuable in the real world (Online Evaluation). It provides a clear, data-driven framework for making decisions about which models to promote, which features to prioritize, and how to continuously improve the system over time.


4. Data Ingestion and Indexing Pipeline: Building the Knowledge Base

The intelligence of our RAG system is entirely dependent on the quality, freshness, and structure of its knowledge base. The Data Ingestion and Indexing pipeline is the factory that constructs this knowledge base. It is an automated, multi-stage workflow designed to transform raw, heterogeneous product data into a highly searchable, semantically rich index.

The following table details each stage of this foundational pipeline, outlining the specific operations, the chosen tools, and the rationale behind each decision, with a special focus on the requirements of a Generative AI system.

Stage

Operation Details

Tools

Rationale & LLM-Specific Focus

1. Source Extraction

Extract Heterogeneous Data: Pull data from all source systems. This includes batch extraction for nightly full-catalog refreshes and event-driven extraction for real-time updates (e.g., price changes, new reviews).

- AWS Glue (for batch)
- AWS Lambda + Amazon SQS (for real-time events)

Rationale: Glue is optimized for large-scale, scheduled ETL jobs. The Lambda/SQS pattern provides a robust, serverless architecture for low-latency, incremental updates, ensuring the knowledge base remains fresh.

2. Multimodal Pre-processing

Clean & Standardize:
- Text: Sanitize by removing HTML tags and special characters. Normalize text (lowercase, handle unicode).
- Tables (from PDFs): Extract structured data from tables within product manuals and spec sheets.
- Images: Standardize format (JPEG), resize for consistency, and perform initial quality checks.

- Unstructured.io / PyMuPDF
- BeautifulSoup
- Python libraries (re, Pillow)

LLM-Specific Focus: The quality of the final generated answer is directly capped by the quality of the input text. This “Garbage In, Garbage Out” principle is paramount. Clean, well-parsed text leads to more meaningful and accurate embeddings.

3. Content Structuring & Chunking

Prepare Data for Embedding:
- Unify: Consolidate all data for a single product into a unified JSON structure.
- Chunk Text: Split long descriptions and reviews into smaller, semantically coherent chunks with overlap.
- Serialize Tables: Convert extracted tables into a Markdown format that is both human-readable and easily understood by an LLM.
- Generate Image Captions: Use a VLM to create descriptive text captions for each product image to enable text-based retrieval of visual information.

- LangChain (RecursiveCharacterTextSplitter)
- Amazon Bedrock (Claude 3 for Image Captioning)

LLM-Specific Focus: Chunking is a critical step to fit content within the LLM’s context window and improve retrieval precision. Image captioning is a core strategy for enabling multimodal search by “flattening” visual information into a text-based index.

4. Embedding Generation

Convert Content to Vectors:
- Text Embeddings: Generate dense vector representations for all text chunks and serialized table descriptions.
- Image Embeddings: Generate dense vector representations for all product images to enable visual similarity search.

- Amazon Bedrock (Titan Text & Titan Multimodal Embedding Models)

Rationale: This is the core step that enables semantic search. Using managed models like Amazon Titan simplifies operations and provides state-of-the-art performance. The use of both text and multimodal models is key to our hybrid, multimodal strategy.

5. Validation & Indexing

Quality Gate & Loading:
- Validate Data: Before indexing, run automated quality checks on the structured, chunked, and embedded data to ensure integrity.
- Index Embeddings & Metadata: Load the vector embeddings and their rich metadata (product_id, category, brand, image_url) into the vector database.

- Great Expectations
- Amazon OpenSearch Service (with k-NN index)

Rationale: The validation step acts as a critical quality gate, preventing corrupt data from polluting our production knowledge base. OpenSearch is chosen for its scalability, managed nature, and robust support for hybrid (vector + keyword) search.

6. Orchestration & Governance

Manage the End-to-End Flow:
- Orchestrate: Manage dependencies, retries, and error handling for the entire multi-step pipeline.
- Version Artifacts: Version the source data (via DVC) and the pipeline definition code (via Git) to ensure reproducibility and enable rollbacks.

- AWS Step Functions
- Git & DVC

LLM-Specific Focus: A complex, multi-step pipeline like this requires a robust orchestrator to be reliable. Step Functions provides excellent visibility and error handling. Versioning is critical for debugging issues like performance degradation that may arise from subtle changes in the data or processing logic.

Architecture Diagram: Data Ingestion and Indexing Pipeline

The following diagram illustrates the AWS architecture for the orchestrated batch ingestion pipeline.


5. Experiment Management & Iteration: The Path to Precision

Building a production-grade RAG system is an iterative science. The journey from a basic keyword search to a sophisticated, multimodal, and multilingual conversational engine was guided by a systematic series of experiments. Each stage was designed to test a specific hypothesis, measure its impact against a consistent set of evaluation metrics, and build upon the learnings of the previous stage.

A. The Evaluation Framework: Our North Star

Before the first experiment, we established a rigorous, multi-faceted evaluation framework to ensure we were measuring what truly matters.

  • Retrieval Quality (Automated): Measured on our internal “golden dataset” of query-product pairs.

    • Tools: The open-source Ragas framework, integrated into our evaluation pipeline.

    • Primary Metrics: Context Precision (is retrieved information relevant?) and Context Recall (is all relevant information retrieved?). Mean Reciprocal Rank (MRR) was our single primary metric for ranking quality.

  • Generation Quality (Automated + Human):

    • Tools: Ragas and an LLM-as-a-Judge pattern using Claude 3 Opus for its superior reasoning.

    • Primary Metrics: Faithfulness (is the answer grounded in the context?) was a non-negotiable, automated check. Qualitative metrics like Conciseness and Helpfulness were scored by the LLM judge.

  • Business Impact (Live):

    • Tool: The company’s internal A/B testing framework.

    • Primary Metric: Search-to-Purchase Conversion Rate. No change was promoted to production unless it demonstrated a neutral or statistically significant positive impact on this metric.

B. Building the “Golden Dataset”: Synthetic Data Generation for RAG Evaluation

To rigorously benchmark our RAG experiments and ensure our improvements were statistically meaningful, we needed a large, high-quality evaluation dataset. Manually creating tens of thousands of (query, relevant product) pairs was infeasible due to the time and cost involved. Instead, we engineered a scalable pipeline to generate a synthetic “golden dataset” using a powerful LLM, a strategy that has become a cornerstone of modern RAG evaluation.

This approach allowed us to create a comprehensive testbed that accurately reflected real-world user behavior while covering the breadth of our product catalog.

The Challenge: The Evaluation Bottleneck

A small, manually curated set of 100-200 queries is sufficient for initial spot-checking, but it’s not large enough to reliably detect small-to-medium performance regressions or validate the impact of advanced RAG strategies across a diverse product catalog. We needed a dataset with tens of thousands of examples to have high statistical confidence in our results.

Our Four-Step Synthetic Generation Pipeline

We designed and implemented an automated pipeline to generate this dataset, grounding the synthetic data in the reality of our actual users and products.

Step 1: Curation of the Seed Query Set

The quality of synthetic data is highly dependent on the quality of the initial examples. To ensure our generated queries were realistic, we did not start from scratch.

  • Action: We mined our production logs for a month’s worth of anonymized user search queries.

  • Process: From this raw log, we curated a “seed set” of ~2,000 unique, high-quality queries. This curation involved cleaning, deduplicating, and, most importantly, categorizing the queries into distinct user intents:

    • Attribute-based: “waterproof hiking boots with ankle support”

    • Comparative: “compare sony wh-1000xm5 vs bose qc ultra”

    • Conceptual/Use-Case: “best camera for travel vlogging”

    • Question-based: “is the iphone 15 compatible with a qi charger?”

  • Purpose: This categorized seed set served as a powerful “style guide” for the LLM, ensuring it generated a diverse and realistic mix of query types.

Step 2: Document Chunking for Focused Context

To prompt the LLM effectively, we needed to provide it with focused, contextually rich pieces of product information.

  • Action: We processed each product description document from our catalog through the same SemanticChunker used in our production ingestion pipeline.

  • Purpose: This ensured that the LLM was generating questions based on the exact same chunks of text that our retriever would be searching over. This alignment between the generation context and the retrieval target is critical for creating a fair and accurate evaluation set.

Step 3: The LLM-Powered Generation Engine

This is the core of the pipeline where the synthetic queries are created. We used a powerful reasoning model (Claude 3 Opus) and a carefully engineered prompt.

  • Action: For each text chunk from every product, we invoked the LLM with a prompt designed to generate 5-10 realistic queries that could be answered by that specific chunk.

  • The Prompt:

You are an expert e-commerce data scientist tasked with creating a high-quality evaluation dataset for a new semantic search engine. Your goal is to generate realistic user search queries that can be answered by the provided text snippet from a product description.

**CONTEXT (Product Information Snippet):**
---
{chunk_of_product_text}
---

**INSTRUCTIONS:**
1.  Read the context carefully.
2.  Generate a JSON list of 5 to 10 diverse, realistic user queries that a real shopper might type.
3.  The answer to each query you generate MUST be present in the provided context. Do NOT generate questions that require outside knowledge.
4.  Mimic the style and intent of the following examples. Include a mix of different query types (e.g., questions, comparisons, feature requests).

**EXAMPLES of QUERY STYLES:**
- "compare product A vs product B"
- "does this laptop have a backlit keyboard?"
- "trail running shoes for rocky terrain"
- "lightweight tent for backpacking in summer"

**OUTPUT (JSON format only):**

Step 4: Automated Validation and Final Assembly

We did not blindly trust the LLM’s output. A final, lightweight validation step was crucial to ensure the quality and utility of the dataset.

  • Action:

    1. We programmatically checked that the output was valid JSON.

    2. We implemented a simple heuristic check to ensure the generated query had a reasonable term overlap with the source chunk, filtering out any queries that seemed to have drifted off-topic.

    3. Each validated query was stored, creating a final tuple: (generated_query, source_product_id, source_chunk_id).

  • Outcome: This pipeline ran over our entire catalog and produced an evaluation dataset of over 50,000 high-quality (query, relevant_document_id) pairs.

Impact on the Project

This investment in a high-quality synthetic dataset was a strategic force multiplier. It provided us with a scalable and reliable benchmark that underpinned all subsequent experimentation. It allowed us to:

  • Quantify Progress: We could confidently measure the impact of each RAG strategy on our primary retrieval metric (MRR).

  • Automate Regression Testing: This dataset became a core part of our CI/CD pipeline, allowing us to automatically detect any performance regressions in our retrieval system before they reached production.

  • Accelerate Iteration: It enabled rapid, data-driven decisions about which strategies to pursue and which to discard, dramatically speeding up our development cycle.

How the Golden Dataset is Used to Calculate MRR

MRR requires a ranking of products for each query. Here is the step-by-step process of how we generate that ranking and calculate the metric, which should clarify the entire workflow:

The Evaluation Process:

  1. Take One Query from the Golden Dataset:

    • Let’s take a query q1 from our Golden Dataset.

    • From the dataset, we know the ground truth “correct” answer is product_id = 'prod_123'.

  2. Run the Query Through the System Under Test:

    • We take the text of q1 and feed it into our entire retrieval system (e.g., the “Hybrid Search + Re-ranker” system we are evaluating).

    • The system does its job: it generates an embedding for the query, searches the OpenSearch index, gets a list of candidates, re-ranks them, and returns a final, ranked list of product IDs.

  3. Get the Model-Generated Ranking:

    • The output from our system might look like this ranked list of the top 10 results:

      1. prod_789

      2. prod_456

      3. prod_123 <– Our correct answer is here!

      4. prod_999

      5. prod_555 … and so on.

  4. Calculate the Reciprocal Rank for This Query:

    • We look for our ground truth answer (prod_123) in the model-generated ranking.

    • We find it at rank = 3.

    • The Reciprocal Rank (RR) for query q1 is therefore 1 / 3 = 0.33.

    • (If the correct answer had been at rank 1, the RR would be 1/1 = 1. If it was not found in the top 10, the RR would be 0 for this query).

  5. Repeat for All Queries:

    • We repeat steps 1-4 for every single query in our 50,000+ entry Golden Dataset. Each query will get its own RR score.

  6. Calculate the Mean Reciprocal Rank (MRR):

    • The final MRR score is simply the average of all the individual RR scores calculated in step 5.

This process gives us a single, powerful number that represents how well our system performs at placing the most relevant document near the top of the search results across thousands of realistic queries. This is how the Golden Dataset, which only contains the “correct” answers, is used to evaluate a system that produces a full ranking.


6. Continual Learning: The Embedding Model Fine-tuning Pipeline

To ensure our RAG system continuously adapts and improves, we will build a dedicated MLOps pipeline for fine-tuning our embedding model. This pipeline operationalizes the “Data Flywheel” concept by systematically learning from real user behavior to enhance retrieval relevance over time.

This is an offline, periodic pipeline, designed to be run on a schedule (e.g., monthly) or triggered manually when a drop in retrieval quality is detected. Its goal is not to train a model from scratch but to take a high-performing base model and adapt it to the specific nuances of our product catalog and customer query patterns.

A. Artifacts to Be Implemented

The following is a blueprint for the key components that will constitute the fine-tuning pipeline.

Artifact Type

Description & Plan

Python Scripts (Components)

1. data_preparation.py:
- Loads raw user interaction data (from the Monitoring Pipeline’s S3 bucket).
- Filters for successful user journeys (e.g., queries that led to an add_to_cart or purchase).
- Constructs Training Triplets (anchor, positive, negative):
    - anchor: The user’s search query.
    - positive: The text chunk from the product the user purchased.
    - negative (Hard Negative Mining): For the same query, retrieve the top 5 results from the current production embedding model. From these results, select a chunk that the user did not click on or purchase. This “hard negative” is crucial because it teaches the new model to better distinguish between semantically similar but ultimately incorrect items.
- Validates & Splits the data into training and validation sets.
- Outputs versioned, final training files to a dedicated S3 bucket.

2. model_training.py:
- Loads the prepared triplet dataset.
- Pulls the latest version of our chosen base embedding model (e.g., multilingual-e5-large) from Hugging Face.
- Fine-tunes the model using the sentence-transformers library and a TripletLoss objective function. This process is executed as a self-contained Amazon SageMaker Training Job for scalability and reproducibility.
- Saves the fine-tuned model artifact back to S3.

3. model_evaluation.py:
- Loads the new candidate model and the current production model.
- Runs both models against our “Golden Dataset” (the held-out test set).
- Compares key retrieval metrics (MRR, NDCG@10, Hit Rate@5).
- Outputs a JSON file with a comparison report and a pass or fail status based on a pre-defined threshold (e.g., “candidate MRR must be > 1.02 * production MRR”).

4. model_registration.py:
- Reads the evaluation result.
- If “pass”: Registers the new model artifact in the Amazon SageMaker Model Registry, tagging it with its evaluation metrics and the training data version. This officially promotes it to a “candidate” status, ready for A/B testing.

Unit Tests (pytest)

- test_data_preparation.py: Tests the hard negative mining logic on a sample DataFrame to ensure it correctly identifies and selects hard negatives.
- test_model_evaluation.py: Mocks the models and tests the metric calculation logic to ensure MRR and NDCG are computed correctly.

Pipeline Orchestration (Airflow DAG)

- embedding_finetuning_dag.py:
1. Trigger: Can be scheduled (e.g., 0 0 1 * * for monthly) or manually triggered.
2. prepare_data_task: Executes the data_preparation.py script (e.g., using PythonOperator or KubernetesPodOperator).
3. train_model_task: Submits a SageMaker Training Job using the SageMakerTrainingOperator.
4. evaluate_model_task: Submits a SageMaker Processing Job to run the model_evaluation.py script.
5. check_evaluation_task (BranchPythonOperator): A conditional gate that checks the evaluation output. Routes to register_model_task on pass or notify_failure_task on fail.
6. register_model_task: Executes the model_registration.py script.
7. notify_failure_task: Sends a Slack/email notification if the model did not meet the quality bar.

Infrastructure as Code (Terraform)

- s3.tf: Defines new S3 buckets for storing the training datasets and the fine-tuned model artifacts.
- iam.tf: Defines a new, specific IAM Role for the SageMaker Training Jobs, granting least-privilege access to the required S3 buckets.
- sagemaker.tf: Defines a SageMaker Model Group to logically group all versions of our fine-tuned embedding model.
- airflow.tf: Manages the deployment of the embedding_finetuning_dag.py file to our Airflow environment.

Integration Test

- An end-to-end test script (test_finetuning_pipeline_integration.py).
- This test triggers the entire Airflow DAG using a small, self-contained dataset.
- It does not check the model’s accuracy but asserts that the DAG runs to completion and that a new model version successfully appears in the staging SageMaker Model Registry. This validates that all components, permissions, and integrations are working correctly.

CI/CD Workflow (GitHub Actions)

- deploy_finetuning_pipeline.yml: This workflow does not run the fine-tuning process. It deploys and manages the pipeline’s definition.
- Trigger: On push to main affecting the /pipelines/finetuning/ directory.
- Jobs:
    1. Lint & Test: Runs static analysis and unit tests on the pipeline’s Python scripts.
    2. Deploy: Executes terraform apply to deploy changes to the Airflow DAG and related AWS infrastructure.

B. Architecture Diagram: Embedding Model Fine-tuning Pipeline

This diagram illustrates the orchestrated workflow, from data collection to model registration.


Note: Training Triplets Dataset vs Golden Evaluation Dataset

The Training Triplets Dataset and the Golden Evaluation Dataset are two distinct and separate datasets, each engineered for a specific, different purpose. They are never the same. Here is a detailed breakdown:

Dataset 1: The Training Triplets Dataset
  • Purpose: To TEACH (fine-tune) the embedding model.

  • Structure: (anchor, positive, negative)

  • What it does: This dataset is the “textbook” for the model. It uses a technique called contrastive learning. For every anchor (a user’s query), it explicitly tells the model:

    • “This positive example (the product they bought) is what you should consider very similar. Pull its vector representation closer to the anchor’s vector.”

    • “This negative example (a product they saw but didn’t buy) is what you should consider dissimilar. Push its vector representation further away from the anchor’s vector.”

  • Why it needs negatives: The model cannot learn to distinguish between good and bad results without being shown both. Hard negatives are especially powerful because they teach the model to differentiate between very similar but ultimately incorrect items, which is a much harder and more valuable task than differentiating between a shoe and a television.

  • Source of Data: Real, messy, high-volume user interaction data (clicks, purchases). It reflects what users actually do, not what we think they should do.

Dataset 2: The “Golden” Evaluation Dataset
  • Purpose: To GRADE (evaluate or benchmark) the performance of the entire retrieval system.

  • Structure: (query, relevant_product_id, relevant_chunk_id)

  • What it does: This dataset is the “final exam.” It represents the ground truth. It does not teach the model anything. Instead, it acts as an answer key against which we measure the performance of any model or retrieval strategy we build.

  • Why it does NOT have negatives: The “negatives” for an evaluation are simply all the other millions of documents in our knowledge base that are not the correct answer for that specific query. The dataset’s only job is to tell us, “For this query, this is the correct document.”

  • Source of Data: Synthetically generated using a powerful LLM and curated for high quality, realism, and broad coverage. It represents the “ideal” testbed.

Crucial Point: Using your evaluation data for training is a cardinal sin in machine learning, known as data leakage. If we trained the model on our Golden Dataset, it would simply memorize the “exam answers,” and our evaluation scores would be artificially inflated and completely useless for predicting real-world performance.


7. The Real-Time Engine: The Inference Pipeline

The Inference Pipeline is the synchronous, low-latency workflow that executes every time a user interacts with the search assistant. It is a distributed system of microservices, orchestrated to perform the complex RAG process—from query understanding to final generation—in under a few seconds. Its design prioritizes performance, scalability, and observability.

A. Artifacts to Be Implemented

The following is a blueprint for the key components that will constitute the Inference Pipeline.

Artifact Type

Description & Plan

Python Scripts (Core Logic)

The entire pipeline will be implemented as a single, modular Python application using FastAPI, deployed as a containerized service. This approach is chosen over an orchestrator like Airflow, which is designed for asynchronous, long-running batch jobs, not real-time inference.

main.py (FastAPI Application):
- Defines the main API endpoint (e.g., /search).
- Handles request validation (using Pydantic), authentication, and orchestrates the RAG workflow.

orchestrator.py (The Core Workflow):
- Implements the full RAG sequence as an async function to maximize I/O concurrency.
- 1. Query Pre-processing: Applies input guardrails and calls the Query Transformation module.
- 2. Parallel Retrieval: Uses asyncio.gather to concurrently call the Hybrid Search and Re-ranking modules.
- 3. Prompt Construction: Builds the final prompt for the LLM.
- 4. Generation & Streaming: Makes a streaming call to the LLM Generation module.
- 5. Post-processing: Applies output guardrails as the response is streamed.
- 6. Tracing: Integrates LangSmith tracing decorators (@traceable) on all key functions to automatically capture latency, inputs, and outputs for each step.

Modules for Each RAG Stage:
- query_transformer.py: A module to perform HyDE. Caches results in Redis (Amazon ElastiCache) for common queries.
- retriever.py: Contains logic to perform hybrid search against Amazon OpenSearch.
- reranker.py: Calls the SageMaker Endpoint for the custom re-ranking model and the SageMaker Feature Store for real-time business signals.
- generator.py: A client for Amazon Bedrock that handles streaming responses.
- guardrails.py: Modules for input (e.g., PII redaction) and output (e.g., toxicity filtering) safety checks.

Unit Tests (pytest)

- test_orchestrator.py: Uses pytest-asyncio and mocker to test the orchestration logic. Mocks out all external calls (OpenSearch, SageMaker, Bedrock) to test the flow, error handling, and asyncio.gather integration.
- test_retriever.py: Tests the construction of the complex OpenSearch hybrid search query JSON.
- test_reranker.py: Tests the logic for combining semantic scores with business features before sending them to the re-ranking model.
- test_guardrails.py: Tests the input and output safety filters with example malicious/harmful text.

Pipeline Orchestration

Not Applicable (FastAPI Application instead of Airflow/Step Functions). The orchestration is handled in-process by the asyncio event loop within the FastAPI application, which is the standard and correct pattern for real-time, low-latency services.

Infrastructure as Code (Terraform)

- api_gateway.tf: Defines the Amazon API Gateway endpoint, sets up throttling, and configures routing to the inference service.
- ecs.tf / lambda.tf: Defines the compute layer. AWS Fargate (on ECS) is the primary choice for its balance of performance (no cold starts) and manageability. A Lambda-based deployment is a secondary option for lower-traffic environments.
- sagemaker.tf: Defines the real-time SageMaker Endpoints for the re-ranking model.
- elasticache.tf: Defines the Redis cluster for caching.
- iam.tf: Defines fine-grained IAM roles for the inference service, granting it least-privilege access to OpenSearch, SageMaker, Bedrock, and ElastiCache.

Integration & Load Tests

- test_inference_integration.py: A pytest script that makes live HTTP requests to a deployed staging endpoint. It validates the end-to-end flow and asserts the API contract (response schema).
- locustfile.py: A Locust script to perform load testing. It simulates realistic user query patterns and measures key performance metrics (RPS, p99 latency, error rate) under load. This is critical for tuning auto-scaling policies.

CI/CD Workflow (GitHub Actions)

- deploy_inference_service.yml:
- Trigger: On push to main affecting the /inference_service/ directory.
- Jobs:
    1. Lint & Unit Test: Runs static analysis and pytest unit tests.
    2. Build & Push Docker Image: Builds the FastAPI application container and pushes it to Amazon ECR.
    3. Deploy to Staging: Runs terraform apply to deploy the new container version to the staging environment.
    4. Integration & Load Test: Automatically runs the pytest integration tests and the Locust load test against the newly deployed staging endpoint.
    5. Manual Approval Gate: Requires a manual approval click in the GitHub Actions UI before proceeding to production.
    6. Deploy to Production (A/B Test): Runs terraform apply to deploy the new version to a small fraction of production traffic using a canary or blue/green deployment strategy managed by the API Gateway.

B. Architecture Diagram: Real-Time Inference Pipeline

This diagram illustrates the flow of a user request through the distributed, microservices-based inference pipeline on AWS.

Note: ECS vs Lambda

  1. Execution Model: In a standard Lambda deployment, each incoming request from the API Gateway triggers a separate, independent Lambda invocation. That single invocation would be responsible for running the entire orchestration logic (query transformation, retrieval, re-ranking, generation, etc.) for that one request.

  2. The Cold Start Problem: This is the critical drawback of using standard Lambda for low-latency applications. A “cold start” occurs when a request comes in and there is no “warm” execution environment ready to handle it. The Lambda service has to:

    • Provision a new micro-container.

    • Download your code/container image.

    • Start the runtime (e.g., the Python interpreter).

    • Initialize your application code (e.g., import libraries, establish initial database connections).

This entire process can add anywhere from hundreds of milliseconds to several seconds of latency before your actual inference code even begins to run. For a conversational search assistant, this is often an unacceptable hit to the user experience.

The Solution for Lambda: Provisioned Concurrency>

So why is it an option at all? Because AWS provides a feature specifically to solve this: Provisioned Concurrency.

  • What it is: You tell AWS, “I want you to pre-initialize and keep N execution environments warm for this function at all times.”

  • How it Works: When a request arrives, it is immediately routed to one of these pre-warmed environments, completely bypassing the cold start latency. The experience becomes as fast as a continuously running server.

  • The Trade-off: Cost. You pay an hourly fee for keeping that concurrency provisioned, whether it’s handling requests or sitting idle.

This is why a Lambda-based deployment is a viable secondary option for lower-traffic environments. If your traffic is very low and sporadic, the cost of keeping a few Lambda environments warm 24/7 might be less than the cost of running even the smallest Fargate task 24/7. However, as traffic becomes high and sustained, the cost model of Fargate becomes more economical.


Note: The Fargate (ECS) + FastAPI Model: Batch processing vs concurrent processing

  1. No Batching of User Requests: In a real-time, user-facing API, you cannot “batch” requests from different users. User A sends a request and expects a response immediately. They cannot be made to wait for User B’s request to arrive so you can process them together. Batching is a strategy for offline data processing (like our Ingestion Pipeline), not for online inference.

  2. Throughput via Concurrency: High throughput in this context is achieved by handling many requests at the same time. This happens at two levels:

    • Application Level (FastAPI & asyncio): This is where the magic of async comes in. A single FastAPI process running in one Fargate container can handle hundreds of concurrent connections. When it processes Request A and makes a network call to Amazon Bedrock (which is an I/O-bound operation), it doesn’t just sit and wait for the response. The asyncio event loop immediately switches context and starts working on Request B, Request C, and so on. When the response for Request A comes back, the event loop picks it up and continues processing. This is how a single container can efficiently juggle many requests at once.

    • Infrastructure Level (Fargate & Load Balancer): When the number of concurrent requests exceeds what a single container can handle, you simply run more containers (Fargate tasks). An Application Load Balancer (ALB) sits in front of your Fargate service and distributes the incoming requests across all the available containers. If you need more throughput, you just tell ECS to run more tasks (horizontal scaling).

In summary: We don’t increase throughput by batching requests. We increase it by handling many requests concurrently within each container (thanks to FastAPI’s async nature) and by running multiple containers to handle the total traffic load. Each user’s request remains an independent, asynchronous task that is processed as quickly as possible.


8. The Monitoring and Observability Pipeline

Operating a production RAG system effectively requires moving beyond simple uptime checks to a sophisticated, multi-layered observability strategy. This pipeline is an always-on, distributed system responsible for collecting, processing, and visualizing data from every component of our application. Its purpose is to provide actionable insights for engineers, product managers, and the continual learning process.

A. Artifacts to Be Implemented

Artifact Type

Description & Plan

Python Scripts (Instrumentation & Processing)

1. instrumentation_lib.py:
- A shared Python library integrated into our Inference Pipeline (FastAPI) and our Data Ingestion Pipeline (Glue/Lambda).
- Structured Logging: Configures the standard Python logging module to emit all logs in a structured JSON format. Each log entry will automatically include a unique trace_id to correlate events across services.
- Custom Metrics Emitter: Provides simple functions (e.g., emit_metric()) that use the boto3 library to send custom metrics directly to Amazon CloudWatch (e.g., retrieval_latency, tokens_generated, user_feedback_received).
- LangSmith Integration: Centralizes the initialization of the LangSmith SDK, ensuring all services are correctly configured to send traces.

2. log_processing_lambda.py:
- A Lambda function triggered by Amazon Kinesis Data Firehose.
- Parses the structured JSON logs as they arrive.
- Aggregates logs to calculate advanced metrics that are difficult to compute in real-time (e.g., estimating a “groundedness score” by sending a sample of responses to an LLM-as-a-judge).
- Enriches logs with additional context (e.g., user metadata from a DynamoDB lookup).
- Sinks the processed, enriched logs into a dedicated S3 bucket for long-term storage and analysis (the fuel for the Data Flywheel).

Unit Tests (pytest)

- test_instrumentation_lib.py: Tests that the logging formatter correctly structures messages and that the CloudWatch metric emitter constructs the correct API payload.
- test_log_processing_lambda.py: Tests the Lambda handler function with sample JSON log events to ensure it correctly parses, enriches, and transforms the data.

Pipeline Orchestration (Event-Driven)

Not a DAG-based pipeline. This is a continuous, event-driven streaming architecture. The “pipeline” is the real-time flow of data through a series of interconnected AWS services, not a sequence of batch jobs.

Infrastructure as Code (Terraform)

- cloudwatch.tf:
    - Defines CloudWatch Log Groups for all services.
    - Defines Custom Metrics and Metric Filters to extract data from logs (e.g., parse latency from a JSON log).
    - Creates CloudWatch Dashboards to visualize key operational and business metrics.
    - Sets up CloudWatch Alarms that trigger on critical thresholds (e.g., p99 latency > 2s, Error Rate > 1%, a sudden drop in Groundedness Score).
- sns.tf: Defines an Amazon SNS Topic that alarms will publish to.
- lambda.tf: Deploys the log_processing_lambda.py function and its IAM role.
- kinesis.tf: Defines the Kinesis Data Firehose stream that collects logs from CloudWatch and delivers them to the processing Lambda and finally to S3.
- s3.tf: Defines the S3 bucket for long-term, queryable storage of processed logs.

Integration Test

- test_monitoring_integration.py:
- An integration test that simulates the end-to-end flow.
- 1. Action: The test makes a call to the staging inference API endpoint.
- 2. Wait & Poll: The test then waits for a few seconds and polls the target S3 log bucket.
- 3. Assert: It asserts that a corresponding log file has been created and contains the correct trace_id and data from the initial API call. This validates that the entire logging stream (CloudWatch -> Kinesis -> Lambda -> S3) is functioning correctly.

CI/CD Workflow (GitHub Actions)

- deploy_monitoring_infra.yml:
- Trigger: On push to main affecting the /monitoring/ directory.
- Jobs:
    1. Lint & Unit Test: Runs static analysis and pytest unit tests on the log_processing_lambda.py script.
    2. Deploy Infrastructure: Runs terraform apply to deploy changes to the CloudWatch dashboards, alarms, Kinesis stream, and the processing Lambda.
    3. Run Integration Test: Executes the test_monitoring_integration.py script to validate the end-to-end logging flow in the staging environment.

B. Architecture Diagram: Monitoring and Observability Pipeline

This diagram illustrates the continuous, real-time flow of observability data from production services to our analysis and storage layers.


9. Testing in Production: Validating Business Impact

Once a new model or feature has passed all offline and staging evaluations, it is ready for the ultimate test: exposure to live user traffic. Testing in production is not about finding bugs; it is about measuring business impact. The goal is to answer the question: “Does this change actually improve the key performance indicators we care about?” We will employ a combination of A/B testing and Shadow Testing to make data-driven decisions about rollouts.

A. Artifacts to Be Implemented

Artifact Type

Description & Plan

Python Scripts (Analysis)

ab_test_analysis.py:
- A Python script (or a Jupyter Notebook) that automates the statistical analysis of A/B test results.
- Loads experiment data from the data warehouse (e.g., Amazon Redshift). This data will contain user events tagged with the experiment variant_id (control or challenger).
- Calculates primary and secondary KPIs for each variant (e.g., Conversion Rate, AOV, CTR).
- Performs Statistical Significance Testing (e.g., using a Chi-squared test for conversion rates or a T-test for AOV) to determine if the observed difference between variants is statistically significant or due to random chance.
- Generates a Report summarizing the results, including p-values and confidence intervals, and provides a clear recommendation: Promote Challenger, Rollback Challenger, or Continue Experiment.

Unit Tests (pytest)

- test_ab_test_analysis.py: Tests the statistical calculation logic within the analysis script. It uses a sample DataFrame with known statistical properties to assert that the script correctly calculates metrics like conversion rate and p-values. This ensures the analysis itself is bug-free.

Pipeline Orchestration

Not an automated pipeline. The A/B testing process is a business workflow orchestrated by the Product Manager and the ML team, not an automated DAG. The key “pipeline” is the flow of experiment data from the production application into the data warehouse, which is part of the existing data engineering infrastructure.

Infrastructure as Code (Terraform)

- api_gateway.tf (Update):
    - Canary Deployments: Modify the API Gateway configuration to support weighted routing. This allows us to direct a small percentage of traffic (e.g., 5%) to a new “challenger” version of our inference service while the majority continues to go to the “control” (current production) version. The traffic split will be managed via Terraform variables.
- ecs.tf / lambda.tf (Update):
    - Separate Deployments: The Terraform configuration will be structured to allow for the deployment of two distinct versions of the inference service (Control and Challenger) side-by-side. Each will be a separate ECS Service or Lambda Alias.
- feature_flags.tf (Optional but Recommended):
    - Defines configurations for a feature flagging service (e.g., AWS AppConfig, LaunchDarkly). This provides a more dynamic and safer way to manage traffic allocation and user segmentation for experiments without requiring a full infrastructure redeployment for every change.

Integration Test

- test_ab_routing.py:
- An integration test that validates the traffic splitting mechanism itself.
- 1. Action: The script makes 1,000 requests to the production API endpoint.
- 2. Collect: It inspects a custom header in the response that identifies which variant (control or challenger) served the request.
- 3. Assert: It asserts that the distribution of responses is within an expected tolerance of the configured traffic split (e.g., if traffic is split 90/10, the test asserts that the challenger served between 8% and 12% of the requests). This confirms the deployment and routing infrastructure is working as intended.

CI/CD Workflow (GitHub Actions)

- deploy_inference_service.yml (Update to the “Deploy to Production” job):
- The final step of the CD pipeline is modified. Instead of a “big bang” rollout, it now performs a Canary Release.
- 1. Deploy Challenger: The new container version is deployed as a “challenger” service.
- 2. Shift Traffic: The pipeline automatically runs a Terraform plan/apply to update the API Gateway’s weighted routing, shifting a small percentage of traffic (e.g., 5%) to the new challenger.
- 3. Monitor: The pipeline enters a monitoring phase. It queries CloudWatch for the challenger’s key operational metrics (error rate, latency).
- 4. Automated Rollback: If the challenger’s error rate or latency exceeds a critical threshold during this initial phase, the pipeline automatically rolls back by shifting 100% of traffic back to the control and triggers an alert.
- 5. Manual Promotion: If the canary deployment is stable, the A/B test begins. The full promotion of the challenger to 100% traffic is a manual step, performed by the team after the A/B test has concluded and the results have been analyzed.

B. The A/B Testing Workflow: From Candidate to Champion

This workflow describes the end-to-end process for safely deploying and validating a new model.

  1. Candidate Promotion: A new model (e.g., a fine-tuned embedding model) passes all offline evaluations and is registered in the SageMaker Model Registry. This makes it a “candidate” for production.

  2. Canary Release (Automated via CI/CD):

    • The CD pipeline deploys the candidate model to production infrastructure, creating a “challenger” version of the inference service.

    • It shifts a small amount of live traffic (e.g., 5%) to this challenger.

    • It performs an immediate health check. If the challenger is unhealthy (high errors/latency), it automatically rolls back.

  3. Experiment Execution (Manual Start/Stop):

    • If the canary is stable, the A/B test officially begins.

    • The team monitors the experiment’s health via CloudWatch and LangSmith dashboards.

    • The experiment runs for a pre-determined duration (e.g., one to two weeks) to collect enough data for statistical significance.

  4. Analysis & Decision:

    • At the end of the experiment, the ab_test_analysis.py script is run.

    • The team reviews the results and makes a data-driven decision:

      • Promote: If the challenger shows a statistically significant win on the primary business KPI, it is promoted. The CD pipeline is run again to shift 100% of traffic to the new version, which now becomes the new “control.”

      • Rollback: If the challenger is neutral or performs worse, it is rolled back, and 100% of traffic is returned to the original control. The learnings from the failed experiment are documented and used to inform the next iteration.

C. Architecture Diagram: A/B Testing in Production

This diagram illustrates how traffic is split to enable live experimentation.


10. The Foundation of Trust - Governance, Ethics, and Human-Centric Design

A technically proficient RAG system is only half the battle. To build a truly production-grade, trustworthy application, we must embed principles of governance, ethics, and human-centric design into every stage of the MLOps lifecycle. This is not a final checklist but a continuous commitment to ensuring our system is transparent, fair, secure, and aligned with user expectations.

A. Comprehensive Model Governance

Model governance provides the framework for control, auditability, and compliance, ensuring our RAG system operates responsibly within both internal policies and external regulations like GDPR.

Governance Component

Implementation Strategy for RAG Search

Reproducibility & Auditability

- Infrastructure: All AWS infrastructure is defined declaratively in Terraform and versioned in Git, providing a complete, auditable history of the environment.
- Data: Training datasets for the re-ranker and embedding models are versioned with DVC, ensuring any model can be precisely reproduced.
- Models: Every custom model is versioned and logged in the Amazon SageMaker Model Registry.
- Tracing: LangSmith provides an immutable, end-to-end trace for every single query, linking the user input, retrieved context, and final LLM output for full auditability.

Documentation & Transparency

- Model Cards: For our custom re-ranking model, we maintain a Model Card detailing its intended use, training data, evaluation metrics (including fairness checks), and known limitations.
- Prompt Versioning: All system prompts are treated as code, versioned in Git, and deployed through CI/CD, providing a clear history of how the LLM’s behavior has been guided over time.

Security & Access Control

- Secrets Management: All API keys (e.g., for LLM providers) are stored securely in AWS Secrets Manager.
- Least Privilege: Each service (Lambda, Fargate, SageMaker) runs with a fine-grained IAM Role granting only the permissions necessary for its specific task (e.g., the inference Lambda can read from OpenSearch but cannot write to it).
- Endpoint Security: The public-facing API Gateway is configured with authentication, rate limiting, and throttling to prevent abuse.

B. Responsible AI (RAI) Principles in Practice

RAI is about proactively identifying and mitigating potential harms, ensuring our system is fair, explainable, and respects user privacy.

RAI Principle

Implementation Strategy for RAG Search

Fairness & Bias Mitigation

- Data Analysis: We continuously analyze user interaction data to ensure our re-ranking model is not developing biases (e.g., unfairly penalizing products from smaller vendors). We monitor performance across different product categories to identify and address performance gaps.
- Bias in Generation: We use output guardrails to scan for and block responses that contain biased or stereotypical language.

Explainability & Interpretability (XAI)

- Attribution is Key: This is the most critical form of XAI for RAG. Every generated statement is linked directly back to the source product page or review, allowing users to verify the information. This is implemented via citations in the UI.
- Re-ranker Explainability: For our custom re-ranking model, we use SHAP (SHapley Additive exPlanations) during offline evaluation to understand which features (e.g., semantic score, popularity, stock level) are driving its ranking decisions.

Privacy Preservation

- PII Redaction: We implement an input guardrail using services like Amazon Comprehend to automatically detect and redact any Personally Identifiable Information from user queries before they are logged or processed.
- Right to be Forgotten: Our data architecture is designed to support GDPR requirements, with clear processes for deleting user data from all associated systems upon request.

Security (Adversarial Robustness)

- Prompt Injection Defense: Our primary defense is a strong system prompt that clearly instructs the LLM on its role and constraints. We also maintain a suite of adversarial prompts in our evaluation dataset to continuously test for vulnerabilities.
- Data Poisoning Defense: The Great Expectations validation step in our ingestion pipeline acts as a defense against data poisoning by flagging anomalous or malformed product data before it can be indexed.

C. The Human Element: Team Structure & User-Centric Design

Technology alone does not create a successful product. The human elements—how the team is structured and how the user experience is designed—are equally critical.

Human-Centric Aspect

Implementation Strategy

Team Structure & Collaboration

We operate as a cross-functional product pod, bringing together the Product Manager, ML/GenAI Engineer, Data Scientist, and Backend/Frontend Engineers. This structure eliminates silos and ensures tight alignment between business goals, AI development, and user experience. The ML/GenAI Engineer (my role) acts as the technical anchor, translating product requirements into a robust, end-to-end MLOps system.

User Experience (Defensive UX)

- Transparency: The UI clearly labels the conversational search as an “AI Assistant” to set user expectations.
- Graceful Failure: When the system has low confidence or cannot find a relevant answer, it is designed to respond transparently (e.g., “I couldn’t find a specific answer for that, but here are some products you might be interested in…”) rather than hallucinating.
- User Control & “Escape Hatches”: The user is always in control. A clear link to the traditional keyword search is always visible, and users can easily bypass the conversational experience if they choose.

User Feedback Loops

- Explicit Feedback: Simple “thumbs up/thumbs down” icons are present on every generated response, providing a direct, low-friction signal of quality.
- Implicit Feedback: Our monitoring pipeline tracks user actions post-response (e.g., clicks, add-to-carts, query reformulations) as powerful implicit signals of relevance and helpfulness. This feedback is the primary fuel for our Data Flywheel.


11. System Architecture, Performance, and Economics

The RAG-based search system is architected as a modern, cloud-native application on AWS. The design philosophy is centered around four interconnected loops, each representing a core operational pipeline:

  1. The Real-Time Inference Pipeline: The user-facing, low-latency engine that serves live queries.

  2. The Offline Data Ingestion Pipeline: The batch/streaming factory that builds and maintains the system’s knowledge base.

  3. The Continual Learning (Data Flywheel) Pipeline: The periodic, offline process that improves the system’s intelligence by learning from user behavior.

  4. The Continuous Monitoring & Observability Pipeline: The always-on “nervous system” that provides visibility into the health, cost, and quality of the entire application.

This modular, microservices-based architecture ensures scalability, resilience, and maintainability.

A. AWS System Architecture Diagram

The following diagram illustrates the complete, end-to-end system, showing the flow of data and the interaction between all major AWS components and services.


B. Sequence Diagram: The Anatomy of a Real-Time RAG Query

The following diagram details the synchronous, end-to-end flow of the inference pipeline, from the moment a user submits a query to when they start receiving a streamed response. The latency estimates represent a target for a well-optimized system, operating under normal load.

Latency Budget Breakdown

The total latency experienced by the user is the sum of the latencies of each step in the sequence. For a conversational application, the most critical metric is Time to First Byte (TTFB)—the time it takes for the user to start seeing the first words of the response.

Step

Operation

Estimated Latency (p99)

Notes

1-2

API Gateway & Network

10 - 50 ms

Initial network overhead and API Gateway processing.

3-4

Retrieval (OpenSearch)

50 - 150 ms

A complex hybrid (vector + keyword) query on a distributed database. Highly dependent on cluster size and index optimization.

5-6

Feature Fetch (Feature Store)

10 - 30 ms

A low-latency, key-value lookup for business features.

7-8

Re-ranking (SageMaker)

50 - 100 ms

Network hop plus inference time for the re-ranking model. Faster than the generator LLM but still a significant step.

9-10

Generation (Amazon Bedrock)

150 - 400 ms (TTFB)

The “thinking time” for the LLM before it generates the first token. This is often the largest single component of perceived latency.

11-12

Response Streaming & Network

10 - 50 ms

Network latency for the response stream to travel back to the user.

TOTAL (Time to First Byte)

~280 - 780 ms

The total perceived latency before the user starts reading the answer. Our primary optimization target is to keep this under 500ms.

TOTAL (Full Response)

1.5 - 3.0 seconds

This depends on the length of the answer. A 200-token response would add approximately 1-2 seconds of total streaming time after the first token.


C. Inference Pipeline: Bottlenecks & Performance Optimizations

Achieving a sub-500ms Time to First Byte (TTFB) in a distributed RAG system requires a relentless focus on performance. The entire inference pipeline is a series of network I/O and computation steps, and every millisecond counts. Based on the sequence diagram, we can identify four primary potential bottlenecks and a corresponding set of optimization strategies.

Bottleneck

Description & Impact on Performance

Optimization Strategies

1. The Retrieval Bottleneck

Description: The hybrid search query against Amazon OpenSearch is the most complex database operation in the pipeline. It involves both a k-NN vector search and a keyword (BM25) search, followed by the fusion of results. As the index size and query volume grow, this step can easily become the largest contributor to latency.

Impact: High retrieval latency directly delays the entire downstream process, pushing TTFB higher and degrading the user experience.

1. Index Tuning (HNSW Parameters): For the vector index, meticulously tune the HNSW algorithm parameters (M and ef_construction during indexing; ef_search at query time). This is a direct trade-off: higher values increase accuracy (recall) but also increase latency and memory usage. Find the optimal balance through offline evaluation.

2. Sharding & Replication: Horizontally scale the OpenSearch cluster by adding more nodes and distributing the index across them (sharding). This parallelizes the search, reducing latency. Add replica shards to increase query throughput.

3. Pre-filtering with Metadata: For queries that contain specific filters (e.g., “in the ‘Electronics’ category”), use OpenSearch’s powerful metadata filtering before the vector search. This dramatically reduces the search space, leading to a significant speedup.

2. The “Cold Start” & Concurrency Bottleneck

Description: This applies if we use AWS Lambda without proper configuration. A cold start can add seconds of latency. Even with a warm container (Lambda with Provisioned Concurrency or Fargate), if the application is not designed for concurrency, it can only handle one request at a time, leading to long wait queues under load.

Impact: Unpredictable, spiky latency (due to cold starts) and a low ceiling on throughput (queries per second) for the entire service.

1. Choose Fargate for Sustained Traffic: For any significant, consistent traffic, AWS Fargate is the superior choice as it eliminates cold starts entirely.

2. Use Provisioned Concurrency for Lambda: If using Lambda, always enable Provisioned Concurrency for the production environment to keep a pool of warm instances ready.

3. Implement Asynchronous I/O (asyncio): This is the single most important application-level optimization. The FastAPI application must be written using async/await for all network calls (to OpenSearch, SageMaker, Bedrock). This allows a single Fargate container to handle hundreds of concurrent requests by efficiently switching between them while waiting for I/O, dramatically increasing throughput.

3. The LLM Generation Bottleneck

Description: The LLM’s “thinking time” before it produces the first token (TTFB) is a significant and often irreducible part of the latency budget. Larger, more complex models have higher TTFB.

Impact: Directly adds to the perceived latency for the user. A slow TTFB makes the application feel sluggish, even if the retrieval was fast.

1. Model Selection: Choose the smallest, fastest model that meets your quality bar. For many queries, a model like Claude 3 Haiku may be sufficient and significantly faster than Opus. Consider a “router” pattern where simple queries go to a fast model and complex ones go to a slower, more powerful model.

2. Prompt Optimization: Keep the context provided to the LLM as concise as possible without losing critical information. Shorter prompts lead to faster processing. Techniques like Contextual Compression can help here.

3. Response Streaming: Always stream the response. This is a UX optimization that has a massive impact on perceived performance. The user starts reading the beginning of the answer while the rest is still being generated, masking the total generation time.

4. The “Death by a Thousand Hops” Bottleneck

Description: The total latency is the sum of multiple network calls: API Gateway -> Fargate -> OpenSearch -> Feature Store -> SageMaker -> Bedrock. While each individual hop might be fast, they add up. Network latency between services can become a significant factor.

Impact: A slow, creeping increase in overall latency that is hard to attribute to any single component.

1. Co-location & VPC Endpoints: Ensure all AWS services are deployed in the same AWS Region and Availability Zone. Use VPC Endpoints for all AWS service calls (S3, SageMaker, Bedrock). This keeps traffic on the private AWS backbone network, reducing latency and improving security compared to going over the public internet.

2. Aggressive Caching (Amazon ElastiCache): Implement caching for repeatable, high-cost operations.
     - Retrieval Cache: Cache the results of common search queries for a short TTL (e.g., 5 minutes).
     - HyDE Cache: The output of the Hypothetical Document Embeddings (HyDE) step is a great candidate for caching, as the same query will always produce the same hypothetical document.

3. Connection Pooling: For services like databases, maintain a warm pool of connections within the Fargate service to avoid the overhead of establishing a new connection for every request.


D. Estimated Monthly Costs for the RAG System

To ensure the economic viability of the project, a detailed cost estimation was performed. The following model breaks down the monthly operational costs across the three core pipelines: Ingestion, Fine-tuning, and the real-time Inference service.

Core Assumptions:

  • Product Catalog: 100,000 Products (requiring ~1.5 GB of vector index storage).

  • User Traffic: 5 Million search queries per month (remains high, indicating an active user base).

  • Data Ingestion: One-time batch ingestion, with small weekly incremental updates (~1,000 products/week).

  • Model Fine-tuning: Two times per year.

Pipeline Component

AWS Service(s)

Detailed Cost Calculation & Rationale

Estimated Cost (USD)

Data Ingestion Pipeline (Periodic)

AWS Lambda
Amazon Bedrock

This is now a small, recurring cost for weekly updates. We’ll assume ~4,000 products are updated per month.
- Lambda Compute: Low-volume ETL jobs can run on Lambda instead of Glue. ~4,000 products/month * ~10s/product * ($0.0000167/ms) = ~$25
- Bedrock (Embeddings): Token cost for embedding updated products.
- 4k products * 2k tokens/product * ($0.0001 / 1k tokens) = ~$1.
The cost is now minimal.

$30 - $50
(Per month)

Continual Learning Pipeline (Periodic)

Amazon MWAA (Airflow)

The cost of the Airflow infrastructure to be ready for the bi-annual fine-tuning runs and potentially other orchestration tasks. The expensive SageMaker training job cost is no longer a recurring monthly expense.
- Small MWAA environment = ~$130/month

$130 - $180
(Per month)

Real-Time Inference Pipeline

Amazon OpenSearch
Amazon SageMaker Endpoint
Amazon Bedrock
API Gateway + Fargate

OpenSearch Service: Significant cost reduction. A 1.5 GB index can be served with high availability and throughput by a much smaller cluster.
- 2 x r6g.large.search instances for HA = ~$475

SageMaker Endpoint (Re-ranker): No change. Still need a 2-node HA endpoint for the re-ranking model.
- 2 x ml.c5.large instances = ~$250

Bedrock (Generation): This is now the dominant cost. Driven by query volume, which remains high.
- 5M queries * 2.2k avg tokens/query * ($0.34/1M avg tokens) = ~$3,740

API Gateway + Fargate: No change. Driven by 5M requests.
- API Gateway + Fargate Compute = ~$95

$4,500 - $5,500
(Per month)

Shared Costs

S3 Storage
CloudWatch

S3: Storage cost is drastically reduced due to smaller catalog size.
- ~20 GB total storage * $0.023/GB-month = ~$1 (rounded to ~$20 for logs)

CloudWatch: Log volume is driven by high query traffic, so this cost remains similar.
- ~500 GB log ingestion + metrics/alarms = ~$300

$320 - $400
(Per month)

Total Estimated Monthly Cost

-

The recurring monthly cost is now driven primarily by the LLM API usage and the 24/7 inference compute.

~$5,000 - $6,130

Non-Recurring Costs

It is critical to budget for the following periodic and one-time costs separately from the monthly operational expenses:

  • One-Time Full Ingestion: The initial, one-time cost to ingest the entire 100,000 product catalog would be approximately $120 - $150.

  • Bi-Annual Re-indexing & Fine-tuning: Twice a year, the full ingestion pipeline will need to be re-run after the embedding model is fine-tuned. This should be treated as a project-based cost of ~$12,000 - $15,000, incurred twice per year.

Key Financial Learnings

This revised, more realistic model provides critical insights for a mid-sized business:

  1. The Cost Center Shifts to the LLM: With a smaller knowledge base, the dominant operational cost is no longer the vector database compute. It is now the per-query token cost of the generator LLM (Amazon Bedrock). This places a high premium on prompt optimization and using the most cost-effective model (e.g., Claude 3 Haiku) that meets the quality bar.

  2. Smart Ingestion is Key: An event-driven, incremental update strategy for the knowledge base is dramatically cheaper than frequent full refreshes.

  3. A/B Testing is Financially Justified: With a monthly cost of ~$5,000, even a small 1-2% improvement in conversion rate resulting from an A/B test can provide a clear and immediate positive ROI. This justifies the engineering effort required for experimentation.

  4. The Total Cost is Now Viable: A total recurring cost in the range of $5,000 - $6,000 per month is a much more justifiable expense for a mid-sized business, especially given the potential for significant conversion and AOV lift.


Implementation: Data Ingestion and Indexing Pipeline

Python Scripts (Core Logic)

The core logic is broken down into modular scripts, each responsible for a specific stage of the pipeline. These scripts would be packaged and deployed as AWS Lambda functions or run within an AWS Glue job.

ingestion_pipeline/src/data_loader.py

import boto3
import json
import logging

logger = logging.getLogger(__name__)

def load_product_data(bucket: str, key: str) -> dict:
    """Loads a single product's JSON data from S3."""
    try:
        s3 = boto3.client('s3')
        response = s3.get_object(Bucket=bucket, Key=key)
        content = response['Body'].read().decode('utf-8')
        return json.loads(content)
    except Exception as e:
        logger.error(f"Error loading data from s3://{bucket}/{key}: {e}")
        raise

# In a real scenario, this would list all new/updated product files.
# For the Step Function, the input `key` will be provided in the event payload.

ingestion_pipeline/src/text_processor.py

import logging
from typing import List, Dict
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.chat_models import BedrockChat
from langchain_core.messages import HumanMessage

# Assume Bedrock client is initialized globally or passed in
bedrock_client = boto3.client('bedrock-runtime')
logger = logging.getLogger(__name__)

def clean_text(text: str) -> str:
    """Basic text cleaning (e.g., remove HTML tags)."""
    # In a real implementation, use BeautifulSoup or a similar library.
    # For brevity, we'll use a simple placeholder.
    import re
    return re.sub(r'<[^>]+>', '', text)

def chunk_text(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
    """Splits text into semantically coherent chunks."""
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
    )
    return text_splitter.split_text(text)

def get_image_caption(image_bytes: bytes, bedrock_model_id: str = "anthropic.claude-3-sonnet-20240229-v1:0") -> str:
    """Generates a descriptive caption for an image using a VLM."""
    # This would be a more complex implementation involving base64 encoding and
    # constructing the correct payload for the multimodal Bedrock model.
    # For brevity, this is a conceptual placeholder.
    # llm = BedrockChat(model_id=bedrock_model_id, client=bedrock_client)
    # message = HumanMessage(content=[...]) # construct multimodal message
    # response = llm.invoke([message])
    # return response.content
    logger.info(f"Generating caption for image of size {len(image_bytes)} bytes.")
    return "A high-quality photo of a product, showing its key features." # Placeholder

ingestion_pipeline/src/embedding_generator.py

import logging
from typing import List
from langchain_community.embeddings import BedrockEmbeddings

logger = logging.getLogger(__name__)

def generate_text_embeddings(chunks: List[str]) -> List[List[float]]:
    """Generates embeddings for a list of text chunks."""
    embeddings = BedrockEmbeddings(
        model_id="amazon.titan-embed-text-v2:0",
    )
    logger.info(f"Generating text embeddings for {len(chunks)} chunks.")
    return embeddings.embed_documents(chunks)

def generate_image_embedding(image_bytes: bytes) -> List[float]:
    """Generates an embedding for a single image."""
    embeddings = BedrockEmbeddings(
        model_id="amazon.titan-embed-image-v1",
    )
    logger.info(f"Generating image embedding for image of size {len(image_bytes)} bytes.")
    # The actual implementation would involve base64 encoding the image
    return embeddings.embed_query("placeholder for image bytes") # Placeholder

ingestion_pipeline/src/opensearch_indexer.py

import logging
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth
from opensearchpy.helpers import bulk

logger = logging.getLogger(__name__)

def get_opensearch_client(host: str, region: str):
    """Initializes and returns an OpenSearch client."""
    credentials = boto3.Session().get_credentials()
    auth = AWSV4SignerAuth(credentials, region, 'aoss')
    client = OpenSearch(
        hosts=[{'host': host, 'port': 443}],
        http_auth=auth,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        pool_maxsize=20
    )
    return client

def index_documents(client: OpenSearch, index_name: str, documents: List[dict]):
    """Bulk indexes documents into OpenSearch."""
    logger.info(f"Indexing {len(documents)} documents into index '{index_name}'.")
    success, failed = bulk(client, documents, index=index_name)
    if failed:
        logger.error(f"Failed to index {len(failed)} documents.")
    return success, failed

Unit Tests

ingestion_pipeline/tests/unit/test_text_processor.py

import pytest
from src import text_processor

def test_chunk_text_splits_correctly():
    """Ensures text is split into multiple chunks."""
    long_text = "This is a sentence. " * 200
    chunks = text_processor.chunk_text(long_text, chunk_size=100, chunk_overlap=20)
    assert len(chunks) > 1
    # Check if overlap is working
    assert chunks[1].startswith(chunks[0][-50:])

def test_get_image_caption_mocked(mocker):
    """Tests the image captioning function with a mocked Bedrock client."""
    # We mock the Bedrock client to avoid making a real API call
    mock_bedrock = mocker.patch('boto3.client')
    # conceptually, you would mock the invoke method's return value
    
    caption = text_processor.get_image_caption(b"fake_image_bytes")
    assert isinstance(caption, str)
    assert len(caption) > 0

Pipeline Orchestration (AWS Step Functions)

ingestion_pipeline/statemachine/ingestion_statemachine.asl.json

{
  "Comment": "RAG Data Ingestion Pipeline for a single product.",
  "StartAt": "LoadProductData",
  "States": {
    "LoadProductData": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "FunctionName": "${LoadDataLambdaArn}",
        "Payload.$": "$"
      },
      "Next": "ProcessAndChunkText",
      "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "NotifyFailure"
      }]
    },
    "ProcessAndChunkText": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "InputPath": "$.Payload",
      "ResultPath": "$.ProcessedText",
      "Next": "GenerateEmbeddings"
    },
    "GenerateEmbeddings": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "InputPath": "$.ProcessedText",
      "ResultPath": "$.Embeddings",
      "Next": "IndexInOpenSearch"
    },
    "IndexInOpenSearch": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "InputPath": "$.Embeddings",
      "End": true
    },
    "NotifyFailure": {
      "Type": "Task",
      "Resource": "arn:aws:states:::sns:publish",
      "Parameters": {
        "TopicArn": "${SnsTopicArn}",
        "Message": {
          "Input.$": "$",
          "Message": "RAG Ingestion Pipeline Failed!"
        }
      },
      "End": true
    }
  }
}

Infrastructure as Code (Terraform)

ingestion_pipeline/infra/main.tf

# main.tf - Defines the core infrastructure for the ingestion pipeline

# Variable for environment (e.g., "staging", "prod")
variable "env" {
  type    = string
  default = "staging"
}

# --- S3 Buckets ---
resource "aws_s3_bucket" "raw_data" {
  bucket = "rag-product-data-raw-${var.env}"
}

resource "aws_s3_bucket" "processed_data" {
  bucket = "rag-product-data-processed-${var.env}"
}

# --- IAM Roles ---
# A comprehensive IAM role for the Lambda functions and Step Function
# In a real setup, you would create separate, least-privilege roles for each component.
resource "aws_iam_role" "ingestion_pipeline_role" {
  name = "IngestionPipelineRole-${var.env}"
  # Assume role policy allows lambda and states services
  assume_role_policy = "..." # Placeholder for IAM trust policy JSON
}
# ... Attach policies for S3, Bedrock, OpenSearch, CloudWatch ...

# --- Lambda Functions ---
# Placeholder for one of the Lambda functions
resource "aws_lambda_function" "load_data_lambda" {
  function_name = "LoadDataLambda-${var.env}"
  role          = aws_iam_role.ingestion_pipeline_role.arn
  handler       = "data_loader.handler"
  runtime       = "python3.11"
  # ... code packaging configuration ...
}

# ... Define other Lambda functions for each step ...

# --- Step Functions State Machine ---
resource "aws_sfn_state_machine" "ingestion_sfn" {
  name     = "RAG-Ingestion-Pipeline-${var.env}"
  role_arn = aws_iam_role.ingestion_pipeline_role.arn

  definition = templatefile("${path.module}/../statemachine/ingestion_statemachine.asl.json", {
    LoadDataLambdaArn = aws_lambda_function.load_data_lambda.arn,
    # ... pass other Lambda ARNs and SNS Topic ARN ...
  })
}

# --- OpenSearch Serverless ---
resource "aws_opensearchserverless_collection" "vector_db" {
  name = "rag-vector-db-${var.env}"
  type = "VECTORSEARCH"
}

# --- EventBridge Scheduler for Batch ---
resource "aws_cloudwatch_event_rule" "nightly_trigger" {
  name                = "NightlyIngestionTrigger-${var.env}"
  schedule_expression = "cron(0 2 * * ? *)" # 2 AM UTC every night
}

resource "aws_cloudwatch_event_target" "step_function_target" {
  rule      = aws_cloudwatch_event_rule.nightly_trigger.name
  arn       = aws_sfn_state_machine.ingestion_sfn.arn
  role_arn  = aws_iam_role.ingestion_pipeline_role.arn # A specific role for EventBridge is better
}

Integration Test

ingestion_pipeline/tests/integration/test_ingestion_pipeline.py

import boto3
import time
import pytest
import os

# Assume env variables are set for staging resources (e.g., STATE_MACHINE_ARN)
STATE_MACHINE_ARN = os.environ["STATE_MACHINE_ARN"]
RAW_BUCKET = os.environ["RAW_BUCKET"]
OS_HOST = os.environ["OS_HOST"]

@pytest.mark.integration
def test_full_pipeline_run():
    s3_client = boto3.client("s3")
    sfn_client = boto3.client("stepfunctions")
    
    # 1. ARRANGE: Upload a sample product file to the raw S3 bucket
    product_id = "test-product-123"
    s3_key = f"products/{product_id}.json"
    sample_data = {"product_id": product_id, "description": "This is a test."}
    s3_client.put_object(Bucket=RAW_BUCKET, Key=s3_key, Body=json.dumps(sample_data))

    # 2. ACT: Trigger the Step Function and wait for completion
    response = sfn_client.start_execution(
        stateMachineArn=STATE_MACHINE_ARN,
        input=json.dumps({"bucket": RAW_BUCKET, "key": s3_key})
    )
    execution_arn = response['executionArn']
    
    while True:
        status_response = sfn_client.describe_execution(executionArn=execution_arn)
        status = status_response['status']
        if status in ['SUCCEEDED', 'FAILED', 'TIMED_OUT', 'ABORTED']:
            break
        time.sleep(5)
    
    # 3. ASSERT
    assert status == 'SUCCEEDED'
    
    # Assert that the data was indexed correctly in OpenSearch
    # In a real test, you'd use the opensearch_indexer client to query the index
    # os_client = opensearch_indexer.get_opensearch_client(...)
    # indexed_doc = os_client.get(index=..., id=...)
    # assert indexed_doc['found'] == True
    
    # 4. CLEANUP (Optional but recommended)
    s3_client.delete_object(Bucket=RAW_BUCKET, Key=s3_key)
    # os_client.delete(index=..., id=...)

CI/CD GitHub Actions Workflow

.github/workflows/deploy_ingestion_pipeline.yml

name: Deploy Data Ingestion Pipeline

on:
  push:
    branches:
      - main
    paths:
      - 'ingestion_pipeline/**'

jobs:
  lint-and-test:
    name: Lint & Unit Test
    runs-on: ubuntu-latest
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: '3.11'
      - name: Install dependencies
        run: pip install -r ingestion_pipeline/requirements.txt
      - name: Run unit tests
        run: pytest ingestion_pipeline/tests/unit/

  deploy-to-staging:
    name: Deploy to Staging
    runs-on: ubuntu-latest
    needs: lint-and-test
    environment: staging
    permissions:
      id-token: write
      contents: read
    steps:
      - name: Checkout code
        uses: actions/checkout@v4
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.STAGING_AWS_ROLE_ARN }}
          aws-region: ${{ secrets.AWS_REGION }}
      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v2
      - name: Terraform Apply
        run: |
          cd ingestion_pipeline/infra
          terraform init
          terraform apply -auto-approve -var="env=staging"

  run-integration-test:
    name: Run Integration Tests on Staging
    runs-on: ubuntu-latest
    needs: deploy-to-staging
    environment: staging
    steps:
      # ... (similar setup to deploy job) ...
      - name: Run integration tests
        env:
          STATE_MACHINE_ARN: ${{ secrets.STAGING_SFN_ARN }}
          RAW_BUCKET: ${{ secrets.STAGING_RAW_BUCKET }}
          OS_HOST: ${{ secrets.STAGING_OS_HOST }}
        run: pytest ingestion_pipeline/tests/integration/

Implementation: Inference Pipeline

Here are the complete artifacts required to build, test, deploy, and automate our low-latency, real-time RAG inference service.

Python Scripts (Core Logic)

The application is structured into modular components orchestrated by main.py.

inference_service/src/main.py

import os
import logging
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from typing import Optional, List

from . import orchestrator
from .config import settings
from .instrumentation import configure_logging

# Configure logging and LangSmith tracing on startup
configure_logging()
# NOTE: LangSmith tracing is configured via environment variables like
# LANGCHAIN_TRACING_V2, LANGCHAIN_API_KEY, etc.

logger = logging.getLogger(__name__)
app = FastAPI()

class SearchRequest(BaseModel):
    query: str
    user_id: Optional[str] = None
    # Add other potential fields like image_url for multimodal search

@app.on_event("startup")
async def startup_event():
    """Initialize resources on startup."""
    # This is a good place to initialize clients that can be reused,
    # like the OpenSearch and Bedrock clients, to leverage connection pooling.
    app.state.orchestrator = await orchestrator.RAGOrchestrator.create(settings)
    logger.info("Application startup complete. RAG Orchestrator initialized.")

@app.post("/search")
async def search(request: SearchRequest, http_request: Request):
    """
    Main endpoint for RAG-based search.
    Streams a response back to the client.
    """
    try:
        if not request.query:
            raise HTTPException(status_code=400, detail="Query cannot be empty.")
        
        rag_orchestrator = http_request.app.state.orchestrator
        
        async def stream_generator():
            async for chunk in rag_orchestrator.stream_rag_response(request.query, request.user_id):
                yield chunk
        
        return StreamingResponse(stream_generator(), media_type="text/plain")

    except Exception as e:
        logger.exception(f"An error occurred during search for query: '{request.query}'")
        raise HTTPException(status_code=500, detail="An internal error occurred.")

@app.get("/health")
def health_check():
    """Simple health check endpoint."""
    return {"status": "ok"}

inference_service/src/orchestrator.py

import asyncio
import logging
from typing import AsyncGenerator
from langsmith import traceable

from . import retriever, reranker, generator, guardrails, query_transformer
from .config import Settings

logger = logging.getLogger(__name__)

class RAGOrchestrator:
    """Orchestrates the end-to-end RAG pipeline asynchronously."""

    def __init__(self, settings: Settings, retriever_client, reranker_client, generator_client, transformer_client):
        self.settings = settings
        self.retriever = retriever_client
        self.reranker = reranker_client
        self.generator = generator_client
        self.transformer = transformer_client

    @classmethod
    async def create(cls, settings: Settings):
        """Asynchronously create an instance of the orchestrator."""
        # Initialize clients for dependencies
        retriever_client = retriever.HybridRetriever(settings.opensearch_host)
        reranker_client = reranker.SageMakerReranker(settings.reranker_endpoint_name)
        generator_client = generator.BedrockGenerator(settings.generator_model_id)
        transformer_client = query_transformer.QueryTransformer(settings.hyde_model_id, settings.redis_host)
        return cls(settings, retriever_client, reranker_client, generator_client, transformer_client)

    @traceable(name="stream_rag_response")
    async def stream_rag_response(self, query: str, user_id: str) -> AsyncGenerator[str, None]:
        """Full asynchronous RAG pipeline with streaming."""
        
        # 1. Input Guardrails & Transformation (can be run concurrently)
        guarded_query_task = guardrails.apply_input_guardrails(query)
        transformed_query_task = self.transformer.transform_query(query)
        
        guarded_query, transformed_query = await asyncio.gather(
            guarded_query_task, transformed_query_task
        )
        
        # 2. Hybrid Retrieval
        retrieved_docs = await self.retriever.retrieve(transformed_query, top_k=50)
        
        # 3. Contextual Re-ranking
        reranked_docs = await self.reranker.rerank(guarded_query, retrieved_docs, user_id, top_k=5)
        
        # 4. Prompt Construction and Generation
        final_prompt = self.generator.construct_prompt(guarded_query, reranked_docs)
        
        # 5. Streaming Generation and Output Guardrails
        token_stream = self.generator.stream_response(final_prompt)
        async for token in guardrails.apply_output_guardrails(token_stream):
            yield token

Unit Tests

inference_service/tests/unit/test_orchestrator.py

import pytest
from unittest.mock import AsyncMock

from src.orchestrator import RAGOrchestrator

@pytest.mark.asyncio
async def test_orchestrator_full_flow(mocker):
    """Tests the full orchestration flow with mocked dependencies."""
    # ARRANGE: Mock all external clients and their async methods
    mock_retriever = AsyncMock()
    mock_reranker = AsyncMock()
    mock_generator = AsyncMock()
    mock_transformer = AsyncMock()

    mock_retriever.retrieve.return_value = [{"page_content": "doc1"}]
    mock_reranker.rerank.return_value = [{"page_content": "reranked_doc1"}]
    mock_generator.stream_response.return_value = (token for token in ["This", " is", " a", " test."]) # Async generator mock
    mock_transformer.transform_query.return_value = "transformed query"
    
    mocker.patch('src.guardrails.apply_input_guardrails', return_value="safe query")
    mocker.patch('src.guardrails.apply_output_guardrails', side_effect=lambda x: x) # Pass-through mock

    orchestrator_instance = RAGOrchestrator(
        settings=mocker.Mock(),
        retriever_client=mock_retriever,
        reranker_client=mock_reranker,
        generator_client=mock_generator,
        transformer_client=mock_transformer
    )

    # ACT: Run the orchestrator
    query = "test query"
    result_stream = orchestrator_instance.stream_rag_response(query, "user123")
    result = "".join([token async for token in result_stream])

    # ASSERT: Verify that all components were called correctly
    mock_transformer.transform_query.assert_awaited_once_with(query)
    mock_retriever.retrieve.assert_awaited_once_with("transformed query", top_k=50)
    mock_reranker.rerank.assert_awaited_once()
    mock_generator.construct_prompt.assert_called_once()
    mock_generator.stream_response.assert_called_once()
    assert result == "This is a test."

Infrastructure as Code (Terraform)

inference_service/infra/ecs.tf

# Defines the AWS Fargate service for our FastAPI application

resource "aws_ecs_cluster" "main" {
  name = "rag-inference-cluster-${var.env}"
}

resource "aws_ecs_task_definition" "api" {
  family                   = "rag-api-task"
  network_mode             = "awsvpc"
  requires_compatibilities = ["FARGATE"]
  cpu                      = 1024 # 1 vCPU
  memory                   = 2048 # 2 GB

  # ... (container definition pointing to the ECR image, port mappings, etc.) ...
  
  # IAM role for the task to access Bedrock, SageMaker, OpenSearch
  task_role_arn            = aws_iam_role.inference_task_role.arn
  execution_role_arn       = aws_iam_role.ecs_execution_role.arn
}

resource "aws_ecs_service" "main" {
  name            = "rag-inference-service-${var.env}"
  cluster         = aws_ecs_cluster.main.id
  task_definition = aws_ecs_task_definition.api.arn
  desired_count   = 2 # Start with 2 tasks for high availability
  launch_type     = "FARGATE"
  
  # ... (network configuration, load balancer attachment) ...
}

# ... (Autoscaling configuration based on CPU or request count) ...

Integration Test

inference_service/tests/integration/test_api.py

import pytest
import requests
import os

# API_ENDPOINT is the URL of the deployed staging service
API_ENDPOINT = os.environ["STAGING_API_ENDPOINT"]

@pytest.mark.integration
def test_search_endpoint_returns_success():
    """Tests that the deployed API returns a successful response."""
    response = requests.post(
        f"{API_ENDPOINT}/search",
        json={"query": "what are the best hiking boots?"},
        stream=True
    )
    assert response.status_code == 200
    
    # Assert that we get some streamed content back
    content = ""
    for chunk in response.iter_content(chunk_size=None):
        content += chunk.decode('utf-8')
    
    assert len(content) > 0

inference_service/tests/load/locustfile.py

from locust import HttpUser, task, between

class RAGUser(HttpUser):
    wait_time = between(1, 5)  # Simulate user think time

    @task
    def search_query(self):
        headers = {"Content-Type": "application/json"}
        payload = {
            "query": "waterproof trail running shoes for wide feet",
            "user_id": f"locust_user_{self.environment.runner.user_count}"
        }
        self.client.post("/search", json=payload, headers=headers, name="/search")

CI/CD GitHub Actions Workflow

.github/workflows/deploy_inference_service.yml

name: Deploy RAG Inference Service

on:
  push:
    branches:
      - main
    paths:
      - 'inference_service/**'

jobs:
  # ... (lint-and-test job as before) ...
  
  build-and-push-image:
    name: Build & Push Docker Image
    runs-on: ubuntu-latest
    needs: lint-and-test
    steps:
      # ... (checkout code) ...
      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        # ...
      - name: Login to Amazon ECR
        uses: aws-actions/amazon-ecr-login@v2
      - name: Build, tag, and push image to Amazon ECR
        run: |
          docker build -t ${{ secrets.ECR_REPOSITORY_URI }}:${{ github.sha }} ./inference_service
          docker push ${{ secrets.ECR_REPOSITORY_URI }}:${{ github.sha }}

  deploy-to-staging:
    name: Deploy to Staging
    runs-on: ubuntu-latest
    needs: build-and-push-image
    environment: staging
    steps:
      # ... (checkout code, configure AWS creds, setup Terraform) ...
      - name: Terraform Apply Staging
        run: |
          cd inference_service/infra
          terraform init
          # Pass the new image tag to the deployment
          terraform apply -auto-approve -var="env=staging" -var="image_tag=${{ github.sha }}"

  run-staging-tests:
    name: Run Integration & Load Tests
    runs-on: ubuntu-latest
    needs: deploy-to-staging
    environment: staging
    steps:
      # ... (setup, install dependencies) ...
      - name: Run integration tests
        env:
          STAGING_API_ENDPOINT: ${{ secrets.STAGING_API_ENDPOINT }}
        run: pytest inference_service/tests/integration/
      - name: Run load tests
        run: locust -f inference_service/tests/load/locustfile.py --host ${{ secrets.STAGING_API_ENDPOINT }} --headless -u 10 -r 2 --run-time 1m

  # ... (Manual approval gate and deploy-to-production jobs as planned) ...

Implementation: The Monitoring and Observability Pipeline

This is not a traditional batch pipeline but a continuous, event-driven streaming architecture. The artifacts below define the instrumentation, infrastructure, and automation required to collect, process, and act on observability data in real-time.

Python Scripts (Core Logic)

The core logic is broken down into modular scripts, each responsible for a specific stage of the pipeline. These scripts would be packaged and deployed as AWS Lambda functions or run within an AWS Glue job.

monitoring/src/instrumentation_lib.py

(This library would be packaged and included as a dependency in the Inference Service)

import logging
import json
import os
import boto3
from uuid import uuid4

# Use a custom JSON formatter for structured logging
class JsonFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            "timestamp": self.formatTime(record, self.datefmt),
            "level": record.levelname,
            "message": record.getMessage(),
            "trace_id": getattr(record, "trace_id", "N/A"),
            "service": "RAGInferenceService"
        }
        # Add exception info if it exists
        if record.exc_info:
            log_record['exception'] = self.formatException(record.exc_info)
        return json.dumps(log_record)

def configure_logging():
    """Configures root logger for structured JSON logging."""
    # Remove any existing handlers
    for handler in logging.root.handlers[:]:
        logging.root.removeHandler(handler)
        
    handler = logging.StreamHandler()
    handler.setFormatter(JsonFormatter())
    logging.basicConfig(level=logging.INFO, handlers=[handler])

def get_trace_id() -> str:
    """Generates a unique trace ID."""
    return str(uuid4())

def emit_cloudwatch_metric(metric_name: str, value: float, unit: str = 'Milliseconds'):
    """Emits a custom metric to CloudWatch."""
    try:
        cloudwatch = boto3.client('cloudwatch')
        cloudwatch.put_metric_data(
            Namespace='RAGApplication',
            MetricData=[
                {
                    'MetricName': metric_name,
                    'Value': value,
                    'Unit': unit,
                    'Dimensions': [
                        {
                            'Name': 'Service',
                            'Value': 'RAGInferenceService'
                        }
                    ]
                },
            ]
        )
    except Exception as e:
        # Log error but don't fail the main application path
        logging.error(f"Failed to emit CloudWatch metric '{metric_name}': {e}")

monitoring/src/log_processing_lambda.py

import json
import base64
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def handler(event, context):
    """
    Processes logs from Kinesis Firehose, enriches them,
    and returns them for storage in S3.
    """
    output_records = []

    for record in event['records']:
        try:
            # Decode the payload from Firehose
            payload_decoded = base64.b64decode(record['data']).decode('utf-8')
            log_data = json.loads(payload_decoded)

            # 1. PARSE: The data is already structured JSON.
            # 2. ENRICH: Add additional metadata.
            #    (e.g., lookup user details from a DB using log_data['user_id'])
            log_data['processed_by'] = context.function_name
            
            # 3. AGGREGATE/ANALYZE (Example):
            # In a more complex scenario, you could send a sample of responses
            # to another service (e.g., an LLM-as-a-judge) here to calculate a
            # groundedness score before archiving.
            
            processed_payload = json.dumps(log_data).encode('utf-8')
            
            output_records.append({
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(processed_payload).decode('utf-8')
            })

        except Exception as e:
            logger.error(f"Failed to process record {record['recordId']}: {e}")
            output_records.append({
                'recordId': record['recordId'],
                'result': 'ProcessingFailed',
                'data': record['data'] # Return original data on failure
            })

    return {'records': output_records}

Unit Tests

monitoring/tests/unit/test_log_processing_lambda.py

import base64
import json
from src import log_processing_lambda

def test_lambda_handler_processes_records():
    """Tests that the Lambda handler correctly processes a Kinesis Firehose event."""
    # ARRANGE: Create a sample Kinesis Firehose event
    log_event = {
        "timestamp": "2025-08-08T10:00:00Z",
        "level": "INFO",
        "message": "Search successful",
        "trace_id": "123-abc"
    }
    event_data = base64.b64encode(json.dumps(log_event).encode('utf-8')).decode('utf-8')
    
    kinesis_event = {
        'records': [{
            'recordId': '4964251234',
            'data': event_data
        }]
    }

    # ACT: Call the handler
    result = log_processing_lambda.handler(kinesis_event, {})

    # ASSERT: Check the result structure and content
    assert len(result['records']) == 1
    record = result['records'][0]
    assert record['result'] == 'Ok'
    
    # Decode the processed data to verify enrichment
    processed_data_decoded = base64.b64decode(record['data']).decode('utf-8')
    processed_data_json = json.loads(processed_data_decoded)
    
    assert processed_data_json['trace_id'] == "123-abc"
    assert 'processed_by' in processed_data_json

Infrastructure as Code (Terraform)

monitoring/infra/main.tf

# main.tf - Defines the infrastructure for the observability pipeline

# --- CloudWatch Resources ---
resource "aws_cloudwatch_log_group" "inference_service_logs" {
  name              = "/aws/ecs/rag-inference-service-${var.env}"
  retention_in_days = 30
}

resource "aws_cloudwatch_dashboard" "rag_dashboard" {
  dashboard_name = "RAG-Observability-Dashboard-${var.env}"
  dashboard_body = jsonencode({
    # ... (Dashboard widget definitions for Latency, Cost, RAG Quality metrics) ...
  })
}

resource "aws_cloudwatch_metric_alarm" "p99_latency_alarm" {
  alarm_name          = "High-P99-Latency-Alarm-${var.env}"
  comparison_operator = "GreaterThanOrEqualToThreshold"
  evaluation_periods  = "1"
  metric_name         = "p99_latency" # Custom metric emitted by the app
  namespace           = "RAGApplication"
  period              = "60"
  statistic           = "Average"
  threshold           = "1500" # 1.5 seconds
  alarm_description   = "P99 latency for the RAG inference service is too high."
  alarm_actions       = [aws_sns_topic.alarms.arn]
}

# --- Kinesis Firehose for Log Streaming ---
resource "aws_kinesis_firehose_delivery_stream" "log_stream" {
  name        = "rag-log-stream-${var.env}"
  destination = "extended_s3"

  extended_s3_configuration {
    bucket_arn        = aws_s3_bucket.log_archive.arn
    role_arn          = aws_iam_role.firehose_role.arn
    
    # Route logs through our Lambda for processing
    processing_configuration {
      enabled = "true"
      processors {
        type = "Lambda"
        parameters {
          parameter_name  = "LambdaArn"
          parameter_value = aws_lambda_function.log_processor.arn
        }
      }
    }
  }
}

# Subscription to send logs from CloudWatch to Kinesis
resource "aws_cloudwatch_log_subscription_filter" "log_subscription" {
  name            = "KinesisSubscriptionFilter-${var.env}"
  log_group_name  = aws_cloudwatch_log_group.inference_service_logs.name
  filter_pattern  = "" # Send all logs
  destination_arn = aws_kinesis_firehose_delivery_stream.log_stream.arn
  role_arn        = aws_iam_role.cloudwatch_to_firehose_role.arn
}

# --- Supporting Resources (S3, Lambda, IAM, SNS) ---
resource "aws_s3_bucket" "log_archive" {
  bucket = "rag-log-archive-${var.env}"
}

resource "aws_lambda_function" "log_processor" {
  function_name = "LogProcessorLambda-${var.env}"
  # ... (configuration for the log processing lambda) ...
}

resource "aws_sns_topic" "alarms" {
  name = "RAG-Alarms-Topic-${var.env}"
}

Integration Test

monitoring/tests/integration/test_monitoring_pipeline.py

import boto3
import requests
import time
import pytest
import os
import gzip

API_ENDPOINT = os.environ["STAGING_API_ENDPOINT"]
LOG_ARCHIVE_BUCKET = os.environ["STAGING_LOG_BUCKET"]

@pytest.mark.integration
def test_end_to_end_logging_flow():
    s3_client = boto3.client("s3")
    
    # 1. ARRANGE: Generate a unique ID to find in the logs
    unique_id = f"integration-test-{int(time.time())}"
    
    # 2. ACT: Make a request to the service that will generate a log
    requests.post(f"{API_ENDPOINT}/search", json={"query": unique_id})
    
    # 3. ASSERT: Poll the S3 bucket until the log file appears
    found_log = False
    for _ in range(12): # Poll for up to 1 minute
        time.sleep(5)
        # Construct prefix based on Firehose date partitioning
        prefix = f"{time.strftime('%Y/%m/%d/%H')}/"
        response = s3_client.list_objects_v2(Bucket=LOG_ARCHIVE_BUCKET, Prefix=prefix)
        
        if 'Contents' in response:
            for obj in response['Contents']:
                log_obj = s3_client.get_object(Bucket=LOG_ARCHIVE_BUCKET, Key=obj['Key'])
                log_content = gzip.decompress(log_obj['Body'].read()).decode('utf-8')
                
                if unique_id in log_content:
                    found_log = True
                    # Check if the log was enriched by our Lambda
                    assert 'processed_by' in log_content
                    break
        if found_log:
            break
            
    assert found_log, "Log file with unique ID was not found in S3 within 60 seconds."

CI/CD GitHub Actions Workflow

.github/workflows/deploy_monitoring_infra.yml

name: Deploy Monitoring Infrastructure

on:
  push:
    branches:
      - main
    paths:
      - 'monitoring/**'

jobs:
  # ... (lint-and-test job for monitoring/src/*.py as before) ...

  deploy-to-staging:
    name: Deploy Monitoring Infra to Staging
    runs-on: ubuntu-latest
    needs: lint-and-test
    environment: staging
    permissions:
      id-token: write
      contents: read
    steps:
      # ... (checkout, configure AWS creds, setup Terraform) ...
      - name: Terraform Apply Monitoring
        run: |
          cd monitoring/infra
          terraform init
          terraform apply -auto-approve -var="env=staging"
  
  run-integration-test:
    name: Run Monitoring Integration Test
    runs-on: ubuntu-latest
    needs: deploy-to-staging
    environment: staging
    steps:
      # ... (setup, configure AWS creds, install dependencies) ...
      - name: Run E2E Monitoring Test
        env:
          STAGING_API_ENDPOINT: ${{ secrets.STAGING_API_ENDPOINT }}
          STAGING_LOG_BUCKET: "rag-log-archive-staging" # This could also be a secret or TF output
        run: pytest monitoring/tests/integration/

Implementation: Testing in Production

Python Scripts (Analysis)

production_testing/analysis/ab_test_analysis.py

import pandas as pd
import numpy as np
from scipy.stats import chi2_contingency, ttest_ind
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def analyze_conversion_rate(df: pd.DataFrame, control_name='control', challenger_name='challenger'):
    """Performs a Chi-squared test for conversion rates."""
    contingency_table = pd.crosstab(df['variant_id'], df['converted'])
    
    if control_name not in contingency_table.index or challenger_name not in contingency_table.index:
        logging.warning("Control or challenger variant not found in the data.")
        return None, None, None

    chi2, p_value, _, _ = chi2_contingency(contingency_table)
    
    control_conv_rate = contingency_table.loc[control_name, True] / contingency_table.loc[control_name].sum()
    challenger_conv_rate = contingency_table.loc[challenger_name, True] / contingency_table.loc[challenger_name].sum()
    
    logging.info(f"Control Conversion Rate: {control_conv_rate:.4f}")
    logging.info(f"Challenger Conversion Rate: {challenger_conv_rate:.4f}")
    logging.info(f"Chi-squared p-value: {p_value:.4f}")
    
    return control_conv_rate, challenger_conv_rate, p_value

def analyze_aov(df: pd.DataFrame, control_name='control', challenger_name='challenger'):
    """Performs an independent T-test for Average Order Value."""
    control_aov = df[df['variant_id'] == control_name]['order_value'].dropna()
    challenger_aov = df[df['variant_id'] == challenger_name]['order_value'].dropna()

    if control_aov.empty or challenger_aov.empty:
        logging.warning("No order data for one or both variants.")
        return None, None, None

    _, p_value = ttest_ind(control_aov, challenger_aov, equal_var=False) # Welch's T-test
    
    logging.info(f"Control AOV: ${control_aov.mean():.2f}")
    logging.info(f"Challenger AOV: ${challenger_aov.mean():.2f}")
    logging.info(f"T-test p-value: {p_value:.4f}")
    
    return control_aov.mean(), challenger_aov.mean(), p_value

def generate_report(data_path: str, alpha: float = 0.05):
    """Loads data and generates a full A/B test report."""
    logging.info(f"Loading experiment data from {data_path}...")
    # In a real scenario, this would connect to a data warehouse like Redshift or BigQuery.
    # For this example, we'll use a CSV.
    try:
        df = pd.read_csv(data_path)
    except FileNotFoundError:
        logging.error(f"Data file not found at {data_path}")
        return

    logging.info("\n--- Conversion Rate Analysis ---")
    _, _, conv_p_value = analyze_conversion_rate(df)
    
    if conv_p_value is not None:
        if conv_p_value < alpha:
            logging.info("Result: Statistically significant difference in conversion rates found.")
        else:
            logging.info("Result: No statistically significant difference in conversion rates.")

    logging.info("\n--- Average Order Value (AOV) Analysis ---")
    _, _, aov_p_value = analyze_aov(df)
    if aov_p_value is not None:
        if aov_p_value < alpha:
            logging.info("Result: Statistically significant difference in AOV found.")
        else:
            logging.info("Result: No statistically significant difference in AOV.")

    # --- Final Recommendation Logic ---
    # (This would be more sophisticated in a real business context)
    logging.info("\n--- Recommendation ---")
    if conv_p_value is not None and conv_p_value < alpha:
        logging.info("Promote Challenger: Strong evidence of impact on conversion.")
    else:
        logging.info("Decision: Continue experiment or conclude with no significant finding.")

if __name__ == "__main__":
    # Example usage:
    # python ab_test_analysis.py data/experiment_results.csv
    import sys
    if len(sys.argv) > 1:
        generate_report(sys.argv[1])
    else:
        logging.error("Please provide the path to the experiment data CSV.")

Unit Tests

production_testing/tests/unit/test_ab_test_analysis.py

import pandas as pd
from analysis import ab_test_analysis

def create_sample_data(control_users, control_conversions, challenger_users, challenger_conversions):
    """Helper function to create a sample DataFrame for testing."""
    data = {
        'variant_id': ['control'] * control_users + ['challenger'] * challenger_users,
        'converted': [True] * control_conversions + [False] * (control_users - control_conversions) + \
                     [True] * challenger_conversions + [False] * (challenger_users - challenger_conversions)
    }
    return pd.DataFrame(data)

def test_conversion_rate_significant_difference():
    """Test case where there is a clear, statistically significant difference."""
    df = create_sample_data(10000, 500, 10000, 600) # 5% vs 6% conversion
    _, _, p_value = ab_test_analysis.analyze_conversion_rate(df)
    assert p_value is not None
    assert p_value < 0.05

def test_conversion_rate_no_difference():
    """Test case where there is no significant difference."""
    df = create_sample_data(10000, 500, 10000, 505) # 5.0% vs 5.05%
    _, _, p_value = ab_test_analysis.analyze_conversion_rate(df)
    assert p_value is not None
    assert p_value > 0.05

Integration Test

production_testing/tests/integration/test_ab_routing.py

import requests
import os
import pytest
from collections import Counter

# PRODUCTION_API_ENDPOINT is the URL of the live, production service
API_ENDPOINT = os.environ["PRODUCTION_API_ENDPOINT"]

@pytest.mark.integration
@pytest.mark.production
def test_traffic_splitting_distribution():
    """Validates that the production traffic split is working as configured."""
    responses = []
    num_requests = 1000
    
    # 1. ACTION: Make 1000 requests to the production endpoint
    for _ in range(num_requests):
        try:
            # The inference service should be configured to return a header
            # indicating which version served the request.
            response = requests.post(f"{API_ENDPOINT}/search", json={"query": "test"}, timeout=5)
            if response.status_code == 200 and 'X-Variant-Version' in response.headers:
                responses.append(response.headers['X-Variant-Version'])
        except requests.RequestException:
            # Ignore timeouts or errors for this specific test
            pass

    # 2. COLLECT: Count the responses from each variant
    counts = Counter(responses)
    control_count = counts.get('control', 0)
    challenger_count = counts.get('challenger', 0)
    
    total_responses = control_count + challenger_count
    assert total_responses > 0, "Did not receive any valid responses from the API."

    challenger_percentage = (challenger_count / total_responses) * 100
    
    # 3. ASSERT: Check if the distribution is within tolerance (e.g., 5% +/- 2%)
    expected_challenger_weight = 5.0
    tolerance = 2.0
    
    print(f"Observed challenger traffic: {challenger_percentage:.2f}%")
    assert abs(challenger_percentage - expected_challenger_weight) < tolerance

CI/CD GitHub Actions Workflow

.github/workflows/deploy_inference_service.yml (Deploy to Production)

  deploy-to-production:
    name: Canary Deploy to Production
    runs-on: ubuntu-latest
    needs: run-staging-tests
    environment: production
    # ... (permissions, secrets, etc.) ...
    steps:
      # ... (checkout, configure AWS creds, setup Terraform) ...
      - name: 1. Deploy Challenger Service
        run: |
          cd inference_service/infra
          terraform init
          # Deploy the new container as the "challenger" service
          terraform apply -auto-approve -var="env=production" -var="image_tag=${{ github.sha }}" -target=aws_ecs_service.challenger

      - name: 2. Shift 5% Traffic to Challenger
        run: |
          cd inference_service/infra
          # Update the ALB / API Gateway to send 5% of traffic
          terraform apply -auto-approve -var="env=production" -var="challenger_weight=5"
      
      - name: 3. Monitor Canary Health
        run: |
          # This script would query CloudWatch for challenger's error rate and latency
          # If metrics exceed thresholds, it exits with a non-zero code.
          ./scripts/monitor_canary.sh challenger-service-name
      
      - name: 4. Manual Approval for Full Rollout
        if: success()
        uses: trstringer/manual-approval@v1
        with:
          secret: ${{ github.TOKEN }}
          approvers: 'engineering-lead,product-manager'
          minimum-approvals: 1
          issue-title: "Promote RAG Challenger to 100% Traffic?"
      
      - name: 5. Promote Challenger (if approved)
        if: success()
        run: |
          cd inference_service/infra
          # Shift 100% traffic to challenger and scale down control
          terraform apply -auto-approve -var="env=production" -var="challenger_weight=100"

      - name: 6. Automated Rollback (if canary monitor fails)
        if: failure()
        run: |
          echo "Canary deployment failed! Rolling back..."
          cd inference_service/infra
          terraform apply -auto-approve -var="env=production" -var="challenger_weight=0"
          # This step would also trigger a PagerDuty/Slack alert

Implementation: Embedding Model Fine-tuning Pipeline

Python Scripts (Pipeline Components)

finetuning_pipeline/src/data_preparation.py

import pandas as pd
import logging
from sentence_transformers import SentenceTransformer
from opensearchpy import OpenSearch # and other clients
from sklearn.model_selection import train_test_split
from typing import List, Tuple

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# This would be initialized with proper credentials
opensearch_client = OpenSearch(...) 
production_embedding_model = SentenceTransformer('intfloat/multilingual-e5-large')

def load_interaction_data(log_bucket: str, date_prefix: str) -> pd.DataFrame:
    """Loads and merges user interaction logs from S3."""
    # In a real scenario, this would read multiple Parquet/JSON files from S3,
    # potentially using AWS Data Wrangler (awswrangler).
    logger.info(f"Loading interaction data from s3://{log_bucket}/{date_prefix}")
    # Placeholder for data loading logic
    sample_data = {
        'session_id': ['s1', 's1', 's2', 's2', 's2'],
        'query': ['queryA', 'queryA', 'queryB', 'queryB', 'queryB'],
        'retrieved_product_id': ['prod1', 'prod2', 'prod3', 'prod4', 'prod5'],
        'clicked': [False, True, False, False, True],
        'purchased': [False, True, False, False, False]
    }
    return pd.DataFrame(sample_data)

def get_product_text(product_id: str) -> str:
    """Fetches the text content for a given product ID."""
    # This would query a database or another S3 location
    return f"Full text description for {product_id}." # Placeholder

def perform_hard_negative_mining(query: str, positive_id: str) -> str:
    """Finds a hard negative for a given query and positive example."""
    query_embedding = production_embedding_model.encode(query)
    
    # Retrieve top 5 results from the current production index
    # This simulates what the user would have seen
    response = opensearch_client.search(...) # Search with query_embedding
    
    retrieved_ids = [hit['_source']['product_id'] for hit in response['hits']['hits']]
    
    # Find the first retrieved ID that is NOT the one the user purchased
    for an_id in retrieved_ids:
        if an_id != positive_id:
            logger.info(f"Found hard negative '{an_id}' for query '{query}'")
            return get_product_text(an_id)
            
    return None # Could happen if user buys the top result

def create_triplets(df: pd.DataFrame) -> List[Tuple[str, str, str]]:
    """Constructs (anchor, positive, negative) triplets from interaction data."""
    triplets = []
    successful_interactions = df[df['purchased'] == True]
    
    for _, row in successful_interactions.iterrows():
        anchor = row['query']
        positive_text = get_product_text(row['retrieved_product_id'])
        negative_text = perform_hard_negative_mining(anchor, row['retrieved_product_id'])
        
        if anchor and positive_text and negative_text:
            triplets.append((anchor, positive_text, negative_text))
            
    logger.info(f"Successfully created {len(triplets)} training triplets.")
    return triplets

def main():
    # This script would be run by the Airflow task
    interaction_df = load_interaction_data("rag-log-archive-prod", "2025/08/")
    triplets = create_triplets(interaction_df)
    
    # Convert to a DataFrame and save to S3
    triplets_df = pd.DataFrame(triplets, columns=['anchor', 'positive', 'negative'])
    train_df, val_df = train_test_split(triplets_df, test_size=0.1)
    
    # Save to S3 in a versioned folder (e.g., using the run date)
    # train_df.to_csv("s3://rag-finetuning-data/train_YYYY-MM-DD.csv")
    # val_df.to_csv("s3://rag-finetuning-data/val_YYYY-MM-DD.csv")

Unit Tests (pytest)

finetuning_pipeline/tests/unit/test_data_preparation.py

import pandas as pd
from unittest.mock import patch
from src import data_preparation

@patch('src.data_preparation.production_embedding_model')
@patch('src.data_preparation.opensearch_client')
def test_hard_negative_mining_logic(mock_os_client, mock_model):
    """Tests that the hard negative mining correctly selects a non-positive document."""
    # ARRANGE
    test_query = "test query"
    positive_product_id = "prod-positive"
    
    # Mock the OpenSearch response
    mock_os_client.search.return_value = {
        'hits': {'hits': [
            {'_source': {'product_id': 'prod-hard-negative'}},
            {'_source': {'product_id': 'prod-positive'}},
            {'_source': {'product_id': 'prod-other'}}
        ]}
    }
    
    # ACT
    hard_negative = data_preparation.perform_hard_negative_mining(test_query, positive_product_id)
    
    # ASSERT
    # It should have picked the first result that was not the positive one.
    assert "prod-hard-negative" in hard_negative

Pipeline Orchestration (Airflow DAG)

dags/embedding_finetuning_dag.py

from __future__ import annotations
import pendulum
from airflow.models.dag import DAG
from airflow.operators.python import BranchPythonOperator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTrainingOperator # and ProcessingOperator
from airflow.operators.dummy import DummyOperator
from airflow.operators.email import EmailOperator

# Define IAM roles, S3 paths, etc. as variables

def check_evaluation_result(**kwargs):
    ti = kwargs['ti']
    # This would pull the evaluation result from XComs, which was pushed by the evaluate_model_task
    eval_result = ti.xcom_pull(task_ids='evaluate_model_task', key='evaluation_summary')
    if eval_result['status'] == 'pass':
        return 'register_model_task'
    else:
        return 'notify_failure_task'

with DAG(
    dag_id="embedding_model_finetuning",
    start_date=pendulum.datetime(2025, 1, 1, tz="UTC"),
    schedule="0 0 1 * *", # At 00:00 on day-of-month 1.
    catchup=False,
    tags=["rag", "finetuning"],
) as dag:
    
    prepare_data_task = # ... PythonOperator to run data_preparation.py ...
    
    train_model_task = SageMakerTrainingOperator(
        task_id="train_model_task",
        # ... (config pointing to training script, instance types, IAM role) ...
    )

    evaluate_model_task = # ... SageMakerProcessingOperator to run model_evaluation.py ...
    
    check_evaluation_gate = BranchPythonOperator(
        task_id="check_evaluation_gate",
        python_callable=check_evaluation_result,
    )
    
    register_model_task = # ... PythonOperator to run model_registration.py ...
    
    notify_failure_task = EmailOperator(
        task_id="notify_failure_task",
        to="ml-team@example.com",
        subject="Embedding Model Fine-tuning Failed Quality Gate",
        html_content="The candidate model did not outperform the production model. Check the logs.",
    )

    success_task = DummyOperator(task_id="success")

    prepare_data_task >> train_model_task >> evaluate_model_task >> check_evaluation_gate
    check_evaluation_gate >> [register_model_task, notify_failure_task]
    register_model_task >> success_task

Infrastructure as Code (Terraform)

finetuning_pipeline/infra/sagemaker.tf

# Defines the SageMaker Model Group for versioning our fine-tuned models

resource "aws_sagemaker_model_package_group" "embedding_models" {
  model_package_group_name        = "rag-embedding-models-${var.env}"
  model_package_group_description = "All versions of the fine-tuned RAG embedding model."
}

# IAM role for the SageMaker training jobs
resource "aws_iam_role" "sagemaker_training_role" {
  name = "SageMakerTrainingRole-${var.env}"
  assume_role_policy = # ... (trust policy for sagemaker.amazonaws.com) ...
}

# Attach policies granting access to S3 data buckets and ECR
resource "aws_iam_role_policy_attachment" "s3_access" {
  role       = aws_iam_role.sagemaker_training_role.name
  policy_arn = # ... (ARN of a policy allowing read from data bucket, write to artifact bucket) ...
}

Integration Test

finetuning_pipeline/tests/integration/test_finetuning_dag.py

import pytest
from airflow.models.dagbag import DagBag

@pytest.mark.integration
def test_dag_loads_with_no_errors():
    """Checks if the DAG file is syntactically correct and can be parsed by Airflow."""
    dagbag = DagBag(dag_folder='dags/', include_examples=False)
    assert not dagbag.import_errors
    assert 'embedding_model_finetuning' in dagbag.dags

# A more advanced integration test would use the Airflow API to trigger the DAG
# in a test environment and check its final state.
# from airflow.api.client.local_client import Client
#
# def test_dag_run_completes():
#     client = Client(None, None)
#     result = client.trigger_dag(dag_id='embedding_model_finetuning')
#     # ... poll for completion status ...
#     assert result['state'] == 'success'

CI/CD GitHub Actions Workflow

.github/workflows/deploy_finetuning_pipeline.yml

name: Deploy Fine-tuning Pipeline

on:
  push:
    branches:
      - main
    paths:
      - 'finetuning_pipeline/**'
      - 'dags/embedding_finetuning_dag.py'

jobs:
  lint-and-test:
    name: Lint & Unit Test
    runs-on: ubuntu-latest
    steps:
      # ... (checkout, setup python, install dependencies) ...
      - name: Run unit tests
        run: pytest finetuning_pipeline/tests/unit/

  deploy-to-staging:
    name: Deploy to Staging Airflow
    runs-on: ubuntu-latest
    needs: lint-and-test
    environment: staging
    steps:
      # ... (checkout, configure AWS creds, setup Terraform) ...
      - name: Terraform Apply
        run: |
          cd finetuning_pipeline/infra
          terraform init
          terraform apply -auto-approve -var="env=staging"
      - name: Sync DAG to Staging Airflow
        # This step would use aws s3 sync or a custom script to upload the
        # dags/embedding_finetuning_dag.py file to the Airflow S3 bucket.
        run: aws s3 sync ./dags s3://${{ secrets.STAGING_AIRFLOW_DAGS_BUCKET }}/dags

Guide to Fine-tuning Re-ranker Model

The “Why”: The Two-Stage Retrieval Process

Think of RAG system’s retrieval in two stages, like a fishing operation:

  1. The Retriever (Embedding Model / Bi-Encoder): The Wide Net. Its job is to be fast and achieve high recall. It quickly scans the entire ocean of documents and pulls in a large net of 50-100 potentially relevant candidates. It’s good at finding things that are semantically in the ballpark, but it might also pull in a lot of irrelevant “bycatch.” This is a bi-encoder architecture, where the query and documents are embedded independently.

  2. The Re-ranker (Cross-Encoder): The Expert Judge. Its job is to be accurate and achieve high precision. It takes the smaller catch from the retriever’s net (the 50-100 candidates) and individually inspects each one against the query. It’s much slower but far more discerning. This is a cross-encoder architecture, which processes the query and a document together, allowing for much deeper contextual understanding.

The goal of fine-tuning the re-ranker is to make this “expert judge” exceptionally good at understanding the specific nuances of relevance for your products and your customers’ queries.

The Dataset: The Crucial Difference

The dataset used for fine-tuning a re-ranker is fundamentally different from the triplet dataset used for the embedding model. They are structured differently to serve different training objectives.

Here is a clear comparison:

Aspect

Embedding Model Fine-tuning Dataset

Re-ranker Fine-tuning Dataset

Purpose

To learn a global semantic representation. Teaches the model to place similar items close together in the vector space so the “wide net” retrieval works better.

To learn relative relevance. Teaches the model to look at a query and a specific document and say “Yes, this is a highly relevant answer” (label=1) or “No, this is not a good answer” (label=0).

Structure

Triplets: (anchor, positive, negative)

Scored Pairs: (query, passage, label) where the label is typically 1 (relevant) or 0 (irrelevant).

Example

("running shoes", "Text about Nike Pegasus...", "Text about dress shoes...")

1. ("running shoes", "Text about Nike Pegasus...", 1)
2. ("running shoes", "Text about dress shoes...", 0)

Training Objective

Distance-based Loss (e.g., TripletLoss). The goal is to minimize the distance between anchor and positive while maximizing the distance between anchor and negative.

Classification or Ranking Loss (e.g., Binary Cross-Entropy). The model predicts a single score (a logit) representing relevance, and the loss is calculated against the binary 0/1 label.

Data Sourcing and Creation for the Re-ranker

The process for creating the re-ranker dataset is similar to the embedding model but results in a different final structure.

  1. Load User Interaction Data: Start with the same raw logs from the Monitoring Pipeline (queries, clicks, purchases).

  2. Create (query, positive_passage) Pairs: For every user journey that resulted in a purchase, create a pair: (user_query, text_of_purchased_product). These will be our positive examples with a label of 1.

  3. Mine Hard Negatives: This is the most critical step for a high-performing re-ranker. For each (query, positive_passage) pair:

    • Use the current production retriever (the bi-encoder) to fetch the top 10-20 results for that query.

    • From this list, select the documents that the user did not click on or purchase. These are your hard negatives. They are powerful training examples because they are semantically close enough to the query to be retrieved, but were ultimately deemed irrelevant by the user. This is exactly the ambiguity the re-ranker needs to learn to resolve.

    • For each hard negative, create a pair: (user_query, text_of_hard_negative_product). These will be our negative examples with a label of 0.

  4. Assemble and Balance the Dataset:

    • Combine all positive and negative pairs into a single dataset.

    • Ensure the dataset is balanced (roughly equal numbers of positive and negative examples) to prevent the model from becoming biased. You might need to oversample positives or undersample negatives.

    • The final dataset is a list of examples, each with a query, a passage, and a label (0 or 1).

The Model and Training Process

  1. Model Architecture (Cross-Encoder):

    • We use a transformer model architecture (like BERT or RoBERTa) configured as a cross-encoder.

    • Instead of embedding the query and passage separately, we feed them into the model together in a single sequence, separated by a special [SEP] token: [CLS] query [SEP] passage [SEP].

    • The model’s output for the [CLS] token is then passed through a linear layer to produce a single logit (a raw score). This score represents the relevance of the passage to the query.

  2. Training Objective:

    • The problem is framed as a simple binary classification. The model’s single logit output is passed through a sigmoid function to get a probability between 0 and 1.

    • We use Binary Cross-Entropy Loss to train the model, comparing its predicted probability against the true 0/1 label from our dataset.

  3. The Training Loop (using Hugging Face transformers):

    • Load a pre-trained cross-encoder model (e.g., cross-encoder/ms-marco-MiniLM-L-6-v2) as a starting point.

    • Use the Hugging Face Trainer API, providing it with our prepared dataset of (query, passage, label) pairs.

    • The Trainer handles the fine-tuning loop: batching the data, feeding it to the model, calculating the loss, and updating the model’s weights.

    • This is executed as a SageMaker Training Job.

The MLOps Pipeline for Re-ranker Fine-tuning

The pipeline is nearly identical to the embedding model pipeline, demonstrating the power of reusable MLOps patterns.

  1. Airflow DAG (reranker_finetuning_dag.py):

    • prepare_reranker_data_task: Executes a script to perform the data loading and hard negative mining described above, creating the final (query, passage, label) dataset.

    • train_reranker_task: Submits a SageMaker Training Job that runs the Hugging Face training script on a GPU instance.

    • evaluate_reranker_task: This is different. It evaluates the new re-ranker by first running the production retriever on the Golden Dataset to get top-50 candidates, then using the new re-ranker to sort them. It then calculates MRR/NDCG on this re-ranked list.

    • check_evaluation_gate & register_model_task: Same logic as before.

Architecture Diagram


Synthetic Dataset Generation to Evaluate Retrieval and Ranking

This script operationalizes the plan to create a large-scale, high-quality evaluation dataset. It uses a powerful LLM (Claude 3 Opus) to generate realistic user queries grounded in specific chunks of our product descriptions, using a curated seed list to guide the generation style.

A. Python Script

golden_dataset_pipeline/src/generate_golden_dataset.py

import asyncio
import json
import logging
import os
import random
from typing import List, Dict, Any

import pandas as pd
from aiohttp import ClientError
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_core.output_parsers import JsonOutputParser
from langchain_core.prompts import ChatPromptTemplate
from langchain_community.chat_models import BedrockChat

# --- Configuration ---
# In a real project, this would come from a config file or environment variables.
CONFIG = {
    "PRODUCT_CATALOG_PATH": "data/raw/product_catalog.csv",
    "SEED_QUERIES_PATH": "data/raw/seed_queries.txt",
    "OUTPUT_PATH": "data/processed/golden_evaluation_dataset.jsonl",
    "LLM_MODEL_ID": "anthropic.claude-3-opus-20240229-v1:0",
    "MAX_QUERIES_PER_CHUNK": 7,
    "NUM_SEED_EXAMPLES": 5,
    "MAX_CONCURRENT_REQUESTS": 10,  # Important to avoid rate limiting
}

# --- Logging Setup ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# --- Core Functions ---

def load_seed_queries(filepath: str) -> List[str]:
    """Loads the seed query list from a text file."""
    try:
        with open(filepath, 'r') as f:
            return [line.strip() for line in f if line.strip()]
    except FileNotFoundError:
        logger.error(f"Seed queries file not found at: {filepath}")
        return []

def load_product_documents(filepath: str) -> pd.DataFrame:
    """Loads product catalog data from a CSV."""
    try:
        df = pd.read_csv(filepath)
        # Ensure required columns exist
        if 'product_id' not in df.columns or 'description' not in df.columns:
            raise ValueError("CSV must contain 'product_id' and 'description' columns.")
        return df.dropna(subset=['product_id', 'description'])
    except FileNotFoundError:
        logger.error(f"Product catalog file not found at: {filepath}")
        return pd.DataFrame()

def chunk_document(text: str, chunk_size: int = 1000, chunk_overlap: int = 200) -> List[str]:
    """Splits a document's text into smaller chunks."""
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len
    )
    return splitter.split_text(text)

async def generate_queries_for_chunk(
    llm: BedrockChat,
    chunk_text: str,
    seed_queries: List[str]
) -> List[str]:
    """Uses an LLM to generate a list of queries for a given text chunk."""
    
    # Randomly sample seed queries to provide varied examples in the prompt
    examples = "\n".join(f"- \"{q}\"" for q in random.sample(seed_queries, CONFIG["NUM_SEED_EXAMPLES"]))
    
    prompt_template = ChatPromptTemplate.from_messages([
        ("system", 
         "You are a data scientist creating a high-quality evaluation dataset for an e-commerce semantic search engine. "
         "Your task is to generate realistic user search queries that can be answered by the provided text snippet from a product description. "
         "The queries must be diverse, reflecting different user intents (e.g., questions, feature requests, comparisons, use-cases). "
         "The answer to each query you generate MUST be present in the provided context. Do NOT generate questions requiring outside knowledge. "
         "Output a JSON object with a single key 'queries' containing a list of strings."),
        ("human", 
         "**CONTEXT (Product Information Snippet):**\n---\n{context}\n---\n\n"
         "**EXAMPLES of QUERY STYLES:**\n{examples}\n\n"
         f"Please generate {CONFIG['MAX_QUERIES_PER_CHUNK']} realistic and diverse user queries based on the context above.")
    ])
    
    parser = JsonOutputParser()
    chain = prompt_template | llm | parser

    try:
        response = await chain.ainvoke({"context": chunk_text, "examples": examples})
        if "queries" in response and isinstance(response["queries"], list):
            return response["queries"]
        else:
            logger.warning(f"LLM returned malformed JSON, missing 'queries' list. Response: {response}")
            return []
    except (ClientError, json.JSONDecodeError, Exception) as e:
        logger.error(f"Error generating queries for chunk: {e}")
        return []

async def process_document(
    semaphore: asyncio.Semaphore,
    llm: BedrockChat,
    product: pd.Series,
    seed_queries: List[str],
    output_file
):
    """Chunks a document, generates queries for each chunk, and writes to the output file."""
    async with semaphore:
        try:
            product_id = product['product_id']
            description = product['description']
            
            chunks = chunk_document(description)
            logger.info(f"Processing product {product_id}: split into {len(chunks)} chunks.")
            
            tasks = []
            for i, chunk in enumerate(chunks):
                tasks.append(generate_queries_for_chunk(llm, chunk, seed_queries))
            
            generated_queries_per_chunk = await asyncio.gather(*tasks)

            for i, queries in enumerate(generated_queries_per_chunk):
                for query in queries:
                    record = {
                        "query": query,
                        "relevant_product_id": str(product_id),
                        "source_chunk_id": i
                    }
                    output_file.write(json.dumps(record) + "\n")
        
        except Exception as e:
            logger.error(f"Failed to process product {product.get('product_id', 'N/A')}: {e}")

async def main():
    """Main orchestration function."""
    logger.info("Starting synthetic golden dataset generation.")
    
    seed_queries = load_seed_queries(CONFIG["SEED_QUERIES_PATH"])
    product_df = load_product_documents(CONFIG["PRODUCT_CATALOG_PATH"])

    if not seed_queries or product_df.empty:
        logger.error("Cannot proceed without seed queries and product data. Exiting.")
        return

    # Initialize the LLM
    llm = BedrockChat(
        model_id=CONFIG["LLM_MODEL_ID"],
        model_kwargs={"temperature": 0.7, "max_tokens": 2048}
    )

    # Use a semaphore to limit concurrent API calls
    semaphore = asyncio.Semaphore(CONFIG["MAX_CONCURRENT_REQUESTS"])
    
    # Ensure output directory exists
    os.makedirs(os.path.dirname(CONFIG["OUTPUT_PATH"]), exist_ok=True)

    with open(CONFIG["OUTPUT_PATH"], 'w') as output_file:
        tasks = []
        for _, product in product_df.iterrows():
            tasks.append(process_document(semaphore, llm, product, seed_queries, output_file))
        
        # Process all documents concurrently
        await asyncio.gather(*tasks)

    logger.info(f"Dataset generation complete. Output saved to {CONFIG['OUTPUT_PATH']}")

if __name__ == "__main__":
    # Ensure AWS credentials and LangSmith env variables are set before running
    # e.g., export LANGCHAIN_TRACING_V2=true; export LANGCHAIN_API_KEY=...
    asyncio.run(main())

B. Prerequisites & How to Run

  1. File Structure:

    golden_dataset_pipeline/
    ├── src/
    │   └── generate_golden_dataset.py
    └── data/
        ├── raw/
        │   ├── product_catalog.csv
        │   └── seed_queries.txt
        └── processed/
            └── (output will be generated here)
    
  2. product_catalog.csv: A CSV file with at least two columns: product_id and description.

  3. seed_queries.txt: A plain text file where each line is a real, high-quality user query.

    waterproof hiking boots with ankle support
    compare sony wh-1000xm5 vs bose qc ultra
    best camera for travel vlogging
    is the iphone 15 compatible with a qi charger?
    ...
    
  4. Dependencies:

    pip install pandas langchain langchain-community-aws boto3
    
  5. Execution:

    • Set up your AWS credentials (~/.aws/credentials).

    • (Optional but recommended) Set up LangSmith environment variables for tracing.

    • Run the script from the root of the golden_dataset_pipeline directory:

      python src/generate_golden_dataset.py
      

C. Output (golden_evaluation_dataset.jsonl)

The script will produce a JSON Lines file, where each line is a single, self-contained evaluation record. This format is efficient and easy to parse.

{"query": "what kind of warranty comes with the anker powerbank?", "relevant_product_id": "B07Y2P1L6F", "source_chunk_id": 2}
{"query": "is this powerbank airline approved for carry on?", "relevant_product_id": "B07Y2P1L6F", "source_chunk_id": 2}
{"query": "compare Anker 313 vs PowerCore Slim 10000", "relevant_product_id": "B07Y2P1L6F", "source_chunk_id": 0}