# Reviews Summarisation ## *** ### **TLDR: End-to-End LLM-Powered Review Summarization** * **Challenge:** A mid-sized European e-commerce marketplace faced a dual problem: customers were overwhelmed by thousands of unstructured, multilingual reviews, leading to decision fatigue, while the business lacked an automated way to extract actionable insights from this valuable customer feedback. * **My Role & Solution:** As the lead **Data Scientist and MLOps/ML Engineer**, I designed, built, and deployed the end-to-end system to solve this challenge. My contributions spanned the entire ML lifecycle, including **Feature Engineering, Model Development, Training & Inference Pipelines, Deployment, Monitoring, and Continual Learning**. My solution was a cost-effective, production-grade system architected around a **batch-processing, Retrieval-Augmented Generation (RAG)** strategy to ensure summaries were factually grounded and trustworthy. Key strategic decisions included: * **Model:** Fine-tuning a `Mistral-7B` model using PEFT/LoRA on a multilingual dataset to achieve near GPT-4 quality at a fraction of the cost. * **Quality Assurance:** Implementing a robust, multi-layered evaluation strategy using **Ragas** for factual consistency and **LLM-as-a-judge** for coherence. * **Cost-Effective Architecture:** Building the system on a **fully Managed** solutions (**AWS Step Functions, OpenSearch**) and using **EKS with scale-to-zero** for the inference endpoint to minimize idle costs, making the solution financially viable. * **Tech Stack:** AWS (Step Functions, EKS, SageMaker, Bedrock, OpenSearch), vLLM, Ragas, MLflow, DVC, LangChain, Terraform, and GitHub Actions. * **Impact:** The system successfully transformed raw feedback into a valuable asset for both customers and the business. The primary impacts measured via A/B testing were: * Achieved up to a **2% increase in conversion rate** for products with AI-generated summaries. * Led to a **3% reduction in product returns** in the pilot category (Electronics) by setting more accurate customer expectations pre-purchase. * **Automated 100%** of the manual review analysis process, saving an estimated **20-30 hours per week** of analyst time. * **System Architecture:** The diagram below illustrates the serverless, event-driven architecture, with the components I owned and delivered highlighted. *** A Note on This Series In my seven years as a Machine Learning Engineer, I've noticed a significant gap between academic tutorials and the realities of production MLOps. Many guides stop at deploying a model in a FastAPI container, leaving aspiring engineers without the strategic frameworks and practical insights needed for building robust, end-to-end systems. This series is a sincere attempt to provide a practitioner's blueprint for production machine learning, moving beyond the code to explore the critical decision-making, trade-offs, and challenges involved. My goal is to eventually expand this work into a comprehensive, project-based MLOps course. Important Disclaimers: - **On Authenticity**: The methodologies and frameworks shared here are drawn directly from my professional experience. However, to ensure client confidentiality, all specific project details and data have been anonymized and are for illustrative purposes only. - **On Collaboration**: These posts were created with the assistance of AI for code and prose generation. The strategic framing, project context, and real-world insights that guide the content are entirely my own. - **On the Code**: The code provided is a conceptual blueprint, not a production-ready application. It is designed to illustrate the structure and logic of a real-world system. Please use it as a learning tool and a starting point for your own projects, but do not expect it to run out-of-the-box without further development and testing. *** ### 1. The Business Imperative: From Information Overload to Actionable Intelligence In the modern e-commerce landscape, the sheer volume of customer-generated reviews presents a dual-sided challenge. While intended to empower shoppers, the deluge of unstructured, often repetitive feedback leads to information overload and decision fatigue. This phenomenon hinders the purchasing journey, contributing directly to higher rates of cart abandonment and a diminished user experience. For the business, this vast repository of customer sentiment is a potential goldmine of insights. However, manually processing thousands of reviews to identify recurring themes, product shortcomings, and emerging trends is a significant operational bottleneck. The process is labor-intensive, slow, and prone to human error, causing critical customer pain points to go unnoticed and opportunities for product improvement to be missed. The fundamental business challenge is to effectively distill this massive volume of unstructured text into a format that is both concise for the customer and insightful for the business. The goal is to transform raw feedback from a liability of noise into a strategic asset that enhances customer confidence and drives internal innovation. #### **Project Objectives and Goals** The primary objective is to develop and deploy an automated, scalable system that summarizes customer reviews to enhance the user shopping experience and provide actionable intelligence to internal teams. This objective is broken down into two core goals: 1. **Enhance the Customer Decision-Making Process:** By providing clear, balanced, and digestible summaries of peer reviews, the system aims to reduce customer friction and build trust. This allows shoppers to make faster, better-informed purchasing decisions, moving from a state of uncertainty to one of confidence. 2. **Unlock Actionable Business Intelligence:** By automating the analysis of review data, the system will identify and surface key product strengths, weaknesses, and recurring customer issues. This data-driven feedback loop empowers product, marketing, and support teams to improve product quality, refine marketing messages, and enhance brand value. #### **Measuring Success: Key Performance Indicators (KPIs)** The success of this initiative will be measured through a rigorous A/B testing framework, focusing on both direct business impact and user engagement. **Primary Business KPIs:** * **Increase in Conversion Rate:** A measurable lift in the percentage of users who purchase a product after interacting with the summary feature. * **Reduction in Average Time-to-Purchase:** A decrease in the time it takes for a user to make a purchase, indicating accelerated decision-making. * **Decrease in Product Return Rates:** A measurable reduction in returns for products where summaries are displayed, suggesting that the feature sets more accurate customer expectations. **Secondary Engagement KPIs:** * **User Interaction with Summaries:** Tracking metrics such as time spent on the page and expansion of summary sections to gauge feature utility. * **"Helpful" Feedback Rate:** An increase in users clicking the "This summary was helpful" button, serving as a direct signal of user satisfaction. *** ### 2. ML Problem Framing: From Business Need to Technical Blueprint Translating a business objective into a well-defined machine learning task is the most critical step in the MLOps lifecycle. A precise problem frame acts as the foundational blueprint, guiding data strategy, model selection, and success measurement. A flaw at this stage will inevitably propagate through the entire system, regardless of subsequent engineering excellence. #### 2.1 Setting the Business Objectives The project originates from a clear business need to improve customer experience and operational efficiency. Before any technical solution is considered, the objectives must be aligned with all relevant stakeholders—including Product Management, Marketing, Engineering, and Business Leadership—to ensure a shared vision of success. The primary business objectives are: * **Increase purchase conversion and customer trust** by providing concise, unbiased summaries of user reviews. * **Improve product quality** by creating an automated feedback loop that delivers actionable insights from customer sentiment to product teams. * **Enhance operational efficiency** by reducing the manual effort required to analyze and understand large volumes of customer feedback. #### 2.2 Is Machine Learning the Right Approach? While a simple rule-based system (e.g., extracting sentences with "love" or "hate") could be a baseline, it would fail to capture the nuance, slang, and context inherent in user-generated reviews. Machine Learning is the appropriate approach for this problem due to several key factors: * **Complex Patterns:** Summarizing requires understanding semantic meaning, identifying key themes, and paraphrasing, tasks too complex for static rules. * **Need for Scale:** The solution must operate across thousands of products, each with hundreds or thousands of reviews, making manual summarization impossible. * **Dynamic Environment:** Customer language, product features, and feedback trends evolve. An ML model can be retrained to adapt, whereas a rule-based system would become brittle. * **Tolerance for Error:** While factual accuracy is paramount, minor stylistic imperfections in the generated summary are acceptable if the core meaning is preserved and the feature provides value. This project also presents a classic opportunity to create a **Data Flywheel**, a virtuous cycle where the product improves as more data is generated. #### 2.3 Defining the ML Problem The business goal must be translated into a precise technical task for the model. * **Ideal Outcome:** A customer effortlessly understands the collective opinion on a product, leading to a confident purchase decision. * **Model's Goal:** Ingest all unstructured review texts for a single product and generate a single, concise, factually grounded, and stylistically neutral summary paragraph. This formally classifies the task as **many-to-one, multi-document abstractive summarization**. * **Many-to-one:** It synthesizes multiple source documents (reviews) into a single output. * **Multi-document:** It must handle and reconcile information from numerous distinct text sources. * **Abstractive:** The goal is to generate novel, fluent sentences that capture the essence of the input, rather than simply extracting and concatenating existing sentences. The choice of an abstractive paradigm over an extractive one is deliberate. While an extractive summary (copying key sentences) offers high factual grounding, the informal grammar, slang, and typos common in reviews would result in a disjointed and poor user experience. A purely abstractive approach, however, introduces an unacceptable risk of "hallucination"—generating statements not supported by the source reviews. Therefore, the optimal solution is a **hybrid approach**, specifically one implemented via a **Retrieval-Augmented Generation (RAG)** architecture. This strategy grounds the abstractive model in retrieved facts, instructing it to generate a summary *based only on* a curated selection of the most relevant review snippets. This balances the need for readability with the non-negotiable requirement for user trust. #### 2.4 Assessing Feasibility & Risks A candid assessment of feasibility and potential risks is necessary before committing significant resources. | Category | Checkpoint | Assessment | Notes & Mitigation Strategy | | :--- | :--- | :--- | :--- | | **Data** | Sufficient quantity & quality? | **Green** | Abundant review data exists. Quality is variable, requiring robust preprocessing to handle spam, short reviews, and noise. | | | Labeling for fine-tuning feasible? | **Yellow** | Creating high-quality, human-written "golden" summaries for fine-tuning and evaluation is expensive and time-consuming. This will be a primary cost driver. | | **Problem Difficulty** | High reliability required? | **Red** | **Factual consistency is paramount.** A single prominent hallucination could destroy user trust. The RAG architecture is the primary mitigation strategy. | | **Adversarial attacks**| Adversarial attacks likely? | **Yellow** | Potential for review bombing or spam. Data ingestion pipelines must include anomaly and spam detection. | | **Technical Reqs** | Latency target achievable? | **Yellow** | Low latency is critical for user experience. This necessitates using optimized inference engines (e.g., vLLM, TGI) and may constrain the size of the production model. | | **RoI** | Compute cost manageable? | **Yellow** | GPU resources for fine-tuning and serving are costly. The project requires a clear budget and cost-optimization strategies like quantization and efficient model selection. | | **Ethics** | Potential for bias? | **Red** | **High risk of bias.** The model could learn a "positivity bias" from imbalanced data or fail to represent minority opinions. Mitigation requires data-centric re-tuning on balanced datasets and specific prompt constraints. | #### 2.5 Defining Success Metrics To measure progress and validate the final impact, metrics are defined across three distinct domains: Business, Model Quality, and Operations. | Metric Type | Success Metric | How to Measure & Target | | :--- | :--- | :--- | | **Business** | **Conversion Rate** | A/B Testing. Target: Statistically significant lift in purchases for users shown the summary vs. a control group. | | | **Product Return Rate** | A/B Testing. Target: Statistically significant decrease in returns for products with summaries. | | | **Average Time-to-Purchase** | A/B Testing. Target: Measurable reduction in the session duration leading to a purchase. | | **Model Quality**| **Faithfulness / Factual Consistency** | **(Primary)** LLM-as-a-judge or automated metrics (e.g., SummaC) on a golden dataset. Target: Aim for >95% factual consistency. | | | **Relevance** | ROUGE-L score against reference summaries and human evaluation. Target: Optimize for capturing key pros and cons. | | | **Coherence & Fluency** | Human evaluation or LLM-as-a-judge (Likert scale 1-5). Target: Average score > 4.0. | | **Operational**| **P95 Inference Latency** | Monitoring dashboards (e.g., Prometheus, Grafana). Target: < 500ms for a real-time user experience. | | | **Cost Per 1,000 Summaries** | Cloud billing analysis. Target: Continuously optimize and reduce cost based on model and hardware choices. | | | **System Throughput** | Load testing and monitoring. Target: System must handle peak traffic loads without violating latency SLAs. | ___ ### 3. GenAI Application: End to end planning #### 3.1 LLMOps Tech Stack | Category | Canvas Block | Tool / Service Chosen | Rationale, LLM-Specific Considerations & Trade-offs | | :--- | :--- | :--- | :--- | | **Data & Code** | Data Sources & Versioning | **Amazon S3** (Data Lake)
**DVC** (Data Version Control) | **Rationale:** S3 is the scalable foundation. DVC versions datasets and human-curated evaluation sets, linking them to Git commits for full reproducibility. | | **Experimentation** | Experiment Management | **Amazon SageMaker Studio** (Notebooks)
**LangChain** (App Framework)
**LangSmith** (Debugging/Tracing) | **Rationale:** We use **LangChain** to rapidly prototype the RAG logic. **LangSmith** is indispensable for debugging these chains by visualizing the retrieved context and prompts. This combination accelerates development. | | **Feature Engineering** | Feature Store & Workflows | **Amazon OpenSearch with k-NN** (Vector DB)
**Amazon MWAA (Airflow)** (Orchestration)| **Rationale:** The **Vector Database is the new Feature Store** for LLMs. OpenSearch provides the managed retrieval backend. Airflow orchestrates the batch workflow that populates the vector DB. | | **ML Lifecycle** | Model & Experiment Tracking | **MLflow Tracking Server** | **Rationale:** MLflow remains the central server of record. It logs experiment results, including the final evaluation scores generated by Ragas. It provides the high-level audit trail of what was tried and what worked. | | | **LLM Quality & Evaluation** | **Ragas** (RAG Evaluation)
**Giskard** (Behavioral Testing) | **Rationale:** This is the critical, missing piece. **Ragas** is used to score our summaries on RAG-specific metrics (faithfulness, context precision). **Giskard** is used to create a suite of "unit tests" for the LLM's behavior (e.g., robustness to typos, bias checks). | | | Continuous Training (CT) | **Amazon SageMaker Training Jobs** | **Rationale:** SageMaker provides the managed, scalable GPU infrastructure needed for LoRA fine-tuning without the overhead of managing a cluster manually. | | **Production** | Model Registry & Versioning | **MLflow Model Registry** | **Rationale:** **This is the core governance component.** A "model version" in MLflow includes the LoRA weights, the specific LangChain prompt template, and the base model ID. Promoting a model from Staging to Production in this registry is the official, auditable act that triggers deployment. | | | Model Deployment & Serving| **vLLM/TGI on Amazon EKS**
**Amazon API Gateway** | **Rationale:** Specialized LLM serving engines are non-negotiable for performance. EKS provides the necessary control. The inference service itself will use the **LangChain** library to execute the RAG logic at runtime. | | | Monitoring & Observability | **Amazon CloudWatch, Prometheus** (System)
**Custom Evaluation Pipeline** (Model Quality) | **Rationale:** Standard tools for system health. For model quality, a custom pipeline is triggered periodically. It samples production requests/responses, runs them through the **Ragas/Giskard** evaluation suite, and logs the quality scores. Alerts are triggered on significant quality degradation (e.g., a drop in faithfulness). | | **Foundation** | DevOps & Foundations | **Git (GitHub)**
**Docker**
**AWS CDK / Terraform** (IaC)
**GitHub Actions** (CI/CD) | **Rationale:** Foundational tools for software and infrastructure best practices. GitHub Actions automates testing and the deployment process, which is kicked off by a model promotion event in the MLflow Registry. | #### 3.2 Key Pipelines and Workflows The system's automation is realized through a set of interconnected pipelines. Each pipeline is a directed acyclic graph (DAG) of tasks responsible for a specific stage of the data and model lifecycle. | Pipeline / Workflow | Trigger | Inputs | Key Steps | Outputs | | :--- | :--- | :--- | :--- | :--- | | **1. Data Ingestion & Validation** | Real-time events from user devices and backend services. | - Raw review JSON objects.
- Clickstream events (e.g., "helpful" clicks). | 1. **Receive:** Data streams into a message queue (e.g., Kinesis/Kafka).
2. **Validate Structure:** Check for schema conformance (correct fields and types).
3. **Validate Semantics:** Pass text through a lightweight model to check for intelligibility and filter out obvious spam/garbled content.
4. **Persist:** Load validated raw data into the S3 Data Lake. | - Clean, validated review data in the S3 Data Lake.
- Dead-letter queue for failed records. | | **2. Embedding Generation (Feature Engineering)**| A new validated review is added to the S3 Data Lake. | - A single, validated customer review text.
- `product_id`, `review_id`. | 1. **Chunk:** Split the review text into semantically meaningful segments (e.g., sentences or small paragraphs).
2. **Embed:** Call a pre-trained embedding model (e.g., `amazon.titan-embed-text-v1`) for each chunk.
3. **Index:** Store the resulting embedding vectors and associated metadata (`product_id`, `review_id`, original text chunk) in the Vector Database (Amazon OpenSearch). | - Indexed, searchable review embeddings in the Vector Database. | | **3. Continuous Training (CT)** | - **Scheduled:** e.g., monthly.
- **On-demand:** Triggered by a significant model quality degradation alert. | - A high-quality, curated dataset of reviews and human-annotated summaries from the S3 Data Lake.
- Base model ID (e.g., `mistral.mistral-7b-instruct-v0:2`). | 1. **Provision:** Spin up a GPU cluster using a SageMaker Training Job.
2. **Fine-tune:** Execute a Parameter-Efficient Fine-Tuning (PEFT) script using the LoRA technique.
3. **Artifact Creation:** Package the resulting LoRA adapter weights and the associated prompt template. | - A new candidate model artifact (LoRA adapter + config) stored in S3. | | **4. Model Evaluation & Registration** | Successful completion of a Continuous Training job. | - Candidate model artifact.
- "Golden" evaluation dataset (versioned with DVC). | 1. **Load:** Load the candidate model for inference.
2. **Generate:** Create summaries for the entire evaluation dataset.
3. **Score:** Evaluate the generated summaries using a specialized framework (e.g., Ragas) for faithfulness, relevance, etc.
4. **Compare:** Compare the new model's scores against the currently deployed production model.
5. **Register:** If the new model shows a statistically significant improvement, version and register it in the MLflow Model Registry. | - A go/no-go decision for deployment.
- (If successful) A new, versioned model in the MLflow Model Registry, promoted to "Staging". | | **5. CI/CD for Deployment** | A model is promoted to the "Production" stage in the MLflow Model Registry. | - The approved model artifact from the MLflow Registry. | 1. **Package:** GitHub Actions builds a new Docker container including the inference server (vLLM/TGI) and the model artifacts.
2. **Test:** Run integration and smoke tests.
3. **Deploy:** Orchestrate a safe, phased rollout (e.g., Canary) to the Amazon EKS serving environment. | - A new version of the inference service running in production, serving a small percentage of live traffic. | | **6. Batch Inference** | **Scheduled:** Runs every hour. | - List of `product_ids` that have received new reviews in the last hour. | 1. **Retrieve:** For each product, query the Vector Database to get the most relevant review snippets.
2. **Invoke:** Call the production LLM serving endpoint to generate a summary.
3. **Cache:** Store the generated summary in the low-latency database (DynamoDB) with a TTL. | - Freshly computed summaries available in the low-latency cache for fast retrieval. | #### 3.3 Why RAG for Reviews Summarization ? Let me clarify why, even without a direct user query and in a purely batch context, the RAG pattern (using embeddings and a Vector DB) is still the superior production architecture for this specific application. The reason is not about enabling real-time, but about **quality, control, and cost-efficiency at scale.** Let's compare the two approaches: ##### Approach 1: Recursive Summarization 1. For a product with 500 reviews, group them into chunks that fit the LLM context window. 2. Send each chunk to the LLM to get an intermediate summary. 3. Take all the intermediate summaries, and if there are still too many, repeat the process. 4. Finally, send the last set of summaries to the LLM for a final summary. This seems straightforward, but it suffers from several critical production-level flaws: * **1. The "Lost in the Middle" Problem:** LLMs have a known weakness with very long contexts. Information presented at the beginning and end of a prompt is recalled much more effectively than information buried in the middle. In a recursive process, critical details from an early review (e.g., a specific safety concern or a major product flaw) are highly likely to be averaged out and lost in the initial summarization pass. The final summary will then be based on "washed out" intermediate texts, making it generic and potentially misleading. * **2. Lack of Control and Bias Amplification:** This approach is a black box. You cannot control *what* the LLM focuses on. If a product has 450 generic positive reviews ("Great product!") and 50 highly detailed, recent negative reviews ("The battery dies in 2 hours"), the recursive approach will almost certainly produce a generic, positive summary. The negative signal will be drowned out. This leads directly to the "helpful hallucination" crisis we aim to avoid, where the summary does not reflect the most critical user feedback. * **3. Inability to Prioritize High-Signal Reviews:** Not all reviews are created equal. A "Verified Purchase" review marked as "helpful" by 100 users is far more valuable than a one-word, anonymous review. The recursive approach treats all text as equal, giving the same weight to low-quality and high-quality feedback. * **4. High Computational Cost:** This approach can be deceptively expensive. For a product with reviews requiring 10 chunks, this strategy involves **11 separate LLM calls**. At scale, across thousands of products, this becomes computationally inefficient and costly. ##### Approach 2: RAG for Batch Summarization (The Recommended Architecture) Here, we reframe the purpose of RAG. The "query" is not from an end-user; it's an **automated, internal query that represents a business logic or a summarization strategy.** 1. **One-time (cheap) cost:** As new reviews come in, they are chunked, embedded, and stored in the Vector DB. This is a fast, inexpensive background process. 2. **Batch Inference (with RAG):** When the nightly batch job runs for a product, it does not fetch all reviews. Instead, it executes an *automated query* against the Vector DB. This query is our point of control. For example: * *"For product 123, retrieve: a) the 5 most helpful positive review snippets, b) the 5 most helpful negative review snippets, and c) the 5 most recent review snippets."* 3. **Controlled Context:** The retrieved snippets—a small, curated, and highly relevant set of texts—are then assembled into a compact context. 4. **Single, High-Quality LLM Call:** This balanced and signal-rich context is passed to the LLM in a **single call** with a prompt like, "Based *only* on these reviews, summarize the pros, cons, and most recent feedback." **Why this is Superior for a Production System:** * **1. Full Control over Content:** This architecture allows us to *programmatically enforce a balanced summary*. We can guarantee that the most critical negative feedback is included, preventing bias and building user trust. * **2. Prioritization of High-Signal Data:** We can use the metadata in our Vector DB (helpfulness scores, verified purchase status) to ensure the LLM sees the *most important* reviews, not just a random assortment. * **3. Mitigates the "Lost in the Middle" Problem:** By feeding the LLM a much smaller, more salient context, we ensure it can effectively process all the provided information. * **4. Cost-Effective and Scalable:** This approach involves **only one LLM call per product** during the batch job, making it significantly cheaper and faster at scale than the 11 calls required by the recursive method. The upfront cost of embedding is minimal in comparison. * **5. Future-Proofing:** This architecture—a Vector DB of all review content—becomes a strategic asset. It can later be used for other features like semantic search ("find me laptops with a great keyboard") with no extra work. The RAG pattern (Embeddings + Vector DB) should be retained as the core of the **batch inference workflow**. Its primary benefit here is not real-time speed, but **unmatched control over summary quality, bias, and cost-efficiency**. It transforms the summarization task from a black-box text-stuffing exercise into a deliberate, controllable, and production-ready engineering process. ___ #### 3.3 Project Management and Stages This project will follow an iterative, stage-based approach. Each stage has a distinct focus and set of outcomes, allowing for structured progress and regular checkpoints to validate assumptions and de-risk the project. | Stage | Key Activities | Primary Outcome | | :--- | :--- | :--- | | **1. Ideation & Planning** | - Finalize business objectives and KPIs.
- Conduct feasibility study and risk assessment.
- Define the ML problem (Batch Abstractive Summarization via RAG).
- Select and document the core tech stack (as detailed in 3.1 & 3.2).
- Develop an initial project plan and timeline. | A clear, documented project charter that aligns all stakeholders on the goals, scope, and technical approach. | | **2. Model Experimentation**| - Establish baseline performance using a simple, non-LLM heuristic.
- **Prompt Engineering:** Meticulously craft and test various prompt templates.
- **RAG Strategy Evaluation:** Experiment with different retrieval strategies (e.g., balancing positive/negative/recent reviews).
- **Model Selection:** Compare performance of different open-source base models (e.g., Llama 3, Mistral, Gemma).
- **Fine-Tuning (PEFT):** Conduct LoRA fine-tuning on a curated dataset to improve domain-specific quality.
- **Evaluation:** Use LangSmith for debugging and Ragas for quantitative scoring of all experiments. | The selection of the optimal combination of base model, LoRA adapter, prompt template, and RAG strategy that meets the predefined quality metrics, ready for productionizing. | | **3. Pipeline Development** | - **Data Pipelines:** Build the Airflow DAGs for data ingestion, validation, and batch embedding generation.
- **Continuous Training (CT) Pipeline:** Automate the fine-tuning and evaluation process.
- **Batch Inference Pipeline:** Build the core workflow that orchestrates the RAG-based summary generation and caching. | A suite of automated, version-controlled pipelines that manage the entire data and model lifecycle from ingestion to inference. | | **4. Deployment & Serving** | - **Infrastructure as Code (IaC):** Define the serving infrastructure (EKS cluster, node groups) using Terraform.
- **CI/CD Pipeline:** Create the GitHub Actions workflow to containerize and deploy the inference service.
- **Serving Endpoint:** Deploy the optimized inference engine (vLLM/TGI) with the selected model. | A scalable, production-ready inference endpoint that can serve summaries based on the batch pipeline's logic. | | **5. Monitoring & In-Production Testing**| - **Set up Dashboards:** Configure Grafana/CloudWatch for system and operational metrics.
- **Implement Quality Monitoring:** Deploy the custom pipeline to sample and evaluate production summaries against quality metrics (e.g., faithfulness).
- **Set up Alerting:** Configure alerts for system failures, latency spikes, and model quality degradation.
- **A/B Testing:** Once stable, design and run an A/B test to measure the impact on the primary business KPIs. | A fully observable system with automated monitoring and alerting, providing the data needed to ensure reliability and validate business impact. | ___ #### 3.4 Cross-Functional Team & Roles For this project, we operated as a small, agile, cross-functional team. Clear role definition was crucial for efficiency and accountability. | Role | Primary Responsibilities | Key Artifacts & Deliverables | | :--- | :--- | :--- | | **Product Manager** | - Defines the business vision, requirements, and success KPIs.
- Manages the project roadmap and prioritizes work.
- Acts as the voice of the customer and internal business stakeholders. | - Product Requirements Document (PRD).
- A/B Test plan and success criteria.
- Go-to-market and stakeholder communication plan. | | **Data Engineer** | - Owns the data sourcing, ingestion, and validation pipelines.
- Manages the Data Lake (S3) and the Vector Database (OpenSearch).
- Builds and maintains the Airflow DAGs for populating the Vector DB. | - Deployed and monitored data ingestion pipelines.
- A populated, up-to-date Vector Database.
- Data quality dashboards and alerts. | | **ML/MLOps Engineer
(My Role)** | - Leads all model experimentation and evaluation.
- Owns the Continuous Training (CT) and Batch Inference pipelines.
- Develops the inference service application (using LangChain).
- Manages the CI/CD process for model deployment.
- Owns the model monitoring and alerting strategy. | - MLflow experiments and model artifacts in the Registry.
- Deployed CT and Batch Inference pipelines.
- Production inference service on EKS.
- Model quality monitoring dashboards. | ___ #### 3.5 Versioning and Governance Strategy To ensure reproducibility, auditability, and stability, a "version everything" philosophy is adopted. This creates a complete lineage from the raw data to the final summary served to a user. | Artifact | Tool for Versioning | Rationale & LLM-Specific Notes | | :--- | :--- | :--- | | **Code** | **Git (GitHub)** | The single source of truth for all application code, pipeline definitions (DAGs), and infrastructure-as-code scripts. | | **Data** | **DVC** | Used specifically for versioning the critical, human-curated "golden" datasets used for evaluation. This ensures that when we compare model v1.1 to v1.2, we know they were tested against the exact same data. | | **Prompts** | **Git (in a config file)** | **Prompts are treated as code.** A change to the prompt template is a critical change to the model's behavior. Prompts are versioned in Git and loaded by the application at runtime. A new prompt version can trigger a new model registration. | | **Model** | **MLflow Model Registry** | The central governance hub. A "model" version in the registry is a composite artifact that bundles:
1. The LoRA adapter weights.
2. A pointer to the base model (e.g., `mistral-7b-v0.2`).
3. The version of the prompt template to be used.
4. The inference configuration. | | **Infrastructure**| **Terraform / AWS CDK** | The entire cloud infrastructure (EKS clusters, S3 buckets, IAM roles) is defined as code and versioned in Git, enabling disaster recovery and preventing manual configuration drift. | ___ #### 3.6 Comprehensive Evaluation Strategy Testing an LLM-powered application requires a multi-layered approach that moves far beyond traditional software QA. The strategy must validate not only the code and infrastructure but also the quality of the data, the nuances of the model's learned behavior, and the integrity of the end-to-end generation process. This "crucible" ensures that what we build is reliable, fair, and trustworthy. The following table outlines the four core pillars of our testing strategy, detailing *what* we test, *why* it's critical for an LLM system, *how* we'll implement it, and *when* in the lifecycle it occurs. | Test Category | Specific Test | Purpose & LLM-Specific Focus | Tools / Method | Stage / Trigger | | :--- | :--- | :--- | :--- | :--- | | **1. Data & Prompt Quality Testing** (The Foundation) | **Input Schema & Value Validation** | **Purpose:** Ensure the structural integrity of incoming review data.
**LLM Focus:** Prevent "Garbage In, Garbage Out." A malformed review text can lead to nonsensical embeddings and poor summaries. | Great Expectations | During the data ingestion pipeline (CI/CD) | | | **PII & Toxicity Screening (Inputs)** | **Purpose:** Identify and redact Personally Identifiable Information (PII) and filter out toxic content *before* it ever reaches the LLM.
**LLM Focus:** A critical data privacy and safety guardrail. | Custom regex, open-source libraries (e.g., `presidio`), or a lightweight classification model. | During the data ingestion pipeline (CI/CD) | | | **Prompt Template Unit Tests** | **Purpose:** Validate that the prompt formatting logic is correct.
**LLM Focus:** Treating the prompt as code. A bug in the prompt templating can break the entire system. | `pytest` with mocked review snippets. | On code commit (CI) | | **2. Offline Model & Application Logic Evaluation** (The Quality Gate) | **Behavioral "Unit" Tests** | **Purpose:** Test for basic, expected model behaviors on simple inputs.
**LLM Focus:** Invariance (paraphrasing a review shouldn't drastically change the summary); Directionality (adding a strong negative fact should make the summary sentiment more negative). | `Giskard` or custom `pytest` scripts. | On code commit, and within the model evaluation pipeline (CI/CD). | | | **RAG-Specific Evaluation** | **Purpose:** Quantitatively measure the quality of the Retrieval-Augmented Generation process.
**LLM Focus:** This is the core of our quality assessment. We measure:
• **Faithfulness:** Does the summary contain "hallucinations" or is it factually grounded in the retrieved reviews?
• **Context Precision:** Are the retrieved review snippets relevant to the final summary? | `Ragas` framework. | Within the automated model evaluation pipeline after every training run (CD). | | | **Holistic Quality Evaluation** | **Purpose:** Assess the overall readability and usefulness of the summary.
**LLM Focus:** Measure human-like qualities: **Coherence** (is it logical?), **Conciseness** (is it to the point?), and **Fluency** (is it well-written?). | **LLM-as-a-judge:** Using a powerful model (e.g., GPT-4o) with a detailed rubric to score the summaries on a Likert scale. | Within the model evaluation pipeline (CD). | | | **Safety & Bias Evaluation** | **Purpose:** Ensure the generated summaries are not toxic, biased, or unfair.
**LLM Focus:** We run slice-based evaluation, specifically testing performance on products with highly polarized reviews to ensure both positive and negative viewpoints are represented fairly. | Custom evaluation scripts and toxicity classifiers (e.g., Detoxify). | Within the model evaluation pipeline (CD). | | **3. Pipeline & Infrastructure Testing** | **Component Integration Tests** | **Purpose:** Verify that the different components of the application logic work together.
**LLM Focus:** Test the full RAG flow: mock the Vector DB -> test the retrieval logic -> verify correct prompt construction -> mock the LLM call. | `pytest` with mocked services (`unittest.mock`). | On code commit (CI). | | | **Batch Inference Pipeline E2E Test** | **Purpose:** Run the entire batch inference pipeline on a small, representative sample of data in a staging environment.
**LLM Focus:** Ensures the orchestration, RAG process, and caching mechanism work end-to-end. | Airflow DAG testing triggered via GitHub Actions. | Before deploying changes to production (CD). | | | **Serving Endpoint Load Test** | **Purpose:** Ensure the deployed model serving endpoint can handle the expected load for the batch pipeline.
**LLM Focus:** Measure key performance metrics like **tokens/second throughput** and latency under load to validate cost-effectiveness and scaling. | `Locust`. | Before deploying new model versions to production (CD). | | **4. In-Production Evaluation** (The Final Verdict) | **Shadow Deployment Comparison** | **Purpose:** Safely compare a new "challenger" model against the current "champion" model using live data without impacting users.
**LLM Focus:** The nightly batch job runs for *both* models. The generated summaries are stored separately and compared offline using our Ragas/LLM-as-a-judge pipeline to provide the highest-fidelity signal of real-world performance. | Custom comparison script within the batch inference pipeline. | Ongoing for any new model candidate. | | | **Automated Quality Monitoring** | **Purpose:** Continuously monitor the quality of summaries being produced by the live model.
**LLM Focus:** A scheduled pipeline periodically samples the latest generated summaries and runs them through the Ragas/LLM-as-a-judge evaluation suite. Alerts are triggered if key metrics like **Faithfulness** drop below a predefined threshold. | Custom Airflow DAG. | Scheduled (e.g., daily). | | | **A/B Testing** | **Purpose:** To definitively measure the business impact of a new model or major system change.
**LLM Focus:** Even with a batch system, we can run A/B tests by showing a subset of users summaries generated by the challenger model. We then measure the impact on our primary business KPIs (Conversion Rate, Return Rate). | A/B Testing Platform (e.g., Optimizely, or in-house). | After a new model has passed all other tests and is deemed stable and safe for production traffic. | ___ *** ### 5. Data Engineering & Pipelines: The Foundation for Summarization The quality of our automated summaries is directly proportional to the quality of the data we feed our model. The following pipelines are designed to be automated, reliable, and auditable, forming the data backbone of the entire system. We need to transform raw, multilingual reviews into a clean, indexed, and searchable knowledge base (the Vector DB) that our RAG-based summarization model can query efficiently during the batch inference job.To achieve this in a modular and robust way, we will design **two distinct but interconnected data pipelines**, orchestrated by Airflow. This separation of concerns is a critical design choice for maintainability and scalability. 1. **Pipeline 1: Data Ingestion & Cleaning:** Its sole responsibility is to process raw reviews and produce a clean, validated, and versioned dataset in our S3 Data Lake. 2. **Pipeline 2: Embedding Generation:** This downstream pipeline listens for new, clean data from Pipeline 1 and is responsible for creating and indexing the vector embeddings required for RAG. Here is the detailed plan for each pipeline. #### **Pipeline 1: Daily Data Ingestion & Cleaning** * **Objective:** To ingest all new reviews from the past 24 hours, clean them, validate their quality, and store them as a versioned dataset in the S3 Data Lake. * **Orchestrator:** Amazon MWAA (Airflow) * **Schedule:** Runs once every 24 hours. | Stage | Operation Details | Tools | Rationale & LLM-Specific Focus | | :--- | :--- | :--- | :--- | | **1. Ingestion** | **Extract New Reviews:** Fetch new customer reviews from the source database (e.g., based on a `created_at` timestamp). | `psycopg2`, custom Python scripts. | Initial step to gather the daily delta of reviews to be processed. | | **2. Preprocessing & Cleaning** | **HTML & Special Character Removal:** Strip out any HTML tags, URLs, and non-standard characters from the review text.
**Text Normalization:** Lowercase text, normalize whitespace. | `BeautifulSoup`, `regex`, `pandas`. | This creates a uniform text format, which is crucial for the consistency of both embedding models and the summarization LLM. | | **3. Safety & Privacy** | **PII Redaction:** Scan review text for Personally Identifiable Information (email addresses, phone numbers) and redact it.
**Toxicity Filtering:** Score reviews for toxicity. Flag or filter out highly toxic content to prevent it from being used in summaries. | `presidio` (for PII), `detoxify` or similar library (for toxicity). | **Crucial LLM Guardrail:** We must ensure that sensitive customer data or toxic language is never passed to the LLM or inadvertently included in a generated summary. | | **4. Filtering & Sampling** | **Filter "Noise" Reviews:** Remove reviews that are too short to be useful (e.g., < 5 words) or identified as spam. | `pandas`. | Improves the signal-to-noise ratio of our dataset. There is no value in summarizing or embedding reviews like "ok" or "good." | | **5. Schema & Metadata Extraction** | **Structure Data:** Format the cleaned data into a defined schema (`review_id`, `product_id`, `star_rating`, `cleaned_text`, `language`, `toxicity_score`).
**Language Detection:** Identify the language of each review. | `langdetect`, `pandas`. | The detected language is critical metadata. It allows for potential language-specific RAG strategies and helps in debugging model performance on different languages. | | **6. Data Validation** | **Validate Cleaned Data:** Run a Great Expectations suite on the processed data frame to check:
• `review_id` is unique and not null.
• `star_rating` is between 1 and 5.
• `cleaned_text` is a string.
• `language` is in the set of expected languages. | Great Expectations | This is an automated quality gate. If the daily batch of cleaned data fails validation, the pipeline stops, and an alert is sent, preventing corrupted data from entering the S3 Lake and the Vector DB. | | **7. Versioning & Storage** | **Store & Version Data:** Save the final, validated dataset as a Parquet file in the S3 Data Lake. Use DVC to version this dataset and commit the `.dvc` file to Git. | `pyarrow`, `DVC`, `Git`. | This creates an immutable, auditable history of our training data. We can always trace a model's behavior back to the exact version of the data it was trained or evaluated on. | ##### Architecture Diagram #### **Pipeline 2: Embedding Generation** * **Objective:** To take newly cleaned reviews, generate vector embeddings for them, and index them in the Vector Database for retrieval during the batch summarization job. * **Orchestrator:** Amazon MWAA (Airflow) * **Trigger:** Runs upon the successful completion of "Pipeline 1: Data Ingestion & Cleaning." | Stage | Operation Details | Tools | Rationale & LLM-Specific Focus | | :--- | :--- | :--- | :--- | | **1. Data Retrieval** | **Load Cleaned Data:** Retrieve the latest versioned dataset of cleaned reviews from the S3 Data Lake. | `DVC API`, `pandas`. | Ensures this pipeline always works on the most recent, validated data. | | **2. Text Chunking** | **Split Reviews into Segments:** For each review, split the `cleaned_text` into smaller, semantically coherent chunks (e.g., sentences or groups of sentences, max 256 tokens). | LangChain (`RecursiveCharacterTextSplitter`). | **Core RAG Requirement:** We embed chunks, not entire reviews. This provides more granular retrieval, allowing the RAG system to find the *exact sentences* that are most relevant, rather than an entire, potentially long review. | | **3. Embedding** | **Generate Vector Embeddings:** For each text chunk, call a pre-trained, multilingual embedding model to generate a high-dimensional vector representation. | Amazon Bedrock (`amazon.titan-embed-text-v2`), `boto3`. | This is the heart of the "feature engineering" for our LLM. The embeddings capture the semantic meaning of the text, enabling the similarity search that powers our RAG strategy. | | **4. Indexing** | **Store in Vector DB:** Ingest the embeddings into the Vector Database, along with their associated metadata (the original text chunk, `review_id`, `product_id`, `star_rating`, `language`). | `opensearch-py`. | This makes the review data searchable based on semantic meaning. The rich metadata allows our batch inference job to execute sophisticated retrieval logic (e.g., "find helpful negative reviews for product X"). | ##### Architecture Diagram ___ ### 6. Feature Engineering: From Hand-Crafted Features to Semantic Vectors In traditional machine learning, feature engineering is often a laborious process of manually creating signals from raw data (e.g., TF-IDF scores, n-gram counts, sentiment analysis). In the modern LLM era, this paradigm has fundamentally shifted. The primary and most powerful feature is the **semantic meaning of the text itself**, captured through high-dimensional vector embeddings. For this project, our feature engineering strategy is not about creating dozens of columns in a table. Instead, it's about creating a rich, searchable knowledge base. #### 6.1 The "Features" for our RAG System Our system relies on two types of features, which are generated by our data pipelines and stored together in our Vector Database. | Feature Type | Description | Generation Process | Role in the System | | :--- | :--- | :--- | :--- | | **Primary Feature:
Semantic Embeddings** | High-dimensional vectors (e.g., 1024 dimensions) that represent the semantic meaning of a chunk of review text. Words and sentences with similar meanings will have vectors that are close to each other in the vector space. | Generated by passing text chunks through a pre-trained, multilingual embedding model (e.g., Amazon Titan) during the **Embedding Generation Pipeline**. | **The Engine of Retrieval.** These embeddings allow us to perform a similarity search to find the most relevant review snippets for the summarization task. | | **Secondary Features:
Retrieval Metadata** | Standard data attributes that are stored alongside each embedding vector. This includes `star_rating`, `language`, `review_date`, a calculated `helpfulness_score` (from clickstream data), and a `is_verified_purchase` flag. | Extracted or calculated during the **Data Ingestion & Cleaning Pipeline**. | **The Brain of Retrieval.** This metadata is not passed to the LLM directly but is used by our *retrieval logic* to filter and rank the results of the semantic search. It allows us to ask sophisticated internal questions like, *"Find me the most helpful, 5-star, verified purchase reviews written in German in the last 90 days."* | #### 6.2 The Vector Database: The New Feature Store In this LLM-powered, RAG-based architecture, the **Vector Database (Amazon OpenSearch) serves as our de facto Feature Store**. * **Traditional Feature Store:** Stores versioned, tabular features (e.g., `user_7_day_purchase_count`) for training and low-latency serving. * **Our "Feature Store":** Stores versioned vector embeddings (the primary feature) and their associated metadata (the secondary features). It is optimized for the core operation we need: efficient, low-latency similarity search and metadata filtering. By completing the Data Ingestion and Embedding Generation pipelines, we have effectively built our feature engineering process and populated our feature store. --- *** ### 8. ML Training Pipeline: Planning the Continuous Fine-Tuning Workflow To ensure our summarization model adapts to new product types, evolving customer language, and emerging review patterns, a "train-once" approach is insufficient. We need an automated **Continuous Training (CT) pipeline** that can periodically fine-tune our model on fresh data, evaluate its performance, and register it for production deployment if it proves superior. This section outlines the comprehensive plan for the artifacts required to build this critical pipeline. #### **8.1 Python Scripts (Pipeline Components)** The pipeline's logic will be encapsulated in a series of modular Python scripts, each designed to be executed as a containerized task orchestrated by Airflow. | Component Script | Description | Key Libraries / Frameworks | | :--- | :--- | :--- | | **`1_data_selection.py`** | Selects the data for the fine-tuning job. It combines a fresh, curated sample of the public multilingual dataset with a sample of high-quality, recently collected internal reviews (from the S3 Data Lake). | `pandas`, `DVC API`, `pyarrow`. | | **`2_data_validation.py`** | Validates the selected training dataset against a predefined set of rules (e.g., schema checks, text length validation, no nulls) to prevent training on corrupted data. | Great Expectations. | | **`3_model_training.py`** | The core fine-tuning component. It loads the base model (`Mistral-7B`), applies the PEFT/LoRA configuration, and runs the training job on the validated dataset using a managed GPU cluster. | Hugging Face `transformers`, `peft`, `accelerate`. | | **`4_model_evaluation.py`** | Evaluates the newly fine-tuned model artifact against the "golden" evaluation dataset. It generates summaries and scores them using a multi-faceted approach. | `Ragas` (for RAG metrics), `Giskard` (for behavioral tests), `openai` (for LLM-as-a-judge). | | **`5_model_registration.py`**| The final governance gate. It compares the new model's evaluation scores against the currently deployed production model. If the new model meets the promotion criteria, it is versioned and registered in the MLflow Model Registry. | `mlflow`. | #### **8.2 Unit Tests (`pytest`)** To ensure the reliability of each component, we will implement a suite of unit tests. * **Test Data Selection:** Verify that `1_data_selection.py` correctly loads data from mock S3 paths and handles empty or missing data gracefully. * **Test Training Script Logic:** Test the setup functions within `3_model_training.py` to ensure it correctly parses configurations and loads the base model and tokenizer, without running a full training loop. * **Test Evaluation Logic:** For `4_model_evaluation.py`, provide a small, fixed set of pre-generated summaries and verify that the metric calculation functions (e.g., parsing Ragas output) work as expected. * **Test Registration Logic:** For `5_model_registration.py`, test the decision-making function by passing it various mock metric scores (e.g., "new model better," "new model worse," "new model marginally better but below threshold") and asserting the correct outcome (register or skip). #### **8.3 Pipeline Code (Airflow DAG)** The components will be orchestrated into a single, cohesive pipeline using an Airflow DAG. * **File:** `llm_finetuning_dag.py` * **Trigger:** Scheduled to run monthly and can be triggered manually for on-demand retraining. * **Tasks:** 1. `data_selection_task`: Executes the data selection script. 2. `data_validation_task`: Executes the validation script. Fails the pipeline if data quality checks do not pass. 3. `model_training_task`: Executes the fine-tuning script as an **Amazon SageMaker Training Job**. This offloads the heavy GPU computation to a managed, ephemeral cluster. 4. `model_evaluation_task`: Executes the evaluation script on a CPU instance once training is complete. 5. `model_registration_task`: Executes the registration script, making the final go/no-go decision. #### **8.4 Infrastructure as Code (Terraform)** The necessary AWS infrastructure for this pipeline will be defined declaratively. * **IAM Role for SageMaker:** A dedicated IAM role for SageMaker Training Jobs, granting it least-privilege access to read data from S3, pull the base model from Hugging Face, and write artifacts back to S3. * **IAM Role for Airflow:** An execution role for the MWAA environment, allowing it to trigger SageMaker jobs and log to CloudWatch. * **ECR Repository:** A repository to store the Docker images for our custom pipeline components. #### **8.5 Integration Test** A CI/CD workflow will run an automated integration test on the entire pipeline. * **Process:** The test will trigger the Airflow DAG in a staging environment. It will use a tiny, dedicated test dataset. The `model_training_task` will be configured to run for only a few steps to produce a dummy model artifact quickly. * **Goal:** The test passes if the DAG runs to completion without errors and a new (dummy) model version is successfully registered in a staging MLflow instance. This validates that all components, permissions, and configurations work together correctly. #### **8.6 Architecture Diagram** #### **8.7 CI/CD Workflow (GitHub Actions)** The deployment and testing of the pipeline code itself will be automated. * **File:** `.github/workflows/deploy_finetuning_pipeline.yml` * **Trigger:** On push to the `main` branch. * **Jobs:** 1. **`lint-and-unit-test`:** Runs static code analysis (`flake8`) and `pytest` for all unit tests. 2. **`build-and-push-images`:** Builds Docker images for the pipeline components and pushes them to our Amazon ECR repository. 3. **`run-integration-test`:** (If previous job succeeds) Triggers the integration test on the staging Airflow environment. 4. **`deploy-to-production`:** (If integration test succeeds) Syncs the updated Airflow DAG file to the production S3 bucket, allowing MWAA to deploy the new pipeline version automatically. *** ### 9. Batch Inference Pipeline: Planning the Production Summarization Workflow This pipeline is the workhorse of the system. It runs frequently to ensure summaries are kept up-to-date with the latest customer feedback, using the RAG-based strategy we've established for quality and control. This pipeline's objective is to efficiently generate and cache high-quality summaries for all products that have received new reviews, ensuring that the information presented to users is fresh and relevant. #### **9.1 Python Scripts (Pipeline Components)** The batch inference process is broken down into a sequence of focused, testable Python scripts. | Component Script | Description | Key Libraries / Frameworks | | :--- | :--- | :--- | | **`1_get_products_to_update.py`**| Queries the application's production database to get a list of `product_id`s that have received new reviews since the last successful pipeline run. | `psycopg2` (or other DB driver), `pandas`. | | **`2_retrieve_rag_context.py`** | For each product ID, executes the defined RAG strategy: queries the Vector DB (OpenSearch) to retrieve the most relevant review snippets (e.g., top positive, top negative, most recent). | `opensearch-py`, LangChain (for prompt templating). | | **`3_generate_summaries.py`** | Takes the retrieved contexts, formats them into prompts, and sends them in batches to the production LLM serving endpoint. Handles API responses and error conditions. | `requests`, `httpx`. | | **`4_validate_and_cache.py`**| Receives the raw JSON summaries from the LLM. Performs a basic validation (e.g., checks for required keys). Writes the valid summaries to the low-latency cache (DynamoDB) with a defined TTL. | `boto3`. | #### **9.2 Unit Tests (`pytest`)** Each script will be accompanied by unit tests to guarantee its logic and error handling. * **Test Product Retrieval:** For `1_get_products_to_update.py`, mock the database connection and test that the correct product IDs are returned based on mock timestamps. * **Test RAG Logic:** For `2_retrieve_rag_context.py`, mock the OpenSearch client. Verify that the script constructs the correct, complex query based on our RAG strategy and correctly assembles the final prompt. * **Test API Invocation:** For `3_generate_summaries.py`, mock the inference service endpoint. Test its ability to handle successful (200) responses, rate limiting (429), and server errors (500). * **Test Caching Logic:** For `4_validate_and_cache.py`, mock the DynamoDB client. Test that valid JSON summaries are written correctly and that malformed summaries are logged as errors. #### **9.3 Pipeline Code (Airflow DAG)** An Airflow DAG will orchestrate the execution of these components in a reliable, scheduled manner. * **File:** `batch_inference_dag.py` * **Trigger:** Scheduled to run hourly. * **Tasks:** 1. `get_products_to_update_task`: Executes the product retrieval script. 2. `check_if_products_exist_task`: A `BranchPythonOperator` that checks if the previous task returned any products. If not, the DAG skips to a final "success" state to avoid running expensive tasks unnecessarily. 3. `retrieve_rag_context_task`: Executes the RAG context retrieval script. 4. `generate_summaries_task`: Executes the summary generation script. 5. `validate_and_cache_task`: Executes the final caching script. #### **9.4 Infrastructure as Code (Terraform)** The AWS resources supporting this pipeline will be defined as code. * **Amazon DynamoDB Table:** Definition of the low-latency cache table, including its partition key (`product_id`), attributes (`summary_json`, `last_updated`), and TTL configuration. * **IAM Role for Airflow:** An execution role for MWAA granting permissions to: * Read from the production application database. * Query the Amazon OpenSearch cluster. * Invoke the production EKS inference endpoint (requires VPC networking and security group configuration). * Write items to the DynamoDB cache table. #### **9.5 Integration Test** A CI/CD workflow will validate the entire pipeline end-to-end in a staging environment. * **Process:** The test will pre-populate a staging database and staging Vector DB with a few sample reviews for a test product. It will then trigger the `batch_inference_dag` in the staging Airflow environment. * **Goal:** The test passes if, after the DAG completes, the script can successfully query the staging DynamoDB table and find a newly generated summary for the test product. This verifies that all permissions, connections, and logic work in concert. #### **9.6 Architecture Diagram** #### **9.7 CI/CD Workflow (GitHub Actions)** This workflow automates the testing and deployment of the *pipeline code itself*. * **File:** `.github/workflows/deploy_batch_inference_pipeline.yml` * **Trigger:** On push to the `main` branch. * **Jobs:** 1. **`lint-and-unit-test`:** Runs static code analysis and `pytest` for all batch pipeline scripts. 2. **`deploy-dag-to-staging`:** Deploys the updated Airflow DAG to the staging environment. 3. **`run-integration-test`:** Triggers the end-to-end integration test on the staging Airflow environment. 4. **`trigger-load-test`:** * **Condition:** This job **only runs if** specific files that affect the load pattern have changed (e.g., `generate_summaries.py`, `retrieve_rag_context.py`). We can use path filters in GitHub Actions for this. * **Action:** Calls the `reusable_load_test.yml` workflow, targeting the **main production serving endpoint**. This validates that our *new client code* works correctly with the *current production server*. 5. **`deploy-dag-to-production`:** (If all previous jobs succeed) Syncs the updated Airflow DAG to the production MWAA environment. *** ### Model (LLM Serving Endpoint) Deployment Pipeline #### Why a shared, reusable Load Test ? There are two distinct deployment processes, 1. **Deploying the Batch Inference Pipeline Logic (Airflow DAG):** This happens when we change the *orchestration code*. Its CI/CD checks that the DAG runs correctly. 2. **Deploying the Model Endpoint (EKS Service):** This happens when a *new model is promoted*. This is where we must validate the performance and stability of the model server itself before it handles production requests from our batch pipeline. This is the critical workflow that is triggered whenever a new model is approved and needs to be rolled out to production. It ensures that the new model not only works correctly but also performs efficiently under load. The key insight is that the **Batch Inference Pipeline *is the primary client*** for our LLM Serving Endpoint. The performance of the endpoint is not just a property of the model itself; it's a function of the interaction between the server and the client that calls it. Therefore: 1. **Changing the Server (Model Endpoint):** Deploying a new model (e.g., a larger one, or one with a different quantization level) will obviously change performance characteristics. This **must** be load tested. 2. **Changing the Client (Batch Pipeline):** A seemingly innocuous change in the batch pipeline's code can dramatically alter the load pattern. For example: * Changing the RAG retrieval logic in `retrieve_rag_context.py` could create much larger prompt contexts, increasing the processing time per request. * Changing the parallelism or batching logic in `generate_summaries.py` could alter the number of concurrent requests sent to the endpoint. A change in either component can break the system's performance. Therefore, any significant change to either the client (the pipeline) or the server (the endpoint) must trigger a load test to validate their interaction. The optimal solution is not to have two separate load tests, but to have a single, **reusable load testing workflow** that can be called by *both* CI/CD pipelines. #### CI/CD for the LLM Serving Endpoint (with the shared Load Test) * **File:** `.github/workflows/deploy_llm_serving_endpoint.yml` * **Trigger:** A model is manually promoted from "Staging" to "Production" in the MLflow Model Registry. * **Jobs:** | Job | Description | Key Artifacts / Outputs | | :--- | :--- | :--- | | **1. Build Container** | - Fetches the newly promoted model artifacts (LoRA weights, config) from MLflow.
- Builds a new Docker container with the inference server (vLLM/TGI) and the model artifacts baked in.
- Pushes the versioned container image to Amazon ECR. | - A new, immutable Docker image in ECR tagged with the model version. | | **2. Deploy to Staging** | - Uses Terraform/CDK to deploy the new container to a *separate staging endpoint* in the EKS cluster. This endpoint is identical to production but does not receive live traffic. | - A live, but isolated, staging/canary model endpoint. | | **3. Smoke & Integration Test** | - Runs a small suite of tests against the staging endpoint:
• **Health Check:** Is the endpoint responsive?
• **API Contract Test:** Does the request/response schema match?
• **Consistency Check:** Does the endpoint give the exact same prediction for a sample input as it did during offline evaluation? | - A go/no-go signal for further testing. | | **4. Load Test** | - If smoke tests pass, a `Locust` load test is automatically triggered against the **staging endpoint**.
- The test simulates the expected load from our hourly batch inference job (e.g., thousands of requests in a short burst).
- It measures and asserts that P95 latency and tokens/second throughput are within our defined SLOs. | - Performance metrics (latency, throughput, error rate).
- A go/no-go signal based on whether the endpoint meets its performance targets. | | **5. Phased Rollout to Production** | - If the load test passes, the workflow begins a safe, automated rollout to the main production endpoint.
- **Canary Strategy:** It updates the EKS service to direct a small percentage of traffic (e.g., 10%) from the live batch pipeline to the new model version.
- The workflow pauses and monitors key metrics (latency, error rates from CloudWatch) for a predefined period. | - A new model version serving a small fraction of production traffic. | | **6. Promote to 100%** | - If the canary phase shows no issues, the workflow automatically proceeds to shift 100% of the production traffic to the new model version.
- The old model version is kept running for a short period to allow for a fast rollback if needed. | - The new model is now the fully live "champion" model in production. | *** ### 10. Monitoring and Observability: Ensuring Production Health and Quality Once deployed, our system enters its most critical phase. Monitoring is not a passive activity but a proactive, multi-layered strategy designed to provide a holistic view of the system's health, from the underlying infrastructure to the data flowing through it, and—most importantly—the quality of the LLM's generated summaries. Our strategy is built on three distinct pillars: 1. **System & Operational Health:** Is the infrastructure running correctly? 2. **Data Quality & Drift:** Are the inputs to our model trustworthy? 3. **Model Quality & Performance:** Are the outputs of our model accurate, safe, and helpful? #### **Monitoring and Alerting Plan** | Category | Specific Metric | Tool / Method | Alerting Threshold & Recipient (Example) | | :--- | :--- | :--- | :--- | | **1. System & Operational Health**| **Batch Inference DAG Success Rate**| **Amazon CloudWatch** (from MWAA logs) | **Alert:** On any DAG run failure.
**Recipient:** On-call Data Engineer, On-call MLOps Engineer. | | | **EKS Pod Health (Serving Endpoint)**| **Prometheus / Grafana** | **Alert:** When the number of ready pods is less than the desired replica count for > 5 mins.
**Recipient:** On-call MLOps Engineer. | | | **GPU Utilization (Serving Endpoint)** | **Prometheus** (via DCGM exporter) | **Alert:** If average GPU utilization during a batch run is < 30% (indicates inefficiency) or > 95% for an extended period (indicates system is overloaded).
**Recipient:** On-call MLOps Engineer. | | | **API Error Rate (Serving Endpoint)** | **Prometheus / Grafana** | **Alert:** If the rate of HTTP 5xx errors exceeds 2% over a 10-minute window.
**Recipient:** On-call MLOps Engineer. | | | **Cache Performance (DynamoDB)** | **Amazon CloudWatch**| **Alert:** On sustained increases in read/write latency or throttled requests.
**Recipient:** On-call Data Engineer. | | **2. Data Quality & Drift** | **Input Data Drift (Semantic)** | **Custom Airflow DAG** using Python (`scipy`, `evidently.ai`). | **Alert:** If Population Stability Index (PSI) between daily review embeddings and the training set baseline exceeds 0.25.
**Recipient:** Data Engineering & MLOps teams (as a non-urgent ticket). | | | **Input Data Properties Drift** | **Custom Airflow DAG** | **Alert:** If the distribution of `star_rating` or `language` changes by > 20% week-over-week.
**Recipient:** Data Engineering team. | | | **PII Leakage Rate (Input)** | **Great Expectations** step in the ingestion pipeline. | **Alert:** On *any* detection of PII. This is a critical failure.
**Recipient:** High-priority alert to Legal, Security, and Data Engineering teams. | | **3. Model Quality & Performance** | **Summary Faithfulness (Hallucinations)** | **Automated Quality Monitoring DAG** using `Ragas`. | **CRITICAL ALERT:** If the average Faithfulness score of the daily sample drops below 0.95. This is the most important quality metric.
**Recipient:** Immediate page to On-call MLOps Engineer. May trigger an automated pause of the batch pipeline. | | | **Summary Coherence & Fluency** | **Automated Quality Monitoring DAG** using LLM-as-a-judge (GPT-4o). | **Warning:** If the average coherence score drops below 4.0/5.
**Recipient:** A ticket is automatically created for the MLOps team to investigate in the next sprint. | | | **Toxicity & Safety (Output)**| **Automated Quality Monitoring DAG** using a toxicity classifier. | **CRITICAL ALERT:** If the rate of generated summaries flagged as toxic exceeds 0.1%.
**Recipient:** Immediate page to MLOps and Product teams. | | | **Format Adherence** | **Automated Quality Monitoring DAG** (simple JSON schema validation). | **Alert:** If > 1% of sampled summaries fail to parse or do not contain the required "pros" and "cons" keys.
**Recipient:** MLOps team. | | | **RAG Context Relevance**| **Automated Quality Monitoring DAG** using `Ragas` (Context Precision). | **Warning:** If the context precision score drops, indicating the retrieval logic may be degrading.
**Recipient:** MLOps team ticket. | This comprehensive monitoring plan transforms our system from a "fire-and-forget" deployment into a closely observed, self-diagnosing service. By setting clear SLOs and automated alerts for system health, data integrity, and model quality, we create the necessary feedback loops to maintain a high-quality user experience and build long-term trust in our GenAI application. *** ### 11. Closing the Loop: Continual Learning & Production Testing A deployed model is the beginning of the journey, not the end. In the dynamic environment of e-commerce, customer language evolves, new products introduce new vocabularies, and the meaning of a "good" review can shift. Our system must adapt to these changes to remain effective. This phase focuses on the strategies for model evolution and the rigorous methods for validating its real-world business impact. #### 11.1 Continual Learning & Retraining Strategy The goal is to keep our summarization model accurate and relevant without incurring the prohibitive cost of retraining from scratch. * **Triggers for Retraining:** Our Continuous Training (CT) pipeline is not run arbitrarily. It is triggered by two specific conditions: 1. **Reactive (Alert-Driven):** An alert from our automated quality monitoring system signals a significant degradation in production model performance (e.g., the Faithfulness score drops below our 0.95 SLO). This is an emergency trigger to remediate a live issue. 2. **Proactive (Scheduled):** A scheduled trigger (e.g., quarterly) ensures the model is periodically refreshed with the latest data, capturing gradual concept drift and new language patterns before they become a major issue. * **The Retraining Process:** When triggered, the planned `llm_finetuning_dag` executes the PEFT/LoRA fine-tuning process on a newly curated dataset. This dataset includes a fresh sample of high-quality reviews collected since the last run, combined with a representative sample of older data to mitigate catastrophic forgetting. #### 11.2 Mitigating Catastrophic Forgetting: A Core LLM Challenge **The Problem:** The primary risk of continual learning is **catastrophic forgetting**. If we fine-tune our model solely on a new wave of "Home & Kitchen" reviews, it might become an expert in that domain but forget the specific nuances and vocabulary required to effectively summarize "Fashion" or "Electronics" reviews. This would silently degrade the user experience for a large portion of our product catalog. **Our Mitigation Strategy: A Modular, Multi-Adapter Architecture** Instead of fine-tuning a single, monolithic model, we will adopt a more robust, **federated adapter strategy**. 1. **Domain-Specific Adapters:** We will train separate, lightweight LoRA adapters for distinct, high-level product categories (e.g., `electronics_adapter`, `fashion_adapter`, `home_adapter`). 2. **Isolated Training:** When the "Electronics" domain requires an update, we *only* retrain the `electronics_adapter` on new electronics reviews. The weights of the base model and all other adapters remain untouched. 3. **Dynamic Serving:** Our inference service on EKS will be architected to support **Multi-LoRA serving (e.g., using LoRAX)**. When a request comes for a product, the service identifies its category and dynamically loads the appropriate LoRA adapter on top of the shared base model to generate the summary. This architecture is the ultimate defense against catastrophic forgetting. It contains the impact of retraining to a specific domain, ensuring that improvements in one area do not cause regressions in another. #### 11.3 Phased Production Testing: From Safety to Business Impact Before a newly trained model (or adapter) can become the new production "champion," it must pass a rigorous, phased testing process in the live environment. | Stage | Test Method | Purpose & LLM-Specific Focus | | :--- | :--- | :--- | | **1. Shadow Deployment** | The new "challenger" model/adapter runs in parallel with the current "champion." | **Purpose:** Safely validate the challenger's performance on live, messy production data with **zero user impact**.
**Process:** Our batch inference pipeline calls both models. The challenger's summaries are logged but not served. We then compare them offline to the champion's outputs using our Ragas/LLM-as-a-judge pipeline. This is our final quality gate before exposing the model to users. | | **2. A/B Testing** | A subset of users are shown summaries generated by the challenger model, while the control group sees summaries from the champion model. | **Purpose:** To definitively and quantitatively measure the **business impact** of the new model. This moves beyond offline metrics to answer the ultimate question: "Does this new model help our users and our business?"
**Primary KPIs Measured:** Conversion Rate, Product Return Rate, User Engagement ("Helpful" clicks). | #### 11.4 A/B Testing Framework for a Batch System Executing an A/B test in our batch-oriented system requires a specific workflow: This framework allows us to rigorously test the real-world impact of our model updates. A new model is only fully rolled out and becomes the new "champion" if it demonstrates a statistically significant improvement in our primary business KPIs, completing the virtuous cycle of our MLOps process. *** ### 12. Governance, Ethics & The Human Element A production AI system's success is ultimately measured not just by its accuracy but by its reliability, fairness, and the trust it earns from users and stakeholders. This section outlines the governance framework and ethical guardrails that ensure our summarization feature is developed and operated responsibly. #### 12.1 Comprehensive Model Governance Governance is integrated throughout our MLOps lifecycle to ensure compliance, reproducibility, and control. | Lifecycle Stage | Governance Component | Key Tasks & Artifacts for this Project | | :--- | :--- | :--- | | **Development**| **Reproducibility** | **Versioning Everything:** All components are versioned to enable full reproducibility.
• **Code:** Git commits.
• **Data:** DVC for evaluation datasets.
• **Prompts:** Versioned as config files in Git.
• **Models:** Registered and versioned in the MLflow Model Registry. | | | **Validation**| **Model Cards:** For each production-promoted model, a Model Card is created, documenting its intended use, evaluation metrics (including on key data slices), limitations, and any known biases. | | **Deployment & Operations**| **Control & Security**| **Access Control:** IAM roles enforce least-privilege access. EKS service accounts and SageMaker execution roles ensure services can only access the resources they need.
**Secret Management:** The production endpoint API key for LLM-as-a-judge is stored securely in AWS Secrets Manager. | | | **Monitoring & Alerting**| **Automated Alerts:** The monitoring system (as defined in Section 10) automatically sends alerts for performance degradation, data drift, or quality issues, creating an immediate feedback loop. | | | **Auditability**| **MLflow as Audit Trail:** The MLflow Tracking Server provides a complete, traceable log linking a specific model version back to the data it was trained on, the code that trained it, and its evaluation results. | #### 12.2 Responsible AI (RAI) Principles in Practice We proactively address ethical considerations, focusing on fairness, explainability, and privacy. | RAI Principle | Risk in this Project | Mitigation Strategy | | :--- | :--- | :--- | | **Fairness & Bias** | **Positivity Bias:** The model, trained on a majority of positive reviews, might learn to over-emphasize positive statements and downplay or ignore valid negative feedback, creating misleadingly positive summaries. | **Data-Centric Mitigation:** Our RAG retrieval strategy is explicitly designed to be **bias-aware**. It programmatically retrieves a *balanced* set of review snippets (e.g., top 3 helpful positive, top 3 helpful negative) to ensure the LLM is always presented with a fair and representative context. | | **Explainability (XAI)** | **"Black Box" Summaries:** Users may not understand *why* the summary makes a particular claim (e.g., "battery life is poor"), which can reduce trust. | **RAG-based Explainability ("Grounding"):** This is a key feature of our system. The user interface can be designed to allow users to click on a sentence in the summary, which would then highlight the original review snippets from which that statement was derived. This directly connects the generated output to its source evidence. | | **Transparency** | **Undisclosed AI Generation:** Users may not be aware that the summary is AI-generated, leading to misplaced expectations. | **Clear Labeling:** The user interface will clearly label the summary section with a disclaimer such as "AI-generated summary based on customer reviews" to manage user expectations appropriately. The Model Card serves as internal transparency. | | **Privacy** | **PII Leakage:** Customer reviews might contain Personally Identifiable Information (e.g., "the seller John Doe at john.doe@email.com was very helpful") which could be repeated in the summary. | **Proactive PII Redaction:** The **Data Ingestion & Cleaning Pipeline** includes a dedicated, automated step using libraries like `presidio` to detect and redact PII from all review texts *before* they are stored, embedded, or ever seen by the LLM. | #### 12.3 Holistic Testing & Production Readiness (ML Test Score) We use the principles from Google's "ML Test Score" rubric to self-assess the production readiness of our system. | Test Category | Key Checks for Our Project | | :--- | :--- | | **Features & Data** | ✓ Feature expectations are captured (Great Expectations).
✓ Data pipeline has PII controls.
✓ All input feature code (`embedding_generation_pipeline`) is unit tested. | | **Model Development**| ✓ Model specs (LoRA config, prompt) are versioned in Git.
✓ Offline metrics (Faithfulness, Coherence) correlate with desired business outcomes.
✓ A simpler model (heuristic baseline) was proven to be worse.
✓ Model quality is checked on key slices (multilingual data, polarized reviews). | | **ML Infrastructure**| ✓ Training pipeline is integration tested.
✓ Model quality is validated before being registered.
✓ Models are tested via a canary/shadow process before full production rollout.
✓ A safe rollback mechanism to a previous model version is in place. | | **Monitoring** | ✓ Data invariants (schema, distributions) are monitored.
✓ Training-serving skew is addressed by design (same embedding model used everywhere).
✓ Model quality (Faithfulness, etc.) is continuously monitored.
✓ System performance (latency, throughput) is monitored. | #### 12.4 The Human Element: Team & User Experience * **Team Collaboration:** As a small, cross-functional team (Product, Data Engineering, MLOps), clear communication and shared ownership were paramount. Blameless post-mortems for any production issues are standard practice to encourage a culture of continuous improvement. * **User Feedback Loops:** The user interface will include "Was this summary helpful?" (👍/👎) buttons. This direct, explicit feedback is a crucial source of data. A sudden increase in "👎" clicks for a specific product category is a powerful signal that can trigger an investigation and potentially a targeted retraining of the relevant LoRA adapter. *** ### 13. Overall System Architecture The end-to-end system for customer review summarization is a sophisticated interplay of data engineering pipelines, MLOps automation, and a high-performance model serving infrastructure. The following diagram provides a unified view of how these components interact to deliver the final product feature. #### 13.1 AWS System Architecture Diagram The system is logically divided into four distinct planes: 1. **CI/CD & Governance Plane (Purple):** This is the developer-facing control loop where all changes originate. * **GitHub** is the single source of truth for all artifacts: application code, pipeline definitions (DAGs), Infrastructure as Code (Terraform), and data pointers (DVC). * **GitHub Actions** automates the entire CI/CD process. It runs tests, builds Docker containers, and deploys the updated pipeline and serving code to the appropriate AWS services. * The **MLflow Model Registry** acts as the central governance gate for models. A manual promotion of a model in the registry is the explicit, auditable action that triggers the workflow to deploy a new model version to production. 2. **Control Plane - MLOps Automation (Orange):** This is the automated "brain" of the MLOps system. * **Amazon MWAA (Managed Workflows for Apache Airflow)** orchestrates all our data and ML pipelines as DAGs. It is responsible for scheduling the batch inference jobs, running the data ingestion pipelines, and triggering the continuous training workflows. 3. **Data Plane (Blue):** This plane represents the flow and storage of all data. * It begins with the **Application DB**, the source of new reviews. * The **S3 Data Lake** serves as the central, cost-effective storage for raw and cleaned review data. * **Amazon Bedrock** provides the Titan embedding model used to convert text into semantic vectors. * **Amazon OpenSearch** acts as our Vector Database, indexing the embeddings to enable efficient similarity search for our RAG strategy. 4. **Serving & Caching Plane (Light Orange):** This is where the model is hosted and the final summaries are made available to the application. * The **LLM Serving Endpoint**, running on **Amazon EKS** with an optimized engine like vLLM, is a high-performance microservice responsible for generating summaries. It is deployed as a container from **Amazon ECR**. * The **Amazon DynamoDB** table is the low-latency Summary Cache. Our batch inference pipeline writes its results here. * The main **E-commerce Application** reads from this DynamoDB cache to display summaries to users, completely decoupling it from the complexity and latency of the live model inference. #### 13.2 Sequence Diagram: Batch Inference Workflow This sequence diagram illustrates a highly efficient and parallelized workflow. * **Total Estimated Pipeline Runtime:** For a typical hourly run involving **500 products**, the entire end-to-end process is expected to complete in well under a minute (**~30-50 seconds**). * **Dominant Latency Step:** The most time-consuming part of the process is the actual LLM inference step (`generate_summaries_task`). This highlights the critical importance of using an optimized serving engine like vLLM to maximize throughput and keep the batch processing time low. * **Scalability:** The architecture is designed to scale. * **OpenSearch and DynamoDB** can handle massive throughput with consistent low latency. * The **LLM Serving Endpoint on EKS** can be scaled horizontally by adding more pods/nodes if the number of products to be updated per hour grows significantly, although this would increase cost. * The primary bottleneck for a much larger workload would likely become the cost and time associated with the EKS inference step. #### 13.3 Potential Bottlenecks and Performance Optimizations While the architecture is designed for efficiency, several potential bottlenecks could arise as the system scales. Proactively identifying and planning for these is key to maintaining a performant and cost-effective service. | Bottleneck | Description | Performance Optimization Strategies | | :--- | :--- | :--- | | **1. LLM Inference Throughput** | This is the **primary and most critical bottleneck**. The number of summaries we can generate per second is limited by the GPU's computational power. If the number of products needing updates per hour exceeds the endpoint's capacity, the batch job's runtime will extend, potentially violating our "hourly" freshness guarantee and increasing compute costs. | **Primary Optimizations (Already Planned):**
• **Optimized Serving Engine:** Using **vLLM** is non-negotiable. Its implementation of continuous batching and PagedAttention can increase throughput by **5-10x** compared to a naive Hugging Face implementation.
• **Quantization:** Serving a quantized version of the model (e.g., INT8 or AWQ) can significantly increase token generation speed and reduce the GPU memory footprint, allowing for larger batch sizes. This requires careful evaluation to ensure no unacceptable drop in summary quality.

**Secondary Optimizations (If Needed):**
• **Horizontal Scaling:** Add more pods to the EKS deployment. This provides a linear increase in throughput but also a linear increase in cost.
• **Vertical Scaling:** Upgrade to a more powerful GPU instance (e.g., from an NVIDIA A10G to an H100). This is more expensive but can provide a step-change in performance.
• **Speculative Decoding:** An advanced technique where a smaller, faster "draft" model generates candidate tokens that the main model validates in chunks, speeding up generation. | | **2. RAG Context Retrieval Latency** | Before we can even call the LLM, we must query the Vector DB (OpenSearch) to retrieve the review snippets. If these queries are slow or inefficient, they add directly to the overall pipeline runtime, especially when processing thousands of products. | **Primary Optimizations (Already Planned):**
• **Batching & Asynchronous Queries:** Instead of querying one product at a time, our `retrieve_rag_context.py` script will use **asynchronous I/O (`asyncio`, `aiohttp`)** to send many queries to OpenSearch concurrently, maximizing throughput.
• **Optimized OpenSearch Index:** Ensure the OpenSearch index is correctly sharded and has the appropriate instance type to handle the query load.

**Secondary Optimizations (If Needed):**
• **Add a Read Replica:** If the OpenSearch cluster is under heavy load from other applications, add a dedicated read replica for the summarization pipeline to query. | | **3. Airflow Worker Capacity** | The Airflow workers orchestrating the pipeline have finite resources. If we try to parallelize the processing of too many products simultaneously, we could overwhelm the worker's CPU and memory, causing tasks to fail or the entire DAG to slow down. | **Primary Optimizations (Already Planned):**
• **Resource Management:** Configure the Airflow DAG with a sensible `max_active_runs` and set appropriate concurrency limits for each task.
• **Offload Heavy Lifting:** The current design correctly offloads the most intensive work (LLM inference) to a dedicated EKS cluster, keeping the Airflow workers lightweight.

**Secondary Optimizations (If Needed):**
• **Scale MWAA Environment:** Increase the size or number of workers in the Amazon MWAA environment. | | **4. Cold Starts (Scaling from Zero)**| Our cost-optimization strategy involves scaling the EKS deployment to zero pods when idle. The first batch job after a period of inactivity will experience a "cold start" latency as the Kubernetes scheduler needs to provision a new pod on a GPU node and download the model container. This could add several minutes to the first run. | **Primary Optimizations (Already Planned):**
• **Overprovisioning with Paused Pods (if supported):** Some advanced schedulers allow for "paused" pods that keep the container image resident on the node, dramatically reducing startup time.
• **Acceptance:** For our hourly batch job, a one-time startup latency of a few minutes is generally an acceptable trade-off for the significant cost savings of scaling to zero.

**Secondary Optimizations (If Needed):**
• **Keep a Single Warm Pod:** As a compromise, configure the deployment to always keep a minimum of one pod running. This eliminates cold starts but incurs a higher baseline cost. | | **5. Database Write Throughput**| After generating summaries, the pipeline needs to write them to the DynamoDB cache. While DynamoDB is highly scalable, a massive burst of writes could potentially exceed the table's provisioned write capacity units (WCUs), leading to throttled requests and task retries. | **Primary Optimizations (Already Planned):**
• **Use BatchWriteItem:** The `validate_and_cache.py` script will use DynamoDB's `BatchWriteItem` API, which is far more efficient than writing items one by one.
• **On-Demand Capacity:** Configure the DynamoDB table to use "On-Demand" capacity mode instead of provisioned. This automatically scales to handle the workload's peak write throughput and is more cost-effective for spiky, infrequent traffic patterns like our batch job. | ___ ___ ### Implementation: Data Ingestion Pipeline #### Architecture Diagram #### Python Scripts ```python import logging import pandas as pd from sqlalchemy import create_engine from datetime import datetime, timedelta logging.basicConfig(level=logging.INFO) def get_new_reviews(db_connection_string: str, execution_date: str) -> pd.DataFrame: """ Extracts new reviews from the source database created in the last 24 hours. Args: db_connection_string: The database connection string. execution_date: The date of the DAG run (for reproducibility). Returns: A pandas DataFrame with new reviews. """ try: logging.info("Connecting to the source database...") engine = create_engine(db_connection_string) # Calculate the time window for the query end_date = datetime.fromisoformat(execution_date) start_date = end_date - timedelta(days=1) query = f""" SELECT review_id, product_id, user_id, star_rating, review_text, created_at FROM public.reviews WHERE created_at >= '{start_date.strftime('%Y-%m-%d %H:%M:%S')}' AND created_at < '{end_date.strftime('%Y-%m-%d %H:%M:%S')}' """ logging.info(f"Executing query for reviews between {start_date} and {end_date}.") with engine.connect() as connection: df = pd.read_sql(query, connection) logging.info(f"Successfully extracted {len(df)} new reviews.") return df except Exception as e: logging.error(f"Failed to extract reviews: {e}") raise ``` ```python import logging import pandas as pd import re from bs4 import BeautifulSoup from langdetect import detect # Assume presidio and detoxify are installed # from presidio_analyzer import AnalyzerEngine # from detoxify import Detoxify logging.basicConfig(level=logging.INFO) # For demonstration, we'll mock the PII/Toxicity models to avoid heavy dependencies # In a real scenario, these would be initialized properly. # analyzer = AnalyzerEngine() # toxicity_classifier = Detoxify('original') def _clean_html(text: str) -> str: """Removes HTML tags from text.""" return BeautifulSoup(text, "html.parser").get_text() def _normalize_text(text: str) -> str: """Lowercases, removes special chars, and normalizes whitespace.""" text = text.lower() text = re.sub(r'\[.*?\]', '', text) text = re.sub(r'https?://\S+|www\.\S+', '', text) text = re.sub(r'<.*?>+', '', text) text = re.sub(r'\n', ' ', text) text = re.sub(r'\w*\d\w*', '', text) text = re.sub(r'[^a-z\s]', '', text) return " ".join(text.split()) def _redact_pii(text: str) -> str: """Mocks PII redaction.""" # In production, this would use Presidio: # results = analyzer.analyze(text=text, language='en') # for result in results: # text = text.replace(text[result.start:result.end], f'[{result.entity_type}]') mock_redacted_text = re.sub(r'\S+@\S+', '[EMAIL]', text) return mock_redacted_text def _get_toxicity_score(text: str) -> float: """Mocks toxicity scoring.""" # In production, this would use Detoxify: # score = toxicity_classifier.predict(text)['toxicity'] if "hate" in text or "stupid" in text: return 0.9 return 0.1 def transform_reviews(df: pd.DataFrame) -> pd.DataFrame: """ Applies a series of transformations to the raw reviews DataFrame. """ logging.info(f"Starting transformation of {len(df)} reviews.") # Clean and normalize text df['cleaned_text'] = df['review_text'].apply(_clean_html).apply(_normalize_text) # Filter out "noise" reviews df = df[df['cleaned_text'].str.split().str.len() >= 5].copy() logging.info(f"{len(df)} reviews remaining after noise filtering.") # Safety and Privacy df['cleaned_text'] = df['cleaned_text'].apply(_redact_pii) df['toxicity_score'] = df['cleaned_text'].apply(_get_toxicity_score) df = df[df['toxicity_score'] < 0.8].copy() logging.info(f"{len(df)} reviews remaining after toxicity filtering.") # Language Detection df['language'] = df['cleaned_text'].apply(lambda x: detect(x) if x.strip() else 'unknown') final_df = df[['review_id', 'product_id', 'user_id', 'star_rating', 'cleaned_text', 'language', 'toxicity_score', 'created_at']] logging.info("Transformation complete.") return final_df ``` ```python import logging import pandas as pd import great_expectations as ge logging.basicConfig(level=logging.INFO) def validate_cleaned_data(df: pd.DataFrame) -> bool: """ Validates the cleaned data using a Great Expectations suite. """ logging.info("Validating cleaned data...") ge_df = ge.from_pandas(df) # Define expectations ge_df.expect_column_to_exist("review_id") ge_df.expect_column_values_to_not_be_null("review_id") ge_df.expect_column_values_to_be_unique("review_id") ge_df.expect_column_values_to_be_in_set("star_rating", [1, 2, 3, 4, 5]) ge_df.expect_column_values_to_not_be_null("cleaned_text") ge_df.expect_column_values_to_be_in_set("language", ["en", "de", "fr", "es", "it", "nl"]) # Example languages validation_result = ge_df.validate() if not validation_result["success"]: logging.error("Data validation failed!") logging.error(validation_result) return False logging.info("Data validation successful.") return True ``` ```python import logging import pandas as pd import subprocess logging.basicConfig(level=logging.INFO) def save_and_version_data(df: pd.DataFrame, local_path: str, s3_bucket: str, execution_date: str) -> None: """ Saves the DataFrame to a local Parquet file and uses DVC to version and push to S3. """ try: # Save to local filesystem (accessible by Airflow worker) file_path = f"{local_path}/cleaned_reviews_{execution_date}.parquet" logging.info(f"Saving cleaned data to {file_path}") df.to_parquet(file_path, index=False) # DVC commands to version and push the data # Assumes DVC is initialized and remote is configured logging.info("Versioning data with DVC...") subprocess.run(["dvc", "add", file_path], check=True) logging.info("Pushing data to S3 remote with DVC...") subprocess.run(["dvc", "push", f"{file_path}.dvc"], check=True) logging.info("Data successfully saved and versioned.") except Exception as e: logging.error(f"Failed to save and version data: {e}") raise ``` #### Unit Tests ```python import pandas as pd from src.pipelines.ingestion.transform import transform_reviews def test_transform_reviews(): # Arrange raw_data = { 'review_id': [1, 2, 3, 4, 5], 'product_id': ['A', 'A', 'B', 'B', 'C'], 'user_id': [101, 102, 103, 104, 105], 'star_rating': [5, 1, 3, 4, 5], 'review_text': [ '

This is GREAT!

', 'I hate this product. It is stupid.', # Should be filtered by toxicity 'Too short.', # Should be filtered by length 'My email is test@example.com', # Should be redacted 'Un produit fantastique en français.' ], 'created_at': pd.to_datetime(['2024-01-01', '2024-01-01', '2024-01-01', '2024-01-01', '2024-01-01']) } raw_df = pd.DataFrame(raw_data) # Act transformed_df = transform_reviews(raw_df) # Assert assert len(transformed_df) == 3 # Should filter 2 rows assert transformed_df.iloc[1]['cleaned_text'] == 'my email is [EMAIL]' assert transformed_df.iloc[0]['language'] == 'en' assert transformed_df.iloc[2]['language'] == 'fr' assert 'toxicity_score' in transformed_df.columns ``` #### Pipeline (Airflow DAG) ```python from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.python import PythonOperator from airflow.providers.postgres.hooks.postgres import PostgresHook # Assuming custom Python modules are in a package installed in the Airflow environment from src.pipelines.ingestion import extract, transform, validate, load S3_BUCKET = "my-ecommerce-mlops-bucket" LOCAL_DATA_PATH = "/tmp/data" # Path on the Airflow worker def extract_task(ti): hook = PostgresHook(postgres_conn_id="source_db_conn") conn_string = hook.get_uri() reviews_df = extract.get_new_reviews(conn_string, ti.execution_date.to_iso8601_string()) # Push to XComs for the next task ti.xcom_push(key="raw_reviews_df", value=reviews_df.to_json()) def transform_task(ti): raw_reviews_json = ti.xcom_pull(task_ids="extract_new_reviews", key="raw_reviews_df") raw_df = pd.read_json(raw_reviews_json) transformed_df = transform.transform_reviews(raw_df) ti.xcom_push(key="transformed_reviews_df", value=transformed_df.to_json()) def validate_task(ti): transformed_reviews_json = ti.xcom_pull(task_ids="transform_raw_reviews", key="transformed_reviews_df") transformed_df = pd.read_json(transformed_reviews_json) if not validate.validate_cleaned_data(transformed_df): raise ValueError("Data validation failed, stopping pipeline.") # If validation succeeds, the original df is passed through ti.xcom_push(key="validated_reviews_df", value=transformed_df.to_json()) def load_task(ti): validated_reviews_json = ti.xcom_pull(task_ids="validate_transformed_reviews", key="validated_reviews_df") validated_df = pd.read_json(validated_reviews_json) load.save_and_version_data( df=validated_df, local_path=LOCAL_DATA_PATH, s3_bucket=S3_BUCKET, execution_date=ti.execution_date.to_iso8601_string() ) with DAG( dag_id="data_ingestion_and_cleaning", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule="0 1 * * *", # Run daily at 1 AM UTC catchup=False, tags=["data-eng", "ingestion"], ) as dag: extract_new_reviews = PythonOperator( task_id="extract_new_reviews", python_callable=extract_task, ) transform_raw_reviews = PythonOperator( task_id="transform_raw_reviews", python_callable=transform_task, ) validate_transformed_reviews = PythonOperator( task_id="validate_transformed_reviews", python_callable=validate_task, ) load_and_version_data = PythonOperator( task_id="load_and_version_data", python_callable=load_task, ) extract_new_reviews >> transform_raw_reviews >> validate_transformed_reviews >> load_and_version_data ``` #### Integration Test ```python import pytest from airflow.models.dagbag import DagBag # This test checks the structural integrity of the DAG def test_dag_loaded(): dagbag = DagBag(dag_folder='dags/', include_examples=False) assert dagbag.get_dag(dag_id='data_ingestion_and_cleaning') is not None assert 'data_ingestion_and_cleaning' in dagbag.dags # A more complex integration test would use the Airflow API # to trigger a run in a staging environment and check the output in S3. # This requires a running Airflow and is often done in a separate CI/CD stage. # # Example using pytest-airflow: # from pytest_airflow import clirunner # # def test_dag_run_successfully(clirunner): # result = clirunner("dags", "test", "data_ingestion_and_cleaning", "2024-01-01") # assert result.return_code == 0, "DAG run failed" # # # Add assertions here to check for output artifacts in a mock S3 bucket ``` #### CI/CD Workflow (Github Actions) ```yaml name: Deploy Data Ingestion Pipeline on: push: branches: - main paths: - 'src/pipelines/ingestion/**' - 'dags/data_ingestion_dag.py' - 'tests/pipelines/ingestion/**' jobs: lint-and-unit-test: runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: | pip install -r requirements.txt pip install -r requirements-dev.txt - name: Run linter run: flake8 src/pipelines/ingestion/ dags/data_ingestion_dag.py - name: Run unit tests run: pytest tests/pipelines/ingestion/ deploy-to-production: needs: lint-and-unit-test runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: eu-west-1 - name: Sync DAG to Production MWAA Bucket run: | aws s3 sync ./dags s3://${{ secrets.MWAA_PROD_DAGS_BUCKET }}/dags --delete # In a real project, you would also sync your custom Python package # aws s3 sync ./src s3://${{ secrets.MWAA_PROD_PLUGINS_BUCKET }}/src ``` ___ ### Implementation: Embeddings Generation Pipeline #### Architecture Diagram #### Python Scripts ```python import logging import pandas as pd import subprocess logging.basicConfig(level=logging.INFO) def get_latest_cleaned_data(local_path: str, execution_date: str) -> pd.DataFrame: """ Uses DVC to pull the latest version of the cleaned data corresponding to the execution date. """ file_path = f"{local_path}/cleaned_reviews_{execution_date}.parquet" dvc_file_path = f"{file_path}.dvc" try: logging.info(f"Using DVC to pull data for {dvc_file_path}...") # Ensure the .dvc file itself is present before pulling # In a real Airflow setup, the repo would be synced. subprocess.run(["dvc", "pull", dvc_file_path], check=True, capture_output=True) logging.info(f"Loading data from {file_path} into pandas DataFrame.") df = pd.read_parquet(file_path) logging.info(f"Successfully loaded {len(df)} records.") return df except FileNotFoundError: logging.error(f"DVC file not found: {dvc_file_path}. Did the ingestion pipeline run successfully?") raise except Exception as e: logging.error(f"Failed to retrieve data with DVC: {e}") logging.error(f"DVC output: {e.stdout.decode() if hasattr(e, 'stdout') else ''}") raise ``` ```python import logging import pandas as pd import boto3 import json from langchain.text_splitter import RecursiveCharacterTextSplitter logging.basicConfig(level=logging.INFO) def generate_embeddings(reviews_df: pd.DataFrame, bedrock_client) -> list: """ Chunks review text and generates embeddings using Amazon Bedrock. """ text_splitter = RecursiveCharacterTextSplitter( chunk_size=256, chunk_overlap=32, length_function=len, ) all_embeddings_data = [] logging.info(f"Starting embedding generation for {len(reviews_df)} reviews.") for index, row in reviews_df.iterrows(): chunks = text_splitter.split_text(row['cleaned_text']) for chunk in chunks: body = json.dumps({"inputText": chunk}) response = bedrock_client.invoke_model( body=body, modelId="amazon.titan-embed-text-v2:0", accept="application/json", contentType="application/json", ) response_body = json.loads(response.get("body").read()) embedding = response_body.get("embedding") all_embeddings_data.append({ "review_id": row['review_id'], "product_id": row['product_id'], "star_rating": row['star_rating'], "language": row['language'], "chunk_text": chunk, "embedding": embedding, }) logging.info(f"Successfully generated {len(all_embeddings_data)} embeddings.") return all_embeddings_data ``` ```python import logging import psycopg2 import psycopg2.extras from pgvector.psycopg2 import register_vector logging.basicConfig(level=logging.INFO) def index_embeddings_in_db(embedding_data: list, db_params: dict) -> None: """ Indexes the generated embeddings and metadata into the Aurora PostgreSQL DB with pgvector. """ try: logging.info(f"Connecting to the vector database...") with psycopg2.connect(**db_params) as conn: with conn.cursor() as cur: register_vector(cur) insert_query = """ INSERT INTO review_embeddings (review_id, product_id, star_rating, language, chunk_text, embedding) VALUES (%s, %s, %s, %s, %s, %s) ON CONFLICT (review_id, chunk_text) DO NOTHING; """ # Using a simple ON CONFLICT to ensure idempotency # Prepare data for batch insert data_to_insert = [ ( item["review_id"], item["product_id"], item["star_rating"], item["language"], item["chunk_text"], item["embedding"], ) for item in embedding_data ] logging.info(f"Indexing {len(data_to_insert)} embeddings in batches...") psycopg2.extras.execute_batch(cur, insert_query, data_to_insert) conn.commit() logging.info("Indexing complete.") except Exception as e: logging.error(f"Failed to index embeddings: {e}") raise ``` #### Unit Test ```python import pandas as pd from unittest.mock import MagicMock from src.pipelines.embedding.embed import generate_embeddings def test_generate_embeddings_batching(mocker): # Arrange mock_bedrock_client = MagicMock() # Mock the return value of invoke_model mock_response_body = json.dumps({"embedding": [0.1] * 1024}) mock_stream = MagicMock() mock_stream.read.return_value = mock_response_body.encode('utf-8') mock_bedrock_client.invoke_model.return_value = {"body": mock_stream} mocker.patch('boto3.client', return_value=mock_bedrock_client) test_data = { 'review_id': [1], 'product_id': ['A'], 'star_rating': [5], 'language': ['en'], 'cleaned_text': ['This is the first sentence. This is the second sentence.'] } test_df = pd.DataFrame(test_data) # Act embedding_data = generate_embeddings(test_df, mock_bedrock_client) # Assert assert len(embedding_data) == 2 # The text should be split into two chunks assert mock_bedrock_client.invoke_model.call_count == 2 assert embedding_data[0]['review_id'] == 1 assert "embedding" in embedding_data[0] assert len(embedding_data[0]['embedding']) == 1024 ``` #### Pipeline Code (Airflow DAG) ```python from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.python import PythonOperator from airflow.sensors.external_task import ExternalTaskSensor from airflow.providers.amazon.aws.hooks.bedrock import BedrockHook from airflow.providers.amazon.aws.hooks.secrets_manager import SecretsManagerHook # Assuming custom Python modules are installed from src.pipelines.embedding import retrieve, embed, load LOCAL_DATA_PATH = "/tmp/data" def retrieve_data_task(ti): # This task gets the output from the ingestion DAG # For simplicity, we assume the execution date matches. execution_date = ti.execution_date.to_iso8601_string() reviews_df = retrieve.get_latest_cleaned_data(LOCAL_DATA_PATH, execution_date) ti.xcom_push(key="reviews_df_json", value=reviews_df.to_json()) def embed_task(ti): reviews_json = ti.xcom_pull(task_ids="retrieve_cleaned_data", key="reviews_df_json") reviews_df = pd.read_json(reviews_json) bedrock_hook = BedrockHook(aws_conn_id='aws_default') bedrock_client = bedrock_hook.get_conn() embedding_data = embed.generate_embeddings(reviews_df, bedrock_client) ti.xcom_push(key="embedding_data", value=embedding_data) def load_task(ti): embedding_data = ti.xcom_pull(task_ids="generate_review_embeddings", key="embedding_data") secrets_hook = SecretsManagerHook(aws_conn_id='aws_default') db_secret = secrets_hook.get_secret_value("aurora/vector_db/credentials") db_params = json.loads(db_secret) load.index_embeddings_in_db(embedding_data, db_params) with DAG( dag_id="embedding_generation", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule=None, # Triggered by the ingestion DAG catchup=False, tags=["data-eng", "embedding", "rag"], ) as dag: wait_for_ingestion = ExternalTaskSensor( task_id="wait_for_ingestion_dag", external_dag_id="data_ingestion_and_cleaning", external_task_id="load_and_version_data", allowed_states=["success"], execution_delta=pendulum.duration(hours=0), ) retrieve_cleaned_data = PythonOperator(task_id="retrieve_cleaned_data", python_callable=retrieve_data_task) generate_review_embeddings = PythonOperator(task_id="generate_review_embeddings", python_callable=embed_task) index_embeddings = PythonOperator(task_id="index_embeddings", python_callable=load_task) wait_for_ingestion >> retrieve_cleaned_data >> generate_review_embeddings >> index_embeddings ``` #### Integration Test This is a critical test that validates the entire workflow, ensuring that all components (Airflow, Python scripts, IAM permissions, and AWS services) work together correctly in a production-like environment. The artifacts are structured into three parts: 1. **Setup Scripts:** To prepare the staging environment for a clean, repeatable test run. 2. **Verification Script:** The `pytest` script that runs after the pipeline execution to assert the correctness of the results. 3. **CI/CD Workflow:** The GitHub Actions workflow that orchestrates the entire process: setup, execution, and verification. ##### **1. Setup Scripts & Data** This script is responsible for creating the necessary preconditions for the test. **`tests/integration/setup_embedding_test.py`** ```python import logging import pandas as pd import boto3 import subprocess import os import psycopg2 # --- Test Configuration --- TEST_EXECUTION_DATE = "2025-01-01T00:00:00+00:00" TEST_REVIEW_ID = "test_review_001" STAGING_BUCKET = os.environ["STAGING_S3_BUCKET"] LOCAL_DATA_PATH = "/tmp/staging_data" # Database connection params from environment variables DB_PARAMS = { "host": os.environ["STAGING_DB_HOST"], "port": os.environ["STAGING_DB_PORT"], "dbname": os.environ["STAGING_DB_NAME"], "user": os.environ["STAGING_DB_USER"], "password": os.environ["STAGING_DB_PASSWORD"], } logging.basicConfig(level=logging.INFO) def create_test_data(): """Creates a sample DataFrame for the test.""" # This text is designed to be split into two chunks by our splitter configuration long_text = ( "This is the first sentence of a moderately long review. " "It provides some initial positive feedback on the product's build quality. " "The reviewer seems generally happy with their purchase so far. " "Now we move on to the second part of the review which discusses the battery life. " "Unfortunately, the battery does not last as long as advertised, which is a significant drawback." ) data = { 'review_id': [TEST_REVIEW_ID], 'product_id': ['product_abc'], 'user_id': [999], 'star_rating': [3], 'cleaned_text': [long_text], 'language': ['en'], 'toxicity_score': [0.1], 'created_at': [pd.to_datetime(TEST_EXECUTION_DATE)] } return pd.DataFrame(data) def upload_and_version_data(df: pd.DataFrame): """Saves data locally, uploads to S3, and creates DVC file.""" os.makedirs(LOCAL_DATA_PATH, exist_ok=True) # Path names must match what the Airflow DAG expects execution_date_str = pd.to_datetime(TEST_EXECUTION_DATE).strftime('%Y-%m-%dT%H:%M:%S%z') file_name = f"cleaned_reviews_{execution_date_str}.parquet" local_file_path = os.path.join(LOCAL_DATA_PATH, file_name) logging.info(f"Saving test data to {local_file_path}") df.to_parquet(local_file_path, index=False) # Upload to S3 (simulating DVC remote) s3_client = boto3.client("s3") s3_key = f"data/{file_name}" # DVC would use a hash, but this is simpler for a test s3_client.upload_file(local_file_path, STAGING_BUCKET, s3_key) logging.info(f"Uploaded test data to s3://{STAGING_BUCKET}/{s3_key}") # For a real DVC setup, we would run `dvc add` and `dvc push` here. # For this test, placing the file is sufficient. def clean_staging_db(): """Ensures the staging DB is clean before the test run.""" logging.info("Cleaning staging database for a fresh test run.") with psycopg2.connect(**DB_PARAMS) as conn: with conn.cursor() as cur: # Truncate the table to remove any data from previous runs cur.execute(f"DELETE FROM review_embeddings WHERE review_id = '{TEST_REVIEW_ID}';") conn.commit() logging.info("Staging database cleaned.") if __name__ == "__main__": clean_staging_db() test_df = create_test_data() upload_and_version_data(test_df) logging.info("Setup for embedding pipeline integration test is complete.") ``` ##### **2. Verification Script (`pytest`)** This script runs *after* the Airflow DAG has been triggered and has completed successfully. **`tests/integration/test_embedding_pipeline.py`** ```python import pytest import os import psycopg2 from pgvector.psycopg2 import register_vector # --- Test Configuration --- TEST_REVIEW_ID = "test_review_001" EXPECTED_CHUNKS = 2 EXPECTED_EMBEDDING_DIM = 1024 # Database connection params from environment variables DB_PARAMS = { "host": os.environ["STAGING_DB_HOST"], "port": os.environ["STAGING_DB_PORT"], "dbname": os.environ["STAGING_DB_NAME"], "user": os.environ["STAGING_DB_USER"], "password": os.environ["STAGING_DB_PASSWORD"], } @pytest.fixture(scope="module") def db_connection(): """Provides a reusable database connection for the test module.""" conn = psycopg2.connect(**DB_PARAMS) register_vector(conn) yield conn conn.close() def test_embedding_generation_end_to_end(db_connection): """ Verifies that the embedding generation pipeline correctly processed and indexed the test data into the staging database. """ # Arrange query = f"SELECT chunk_text, embedding FROM review_embeddings WHERE review_id = '{TEST_REVIEW_ID}';" # Act with db_connection.cursor() as cur: cur.execute(query) results = cur.fetchall() # Assert assert results is not None, "No results found for the test review ID." # 1. Verify the number of chunks assert len(results) == EXPECTED_CHUNKS, \ f"Expected {EXPECTED_CHUNKS} chunks, but found {len(results)}." # 2. Verify the embedding vectors for i, (chunk_text, embedding) in enumerate(results): assert isinstance(embedding, list) or hasattr(embedding, 'shape'), \ f"Embedding for chunk {i} is not a list or array." assert len(embedding) == EXPECTED_EMBEDDING_DIM, \ f"Embedding for chunk {i} has dimension {len(embedding)}, expected {EXPECTED_EMBEDDING_DIM}." print(f"\nIntegration test passed: Found {len(results)} chunks with correct embedding dimensions.") ``` #### CI/CD Workflow (GitHub Actions) This workflow automates the entire test: setup, DAG execution, and verification. **`.github/workflows/run_embedding_integration_test.yml`** ```yaml name: Embedding Pipeline Integration Test on: workflow_dispatch: # Allows manual trigger push: branches: - main paths: - 'src/pipelines/embedding/**' - 'dags/embedding_generation_dag.py' jobs: setup: name: 1. Setup Staging Environment runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: pip install -r requirements.txt - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2 with: aws-access-key-id: ${{ secrets.STAGING_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.STAGING_AWS_SECRET_ACCESS_KEY }} aws-region: eu-west-1 - name: Run setup script env: STAGING_S3_BUCKET: ${{ secrets.STAGING_S3_BUCKET }} STAGING_DB_HOST: ${{ secrets.STAGING_DB_HOST }} # ... other DB secrets run: python tests/integration/setup_embedding_test.py trigger-and-monitor-dag: name: 2. Trigger and Monitor Airflow DAG needs: setup runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install requests run: pip install requests - name: Trigger and wait for DAG run env: AIRFLOW_HOST: ${{ secrets.STAGING_AIRFLOW_HOST }} AIRFLOW_USERNAME: ${{ secrets.STAGING_AIRFLOW_USERNAME }} AIRFLOW_PASSWORD: ${{ secrets.STAGING_AIRFLOW_PASSWORD }} # Assume a helper script to trigger and poll the Airflow API run: python scripts/trigger_airflow_dag.py --dag-id embedding_generation --execution-date "2025-01-01T00:00:00+00:00" verify: name: 3. Verify Results in Database needs: trigger-and-monitor-dag runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: pip install -r requirements-dev.txt - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2 with: aws-access-key-id: ${{ secrets.STAGING_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.STAGING_AWS_SECRET_ACCESS_KEY }} aws-region: eu-west-1 - name: Run verification script env: STAGING_DB_HOST: ${{ secrets.STAGING_DB_HOST }} # ... other DB secrets run: pytest tests/integration/test_embedding_pipeline.py ``` ___ ### Implementation: LLM Fine-tuning Pipeline #### Architecture Diagram #### Python Scripts **src/pipelines/training/data_selection.py** ```python import logging import pandas as pd # Assume a helper module for S3 interactions # from common.s3_utils import list_recent_files logging.basicConfig(level=logging.INFO) def select_finetuning_data(s3_bucket: str, s3_prefix: str, sample_size: int = 5000) -> pd.DataFrame: """ Selects a sample of the most recent, high-quality reviews for fine-tuning. In a real scenario, this would also blend in a curated multilingual dataset. """ logging.info(f"Selecting data from s3://{s3_bucket}/{s3_prefix}") # This is a simplified version. A real implementation would be more robust. # recent_files = list_recent_files(s3_bucket, s3_prefix, days=30) # dfs = [pd.read_parquet(f"s3://{s3_bucket}/{f}") for f in recent_files] # combined_df = pd.concat(dfs) # For now, we create a dummy dataframe. # Let's assume we load a dataset that needs formatting for the trainer. # The format should be a text column like: # "###Instruction: Summarize these reviews. ###Input: [all review texts] ###Response: [human-written summary]" dummy_data = { "text": [ f"###Instruction: Summarize these reviews. ###Input: review text {i}. ###Response: ideal summary {i}." for i in range(sample_size) ] } sample_df = pd.DataFrame(dummy_data) logging.info(f"Selected a sample of {len(sample_df)} records for fine-tuning.") return sample_df ``` **src/pipelines/training/train.py** (This script is executed on SageMaker) ```python import argparse import logging import os import pandas as pd from transformers import AutoModelForCausalLM, AutoTokenizer, TrainingArguments from peft import LoraConfig from trl import SFTTrainer logging.basicConfig(level=logging.INFO) def main(): parser = argparse.ArgumentParser() # SageMaker environments parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR")) parser.add_argument("--train_data_dir", type=str, default=os.environ.get("SM_CHANNEL_TRAINING")) # Hyperparameters parser.add_argument("--base_model_id", type=str, default="mistralai/Mistral-7B-Instruct-v0.1") parser.add_argument("--epochs", type=int, default=1) parser.add_argument("--per_device_train_batch_size", type=int, default=4) args, _ = parser.parse_known_args() # 1. Load data train_file = os.path.join(args.train_data_dir, "train.parquet") train_dataset = pd.read_parquet(train_file) logging.info(f"Loaded {len(train_dataset)} training records.") # 2. Load model and tokenizer tokenizer = AutoTokenizer.from_pretrained(args.base_model_id) tokenizer.pad_token = tokenizer.eos_token model = AutoModelForCausalLM.from_pretrained(args.base_model_id, device_map="auto") # 3. Configure PEFT/LoRA peft_config = LoraConfig( r=16, lora_alpha=32, lora_dropout=0.05, bias="none", task_type="CAUSAL_LM", ) # 4. Configure Training Arguments training_args = TrainingArguments( output_dir=os.path.join(args.model_dir, "checkpoints"), per_device_train_batch_size=args.per_device_train_batch_size, num_train_epochs=args.epochs, logging_steps=10, save_strategy="epoch", report_to="none", ) # 5. Initialize Trainer trainer = SFTTrainer( model=model, args=training_args, train_dataset=train_dataset, peft_config=peft_config, dataset_text_field="text", max_seq_length=1024, tokenizer=tokenizer, ) # 6. Start Training logging.info("Starting model fine-tuning...") trainer.train() logging.info("Training complete.") # 7. Save the LoRA adapter final_adapter_path = os.path.join(args.model_dir, "adapter") trainer.save_model(final_adapter_path) logging.info(f"LoRA adapter saved to {final_adapter_path}") if __name__ == "__main__": main() ``` **`src/pipelines/training/evaluate_and_register.py`** ```python import logging import pandas as pd import mlflow # Assume other necessary imports for evaluation (Ragas, OpenAI) logging.basicConfig(level=logging.INFO) MLFLOW_TRACKING_URI = os.environ["MLFLOW_TRACKING_URI"] PROD_MODEL_NAME = "review-summarizer" EVALUATION_THRESHOLD = 1.05 # New model must be 5% better def evaluate_model(adapter_path: str, eval_df: pd.DataFrame) -> dict: """Mocks the evaluation process.""" logging.info(f"Evaluating model adapter from {adapter_path}...") # In a real scenario, this would: # 1. Load the base model + LoRA adapter. # 2. Generate summaries for the evaluation dataframe. # 3. Run Ragas and LLM-as-a-judge. # We'll return mock scores for this implementation. mock_scores = {"faithfulness": 0.98, "coherence": 4.6} logging.info(f"Evaluation complete. Scores: {mock_scores}") return mock_scores def register_model(model_artifact_path: str, metrics: dict): """Compares metrics and registers the model in MLflow if it's better.""" mlflow.set_tracking_uri(MLFLOW_TRACKING_URI) client = mlflow.tracking.MlflowClient() try: # Get the latest production model's metrics latest_prod_version = client.get_latest_versions(PROD_MODEL_NAME, stages=["Production"])[0] prod_run = client.get_run(latest_prod_version.run_id) prod_faithfulness = prod_run.data.metrics.get("faithfulness", 0) except IndexError: # No production model exists yet prod_faithfulness = 0 candidate_faithfulness = metrics.get("faithfulness", 0) logging.info(f"Candidate faithfulness: {candidate_faithfulness}, Production faithfulness: {prod_faithfulness}") if candidate_faithfulness > prod_faithfulness * EVALUATION_THRESHOLD: logging.info("Candidate model is better. Registering new version.") mlflow.register_model( model_uri=f"s3://{model_artifact_path}", # Assuming path is an S3 URI name=PROD_MODEL_NAME, # Link to the run, log metrics, etc. ) logging.info("Model registration successful.") else: logging.info("Candidate model is not better than production. Skipping registration.") # ... main execution block to run these functions ``` #### Unit Tests **tests/pipelines/training/test_registration.py** ```python from unittest.mock import MagicMock import pytest from src.pipelines.training.evaluate_and_register import register_model @pytest.fixture def mock_mlflow_client(mocker): """Mocks the MLflow client and its methods.""" mock_client = MagicMock() # Simulate an existing production model mock_version = MagicMock() mock_version.run_id = "prod_run_id" mock_client.get_latest_versions.return_value = [mock_version] mock_run = MagicMock() mock_run.data.metrics = {"faithfulness": 0.95} mock_client.get_run.return_value = mock_run mocker.patch("mlflow.tracking.MlflowClient", return_value=mock_client) mocker.patch("mlflow.set_tracking_uri") mocker.patch("mlflow.register_model") return mock_client def test_register_model_if_better(mock_mlflow_client): # Arrange better_metrics = {"faithfulness": 0.99} # Act register_model("s3://path/to/new/model", better_metrics) # Assert mock_mlflow_client.register_model.assert_called_once() def test_do_not_register_if_worse(mock_mlflow_client): # Arrange worse_metrics = {"faithfulness": 0.90} # Act register_model("s3://path/to/new/model", worse_metrics) # Assert mock_mlflow_client.register_model.assert_not_called() ``` #### Pipeline (Airflow DAG) **dags/llm_finetuning_dag.py** ```python from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.python import PythonOperator from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTrainingOperator # ... other imports # Simplified SageMaker Training Config sagemaker_training_config = { "AlgorithmSpecification": { "TrainingImage": "123456789012.dkr.ecr.eu-west-1.amazonaws.com/llm-finetuning-image:latest", "TrainingInputMode": "File", }, "RoleArn": "arn:aws:iam::123456789012:role/SageMakerExecutionRole", "InputDataConfig": [ { "ChannelName": "training", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://my-ecommerce-mlops-bucket/data/training/{{ ds }}/", } }, } ], "OutputDataConfig": {"S3OutputPath": "s3://my-ecommerce-mlops-bucket/models/training-output/"}, "ResourceConfig": {"InstanceCount": 1, "InstanceType": "ml.g5.2xlarge", "VolumeSizeInGB": 50}, "StoppingCondition": {"MaxRuntimeInSeconds": 14400}, "HyperParameters": {"base_model_id": "mistralai/Mistral-7B-Instruct-v0.1", "epochs": "1"}, } with DAG( dag_id="llm_continuous_training", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule="0 0 1 * *", # Run on the 1st of every month catchup=False, tags=["ml-training", "llm"], ) as dag: # PythonOperator for data selection and validation select_data_task = PythonOperator(...) trigger_sagemaker_training = SageMakerTrainingOperator( task_id="trigger_sagemaker_training", config=sagemaker_training_config, wait_for_completion=True, ) # PythonOperator to run evaluate_and_register.py # It will get the model artifact path from the SageMaker job's output (via XComs) evaluate_and_register_task = PythonOperator(...) select_data_task >> trigger_sagemaker_training >> evaluate_and_register_task ``` #### Infrastructure as Code (Terraform) ```hcl resource "aws_iam_role" "sagemaker_execution_role" { name = "SageMakerExecutionRole" # Assume role policy for SageMaker service } resource "aws_iam_policy" "sagemaker_policy" { name = "SageMakerPolicy" policy = jsonencode({ Version = "2012-10-17" Statement = [ { Effect = "Allow", Action = ["s3:GetObject", "s3:PutObject", "s3:ListBucket"], Resource = ["arn:aws:s3:::my-ecommerce-mlops-bucket/*"] }, { Effect = "Allow", Action = ["ecr:GetDownloadUrlForLayer", "ecr:BatchGetImage", "ecr:BatchCheckLayerAvailability"], Resource = aws_ecr_repository.training_repo.arn } # Plus CloudWatch logs permissions, etc. ] }) } resource "aws_iam_role_policy_attachment" "sagemaker_attach" { role = aws_iam_role.sagemaker_execution_role.name policy_arn = aws_iam_policy.sagemaker_policy.arn } resource "aws_ecr_repository" "training_repo" { name = "llm-finetuning-image" } ``` #### Integration Test The artifacts are structured into three parts: 1. **Setup Scripts & Data:** To prepare the staging environment with the necessary test data. 2. **Verification Script:** The `pytest` script that runs after the pipeline execution to assert the outcome. 3. **CI/CD Workflow:** The GitHub Actions workflow that orchestrates the entire process. **tests/integration/setup_training_test.py** ```python import logging import pandas as pd import boto3 import os # --- Test Configuration --- STAGING_BUCKET = os.environ["STAGING_S3_BUCKET"] EXECUTION_DATE = "2025-01-01" # Matches the test execution date SAMPLE_SIZE = 50 # Small sample for a quick test run logging.basicConfig(level=logging.INFO) def create_finetuning_test_data(): """Creates a sample DataFrame for the fine-tuning test.""" # This format matches what the SFTTrainer expects in our train.py script data = { "text": [ f"###Instruction: Summarize these reviews. ###Input: Test review text {i}. ###Response: Ideal test summary {i}." for i in range(SAMPLE_SIZE) ] } return pd.DataFrame(data) def upload_data_to_s3(df: pd.DataFrame): """Saves data locally and uploads it to the correct S3 path for the DAG.""" s3_key = f"data/training/{EXECUTION_DATE}/train.parquet" local_path = "/tmp/train.parquet" df.to_parquet(local_path, index=False) logging.info(f"Uploading test training data to s3://{STAGING_BUCKET}/{s3_key}") s3_client = boto3.client("s3") s3_client.upload_file(local_path, STAGING_BUCKET, s3_key) logging.info("Upload complete.") def create_mock_evaluation_data(): """ Creates a mock evaluation dataset. Our evaluation script needs this to run, even though the results are mocked for the integration test. """ eval_data = { "review_text": ["This is a test review for evaluation."] } df = pd.DataFrame(eval_data) s3_key = "data/evaluation/golden_dataset.parquet" local_path = "/tmp/golden_dataset.parquet" df.to_parquet(local_path, index=False) logging.info(f"Uploading mock evaluation data to s3://{STAGING_BUCKET}/{s3_key}") s3_client = boto3.client("s3") s3_client.upload_file(local_path, STAGING_BUCKET, s3_key) logging.info("Upload complete.") if __name__ == "__main__": training_df = create_finetuning_test_data() upload_data_to_s3(training_df) create_mock_evaluation_data() logging.info("Setup for training pipeline integration test is complete.") ``` **Verification Script (pytest)** **tests/integration/test_training_pipeline.py** This script runs after the llm_continuous_training DAG has completed successfully. It connects to the staging MLflow server to verify that a new model was registered. ```python import pytest import os import mlflow from mlflow.tracking import MlflowClient # --- Test Configuration --- STAGING_MLFLOW_TRACKING_URI = os.environ["STAGING_MLFLOW_TRACKING_URI"] MODEL_NAME = "review-summarizer" TEST_RUN_TAG = "integration_test" EXECUTION_DATE = "2025-01-01" @pytest.fixture(scope="module") def mlflow_client(): """Provides a reusable MLflow client for the test module.""" mlflow.set_tracking_uri(STAGING_MLFLOW_TRACKING_URI) return MlflowClient() def test_finetuning_pipeline_registers_new_model_version(mlflow_client): """ Verifies that a new version of the summarizer model was registered by the pipeline run, tagged appropriately for this integration test. """ # Arrange: Find the experiment and the specific run for our test # In the real DAG, we would add a tag to the MLflow run to identify it. experiment = mlflow_client.get_experiment_by_name("llm-finetuning") assert experiment is not None, "MLflow experiment 'llm-finetuning' not found." # Filter runs by tag to find our specific integration test run runs = mlflow_client.search_runs( experiment_ids=[experiment.experiment_id], filter_string=f"tags.airflow_run_id LIKE 'scheduled__{EXECUTION_DATE}%' AND tags.dag_id = 'llm_continuous_training'" ) assert len(runs) > 0, f"No MLflow run found for DAG 'llm_continuous_training' on {EXECUTION_DATE}" test_run = runs[0] test_run_id = test_run.info.run_id # Act: Get all registered versions for our model registered_versions = mlflow_client.get_latest_versions(MODEL_NAME, stages=["None", "Staging"]) # Assert: Check if any of the registered versions came from our test run found_match = any(version.run_id == test_run_id for version in registered_versions) assert found_match, \ f"Integration test failed: No model version was registered in MLflow for the test run ID {test_run_id}." print(f"\nIntegration test passed: Found a newly registered model version from run ID {test_run_id}.") ``` **CI/CD Workflow (GitHub Actions)** This workflow automates the entire test process. **.github/workflows/run_training_integration_test.yml** ```yaml name: Training Pipeline Integration Test on: workflow_dispatch: push: branches: - main paths: - 'src/pipelines/training/**' - 'dags/llm_finetuning_dag.py' jobs: setup: name: 1. Setup Staging Test Data runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: pip install -r requirements.txt - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2 with: aws-access-key-id: ${{ secrets.STAGING_AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.STAGING_AWS_SECRET_ACCESS_KEY }} aws-region: eu-west-1 - name: Run setup script to upload test data env: STAGING_S3_BUCKET: ${{ secrets.STAGING_S3_BUCKET }} run: python tests/integration/setup_training_test.py trigger-and-monitor-dag: name: 2. Trigger and Monitor Airflow DAG needs: setup runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install requests run: pip install requests - name: Trigger and wait for DAG run env: AIRFLOW_HOST: ${{ secrets.STAGING_AIRFLOW_HOST }} AIRFLOW_USERNAME: ${{ secrets.STAGING_AIRFLOW_USERNAME }} AIRFLOW_PASSWORD: ${{ secrets.STAGING_AIRFLOW_PASSWORD }} # This script needs to be robust, polling the Airflow API until the DAG run completes (succeeds or fails) # We also pass a special config to the DAG to set max_steps for the training job. run: > python scripts/trigger_airflow_dag.py --dag-id llm_continuous_training --execution-date "2025-01-01" --conf '{"max_steps": 2}' verify: name: 3. Verify Model Registration in MLflow needs: trigger-and-monitor-dag runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 with: python-version: '3.9' - name: Install dependencies run: pip install -r requirements-dev.txt - name: Run verification script env: STAGING_MLFLOW_TRACKING_URI: ${{ secrets.STAGING_MLFLOW_TRACKING_URI }} run: pytest tests/integration/test_training_pipeline.py ``` #### CI/CD Workflow (Github Actions) **.github/workflows/deploy_training_pipeline.yml** ```yaml name: Deploy LLM Fine-tuning Pipeline on: push: branches: - main paths: - 'src/pipelines/training/**' - 'dags/llm_finetuning_dag.py' jobs: test-and-build: runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Set up Python uses: actions/setup-python@v4 - name: Install dependencies & Run unit tests run: | pip install -r requirements-dev.txt pytest tests/pipelines/training/ - name: Configure AWS Credentials uses: aws-actions/configure-aws-credentials@v2 with: aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }} aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }} aws-region: eu-west-1 - name: Login to Amazon ECR uses: aws-actions/amazon-ecr-login@v1 - name: Build and push training container to ECR run: | docker build -t ${{ secrets.ECR_REGISTRY }}/llm-finetuning-image:latest -f src/pipelines/training/Dockerfile . docker push ${{ secrets.ECR_REGISTRY }}/llm-finetuning-image:latest deploy: needs: test-and-build runs-on: ubuntu-latest steps: - name: Checkout code uses: actions/checkout@v3 - name: Configure AWS Credentials # ... credentials setup - name: Run Integration Test (placeholder) run: echo "Triggering and monitoring integration test..." # This would call the test script - name: Sync DAG to Production MWAA Bucket if: success() run: aws s3 sync ./dags s3://${{ secrets.MWAA_PROD_DAGS_BUCKET }}/dags --delete ``` ___ ### Implementation: Batch Inference Pipeline #### Architecture Diagram #### Python Scripts **src/pipelines/inference/get_products.py** ```python import logging import pandas as pd from sqlalchemy import create_engine from datetime import datetime, timedelta logging.basicConfig(level=logging.INFO) def get_products_to_update(db_connection_string: str, interval_hours: int = 1) -> list[str]: """ Gets a list of product_ids that have received new reviews in the last interval. """ try: logging.info("Connecting to the application database to find products with new reviews...") engine = create_engine(db_connection_string) end_date = datetime.utcnow() start_date = end_date - timedelta(hours=interval_hours) query = f""" SELECT DISTINCT product_id FROM public.reviews WHERE created_at >= '{start_date.strftime('%Y-%m-%d %H:%M:%S')}' """ with engine.connect() as connection: df = pd.read_sql(query, connection) product_ids = df['product_id'].tolist() logging.info(f"Found {len(product_ids)} products to update.") return product_ids except Exception as e: logging.error(f"Failed to get products to update: {e}") raise ``` **src/pipelines/inference/retrieve_context.py** ```python import logging import psycopg2 from pgvector.psycopg2 import register_vector from langchain.prompts import PromptTemplate logging.basicConfig(level=logging.INFO) PROMPT_TEMPLATE = """ ###Instruction: Based ONLY on the following customer reviews, provide a balanced summary of the product's pros and cons. Do not invent information. ###Reviews: {reviews_context} ###Response: """ def retrieve_rag_context(product_ids: list[str], db_params: dict) -> list[dict]: """ For each product, retrieves the RAG context from the Vector DB and constructs a prompt. """ prompts = [] try: with psycopg2.connect(**db_params) as conn: register_vector(conn) with conn.cursor() as cur: for product_id in product_ids: # This query implements our advanced RAG strategy # Note: This is a simplified example. A production query might be more complex. query = """ (SELECT chunk_text FROM review_embeddings WHERE product_id = %s AND star_rating >= 4 ORDER BY review_id DESC LIMIT 5) UNION ALL (SELECT chunk_text FROM review_embeddings WHERE product_id = %s AND star_rating <= 2 ORDER BY review_id DESC LIMIT 5) """ cur.execute(query, (product_id, product_id)) results = cur.fetchall() if not results: continue context_str = "\n".join([f"- {res[0]}" for res in results]) prompt_formatter = PromptTemplate.from_template(PROMPT_TEMPLATE) formatted_prompt = prompt_formatter.format(reviews_context=context_str) prompts.append({"product_id": product_id, "prompt": formatted_prompt}) logging.info(f"Successfully constructed {len(prompts)} prompts.") return prompts except Exception as e: logging.error(f"Failed to retrieve RAG context: {e}") raise ``` **src/pipelines/inference/generate_summaries.py** ```python import logging import requests import json from concurrent.futures import ThreadPoolExecutor, as_completed logging.basicConfig(level=logging.INFO) def invoke_llm_endpoint(prompts_data: list[dict], endpoint_url: str, api_key: str) -> list[dict]: """ Invokes the LLM serving endpoint in parallel to generate summaries. """ summaries = [] headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"} def post_request(prompt_data): try: payload = {"prompt": prompt_data["prompt"]} # Varies based on serving API response = requests.post(endpoint_url, headers=headers, json=payload, timeout=60) response.raise_for_status() return {"product_id": prompt_data["product_id"], "summary": response.json()["summary"]} except requests.exceptions.RequestException as e: logging.error(f"Failed to get summary for product {prompt_data['product_id']}: {e}") return None with ThreadPoolExecutor(max_workers=10) as executor: future_to_prompt = {executor.submit(post_request, p): p for p in prompts_data} for future in as_completed(future_to_prompt): result = future.result() if result: summaries.append(result) logging.info(f"Successfully generated {len(summaries)} summaries.") return summaries ``` **src/pipelines/inference/cache_results.py** ```python import logging import boto3 from datetime import datetime from decimal import Decimal logging.basicConfig(level=logging.INFO) def cache_summaries_in_dynamodb(summaries: list[dict], table_name: str, ttl_days: int = 30): """ Writes the generated summaries to the DynamoDB cache table in a batch. """ dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(table_name) ttl_timestamp = int((datetime.utcnow() + timedelta(days=ttl_days)).timestamp()) try: with table.batch_writer() as batch: for item in summaries: batch.put_item( Item={ 'product_id': item['product_id'], 'summary_json': json.dumps(item['summary']), # Store as JSON string 'last_updated': datetime.utcnow().isoformat(), 'ttl': ttl_timestamp } ) logging.info(f"Successfully cached {len(summaries)} summaries in DynamoDB.") except Exception as e: logging.error(f"Failed to cache summaries: {e}") raise ``` #### Unit Tests **tests/pipelines/inference/test_generate_summaries.py** ```python from unittest.mock import patch from src.pipelines.inference.generate_summaries import invoke_llm_endpoint @patch('requests.post') def test_invoke_llm_endpoint_success(mock_post): # Arrange mock_post.return_value.status_code = 200 mock_post.return_value.json.return_value = {"summary": {"pros": "Good", "cons": "Bad"}} test_prompts = [{"product_id": "A", "prompt": "Test prompt"}] # Act summaries = invoke_llm_endpoint(test_prompts, "http://fake-url", "fake-key") # Assert assert len(summaries) == 1 assert summaries[0]["product_id"] == "A" assert summaries[0]["summary"]["pros"] == "Good" @patch('requests.post') def test_invoke_llm_endpoint_handles_error(mock_post): # Arrange mock_post.side_effect = requests.exceptions.RequestException("API Error") test_prompts = [{"product_id": "A", "prompt": "Test prompt"}] # Act summaries = invoke_llm_endpoint(test_prompts, "http://fake-url", "fake-key") # Assert assert len(summaries) == 0 # The failed request should be skipped ``` #### Pipeline (Airflow DAG) ```python # dags/batch_inference_dag.py (Conceptual - showing the structure) # This assumes PythonOperators calling the above functions with DAG(dag_id="batch_inference", schedule="0 * * * *", ...) as dag: get_products_task = PythonOperator( task_id="get_products_to_update", python_callable=get_products.get_products_to_update, ) check_if_products_exist = BranchPythonOperator( task_id="check_if_products_exist", python_callable=lambda ti: "retrieve_rag_context_task" if ti.xcom_pull(...) else "end_pipeline", ) retrieve_context_task = PythonOperator(...) generate_summaries_task = PythonOperator(...) cache_results_task = PythonOperator(...) end_pipeline = EmptyOperator(task_id="end_pipeline") get_products_task >> check_if_products_exist check_if_products_exist >> [retrieve_context_task, end_pipeline] retrieve_context_task >> generate_summaries_task >> cache_results_task ``` #### Infrastructure as Code (Terraform) **infra/dynamodb.tf** ```hcl resource "aws_dynamodb_table" "summary_cache" { name = "ProductSummaryCache" billing_mode = "PAY_PER_REQUEST" # Best for spiky, infrequent workloads hash_key = "product_id" attribute { name = "product_id" type = "S" } ttl { attribute_name = "ttl" enabled = true } tags = { Project = "ReviewSummarization" } } ``` #### Integration Test **tests/integration/test_inference_pipeline.py** ```python import pytest import os import boto3 import time # --- Test Configuration --- STAGING_DYNAMODB_TABLE = "StagingProductSummaryCache" TEST_PRODUCT_ID = "product_integration_test_001" @pytest.fixture(scope="module") def dynamodb_client(): return boto3.client("dynamodb") def test_inference_pipeline_caches_summary(dynamodb_client): """ Verifies that after the batch inference DAG runs, a summary for the test product exists in the staging DynamoDB cache. """ # Arrange (Setup would have populated source DBs and run the DAG) time.sleep(10) # Give a moment for potential eventual consistency # Act try: response = dynamodb_client.get_item( TableName=STAGING_DYNAMODB_TABLE, Key={'product_id': {'S': TEST_PRODUCT_ID}} ) except ClientError as e: pytest.fail(f"Failed to query DynamoDB: {e}") # Assert assert "Item" in response, f"No summary found in cache for product {TEST_PRODUCT_ID}" item = response["Item"] assert "summary_json" in item, "Cached item is missing the 'summary_json' attribute." # Check if the summary is valid JSON summary = json.loads(item["summary_json"]["S"]) assert "pros" in summary assert "cons" in summary print(f"\nIntegration test passed: Found a valid cached summary for product {TEST_PRODUCT_ID}.") ``` #### CI/CD Workflow (Github Actions) ```yaml name: Deploy Batch Inference Pipeline on: push: branches: - main paths: - 'src/pipelines/inference/**' - 'dags/batch_inference_dag.py' jobs: test-and-deploy: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 - name: Install dependencies run: pip install -r requirements-dev.txt - name: Run unit tests run: pytest tests/pipelines/inference/ # Integration Test Steps - name: Configure AWS Staging Credentials # ... - name: Run Integration Test Setup # ... (calls setup_inference_test.py) - name: Trigger Staging DAG Run # ... (calls scripts/trigger_airflow_dag.py) - name: Run Integration Test Verification # ... (calls pytest tests/integration/test_inference_pipeline.py) # Deploy to Production - name: Configure AWS Production Credentials if: success() # ... - name: Sync DAG to Production MWAA Bucket if: success() run: aws s3 sync ./dags s3://${{ secrets.MWAA_PROD_DAGS_BUCKET }}/dags --delete ``` ___ ### Implementation: Monitoring and Alerting #### Architecture Diagram #### Monitoring Quality **src/monitoring/quality_monitor.py** ```python import logging import pandas as pd import boto3 import json import os from datetime import datetime, timedelta # Assume Ragas and OpenAI are installed and configured # from ragas import evaluate # from ragas.metrics import faithfulness, context_precision # from openai import OpenAI logging.basicConfig(level=logging.INFO) # --- Configuration --- DYNAMODB_TABLE = os.environ["SUMMARY_CACHE_TABLE"] CLOUDWATCH_NAMESPACE = "LLMReviewSummarizer" def get_recent_summaries(table_name: str, hours: int = 24) -> pd.DataFrame: """Fetches recently generated summaries from the DynamoDB cache.""" logging.info(f"Fetching summaries from the last {hours} hours from table {table_name}.") dynamodb = boto3.resource('dynamodb') table = dynamodb.Table(table_name) # In a real system, you'd scan with a filter. For simplicity, we'll assume a GSI. # For now, we return a mock DataFrame. mock_data = { "product_id": ["prod_123", "prod_456"], "summary_json": [ '{"pros": "Very fast.", "cons": "Gets hot."}', '{"pros": "Great design.", "cons": "Battery is weak."}' ], # In a real system, we'd also fetch the review context used for generation. "review_context": [ "The laptop is incredibly fast for all my tasks.", "The battery life is a major issue, lasts only 2 hours." ] } logging.info("Returning mock summaries for demonstration.") return pd.DataFrame(mock_data) def evaluate_summaries(df: pd.DataFrame) -> pd.DataFrame: """ Evaluates summaries using Ragas and LLM-as-a-judge (mocked). """ logging.info(f"Evaluating {len(df)} summaries.") # In a real implementation: # 1. Format data for Ragas (question, answer, contexts, ground_truth) # 2. Call `evaluate(dataset, metrics=[faithfulness, ...])` # 3. Call OpenAI API for LLM-as-a-judge coherence score # Mocked results df['faithfulness_score'] = [0.98, 0.93] df['coherence_score'] = [4.5, 4.1] df['toxicity_score'] = [0.05, 0.02] logging.info("Evaluation complete.") return df def publish_metrics_to_cloudwatch(df: pd.DataFrame): """Calculates aggregate scores and publishes them as CloudWatch Custom Metrics.""" cloudwatch = boto3.client('cloudwatch') avg_faithfulness = df['faithfulness_score'].mean() avg_coherence = df['coherence_score'].mean() logging.info(f"Publishing metrics to CloudWatch: Faithfulness={avg_faithfulness}, Coherence={avg_coherence}") metric_data = [ { 'MetricName': 'AverageFaithfulness', 'Value': avg_faithfulness, 'Unit': 'None' }, { 'MetricName': 'AverageCoherence', 'Value': avg_coherence, 'Unit': 'None' } ] cloudwatch.put_metric_data( Namespace=CLOUDWATCH_NAMESPACE, MetricData=metric_data ) logging.info("Metrics successfully published.") def main(): """Main function to run the monitoring pipeline.""" summaries_df = get_recent_summaries(DYNAMODB_TABLE) if not summaries_df.empty: evaluated_df = evaluate_summaries(summaries_df) publish_metrics_to_cloudwatch(evaluated_df) else: logging.info("No new summaries found to monitor.") if __name__ == "__main__": main() ``` #### Unit Test **tests/monitoring/test_quality_monitor.py** ```python from unittest.mock import patch, MagicMock import pandas as pd from src.monitoring import quality_monitor @patch('boto3.client') def test_publish_metrics_to_cloudwatch(mock_boto_client): # Arrange mock_cloudwatch = MagicMock() mock_boto_client.return_value = mock_cloudwatch test_data = { 'faithfulness_score': [1.0, 0.9], # Avg = 0.95 'coherence_score': [5.0, 4.0] # Avg = 4.5 } test_df = pd.DataFrame(test_data) # Act quality_monitor.publish_metrics_to_cloudwatch(test_df) # Assert mock_cloudwatch.put_metric_data.assert_called_once() # Get the arguments passed to the mock call_args = mock_cloudwatch.put_metric_data.call_args[1] assert call_args['Namespace'] == "LLMReviewSummarizer" metric_data = call_args['MetricData'] faithfulness_metric = next(m for m in metric_data if m['MetricName'] == 'AverageFaithfulness') coherence_metric = next(m for m in metric_data if m['MetricName'] == 'AverageCoherence') assert faithfulness_metric['Value'] == 0.95 assert coherence_metric['Value'] == 4.5 ``` #### Pipeline Code (Airflow DAG) **dags/model_quality_monitoring_dag.py** ```python from __future__ import annotations import pendulum from airflow.models.dag import DAG from airflow.operators.docker_operator import DockerOperator # Assumes the monitoring script is containerized in an image in ECR ECR_IMAGE = "123456789012.dkr.ecr.eu-west-1.amazonaws.com/quality-monitor:latest" with DAG( dag_id="model_quality_monitoring", start_date=pendulum.datetime(2024, 1, 1, tz="UTC"), schedule="0 3 * * *", # Run daily at 3 AM UTC catchup=False, tags=["monitoring", "quality", "llm"], ) as dag: run_quality_monitor = DockerOperator( task_id="run_quality_monitor", image=ECR_IMAGE, api_version="auto", auto_remove=True, # Pass environment variables needed by the script environment={ "SUMMARY_CACHE_TABLE": "ProductionProductSummaryCache", "AWS_ACCESS_KEY_ID": "{{ conn.aws_default.login }}", "AWS_SECRET_ACCESS_KEY": "{{ conn.aws_default.password }}", "AWS_SESSION_TOKEN": "{{ conn.aws_default.extra_dejson.aws_session_token }}", "AWS_REGION": "eu-west-1", }, command="/usr/bin/python3 quality_monitor.py", docker_url="unix://var/run/docker.sock", network_mode="bridge", ) ``` #### Infrastructure as Code (Terraform) **infra/monitoring.tf** ```hcl variable "faithfulness_threshold" { description = "The minimum acceptable faithfulness score before triggering an alert." type = number default = 0.95 } resource "aws_sns_topic" "alerts_topic" { name = "LLM-Summarizer-Alerts-Topic" } resource "aws_sns_topic_subscription" "email_subscription" { topic_arn = aws_sns_topic.alerts_topic.arn protocol = "email" endpoint = "oncall-ml-team@example.com" } resource "aws_cloudwatch_metric_alarm" "faithfulness_alarm" { alarm_name = "High-Hallucination-Rate-Alarm" comparison_operator = "LessThanThreshold" evaluation_periods = "1" metric_name = "AverageFaithfulness" namespace = "LLMReviewSummarizer" period = "86400" # 24 hours, matching the DAG schedule statistic = "Average" threshold = var.faithfulness_threshold alarm_description = "This alarm triggers if the average summary faithfulness score drops below the acceptable threshold." alarm_actions = [aws_sns_topic.alerts_topic.arn] ok_actions = [aws_sns_topic.alerts_topic.arn] } resource "aws_cloudwatch_dashboard" "summarizer_dashboard" { dashboard_name = "LLM-Review-Summarizer-Dashboard" dashboard_body = jsonencode({ widgets = [ { type = "metric", x = 0, y = 0, width = 12, height = 6, properties = { metrics = [ ["LLMReviewSummarizer", "AverageFaithfulness"] ], period = 300, stat = "Average", region = "eu-west-1", title = "Summary Faithfulness (Daily Average)" # Add horizontal annotation for the alarm threshold } }, # ... other widgets for coherence, EKS GPU utilization, etc. ] }) } ``` #### CI/CD Github Actions Workflow **.github/workflows/deploy_monitoring.yml** ```yaml name: Deploy Monitoring System on: push: branches: - main paths: - 'src/monitoring/**' - 'dags/model_quality_monitoring_dag.py' - 'infra/monitoring.tf' jobs: test-and-build: runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - uses: actions/setup-python@v4 - name: Install dependencies and run unit tests run: | pip install -r requirements-dev.txt pytest tests/monitoring/ # ... steps to build and push the quality-monitor docker image to ECR deploy-infra: needs: test-and-build runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - uses: hashicorp/setup-terraform@v2 - name: Configure AWS Credentials # ... - name: Terraform Apply for Monitoring run: | terraform -chdir=infra init terraform -chdir=infra apply -auto-approve -target=aws_sns_topic.alerts_topic -target=aws_cloudwatch_metric_alarm.faithfulness_alarm -target=aws_cloudwatch_dashboard.summarizer_dashboard deploy-dag: needs: deploy-infra runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 - name: Configure AWS Credentials # ... - name: Sync Monitoring DAG to Production run: aws s3 sync ./dags s3://${{ secrets.MWAA_PROD_DAGS_BUCKET }}/dags --delete ``` ___