Real-Time Purchase Intent Scoring


TLDR: A Production-Grade MLOps System for Real-Time Purchase Intent Scoring

  • Challenge: The core business challenge was to move beyond a static e-commerce catalog and increase online sales by understanding user intent in real-time. The goal was to build a system that could dynamically predict a user’s likelihood to make a purchase during their session, enabling timely and personalized interventions (like targeted offers) to reduce cart abandonment and improve conversion.

  • My Role & Solution: As the lead ML Engineer and Data Scientist on a lean, three-person team, I designed and implemented the end-to-end MLOps platform from the ground up. My solution involved a hybrid batch and streaming architecture to deliver both rich historical context and real-time responsiveness.

    My specific contributions included:

    • Feature Engineering Pipelines: Architected and built both daily batch (Spark) and real-time streaming (Spark Streaming) feature pipelines. I implemented a Feast Feature Store to solve training-serving skew and provide a single source of truth for features.

    • ML Model Development: Led the iterative modeling process, starting with simple baselines and progressing to a high-performance LightGBM model. I focused on not just offline accuracy (AUC) but also production-critical factors like model calibration.

    • ML Training & Deployment Pipelines: Built a fully automated retraining pipeline using Apache Airflow and Amazon SageMaker. I created a CI/CD workflow with GitHub Actions that automatically deploys new models as canary releases to a SageMaker endpoint, ensuring safe, zero-downtime updates.

    • Monitoring & Continual Learning: Implemented a comprehensive monitoring suite to track operational metrics, data drift, and model performance degradation. This system automatically triggers retraining runs when needed, creating a closed-loop system that adapts to new data.

    Tech Stack: AWS (SageMaker, Kinesis, EMR, Lambda, ElastiCache), Apache Airflow, MLflow, Feast, DVC, Great Expectations, Spark, LightGBM, GitHub Actions, Terraform.

  • Impact: The system delivered a clear and measurable business impact, accomplished by a small, agile team.

    • Increased the overall user conversion rate by 5.2% as measured in a two-week A/B test.

    • Increased the Average Order Value (AOV) by 3.5% for users who interacted with a personalized offer.

    • Automated the model lifecycle, reducing the time to deploy a new model from over a week of manual effort to a 2-hour automated pipeline run.

    • Reduced p99 inference latency by 40% through model quantization and a Redis-based feature cache, ensuring a smooth user experience even during peak traffic.

  • System Architecture: The diagram below illustrates the complete MLOps architecture. The components highlighted in blue represent the core systems I was directly responsible for building and managing.


1. Business Challenge: From Anonymous Clicks to Intent-Driven Conversions

The paradigm of e-commerce has irrevocably shifted. A static digital catalog is no longer sufficient; today’s consumers, shaped by the hyper-personalized experiences of platforms like Netflix and Amazon, expect a similar level of contextual awareness and tailored guidance from every online interaction. The central challenge for any modern retailer is to transform their digital storefront from a passive repository of products into a dynamic, intelligent system that anticipates user needs and proactively guides them toward a satisfying purchase.

This mandate for personalization is not merely about addressing customers by name. It is about understanding a visitor’s underlying intent at each stage of their journey and adapting the experience in real-time. Whether a user is a first-time anonymous visitor idly browsing or a known, loyal customer executing a specific purchase, the platform must intelligently adjust its content, messaging, and recommendations.

However, this creates a delicate and critical balance. While a vast majority of shoppers are more likely to purchase from a company that offers personalized experiences, they are quickly alienated by tactics perceived as intrusive or overly algorithmic. The most effective personalization is often invisible; the customer simply feels the journey is effortless and intuitive. This elevates the importance of not just prediction, but also interpretability, ensuring every action taken is genuinely helpful rather than jarring.

The business case for mastering this challenge is compelling and quantifiable. Industry analyses consistently demonstrate a significant return on investment. Brands that excel at personalization report a 10-15% lift in revenue and an increase in marketing spend efficiency of 10-30%. By analyzing a user’s behavior as it happens—their “clickstream”—an e-commerce platform can make timely interventions that directly influence outcomes. Proactive, data-driven actions can reduce cart abandonment, increase average order value through targeted upsells, and ultimately boost the overall conversion rate.

To measure success, the system’s performance must be tied directly to core business outcomes:

  • Primary Business KPIs: Conversion Rate, Average Order Value (AOV), and overall Revenue Lift.

  • Secondary Engagement KPIs: Click-Through Rate (CTR) on personalized offers, increased Session Duration, and reduced Bounce Rates, which serve as leading indicators of user satisfaction.

Therefore, the fundamental business challenge is to translate the high-velocity, high-volume stream of raw customer clicks into a nuanced understanding of purchase intent, and to use that understanding to power a real-time personalization engine that measurably improves the customer experience and drives tangible business growth.


2. ML Problem Framing

Before a single line of code is written, a successful machine learning project begins with a rigorous process of problem framing. This is the crucial stage of translating a business vision into a well-defined, feasible, and measurable machine learning task. A flawed framing can lead a project astray, wasting resources on a model that, no matter how accurate, fails to deliver real-world value.

This section outlines the blueprint for the purchase propensity model, ensuring that the technical solution is precisely aligned with the business objectives.

2.1 Setting the Business Objectives

Every ML project must be anchored to a genuine business need. For this project, the primary objective is to increase online sales by delivering timely, personalized interventions to users during their browsing session.

This high-level goal was defined in collaboration with key stakeholders, each with a unique perspective:

  • Product & Marketing Teams: Focused on increasing Conversion Rate and Average Order Value (AOV). They are interested in identifying “on the fence” customers who could be nudged towards a purchase with a targeted offer, and “high-intent” customers who could be receptive to up-sell or cross-sell recommendations.

  • Sales & Finance Teams: Concerned with the return on investment (ROI) of promotions. They want to ensure that discounts are not offered to users who would have purchased anyway, maximizing margin.

  • MLOps & Engineering Teams: Focused on system stability, maintainability, and performance. Their primary concerns are the prediction service’s latency and throughput, ensuring it does not degrade the overall user experience.

A critical requirement established early on was the need for model interpretability. To gain trust and enable debugging, the marketing team needs to understand why a user was assigned a high or low propensity score. This allows them to analyze the effectiveness of different personalization strategies and diagnose underperforming campaigns.

2.2 Is Machine Learning the Right Approach?

While a simple, rules-based system could be implemented (e.g., “if a user has an item in their cart for > 1 hour, show a banner”), this approach is brittle and fails to capture the nuanced complexity of user behavior. Machine learning is the correct approach for this problem for several key reasons:

  1. Complex Patterns: User purchase intent is influenced by a non-linear combination of dozens of signals (browsing history, click patterns, time of day, product characteristics). ML models are designed to learn these intricate patterns from data, something a human-defined rules engine cannot do effectively.

  2. Scale: The system must evaluate millions of user events per day across thousands of products and users. ML provides a scalable way to make millions of cheap, consistent predictions.

  3. Adaptability: Customer behavior changes with market trends, seasonality, and marketing campaigns. A static rules engine would quickly become outdated. An ML system, through retraining, can adapt to these evolving patterns.

The following decision flow confirms that ML is a suitable and necessary tool for this challenge.

2.3 Defining the ML Problem

With the need for ML established, the business problem is translated into a precise technical formulation.

  • Ideal Outcome: Deliver the optimal intervention (e.g., a specific recommendation, a tailored discount, or no action at all) to each user at the right moment to maximize the probability of a valuable conversion.

  • Model’s Goal: To empower the “ideal outcome,” the model’s direct goal is to predict the probability (a score from 0.0 to 1.0) that a user will complete a purchase within their current session.

  • ML Task Type: This is framed as a Regression problem. Predicting a continuous probability score is more flexible than a simple binary classification. It allows the business to define multiple thresholds for different actions, tailoring the aggressiveness of the intervention to the model’s confidence.

  • Proxy Label: The ground truth for training the model will be the purchase event within a session. If a user makes a purchase, their session is labeled with a target value of 1; otherwise, it is labeled 0. This is a strong proxy for purchase intent, though it’s acknowledged that it doesn’t account for post-purchase events like returns. It is, however, far superior to weaker proxies like add_to_cart (which suffers from high abandonment rates) or product_view (which can optimize for “clickbait” products rather than genuine interest).

2.4 Assessing Feasibility & Risks

A critical analysis was performed to ensure the project was achievable and to identify potential roadblocks early.

Category

Checkpoint

Assessment & Mitigation Strategy

Data

Sufficient labeled data?

Green: Large volumes of historical clickstream and purchase data are available.

Features available at serving time?

Yellow (Risk): Historical features (e.g., lifetime_value) are not available for anonymous, first-time users. Mitigation: Two distinct models or feature sets will be required: one for known users and a simpler, session-based model for anonymous users.

Privacy/Regulatory compliant?

Yellow (Risk): Use of user data must comply with GDPR/CCPA. Mitigation: The project will only use first-party data. All features will be aggregated and anonymized where possible. A legal review is a required step before production.

Problem & Model

High reliability required?

Medium: A wrong prediction (e.g., offering a needless discount) has a direct financial cost, but is not catastrophic.

Latency target achievable?

Red (High Risk): The requirement for <100ms p99 inference latency is a significant technical challenge. Mitigation: This requires careful model selection (favoring efficiency), infrastructure optimization (e.g., GPUs, Triton server), and an online feature store.

Adversarial attacks likely?

Low: Not a primary concern for this use case, unlike fraud detection.

Ethics & Fairness

Potential for bias?

Yellow (Risk): The model could learn to offer better discounts to users from high-income zip codes or unfairly penalize new users. Mitigation: Implement fairness monitoring. Regularly audit model predictions across key user segments.

Cost & ROI

Positive ROI projected?

Green: A preliminary cost-benefit analysis, based on a conservative estimate of a 2-3% conversion lift, shows a strong positive ROI against the projected infrastructure and personnel costs.

2.5 Defining Success Metrics

To measure progress and ultimate success, a clear distinction is made between business, model, and operational metrics. These are not static goals but will be tracked continuously throughout the project lifecycle.

Metric Type

Success Metrics

How to Measure & Evaluate

Business Success

Conversion Rate Lift: The primary measure of impact.

Measured via rigorous A/B testing, comparing the model-driven personalization strategy against a non-personalized control group. A statistically significant lift of >2% is the target.

Increase in Average Order Value (AOV):

Measured in the same A/B test. An increase indicates successful up-selling and cross-selling.

Model Evaluation

Area Under the ROC Curve (AUC): The primary offline metric for optimizing the model’s ability to rank users by their likelihood to purchase. Target AUC > 0.85.

Evaluated on a held-out test set during each training run and tracked in MLflow.

Precision/Recall at specific thresholds: Satisficing metrics.

E.g., at the 0.9 score threshold for offering discounts, Precision must be >= 75% to minimize wasted marketing spend.

Operational Health

Inference Latency: A critical non-functional requirement.

Monitored in real-time. The service must maintain p99 latency < 100ms.

Serving Cost:

Measured as cost per 1,000 inferences. Must be tracked to ensure the solution remains within the projected budget.

By meticulously framing the problem through these five lenses, we establish a robust foundation. We have a clear business mandate, a well-defined technical problem, an understanding of the risks, and a precise definition of what success looks like. This blueprint will guide all subsequent engineering and data science decisions.


3. MLOps Project Planning and Operational Strategy

Building a real-time personalization system is not about creating a single, static model; it is about engineering a robust, evolving ecosystem that can adapt to changing data and business needs. Adopting a Machine Learning Operations (MLOps) mindset from the outset is paramount to success. This section outlines the strategic plan, architectural principles, and technology choices that will govern the project’s entire lifecycle.

3.1 The MLOps-First Mindset: Building a System, Not Just a Model

MLOps is a set of practices that combines Machine Learning, DevOps, and Data Engineering to automate and streamline the end-to-end machine learning lifecycle. The fundamental goal is to move away from artisanal, one-off model development and toward an industrialized process that enables continuous integration, continuous delivery, and continuous training (CI/CD/CT).

This is especially critical for a purchase intent model, where real-world user behavior is constantly changing, rendering static models obsolete and prone to performance degradation—a phenomenon known as model drift. By embracing MLOps, we commit to building a system that is reproducible, testable, and maintainable from day one.

3.2 The Architectural Triad: Balancing Offline, Nearline, and Online Computation

A core strategic decision in designing any large-scale personalization system is how to partition computational tasks. The architecture will be a hybrid, leveraging the strengths of three distinct modes to balance computational complexity, data freshness, and response latency:

  • Offline Computation: Reserved for tasks that are computationally intensive and not subject to real-time constraints.

    • Use Cases: Large-scale model training on years of historical data; batch feature computation (e.g., calculating customer lifetime value).

  • Online (Real-Time) Computation: For tasks that must respond to user interactions immediately, operating under strict latency SLAs (sub-100 milliseconds).

    • Use Cases: Model inference (scoring); real-time feature retrieval from a low-latency online feature store.

  • Nearline Computation: An intelligent compromise for tasks triggered by real-time events but executed asynchronously.

    • Use Cases: Session-based feature updates after a user’s session ends, ensuring the next visit benefits from the context of the last one without performing complex aggregations in the real-time path.

3.3 The MLOps Stack Canvas: Technology Stack Selection

Using the MLOps Stack Canvas framework, we can systematically select the tools and technologies for each component of our system. The following table outlines our chosen stack, focusing on a combination of managed AWS services for infrastructure and best-of-breed open-source tools for core ML capabilities to balance speed of development with flexibility and control.

MLOps Canvas Block

Capability

Chosen Tool/Technology

Justification & Trade-offs

Value Proposition

Business Goal Alignment

Propensity Scoring Model

The core ML asset designed to directly influence conversion rate and AOV by enabling targeted, real-time personalization.

Data Sources & Versioning

Data Ingestion

AWS Kinesis Data Streams

Chosen for its seamless integration with the AWS ecosystem (API Gateway, Lambda) and its ability to handle high-throughput, real-time data ingestion with managed scaling.

Data Storage

Amazon S3 Data Lake

The definitive source of truth for all historical data. Its cost-effectiveness, durability, and integration with the entire AWS analytics stack make it the ideal choice for an offline store.

Data Versioning

DVC (Data Version Control)

Tracks large data files and models alongside code in Git without bloating the repository. Provides crucial reproducibility for datasets used in training.

Experiment Management

Experiment Tracking

MLflow Tracking

An open-source standard for logging experiment parameters, metrics, and artifacts. Provides a clear, auditable history of all model development efforts, fostering collaboration and reproducibility.

ML Frameworks

Scikit-learn, XGBoost, TensorFlow/PyTorch

A pragmatic selection. Start with simpler, highly performant models like LightGBM/XGBoost for a strong, interpretable baseline. Use deep learning frameworks for more complex sequential modeling if justified by performance needs.

Feature Store & Workflows

Feature Store

Feast (Open Source)

Chosen to avoid vendor lock-in and for its strong integration with multiple online (Redis, DynamoDB) and offline (S3, Redshift) stores. Provides the core abstraction needed to solve training-serving skew.

Workflow Orchestration

Apache Airflow

The industry standard for orchestrating complex, scheduled batch workflows. Ideal for managing the daily/weekly feature engineering and model retraining pipelines.

DevOps & Code Management

Source Control

Git (GitHub)

The universal standard for source code management and collaborative development.

CI/CD

GitHub Actions

Tightly integrated with the source code repository. Used for automating code builds, static analysis, and unit/integration testing for all system components.

CI/CT/CD

ML Pipeline Orchestration

Airflow + SageMaker SDK

Airflow will act as the master orchestrator, triggering and managing parameterized AWS SageMaker Training and Processing Jobs. This provides a balance of open-source control and managed, scalable execution.

Automated ML Testing

Pytest, Great Expectations

Pytest for unit testing Python code. Great Expectations for data validation and quality checks within the feature and training pipelines to detect data issues early.

Model Registry

Model Registry

MLflow Model Registry

Provides a central hub for managing the lifecycle of ML models. Manages model versions, stages (Staging, Production), and annotations, and integrates seamlessly with the CI/CD pipeline for automated deployments.

Model Deployment & Serving

Model Serving

Amazon SageMaker Endpoints

A fully managed service that simplifies deploying models as scalable, secure API endpoints. It handles infrastructure provisioning, autoscaling, and monitoring, accelerating the path to production.

Release Strategy

Canary Deployment

A safe, gradual rollout strategy. Traffic is incrementally shifted to the new model version while key business and operational metrics are monitored, minimizing the “blast radius” of a potential failure.

Monitoring & Observability

Infrastructure & Model

Amazon CloudWatch

The native AWS monitoring solution. Used for tracking operational metrics (latency, error rate, CPU/memory), setting alerts on performance thresholds, and collecting logs from all services.

Data & Concept Drift

Evidently AI / WhyLabs

Specialized open-source/SaaS tools for ML monitoring. Will be used to compare production data distributions against a baseline (training data) to detect data drift and concept drift, triggering alerts or retraining pipelines.

Metadata Store

ML Metadata

MLflow Tracking Server Backend

Acts as the central “logbook.” MLflow automatically captures and stores metadata from experiments, data versions, and model artifacts, creating a comprehensive, searchable lineage for the entire ML system.

This strategically chosen stack provides a powerful and balanced MLOps foundation. It leverages managed services to reduce operational overhead where appropriate (ingestion, serving) while using flexible, open-source tools for core ML lifecycle management (experimentation, model registry) to maintain control and avoid vendor lock-in.

3.4 Core MLOps Pipelines and Workflows

To operationalize the MLOps strategy, the entire system is decomposed into a series of interconnected, automated pipelines. These pipelines represent the key workflows that govern the lifecycle of our data and models, transforming the ML process from a manual, research-oriented task into a reliable, repeatable engineering discipline. The following table outlines the primary pipelines to be developed, each designed as a distinct, automated workflow.

Pipeline / Workflow

Trigger

Inputs

Key Steps

Outputs

Real-Time Clickstream Ingestion

User action on client (web/mobile app)

JSON event payload from client tracker.

1. Collection: API Gateway receives the event.
2. Buffering: Event is published to an AWS Kinesis Data Stream.
3. Initial Processing: An AWS Lambda function consumes the event, performs initial validation, and enriches it with server-side info (e.g., geo-location from IP).

Enriched event message pushed to a “processed” Kinesis stream and simultaneously archived to an S3 “raw” data lake bucket.

Feature Engineering Pipeline

Streaming: New message on the “processed” Kinesis stream.

Batch: Daily/Hourly schedule via Airflow.

Streaming: Enriched clickstream events.

Batch: Historical clickstream data from the S3 Data Lake.

Streaming (Real-time):
1. Consume enriched events.
2. Perform stateless transformations.
3. Update/compute session-level features (e.g., pages_viewed_in_session).
4. Write latest feature values to the Feast Online Store (DynamoDB/Redis).

Batch (Offline):
1. Run a scheduled Spark job on historical data.
2. Compute complex, long-term aggregations (e.g., 90_day_purchase_count, customer_lifetime_value).
3. Backfill the Feast Offline Store (S3 Parquet) with historical values.

Online Store: Up-to-the-millisecond feature values for active users.

Offline Store: A complete, point-in-time correct history of all feature values.

ML Training Pipeline (CI/CT)

1. Scheduled: Weekly/bi-weekly via Airflow.
2. On-demand: Manual trigger by an ML Engineer.
3. Automated: Alert from the Monitoring system (e.g., significant data drift detected).

1. model_training_config.yaml (hyperparameters, etc.).
2. feature_list.py (defines features to use).
3. Historical feature data from the Feast Offline Store.

1. Data Fetching: Generate a point-in-time correct training dataset using Feast.
2. Data Validation: Use Great Expectations to validate the training data against predefined quality checks.
3. Model Training: Execute the training script on AWS SageMaker.
4. Model Evaluation: Evaluate model performance (AUC, Precision) on a held-out test set.
5. Model Registration: If performance exceeds the production model’s baseline, version and register the new model artifact and its metrics to the MLflow Model Registry.

A versioned, validated, and registered model artifact in the MLflow Model Registry, ready for deployment.

Model Deployment Pipeline (CD)

A model is promoted to the “Production” stage in the MLflow Model Registry.

1. A registered model artifact from MLflow.
2. deployment_config.yaml (instance types, scaling policies).

1. Trigger: A webhook from MLflow triggers a GitHub Actions workflow.
2. Build: Build a new Docker container with the model artifact.
3. Push: Push the container to Amazon ECR.
4. Deploy: Initiate a canary release on the Amazon SageMaker Endpoint, shifting a small percentage (e.g., 5%) of traffic to the new model version.

A new version of the model is safely deployed and serving a portion of live production traffic.

Real-Time Inference (Logical Flow)

API call from the e-commerce backend application.

user_id and any real-time context (e.g., product_id being viewed).

1. Feature Retrieval: Fetch the latest feature vector for the user_id from the Feast Online Store.
2. Prediction: Pass the feature vector to the deployed model on the SageMaker Endpoint.
3. Response: Return the propensity score (e.g., {"propensity": 0.85}) to the backend application.

A real-time propensity score, enabling a personalized action within the user’s session.

Monitoring & Retraining Loop

Continuous (for monitoring)

Alert-based (for retraining)

Live prediction requests and model responses.

1. Log: Log all predictions and features to a dedicated monitoring stream.
2. Monitor: Continuously compare the distribution of live features and predictions against the training set baseline to detect drift.
3. Alert: If drift exceeds a predefined threshold, send an alert (e.g., to Slack, PagerDuty).
4. Trigger: The alert automatically triggers the ML Training Pipeline to start a new retraining run with the latest data.

Actionable alerts for on-call engineers and an automated, closed-loop system that can adapt to changing data without manual intervention.

3.5.1 Project Stages and Timeline

A project of this complexity requires a structured, iterative approach to manage risk and deliver value incrementally. The project will be executed in distinct phases, moving from initial planning and experimentation to a full production rollout. This phased approach allows the team to learn and adapt, ensuring the final system is robust, scalable, and aligned with business needs.

The project is broken down into five major stages, each with specific goals and deliverables.

Stage

Duration (Est.)

Key Activities & Deliverables

Stage 1: Ideation & Planning

Weeks 1-2

- Define Business Case: Solidify KPIs (Conversion Rate, AOV), success criteria, and projected ROI.
- Problem Framing: Finalize the ML problem as propensity scoring; define model inputs/outputs.
- Feasibility Study: Complete data discovery, risk assessment, and initial cost estimation.
- Tech Stack Selection: Finalize the MLOps stack (as defined in Sec 3.3).
- Project Plan: Create a detailed project plan and roadmap.
Deliverable: A signed-off Project Charter document.

Stage 2: Offline Model Experimentation

Weeks 3-6

- Data Preparation: Ingest and clean a historical dataset for initial modeling.
- Feature Engineering (Offline): Develop and test an initial set of features using Python/Spark.
- Baseline Model: Train a simple baseline model (e.g., Logistic Regression) to establish a performance floor.
- Advanced Model Training: Experiment with more complex models (LightGBM, XGBoost) and log all experiments in MLflow.
- Model Selection: Select the best-performing “champion” model based on offline evaluation metrics (AUC, Precision).
Deliverable: A trained and registered v1 model artifact with a comprehensive evaluation report.

Stage 3: MLOps Pipeline Development

Weeks 7-14

- Infrastructure as Code (IaC): Develop Terraform scripts for all AWS resources.
- Pipeline Implementation: Build, test, and document the core automated pipelines:
- Real-time Ingestion Pipeline (Kinesis/Lambda).
- Batch & Streaming Feature Engineering Pipelines (Feast/Spark).
- ML Training Pipeline (Airflow/SageMaker).
- CI/CD workflows for all components (GitHub Actions).
Deliverable: Fully automated, tested, and production-ready MLOps pipelines.

Stage 4: Deployment & Initial Serving

Weeks 15-16

- Staging Deployment: Deploy the entire system to a pre-production (staging) environment for end-to-end testing.
- Shadow Deployment: Deploy the champion model into production in “shadow mode.” It will receive live traffic and make predictions, but its outputs will only be logged, not acted upon. This validates the live feature pipeline and model performance on real data without user impact.
- Canary Release: Begin a canary release, routing 1% of live traffic to the model for active personalization. Closely monitor system and business metrics.
Deliverable: The model is live in production, serving a small fraction of users.

Stage 5: Monitoring & Iteration

Ongoing

- Ramp-Up: Gradually increase traffic to the new model (e.g., 10% -> 50% -> 100%) based on monitoring results.
- A/B Testing: Conduct formal A/B tests to rigorously measure the business impact (lift in Conversion Rate, AOV) against the control group.
- Monitoring & Alerting: Continuously monitor for operational issues, data drift, and concept drift.
- Continual Learning: Execute scheduled and trigger-based retraining pipelines to keep the model fresh.
- Iterate: Use insights from monitoring and A/B tests to inform the development of the next generation of models (v2).
Deliverable: A stable, monitored production system and a data-driven roadmap for future improvements.

3.5.2 Cross-Functional Team & Roles

Success requires a tight-knit, cross-functional team where responsibilities are clear but collaboration is constant. For a project of this scale, the core team would consist of:

Role

Primary Responsibilities

Product Manager

Owns the business vision and roadmap. Defines requirements, prioritizes features, and acts as the bridge between technical teams and business stakeholders. Responsible for measuring the final business impact (ROI).

Data Engineer

Designs, builds, and maintains the real-time data ingestion and processing pipelines. Ensures data quality, reliability, and scalability. Owns the “Bronze” and “Silver” layers of the data lake.

ML Engineer

Leads the MLOps strategy. Designs and builds the core ML pipelines: feature engineering (in collaboration with Data Engineering), automated training, and model deployment. Owns the production model’s operational health, including monitoring, alerting, and retraining strategies.

Data Scientist

Leads the modeling strategy. Explores data, develops features, and experiments with different ML algorithms. Responsible for model evaluation, interpretation, and selecting the “champion” model for deployment.

Platform/DevOps Engineer

Manages the underlying cloud infrastructure (AWS), Kubernetes, and CI/CD tooling (GitHub Actions). Ensures the platform is secure, scalable, and reliable.

3.5.3 Versioning and Governance Strategy

To ensure reproducibility, auditability, and controlled changes, a strict versioning and governance strategy will be applied to all assets.

Asset

Versioning Tool

Governance Strategy

Code

Git (GitHub)

All code (pipelines, model logic, IaC) is managed in Git. All changes are made via Pull Requests (PRs), which require at least one peer review and must pass all automated CI checks (linting, unit tests) before being merged into the main branch.

Data

DVC

The specific version of the dataset used to train a production model is “snapshotted” using DVC. The DVC metadata file is checked into Git, allowing for perfect recreation of the training data for any model version.

Models

MLflow Model Registry

Every trained model candidate is registered in MLflow. Models are promoted through distinct stages: Staging -> Production. Only users with specific permissions (e.g., Lead ML Engineer) can approve the promotion of a model to the Production stage, which serves as the trigger for the deployment pipeline.

Features

Feast Registry & Git

Feature definitions are declarative Python code and are versioned in the same Git repository as the pipeline code. The Feast registry stores the state of the deployed features. Changes to feature definitions follow the same PR and review process as any other code change.

Infrastructure

Terraform & Git

All cloud infrastructure is defined as code using Terraform and versioned in Git. Changes to production infrastructure also require a reviewed PR, ensuring a controlled and auditable process.

3.6 A Comprehensive ML Testing Strategy: The MLOps Crucible

In MLOps, testing is not a single stage but a continuous, multi-layered process that ensures quality and reliability from the first line of code to the model serving live traffic. A failure in data, code, or the model itself can lead to silent, costly degradation of the user experience. Therefore, we adopt a holistic testing strategy that scrutinizes every component of the system.

This strategy is modeled after the MLOps Test Pyramid (based on Martin Fowler’s The Practical Test Pyramid), which advocates for a tiered approach. We push tests as low down the pyramid as possible for fast feedback, with a broad base of fast, isolated unit tests and progressively fewer, more integrated tests as we move up the stack.

The following table details the specific tests that will be implemented across the four primary layers of our system: Data & Features, Code & Pipelines, Model, and Infrastructure & Serving.

Testing Layer

Test Type / Purpose

Implementation Strategy & Tooling

Lifecycle Stage (Where & When)

Data & Features

Schema & Quality Validation
To ensure all incoming data conforms to expected formats and quality standards before it corrupts downstream processes.

Great Expectations suites will be defined and versioned in Git. Checks include:
- user_id, event_timestamp must not be null.
- event_type must be in the set ['view', 'add_to_cart', 'purchase'].
- price and quantity columns must be positive numbers.
These suites will be integrated directly into our Airflow data pipelines.

CI/CD & Operational: Executed as a distinct task within the Airflow data ingestion and feature engineering pipelines. A failure on a critical expectation will halt the pipeline and trigger an alert.

Feature Distribution Validation
To detect data and concept drift by monitoring shifts in the statistical properties of our features.

Evidently AI will be used to:
1. Offline: Generate a reference profile from the training dataset.
2. Online: Periodically run a job that profiles live inference data from the monitoring S3 bucket and compares it to the reference profile. Statistical tests (e.g., Kolmogorov-Smirnov) will detect significant drift.

Offline (CI/CD): A reference profile is generated and stored as an artifact during the training pipeline.

Production (Monitoring): A scheduled Airflow DAG runs daily to generate a new profile and perform the drift comparison, sending alerts on significant drift.

Code & Pipelines

Unit Tests
To verify the correctness of individual functions and logical components in isolation.

Pytest will be used to test all Python code, including:
- Feature transformation logic (e.g., testing that a sessionize function correctly groups events).
- Utility functions and data processing classes.
- API logic in the model serving application (using FastAPI’s TestClient).
All external dependencies (e.g., database connections) will be mocked.

Dev & CI: Executed locally by developers and automatically on every git push in the GitHub Actions CI pipeline. A failing test will block the Pull Request from being merged.

Pipeline Integration Tests
To ensure that different components of a pipeline (e.g., Airflow tasks) work together correctly.

Pytest scripts will orchestrate tests against a staging environment:
- Feature Pipeline: A test will trigger the feature engineering DAG with a small, fixed input file on S3 and assert that the expected features are written correctly to a staging Feast feature store.
- Training Pipeline: A test will run the entire training DAG on a small dataset to ensure it can successfully produce and register a model in the staging MLflow registry.

Staging (CD): Executed automatically by GitHub Actions after a successful deployment of pipeline code to the staging environment.

Model

Behavioral & Robustness Tests
To validate that the model has learned correct, logical behaviors beyond just its aggregate accuracy.

Custom Pytest checks will be created to validate model logic:
- Invariance Test: A user’s propensity score should not change if only their user_id is changed while all behavioral features remain identical.
- Directional Expectation Test: Adding a high-intent event like add_to_cart to a session’s event stream should increase the resulting propensity score.
- Robustness Test: Introduce minor noise (e.g., slightly alter a product price) and assert that the prediction does not change dramatically.

CI/CD: Executed as a mandatory step in the model training pipeline after the model is trained but before it is registered. A failure here prevents the model from being considered for deployment.

Sliced Evaluation & Fairness
To ensure the model performs reliably and fairly across critical business segments, preventing hidden biases.

The model evaluation script in the training pipeline will calculate and log key metrics (AUC, Precision) for predefined user segments, including:
- Anonymous vs. Logged-in Users
- New vs. Returning Customers
- Mobile vs. Desktop Users
The results will be stored as artifacts in MLflow.

CI/CD: Executed as part of the model training pipeline. A significant performance drop (e.g., >10%) on any critical slice relative to the overall performance will flag the model for manual review.

Infrastructure & Serving

API Contract & Smoke Test
To confirm that a newly deployed model endpoint is live, responsive, and adheres to its API schema.

A Pytest script will be run post-deployment to:
1. Ping the /health endpoint and expect a 200 OK response.
2. Send a valid sample request and assert that the response is a 200 OK and that the JSON body contains a propensity key with a float value between 0.0 and 1.0.

Staging & Production (CD): Executed automatically by the GitHub Actions CD workflow immediately after an endpoint deployment. A failure will trigger an automatic rollback to the previous stable version.

Performance & Load Test
To ensure the serving infrastructure meets the stringent latency and throughput requirements under realistic load.

Locust, an open-source load testing tool, will be used.
- A locustfile.py will define user behavior (sending prediction requests).
- The test will simulate peak traffic against the staging endpoint to validate that p99 latency remains < 100ms and to identify the maximum sustainable QPS (Queries Per Second).

Staging (CD): Executed as a final automated step in the CD pipeline before a model can be promoted to production. Performance results are published to the PR.


4. Data Sourcing, Discovery, and Characteristics

Every robust machine learning system is built upon a foundation of high-quality, relevant data. This phase is where we identify, acquire, and understand the raw ingredients that will fuel our propensity models. The process is systematic, moving from high-level requirements to a granular understanding of the data’s profile and potential challenges.

4.1 Data Sourcing & Discovery Plan

The following plan maps the standard framework for data sourcing to the specific needs of the e-commerce propensity scoring project.

Framework Step

Application to the Propensity Scoring Project

Key Rationale & Chosen Tools

Identifying Data Requirements

To predict purchase intent, we require three core data domains:
1. Behavioral Data: Real-time user interactions (views, clicks, searches, add-to-carts).
2. Transactional Data: Historical purchases, returns, and order values.
3. Customer Data: User demographics and segment information (for known users).
4. Product Catalog Data: Product attributes, categories, and pricing.

These domains provide the necessary signals for short-term intent (session behavior) and long-term context (user history). This rich combination is essential for building an accurate, personalized model.

Exploring Data Sources

All required data is first-party, sourced from internal systems:
- Clickstream Events: A custom tracking system or a platform like Snowplow sending JSON events from the website and mobile apps.
- Transactional Database: A production OLTP database (e.g., PostgreSQL, MySQL) holding order and payment information.
- CRM System: The central repository for customer profile data.
- Product Information Management (PIM): The system holding the product catalog.

Relying on first-party data simplifies governance and ensures compliance with privacy regulations like GDPR. These sources represent the ground truth for business operations.

Data Collection & Ingestion Strategy

A hybrid ingestion strategy is required:
- Streaming (for Behavioral Data): High-velocity clickstream events will be ingested in real-time to power session-based features.
- Batch (for other data): Transactional, Customer, and Product data, which change less frequently, will be ingested via daily batch jobs.

This dual approach optimizes for both freshness and cost. Real-time ingestion is critical for session-based personalization, while batch processing is more efficient for slower-moving, structured data sources.
Tools: AWS Kinesis for streaming; Apache Airflow for batch orchestration.

Initial Storage & Versioning

Data will be landed in an Amazon S3 Data Lake structured by layers (Bronze, Silver, Gold).
- Bronze: Raw, immutable event data.
- Silver: Cleaned, sessionized, and structured data (Parquet format).
- DVC (Data Version Control) will be used to snapshot and version key datasets (e.g., the specific data cut used for a production model training run), with the pointers stored in Git.

The layered data lake provides a clear, auditable data flow. Parquet is optimal for analytical performance. DVC ensures that every model training run is reproducible by linking the model version to the exact data version it was trained on.

Exploratory Data Analysis (EDA)

A dedicated phase of analysis will be conducted to:
- Profile feature distributions (e.g., session length, purchase frequency).
- Identify data quality issues (missing values, outliers).
- Validate initial hypotheses (e.g., “longer sessions correlate with higher purchase intent”).
- Understand differences between anonymous and identified user behavior.

EDA is critical for informing feature engineering and model selection. It de-risks the modeling phase by uncovering potential issues and biases in the data early.
Tools: Jupyter Notebooks, Pandas, Matplotlib, Seaborn.

Data Documentation & Discovery

Initial documentation will be lean but effective:
- A version-controlled data_sources.md file will document the schema, origin, and ownership of each key dataset.
- AWS Glue Data Catalog will be used to make Silver and Gold layer tables in S3 discoverable and queryable via Athena.
- EDA findings will be documented in shareable notebooks.

For the project’s scale, a full-featured data discovery platform (like Amundsen) is overkill. A pragmatic combination of code-based documentation and a managed data catalog provides the necessary discovery and governance without excessive overhead.

Early Governance & Security

- Access Control: Strict AWS IAM roles will be defined for each pipeline and user type, adhering to the principle of least privilege.
- Privacy: A PII scan will be part of the ingestion pipeline, with automated masking/hashing of sensitive customer data before it lands in the analytical layers. All processes must be GDPR compliant.

Security and privacy are not afterthoughts. Integrating governance from day one is essential for building a trustworthy and compliant production system, especially when operating in Europe.

4.3 Key Technical Considerations for Implementation

The data discovery and profiling phase has surfaced several critical technical challenges that must be addressed in the implementation of our data and feature pipelines.

  1. Solving for Long-Lived Session State: User shopping sessions can span days. Computing session-based features in real-time requires a stateful stream processing solution that can manage state for millions of concurrent sessions without excessive memory consumption. This will be a key design challenge in the Feature Engineering section.

  2. Point-in-Time Correctness for Training Data: To avoid data leakage, training datasets must be assembled with point-in-time accuracy (i.e., joining features as they were at the exact moment of each historical event). Our use of Feast is intended to solve this directly, but the implementation of the data loading into the Feast offline store must be done carefully.

  3. Online/Offline Feature Store Synchronization: The system must guarantee that the features used for real-time inference in the online store are consistent with the features used for batch training in the offline store. The design of our feature ingestion pipelines will be critical to maintaining this consistency and preventing training-serving skew.

  4. Handling Extreme Class Imbalance: The “purchase” event is rare (our conversion rate is ~2%). This extreme imbalance must be addressed during the model training phase using techniques like over/under-sampling (e.g., SMOTE) or by using appropriate class weights to prevent the model from simply predicting “no purchase” for every session.


5. Data Engineering and Pipelines

5.1 The Data Engineering Lifecycle: From Raw Data to ML-Ready Features

The following plan maps the standard data engineering lifecycle to our e-commerce propensity project, outlining the specific steps and tools for our batch-oriented pipelines, which are primarily responsible for creating our historical training data and populating our feature store’s offline tables.

Lifecycle Stage

Application to the Propensity Scoring Project

Key Rationale & Chosen Tools

Generation & Sourcing

- Clickstream Events: Ingested from internal trackers.
- Transactional & CRM Data: Pulled from production databases (PostgreSQL).
- Product Catalog: Pulled from PIM system.

The data required is entirely first-party. Our pipelines must interact with both event streams and relational databases.

Storage

- Raw (Bronze Layer): All source data landed as-is in Amazon S3.
- Cleaned (Silver Layer): Data cleaned, sessionized, and stored in S3 as Parquet.
- Features (Gold Layer): Final, aggregated features stored in the Feast Offline Store (S3 Parquet).

S3 provides a cost-effective, scalable, and durable foundation for our data lake. Parquet is chosen for its analytical efficiency. Feast manages the Gold layer for ML consumption.

Ingestion & Transformation (ETL)

The core batch pipeline, orchestrated by Apache Airflow, will perform the following steps on a daily schedule:

Airflow is the chosen orchestrator for its robustness and extensive ecosystem of operators. The pipeline is designed to be modular and idempotent.

1. Data Cleaning & Wrangling:

- Handle missing values (e.g., impute missing product_category with ‘Unknown’).
- Correct data types (e.g., ensure event_timestamp is a valid timestamp).
- Standardize text (lowercase search queries, remove special characters).
- Filter out bot traffic based on heuristics (e.g., sessions with abnormally high event counts).

2. Data Transformation:

- Sessionization: Group individual clickstream events into user sessions based on user_id/session_id and a 30-minute inactivity window. This is a critical transformation for creating session-based features.
- Feature Engineering: Calculate historical aggregates (e.g., user_purchase_count_90d, avg_order_value). This will be a complex Spark job for performance at scale.

3. Data Labeling & Splitting:

- Labeling: For each session, determine the target label: 1 if a purchase event exists, 0 otherwise.
- Splitting (for Training Pipeline): Implement a time-based split on the historical data (e.g., train on weeks 1-3, validate on week 4). For reproducibility, also implement a group-based split to ensure all events from a single user remain in the same set.

4. Data Validation:

- Great Expectations will be integrated as a dedicated task in the Airflow DAG. An “Expectation Suite” will run against the final, ML-ready dataset to:
- expect_column_to_exist: ['user_id', 'propensity_score_target', ...]
- expect_column_values_to_not_be_null: 'user_id'
- expect_column_mean_to_be_between: 'session_duration', min_value=0, max_value=3600

5. Data Versioning:

- DVC will be used to version the final training dataset. An Airflow task will run dvc add and dvc push to create a new version of the data, linked to the Git commit of the pipeline code that produced it.

Serving (Offline)

The final, versioned dataset in the Feast offline store is made available to the ML Training Pipeline.

The Data Engineering pipeline’s output is the direct input for the ML Training pipeline, creating a seamless, automated workflow.

5.2 Real-Time Streaming Pipeline: Design & Architecture

While batch pipelines prepare our historical data, the real-time pipeline is the system’s central nervous system, responsible for processing live clickstream data to enable in-session personalization.

5.2.1 Core Architecture

The pipeline will follow a standard streaming pattern, leveraging managed AWS services for scalability and reliability.

5.2.2 Key Challenges and Solutions for Real-Time Feature Engineering

Challenge

Our Solution & Rationale

State Management for Long-Lived Sessions

User sessions can last for days, making it infeasible to hold all state in memory. Our Approach: We will leverage Spark Structured Streaming’s state management capabilities. We will use flatMapGroupsWithState to manage the state for each active user session. For robustness at scale, Spark’s state will be checkpointed to a reliable file system (Amazon S3). We will configure a state timeout (e.g., 48 hours of inactivity) to automatically clean up the state for abandoned sessions, preventing infinite state growth. This is a pragmatic balance, avoiding the complexity of an external state store like RocksDB for our initial implementation while still handling long-lived sessions reliably.

Ensuring Training-Serving Consistency

This is the most critical MLOps challenge. Our Solution: The Feast Feature Store is the core of our strategy. The exact same feature transformation logic (written in Python/Spark) will be used by:
1. The batch pipeline to compute historical features and populate the Feast offline store.
2. The streaming pipeline to compute real-time features and populate the Feast online store.
By defining the feature logic once and applying it to both paths, we drastically reduce the risk of skew.

Creating Point-in-Time Correct Training Data

Generating historical training data must avoid using future information. Our Solution: This is another primary benefit of Feast. When the ML Training Pipeline requests data, it will provide an “entity DataFrame” containing user_id, session_id, and event_timestamp. Feast’s get_historical_features() method will automatically perform the complex point-in-time join against the offline feature store, ensuring that for each event, only feature values that were available at that exact timestamp are joined. This eliminates manual, error-prone temporal join logic.

Online/Offline Store Synchronization

The online and offline stores must be consistent. Our Approach: The data flow is unidirectional and designed for consistency.
1. The streaming pipeline is the source of truth for real-time features. It writes directly to the Online Store (Amazon ElastiCache for Redis) for low latency.
2. In parallel, the same streaming job writes the computed features to a table in our S3 Data Lake.
3. A separate, periodic batch job (part of the feature pipeline DAG) picks up these files and appends them to the main Feast Offline Store tables.
This ensures the online store is always freshest, while the offline store is kept reliably in sync with a slight, acceptable delay. Note: ElastiCache for Redis is chosen over DynamoDB based on benchmark data suggesting superior performance for this type of key-value lookup workload.

Handling High Ingestion Throughput & Spikes

The system must handle traffic spikes during sales events. Our Solution: We use managed, auto-scaling AWS services. API Gateway and Kinesis Data Streams are designed to handle massive, spiky throughput. Our Spark Structured Streaming job will be run on a cluster (e.g., EMR or Databricks) with autoscaling enabled to add or remove worker nodes based on the volume of data in the Kinesis stream.

5.3 How do we choose the optimal Trigger Interval for our Spark Structured Streaming job?

Factors Influencing the Trigger Interval Choice
  1. Business Requirements & Feature Freshness (The “Why”)

    • What it is: This is the most important factor. How up-to-date do the features need to be to provide business value?

    • For Our Project: We are predicting purchase intent for in-session personalization. If a user adds an item to their cart, we want that action to be reflected in their feature vector relatively quickly so we can personalize their next click. A feature that is 5 minutes old is still useful, but one that is 30 minutes old might miss the opportunity. There is no business value in sub-second freshness for this use case.

    • Consideration: A shorter interval provides fresher features but comes at a higher cost.

  2. Cost & Efficiency (The “How Much”)

    • What it is: Each time the trigger fires, Spark spins up tasks, plans the execution, and writes output. Very frequent triggers lead to high operational overhead.

    • CPU Churn: A very short trigger (e.g., 5 seconds) means the Spark cluster is constantly busy planning and executing very small batches of work, which is inefficient and leads to high CPU costs for the amount of data processed.

    • The “Small File Problem”: This is a classic data engineering challenge. If our trigger is 10 seconds and we are writing the output to our S3 Data Lake (for the offline store), we will generate 6 files per minute, or 8,640 tiny files per day. Data lakes are optimized for reading a small number of large files, not thousands of small ones. This severely degrades the performance of any downstream analytical queries or model training jobs.

    • Consideration: A longer interval is more cost-effective and creates fewer, larger, and more optimized files in the data lake.

  3. End-to-End Latency (The “How Fast”)

    • What it is: The total time from a user’s click to that event being reflected in a feature available for inference.

    • Components: Latency = Kinesis Ingestion Lag + Spark Scheduling Delay + Trigger Interval + Micro-Batch Processing Time + Feature Store Write Time.

    • The Trigger’s Role: The trigger interval is often the largest and most controllable component of this latency. A 1-minute trigger guarantees a minimum of 1 minute of data staleness for some events.

    • Consideration: You must ensure your Micro-Batch Processing Time < Trigger Interval. If it takes 45 seconds to process a micro-batch, setting a 30-second trigger is impossible; the pipeline will continuously fall behind, and its lag will grow indefinitely.

  4. Operational Stability & State Management (The “How Reliable”)

    • What it is: The reliability of the streaming job itself.

    • Stateful Operations: Our pipeline is stateful because we are calculating session-based features (e.g., pages_viewed_in_session). Spark needs to hold the state for every active user session in memory (spilling to checkpoint storage on S3).

    • Impact of Interval: A longer trigger interval means each micro-batch contains more data and potentially updates the state for more users at once. This increases the memory pressure on the Spark executors during that batch. A very long interval could increase the risk of Out-Of-Memory (OOM) errors if a single batch becomes too large to process.

    • Recovery Time: If a job fails, Spark will recover from the last successful checkpoint. A shorter trigger interval means checkpoints happen more frequently, so the amount of data to reprocess upon failure is smaller, leading to faster recovery.

Summary of Trade-offs

Trigger Interval

Feature Freshness

Cost

Data Lake Friendliness

Latency

Stability Risk

Very Short (e.g., < 10s)

Excellent

High

Poor (many small files)

Low

Higher (CPU churn, risk of falling behind)

Medium (e.g., 30s - 2 min)

Good

Medium

Good

Medium

Balanced

Long (e.g., > 5 min)

Fair / Poor

Low

Excellent (fewer, larger files)

High

Lower (but larger recovery on failure)

Recommendation for Our Project

Given the factors above, a fixed ProcessingTime trigger is the most appropriate choice.

The recommended trigger interval for this project is between 30 seconds and 2 minutes.

A good starting point would be 1 minute.

Justification:

  1. Business Value: A 1-minute feature freshness is more than sufficient for effective in-session personalization. A user’s overall intent doesn’t change drastically on a second-by-second basis. This interval allows us to capture significant actions (like adding to cart) and use them to influence the experience within the next few page loads.

  2. Cost-Effectiveness: A 1-minute interval avoids the severe “small file problem” and prevents excessive CPU churn, keeping operational costs reasonable. It produces 1,440 files per day, which is manageable for downstream systems.

  3. Latency: This adds a predictable latency component that fits well within the overall system design. It is not trying to be a sub-second, real-time trading system.

  4. Stability: It creates micro-batches of a manageable size for our stateful session calculations, providing a good balance between memory pressure and recovery time.

This interval is an initial, reasoned choice. In a true production rollout, this parameter would be fine-tuned based on load testing and by monitoring the pipeline’s processing time and end-to-end latency metrics in the staging environment.


6. Feature Engineering and Pipelines: Crafting the Predictive Signals

Feature engineering is the alchemical process of transmuting raw data into the valuable, predictive signals that fuel machine learning models. It is widely acknowledged that the quality and relevance of features often have a more significant impact on model performance than the choice of the algorithm itself.

This section details the feature engineering strategy for our propensity scoring model. We will cover the specific features to be created, the design of the hybrid batch and streaming pipelines responsible for their computation, and the role of the Feature Store as the central nervous system for managing and serving these features consistently across training and inference.

6.1 Feature Engineering Lifecycle and Strategy

Our feature engineering process follows a structured lifecycle, ensuring that every feature is well-designed, validated, and managed.

Lifecycle Stage

Application to the Propensity Scoring Project

Feature Ideation

Based on domain knowledge and EDA, we hypothesize that a user’s purchase intent is a function of their long-term habits, their current session’s intensity, and their interactions with specific products. This leads to four categories of features: User, Product, Session, and User-Product interaction features.

Data Sourcing

All features will be derived from the “Silver” layer of our S3 data lake, which contains cleaned, structured clickstream, transactional, and customer data.

Feature Transformation & Generation

A hybrid approach is adopted:
- Batch Pipeline (Daily): Computes long-term historical features (User & Product level).
- Streaming Pipeline (Real-Time): Computes volatile, short-term session features (Session & Session-Product level).

Feature Validation

Data quality checks (using Great Expectations) will be embedded within the generation pipelines to validate features before they are saved to the Feature Store. Checks will include null-value constraints, range checks, and distribution comparisons.

Feature Storage & Management

We will use Feast as our centralized Feature Store. All engineered features will be registered and stored in Feast’s offline (S3) and online (ElastiCache for Redis) stores.

Feature Serving & Monitoring

Features will be served by Feast to the training and inference pipelines. Production features will be monitored for drift using Evidently AI.

6.2 A Lexicon of Features for Purchase Intent

The following table details the specific features that will be engineered. They are categorized by entity and computation frequency, aligning with our hybrid pipeline strategy.

Feature Category

Computation

Feature Name

Description & Rationale

User Features

Batch (Daily)

lifetime_purchase_count

Total number of purchases made by a user. Strong indicator of loyalty.

avg_order_value_90d

Average monetary value of a user’s orders over the last 90 days. Indicates spending habits.

days_since_last_purchase

Time elapsed since the user’s last purchase. Recency is a powerful predictor of re-engagement.

preferred_product_category

The product category most frequently purchased by the user. Captures long-term affinity.

Product Features

Batch (Daily)

purchase_count_30d

Total number of times a product has been purchased in the last 30 days. A measure of product popularity.

view_to_purchase_rate_30d

The ratio of purchases to views for a product in the last 30 days. A proxy for the product’s conversion efficiency.

avg_price_7d

The average selling price of the product over the last 7 days. Captures price volatility.

Session Features

Streaming (Real-Time)

session_duration_seconds

The elapsed time since the start of the current session. Measures user engagement.

distinct_products_viewed

The count of unique products viewed in the current session. Indicates browsing breadth.

add_to_cart_count

The number of “add to cart” events in the session. A very strong intent signal.

is_weekend, hour_of_day

Temporal features that capture contextual user behavior patterns.

device_type, channel_type

Contextual features describing how the user is accessing the platform.

Session-Product Features

Streaming (Real-Time)

product_views_in_session

The number of times the current user has viewed a specific product in the current session. Indicates specific interest.

time_since_last_view_of_product

The time elapsed since the user last viewed the specific product. Captures recency of interest within the session.

6.3 Architecting the Feature Engineering Pipelines

We will implement two parallel pipelines, each tailored to the specific freshness requirements of our features.

6.3.1 The Daily Batch Feature Pipeline

This pipeline is responsible for computing features that are based on long-term historical data and do not need to be updated in real-time.

  • Orchestration: An Apache Airflow DAG, scheduled to run once daily after midnight.

  • Compute Engine: An AWS EMR cluster running Apache Spark. Spark is chosen for its ability to efficiently process large volumes of historical data from the S3 data lake.

  • Workflow Steps (within the Airflow DAG):

    1. Launch EMR Cluster: The DAG begins by provisioning a transient EMR cluster.

    2. Run Spark Job: Submits a Spark job that reads from the “Silver” layer tables, computes all “User Features” and “Product Features”, and saves the output to a temporary S3 location.

    3. Validate Features: A Python task runs a Great Expectations suite on the newly computed features to ensure quality.

    4. Load to Feast: A Python task calls feast materialize to load the validated features from the temporary location into the Feast offline store (S3). A subset (e.g., features for recently active users) is also materialized to the Feast online store (Redis) to refresh the long-term context for active users.

    5. Terminate EMR Cluster: The DAG terminates the EMR cluster to save costs.

6.3.2 The Real-Time Streaming Feature Pipeline

This pipeline processes live clickstream events to compute and serve volatile, session-based features with low latency.

  • Core Challenge: As identified in our planning phase, the primary challenge is managing state for potentially millions of long-lived user sessions.

  • Compute Engine: A continuously running Spark Structured Streaming application on a persistent AWS EMR cluster (or a similar service like Databricks).

  • State Management Solution: We will use Spark’s built-in stateful streaming capabilities (flatMapGroupsWithState). To handle the large state and ensure fault tolerance, the state will be checkpointed to Amazon S3. A session state timeout (e.g., 24 hours of inactivity) will be implemented within the stateful function to automatically purge expired session data, preventing infinite state growth.

  • Workflow Steps (Continuous Application):

    1. Consume from Kinesis: The Spark job reads enriched events from the “processed” Kinesis Data Stream.

    2. Stateful Transformation: The flatMapGroupsWithState operation is applied, grouped by user_id and session_id. Within the stateful function, all “Session Features” and “Session-Product Features” are updated based on the incoming events.

    3. Write to Online Store: The output stream of updated features is written directly to the Feast online store (Amazon ElastiCache for Redis). Redis is chosen over DynamoDB for its superior in-memory performance, which is critical for meeting our sub-100ms inference latency SLA.

    4. Write to Offline Archive: In parallel, the output stream is also written to a “Gold” table in the S3 data lake. This provides a historical archive of real-time features, which is essential for ensuring point-in-time correctness during training. The Feast framework will later use this offline archive to create consistent training datasets.

This hybrid pipeline architecture provides a robust and efficient solution. The batch pipeline cost-effectively handles large-scale historical computations, while the streaming pipeline delivers the ultra-fresh, session-level features required for impactful, real-time personalization. The Feast Feature Store acts as the unifying layer, ensuring consistency between these two worlds.


7. Model Development & Iteration

The goal here is not to build a single, perfect model in one attempt, but to follow a rigorous, iterative process of experimentation. We will start with the simplest possible baselines, measure performance meticulously, and justify every increase in complexity with tangible improvements in our ability to predict purchase intent.

7.1 Foundations for Success: The Modeling Blueprint

Before the first model is trained, we establish the strategic foundations that will guide our entire development process.

Factor to Consider

Decision / Choice Made

Rationale & Trade-offs

1. Success Metrics

Optimizing: Area Under the ROC Curve (AUC).
Satisficing: p99 Inference Latency < 100ms; Precision at a 0.9 score threshold > 75%.
Business KPI: Increase in Conversion Rate.

AUC is the primary metric because it measures the model’s ability to correctly rank sessions by their likelihood to convert, which is vital for targeting different user segments. Latency is a hard constraint for the real-time user experience. Precision at the high-score threshold is a key business constraint to minimize giving discounts to users who would have purchased anyway.

2. Data Splitting

Strict Temporal Split. The training data will use events up to a specific cutoff date (e.g., end of Week 3), and the model will be evaluated on events from the subsequent period (Week 4).

This is the only valid approach for a time-dependent problem like predicting user behavior. A random split would leak future information into the training set, resulting in an overly optimistic and misleading evaluation of the model’s true performance.

3. Baseline Models

1. Heuristic Baseline: A simple rule (e.g., “predict ‘purchase’ if add_to_cart_count > 0”).
2. ML Baseline: A Logistic Regression model using only basic session-level features.

The heuristic baseline establishes the absolute minimum performance bar and confirms the value of using ML. The simple logistic regression model validates the end-to-end training pipeline and provides a robust, interpretable initial performance floor to beat.

4. Primary Model Family

Gradient Boosted Trees (LightGBM/XGBoost)

This model family is the industry standard for tabular data. It offers state-of-the-art performance, is computationally efficient, and provides good interpretability through feature importance plots. It strikes the best balance of accuracy and operational feasibility for this problem.

5. Experiment Tracking

MLflow

Aligned with our tech stack, MLflow will be used to automatically log the parameters, metrics, code versions, and artifacts for every single training run. This ensures complete reproducibility and simplifies the comparison of different model iterations.

6. Debugging & Diagnostics

SHAP (SHapley Additive exPlanations) will be used for feature importance and explaining individual predictions. Learning curves will be analyzed to diagnose bias vs. variance issues.

SHAP is critical for gaining business trust by explaining why a particular user session was given a high or low score. This is invaluable for debugging and refining personalization strategies.


8. ML Training Pipelines

The experimental model from the previous phase must be transformed into a standardized, automated, and reliable production workflow. The ML Training Pipeline is the MLOps “assembly line” responsible for this. It codifies every step, from data ingestion to model validation, ensuring that every production model is built and evaluated with the same rigor and consistency.

8.1 Training Pipeline Design and Architecture

Our training pipeline will be orchestrated by Apache Airflow. It will be designed as a Directed Acyclic Graph (DAG) that can be triggered on a schedule (e.g., weekly) or by an alert from our monitoring system, indicating model or data drift. This pipeline will leverage managed AWS services for scalable computation to avoid managing a dedicated training cluster.

Architecture Diagram
graph TD
    subgraph Airflow Orchestration
        A[Start] --> B(Fetch Training Data from Feast);
        B --> C(Validate Input Data);
        C --> D(Train Model on SageMaker);
        D --> E(Evaluate Model Performance);
        E --> F{Performance > Production?};
        F -- Yes --> G(Behavioral & Fairness Tests);
        G --> H{All Tests Pass?};
        H -- Yes --> I(Register Model in MLflow);
        I --> J[End];
        F -- No --> K(End & Alert);
        H -- No --> L(End & Alert);
    end

    subgraph AWS & MLOps Tooling
        B --> Feast(Feast Offline Store - S3);
        C --> GE(Great Expectations);
        D --> SM(SageMaker Training Job);
        E --> MLflow(MLflow Tracking Server);
        G --> Pytest(Pytest Checklists);
        I --> MLflowRegistry(MLflow Model Registry);
    end

8.2 Pipeline Components and Implementation Plan

The Airflow DAG will be composed of the following modular, containerized tasks.

Stage

Pipeline Task (Component)

Implementation Details & Tools

1. Data Preparation

fetch_and_validate_data

Script (data_prep.py):
- Fetches a point-in-time correct training dataset from the Feast offline store using get_historical_features() specifying a fresh cutoff date.
- Data Validation (Great Expectations): The script then runs a validation suite against the fetched data to check for schema compliance, null values, and feature ranges.
Output: A validated training dataset is saved to a versioned S3 path for the current pipeline run.

2. Model Training

train_model

Script (train.py):
- This script is packaged into a Docker container and submitted as an Amazon SageMaker Training Job using Airflow’s SageMakerTrainingOperator.
- MLflow Tracking: The script is heavily instrumented with MLflow to log all parameters, metrics (like training loss), and the final trained model artifact directly to the MLflow server.
Output: A trained model artifact logged in MLflow and associated with the current pipeline run.

3. Offline Evaluation

evaluate_model

Script (evaluate.py):
- Loads the trained model from the MLflow run and the held-out test set.
- Calculates Core Metrics: Computes AUC, Precision, Recall, and F1-score.
- Compares to Production: Fetches the metrics of the current “Production” model from the MLflow Model Registry and compares the new model’s AUC. The result (e.g., is_better: True/False) is passed to the next task via XComs.
Output: A detailed evaluation report (evaluation.json) and performance plots saved as MLflow artifacts.

4. Comprehensive Testing & Validation

run_advanced_tests

Script (advanced_tests.py):
- This task executes only if the new model is better than the production model.
- Sliced Evaluation: It evaluates AUC on critical user segments (Anonymous vs. Known, Mobile vs. Desktop) and asserts that performance does not drop by more than a set threshold (e.g., 5%) on any slice.
- Behavioral Testing (pytest): Runs a suite of predefined behavioral tests:
- Invariance Test: Asserts that the prediction is unchanged when a non-predictive feature (e.g., session_id) is altered.
- Directional Test: Asserts that adding an add_to_cart event to a session’s data increases its propensity score.
- Fairness & Calibration (Conceptual): While not implemented in V1, this task would include checks for fairness metrics and model calibration here.
Output: A boolean flag indicating if all advanced tests passed.

5. Model Registration

register_model

Script (register.py):
- This task runs only if all previous validation and testing steps pass.
- It uses the MLflow client to take the model artifact from the current run and registers it as a new version in the MLflow Model Registry.
- The new version is initially placed in the “Staging” stage, ready for the CD pipeline to pick it up for deployment.

8.3 Artifacts to be Implemented

This plan requires the development and versioning of the following key artifacts:

  1. Python Scripts (/src):

    • data_prep.py: Logic for fetching data from Feast and running validation.

    • train.py: The core model training logic for our LightGBM model.

    • evaluate.py: Script for calculating and comparing model performance metrics.

    • advanced_tests.py: The script containing the sliced evaluation and behavioral tests.

    • register.py: A utility script to interact with the MLflow Model Registry.

  2. Unit Tests (/tests):

    • test_data_prep.py: Unit tests for the data preparation logic with mocked Feast responses.

    • test_train.py: Unit tests for the training script’s utility functions.

    • test_evaluate.py: Unit tests for the evaluation metric calculation logic.

  3. Pipeline Definition (/pipelines/dags):

    • model_retraining_dag.py: The Airflow DAG file that defines the tasks, dependencies, schedule, and alerting for the entire training pipeline.

  4. Infrastructure as Code (/infrastructure):

    • sagemaker.tf: Terraform configuration for the IAM roles required for SageMaker Training Jobs to access S3 and Feast.

    • airflow_connections.tf: Terraform to configure the AWS connections required by Airflow.

  5. Integration & E2E Tests (/pipelines/tests):

    • test_training_pipeline_integration.py: A pytest script designed to be run in a staging environment. It triggers the Airflow DAG with a small dataset and asserts that a model is successfully trained and registered in a staging MLflow instance. This validates the entire pipeline flow and component integrations.

  6. CI/CD Workflows (/.github/workflows):

    • ci_training_pipeline.yml: A GitHub Actions workflow that runs on every PR. It will execute:

      • Linting and static analysis on all Python code.

      • Unit tests for all src components.

      • Airflow DAG syntax validation (airflow dags test).

    • cd_training_pipeline.yml: A GitHub Actions workflow triggered on merge to main. It will deploy the updated Airflow DAG and any associated containers to the production Airflow environment.


9. Deployment, Serving, and Inference

With a validated and versioned model artifact in our MLflow Registry, we now shift focus to the critical “last mile” of MLOps: deploying the model as a scalable, low-latency service and building the real-time inference pipeline to consume it. This is where the model begins to deliver tangible business value.

9.1 Overarching Deployment and Serving Strategy

Our strategy is guided by the project’s core requirements: low latency for a real-time user experience and high availability to support a production e-commerce site.

Strategic Factor

Decision / Choice

Rationale & Trade-offs

1. Deployment Pattern

Online Prediction (Model-as-a-Service)

The business need for in-session personalization requires immediate, synchronous predictions. Batch prediction is too slow and would result in stale, irrelevant user experiences.

2. Serving Architecture

Microservice on a Managed Platform (Amazon SageMaker)

We will deploy the model as a standalone microservice to decouple it from the main e-commerce application. This allows for independent scaling and updates. We’ve chosen Amazon SageMaker Endpoints to abstract away the underlying Kubernetes complexity, reduce operational overhead, and leverage built-in features like autoscaling and monitoring.

3. API Style

REST API

A RESTful API using JSON payloads is chosen for its simplicity and broad compatibility. While gRPC offers lower latency, the overhead of managing Protobufs and gRPC clients was deemed unnecessary for this initial version. Performance can be optimized via other means.

4. Release Strategy

Canary Release followed by A/B Testing

New model versions will be deployed using a Canary Release strategy (e.g., starting with 5% of traffic) to safely validate operational performance. Once stable, a formal A/B test will be conducted to measure the new model’s impact on the primary business KPI (Conversion Rate) before a full rollout.

5. Model & Infrastructure Governance

CI/CD for Serving & Automated Rollbacks

All infrastructure will be managed via Terraform. Deployments will be automated through GitHub Actions. Critical monitoring alerts (e.g., a spike in p99 latency or error rate on the canary) will be configured to trigger an automated rollback to the previous stable model version.

9.2 Pre-Deployment Preparations: Packaging the Model for Serving

Before deployment, the model artifact must be packaged into a self-contained, reproducible unit.

  1. Model Serialization: The trained LightGBM model will be serialized using joblib or pickle.

  2. Containerization (Docker): We will create a Dockerfile that:

    • Starts from a standard Python base image.

    • Copies over the model artifact and any necessary source code (e.g., pre-processing logic).

    • Installs all dependencies from a requirements.txt file.

    • Uses a production-grade web server like Gunicorn to run a simple FastAPI application that wraps the model.

  3. API Definition (FastAPI): The FastAPI app will define:

    • A /health endpoint for health checks.

    • A /predict endpoint that accepts a JSON payload matching the feature vector schema, runs the prediction, and returns the propensity score. Pydantic will be used for request/response validation.

  4. Container Registry: The final Docker image will be version-tagged and pushed to Amazon Elastic Container Registry (ECR).

9.3 The Real-Time Inference Pipeline

This is not a scheduled pipeline like our training DAG but the logical flow of a single, live prediction request. Optimizing this path for latency is critical.

Architecture Diagram
Latency Budget (p99 < 100ms)

The total time is the sum of each step. Our optimization targets are:

  • Network Overhead (App <-> SME): ~5-15ms

  • Feature Retrieval (SME -> Redis): < 10ms. This is why ElastiCache for Redis was chosen.

  • Model Inference (on SageMaker): < 30ms. LightGBM is extremely fast.

  • Pre/Post-processing: < 5ms

  • Safety Buffer: ~40ms

9.4 Implementation Plan for the Inference System Artifacts

Artifact Category

File(s) / Component

Description & Key Logic

Python Scripts

/src/serving/app.py

A FastAPI application with /predict and /health endpoints. The /predict endpoint will:
1. Receive a request with user_id and session_id.
2. Initialize a Feast FeatureStore client.
3. Call fs.get_online_features() to retrieve the feature vector from Redis.
4. Call model.predict_proba() on the feature vector.
5. Return the JSON response.

/src/serving/preprocessor.py

Contains any pre-processing logic (e.g., one-hot encoding for device_type) that must be applied to the feature vector before inference. This ensures consistency with the training pipeline.

Containerization

Dockerfile

Defines the steps to build the serving container image, including installing dependencies and setting the CMD to run the FastAPI app with Gunicorn.

.dockerignore

Prevents unnecessary files (like local test data) from being copied into the container, keeping it lean.

Unit Tests

/tests/unit/test_serving_app.py

Pytest unit tests for the FastAPI application using TestClient. It will:
- Test the /health endpoint.
- Test the /predict endpoint with a valid payload, mocking the Feast client to return a sample feature vector and asserting the response format.
- Test with an invalid payload and assert a 422 Unprocessable Entity error.

Infrastructure as Code

/infrastructure/sagemaker_endpoint.tf

Terraform configuration to define the aws_sagemaker_model, aws_sagemaker_endpoint_configuration, and aws_sagemaker_endpoint resources. This code will reference the ECR image URI and define instance types, scaling policies, and the canary traffic split.

Integration Tests

/tests/integration/test_deployed_endpoint.py

A Pytest script intended to be run after a deployment. It sends a real HTTP request to the deployed SageMaker endpoint’s URL and validates the response. This is the final smoke test in the CD pipeline.

CI/CD Workflow

/.github/workflows/cd_serving.yml

A GitHub Actions workflow responsible for Continuous Delivery of the model.
1. Trigger: On promotion of a model to “Production” in the MLflow Registry.
2. Build & Push: Builds the Docker image for the new model and pushes it to ECR.
3. Deploy: Runs terraform apply to update the SageMaker endpoint configuration with the new container, implementing the canary traffic shift.
4. Test: Executes the integration test script (test_deployed_endpoint.py).
5. Rollback: If the integration test fails, it automatically runs terraform apply with the previous stable configuration.

This comprehensive plan ensures that our validated model is deployed into a performant, scalable, and reliable serving environment, with automation and safety checks built into every step of the process.


10. Monitoring, Observability, and Model Evolution

Deploying a model into production is not the finish line; it is the starting line for its operational life. A model’s performance inevitably degrades over time as it encounters data patterns and user behaviors that differ from its training set. A comprehensive Monitoring and Observability strategy is therefore non-negotiable for maintaining model accuracy, ensuring system reliability, and delivering sustained business value.

Our strategy is built on a proactive, multi-layered approach that covers the health of our infrastructure, the quality of our data, and the performance of the model itself.

10.1 Monitoring and Observability Plan

Pillar

Capability

Implementation Strategy & Tooling

Alerting & Action Plan

1. Service Health & Reliability

Operational Monitoring
To track the health and performance of the SageMaker serving endpoint.

Amazon CloudWatch will be used to monitor key metrics for the SageMaker Endpoint:
- Latency (p90, p99): The time taken to serve a prediction.
- Invocation Errors (5xx Rate): The rate of server-side errors.
- Throttles: Indicates if the service is unable to handle the request volume.
- CPU/Memory/GPU Utilization: To inform scaling policies.

CloudWatch Alarms will be configured:
- High Latency Alert (P0): If p99 latency > 100ms for 5 consecutive minutes, an alert is sent to the on-call MLOps engineer via PagerDuty.
- High Error Rate Alert (P1): If the 5xx error rate exceeds 1% over a 15-minute period, an alert is sent to the team’s Slack channel.

2. Data Quality & Integrity

Input Data Validation
To ensure the data being sent to the live model for inference is valid and well-formed.

Pydantic models will be used within the FastAPI serving application to automatically validate the schema and data types of incoming prediction requests. Malformed requests will be rejected with a 422 error and logged for analysis.

Logging & Monitoring: The rate of 422 errors will be monitored in CloudWatch. A sudden spike indicates a potential issue with an upstream client application and triggers a P2 alert to the development team’s Slack channel.

Feature Drift Detection
To detect shifts in the distribution of live features compared to the training data.

Evidently AI will be used in a scheduled Airflow DAG that runs daily:
1. The DAG reads a sample of the last 24 hours of inference requests (features) logged in S3.
2. It compares the distribution of each feature against a reference profile generated from the training data.
3. It calculates the Population Stability Index (PSI) for key numerical features and the Chi-Squared test for categorical features.

Drift Alert (P1): If the PSI for a critical feature (e.g., session_duration) exceeds 0.25, an alert with the Evidently AI report is sent to the MLOps and Data Science team’s Slack channel for investigation.

3. Model Performance

Prediction Drift Detection
To provide an early warning of potential model performance degradation when ground truth labels are not yet available.

The same daily Airflow DAG using Evidently AI will also monitor the distribution of the model’s output scores (the propensity scores).
It will calculate the PSI or Kolmogorov-Smirnov (K-S) statistic comparing the distribution of today’s predictions to the distribution on the validation set.

Prediction Drift Alert (P1): A significant shift in the prediction distribution (e.g., PSI > 0.2) triggers an alert. This is often the first sign of concept drift or a data quality issue and prompts an immediate investigation.

Ground Truth Performance Monitoring
To track the model’s actual predictive accuracy on live traffic.

An Airflow DAG will run daily to join the logged predictions with the ground truth purchase events (which become available after a session ends). It will calculate and log the following metrics to MLflow:
- Overall AUC
- Precision and Recall

Performance Degradation Alert (P1): If the overall AUC drops by more than 5% from the validation set performance for two consecutive days, an alert is triggered, suggesting the need for retraining or model analysis.

Fairness & Bias Monitoring
To ensure the model performs equitably across key user segments.

The performance monitoring DAG will also compute sliced evaluation metrics. It will calculate and compare the AUC for key segments (e.g., Anonymous vs. Known users, Mobile vs. Desktop).

Fairness Alert (P2): If the performance disparity between any two critical segments becomes too large (e.g., the difference in AUC > 10%), an alert is sent for manual review. This could indicate that the model has developed a bias and needs to be retrained with more balanced data or different features.

4. Explainability (XAI)

Local & Global Explanations
To understand why the model makes certain predictions, for debugging, and to build business trust.

SHAP (SHapley Additive exPlanations) will be integrated:
- For a sample of live predictions, the model service will also compute and log the SHAP values for each feature.
- These values will be used to generate:
- Global Feature Importance plots to understand the model’s overall behavior.
- Local Explanations to debug specific high-stakes or anomalous predictions (e.g., “Why was this high-value cart user given a low propensity score?”).

This is primarily for offline analysis and debugging, not real-time alerting. The logged SHAP values are a critical input for any root cause analysis following a performance alert.


11. Continual Learning & Production Testing: Evolving the Model

A deployed model is not the end of the journey; it is the beginning of a continuous cycle of monitoring, learning, and adaptation. To ensure our propensity scoring model remains accurate and delivers sustained business value, we must implement a robust strategy for continual learning and production testing. This strategy moves us from a static deployment to a dynamic, self-improving system.

11.1 The Imperative to Evolve: Triggers for Model Updates

Our models will not “age like fine wine.” Their performance will degrade due to data and concept drift. Our continual learning strategy is designed to counteract this by updating the model in response to specific, monitored triggers.

Trigger Type

Primary Monitoring Metric

Threshold / Condition

Retraining Action

Performance-based

Overall AUC (on live data)

If daily AUC drops by >5% from the initial validation AUC for 2 consecutive days.

High Priority: Immediately triggers the automated retraining pipeline (model_retraining_dag.py).

Drift-based

Population Stability Index (PSI)

If PSI on a critical feature (e.g., session_duration) or on the prediction distribution exceeds 0.25.

Medium Priority: Triggers an alert for the MLOps/DS team to investigate. If the drift is confirmed to be significant and not an anomaly, a manual retraining run is initiated.

Scheduled

Time

Every 2 weeks, regardless of other triggers.

Standard Cadence: Triggers the automated retraining pipeline to capture gradual concept drift and incorporate the latest data patterns.

11.2 Retraining and Data Curation Strategy

Our primary approach will be Automated Stateless Retraining (Stage 2 maturity), which provides a balance of simplicity and robustness.

  • Retraining Mechanism: When triggered, the Airflow model_retraining_dag.py will train a new LightGBM model from scratch.

  • Data Curation: The pipeline will use a sliding window of the most recent 4 weeks of data from the Feast offline store. This ensures the model learns from the most relevant, up-to-date user behavior while discarding older, potentially stale patterns.

  • Stateful Training (Future Consideration): While stateless retraining is our initial strategy, we recognize the significant cost and efficiency benefits of stateful fine-tuning. A future iteration of the MLOps platform could explore this, but it would require more complex checkpoint management and lineage tracking.

11.3 Production Testing: The A/B Testing Framework

No new model, even one that shows superior offline metrics, will be fully deployed without proving its value on live traffic. We will use a rigorous A/B testing framework to measure the causal impact of new models on our primary business KPI.

1. Hypothesis:

  • “Deploying the challenger model (e.g., version v2.1), retrained on the latest data, will cause a statistically significant increase of at least 1% in the overall user conversion rate compared to the champion model (v2.0) over a 2-week period.”

2. Experiment Design:

Design Factor

Specification

Rationale

Primary Metric

User Conversion Rate

This directly measures the ultimate business objective of increasing sales.

Guardrail Metrics

- p99 Inference Latency (< 100ms)
- Endpoint Error Rate (< 0.1%)
- User Bounce Rate (should not increase)
- Unsubscribe Rate (for email campaigns using scores)

To ensure the new model does not degrade system performance or negatively impact the broader user experience. A breach of these guardrails will halt the experiment.

Unit of Randomization

user_id

Ensures a consistent experience for each user across multiple sessions and provides a stable unit for analyzing behavior.

Traffic Allocation

50% Control (Champion), 50% Treatment (Challenger)

A standard 50/50 split provides the maximum statistical power for a given total sample size.

Power & Duration

The experiment will be designed to have 80% statistical power to detect a 1% relative lift in conversion rate at a 95% significance level (α=0.05). Given our traffic estimates, this will require a run time of approximately 14 days.

This ensures we can confidently detect a meaningful business impact while accounting for weekly user behavior cycles.

3. Rollout and Decision Workflow:

The deployment and testing of a new model will follow a safe, phased process orchestrated by our CI/CD pipelines and experimentation platform.

11.4 Addressing Advanced Challenges

Our experimentation strategy also accounts for common real-world complexities:

  • Variance Reduction: To increase the sensitivity of our tests and potentially shorten experiment duration, we will implement CUPED (Controlled-experiment Using Pre-Experiment Data). We will use a user’s conversion rate from the pre-experiment period as a covariate to reduce variance in the conversion rate metric measured during the test.

  • Interference: As a multi-category retailer, interference between experiments is low but possible (e.g., a pricing test on one category affecting overall user budget). Our experimentation platform will use a layering system, ensuring that this propensity model test runs in a separate “ML Model” layer, orthogonal to UI or pricing experiments.

  • Novelty Effects: We will monitor the daily treatment effect throughout the A/B test. If we observe a significant novelty effect (e.g., a large initial lift that quickly fades), the final decision will be based on the metrics from the period after the effect has stabilized (e.g., the second week of the test).

This comprehensive strategy for continual learning and production testing creates a closed-loop system. It ensures that our models do not become stale, that updates are rigorously validated against real business metrics, and that we can continuously and safely evolve our personalization capabilities in a data-driven manner.


12. Governance, Ethics & The Human Element

Building a functional machine learning system is a significant technical achievement. However, building a trustworthy, compliant, and responsible system that earns the confidence of users, stakeholders, and regulators is the hallmark of a mature MLOps practice. This section outlines the principles and practices we will adopt to ensure our propensity scoring model is not only accurate but also governed, fair, and secure.

12.1 Comprehensive Model Governance Plan

Governance is the framework of rules, practices, and tools that ensures our ML assets are managed, versioned, and deployed in a controlled and auditable manner.

Governance Component

Application to the Propensity Scoring Project

Reproducibility & Auditability

Every model in our MLflow Model Registry will have a complete, traceable lineage. An auditor can select any production model version and trace it back to:
- The Git commit of the training pipeline code.
- The exact DVC version of the data it was trained on.
- The specific hyperparameters used.
- A full record of its evaluation metrics and artifacts.

Documentation

Each registered model version will be accompanied by a Model Card (a versioned model_card.md file). This card will detail:
- Model Details: Algorithm (LightGBM), version.
- Intended Use: To personalize user experience by scoring purchase intent.
- Evaluation Data: The date range of the validation set.
- Metrics: Key performance metrics (AUC, Precision) on both the overall test set and on critical user slices.
- Ethical Considerations: A summary of fairness evaluations and known limitations.

Security & Access Control

- Infrastructure: All AWS resources (S3 buckets, SageMaker endpoints, Kinesis streams) will be provisioned via Terraform with strict IAM policies based on the principle of least privilege.
- Model Registry: The MLflow Model Registry will have role-based access control. Only lead ML Engineers can approve the transition of a model from “Staging” to “Production.”
- Endpoint Security: The SageMaker inference endpoint will be secured within our VPC and only accessible via the API Gateway, which requires API key authentication.

Regulatory Compliance (EU AI Act)

Our propensity model would likely be classified as “Limited Risk” under the EU AI Act, as it influences commercial decisions. This classification mandates transparency. Our user-facing application must include a clear disclosure (e.g., in the privacy policy) stating that AI is used to personalize product recommendations and offers. If the model’s use were to expand into “High Risk” areas (like determining credit or insurance eligibility), it would trigger far more stringent requirements, including formal conformity assessments.

12.2 Responsible AI (RAI) by Design

Responsible AI is not a checklist to be completed at the end but a set of principles to be embedded throughout the project lifecycle.

RAI Principle

Practice within the Propensity Scoring Project

Fairness & Bias Mitigation

Problem: The model could inadvertently create unfair outcomes, such as only offering discounts to users from historically high-spending demographics, effectively penalizing new users or users from different regions.
Mitigation Strategy:
1. Measurement: During the automated evaluation stage, we will measure and compare the model’s AUC and precision across key user segments (e.g., Anonymous vs. Known, by country, by device type).
2. Action: If a significant performance disparity is detected (e.g., AUC for mobile users is 10% lower than for desktop users), it will trigger a P2 alert. This will prompt a manual review, which could lead to re-weighting samples in the training data or adding features that help the model better understand the underperforming segment.

Explainability (XAI)

Problem: Business stakeholders (e.g., the marketing team) need to understand why the model is making its predictions to trust it and build effective campaigns.
Implementation:
- Global Explanations: We will use SHAP to generate and save feature importance plots as a standard artifact for every trained model. This shows what factors, on average, drive predictions.
- Local Explanations: The inference service will have the capability to log SHAP values for a sample of predictions. This allows us to perform “spot checks” and answer specific questions like, “Why was this specific user, who added three items to their cart, given a low propensity score?”

Privacy-Preserving Techniques

Problem: Our data contains sensitive user information that must be protected under GDPR.
Implementation:
- Data Minimization: The training and inference pipelines will only have access to the specific features they need. They will not have access to raw PII like names or email addresses.
- Anonymization & Pseudonymization: The data engineering pipeline will be responsible for hashing or masking any PII. user_id will be a pseudonymized identifier, not a database primary key.
- Right to be Forgotten: We will implement a process to remove a user’s data from the training sets and feature stores upon receiving a GDPR deletion request.

Security against ML Attacks

Problem: While less of a target than fraud or spam models, our system is still vulnerable.
Mitigation Strategy:
- Input Sanitization: The FastAPI serving app validates all incoming feature data against a strict schema. Any requests with anomalous or malformed data are rejected.
- Robust Data Validation: Our Great Expectations suites in the training pipeline protect us from large-scale data poisoning attacks by validating the statistical properties of our training data before the model is trained.

12.3 Holistic Testing: A Production Readiness Assessment

We use the spirit of the “ML Test Score” rubric to self-assess the production readiness of our system.

Test Category

Our Project’s Scorecard

Features & Data

High. All features are defined in code (Feast), versioned in Git, and their costs/benefits are understood. The data pipelines have explicit validation steps (Great Expectations).

Model Development

High. Model specs are code. We have established baselines. We perform sliced evaluation and behavioral tests. Hyperparameters are tuned.

ML Infrastructure

High. Training is reproducible via Airflow/MLflow/DVC. The full pipeline is integration tested. We use a canary release process and have automated rollback capabilities via our CD workflow.

Monitoring

High. We have implemented monitoring for service health, data drift, prediction drift, and ground truth performance, with automated alerting on each.

12.4 The Human Element: Team & User Experience

  • Team Structure & Roles: This project exemplifies the Platform-Enabled Model. The Platform/DevOps team provides the core infrastructure (AWS, Airflow, GitHub Actions). As the ML Engineer/Data Scientist, you own the end-to-end ML workflow on top of that platform—from data analysis and model development to building the automated training and deployment pipelines. This structure maximizes ownership and velocity.

  • Designing for Trust: The outputs of this model directly impact the user experience. To maintain trust:

    • Feedback Loops: The system should be able to measure the outcome of its predictions. Did a user who was shown a discount actually convert? This feedback is essential for A/B testing and model improvement.

    • Graceful Failure: What happens if the inference service fails? The e-commerce application must have a fallback mechanism to serve a default, non-personalized experience, ensuring the user journey is never broken due to an ML system failure.


13. Overall System Architecture

This section provides the unified architectural blueprint of the end-to-end propensity scoring system. It ties together the individual data, training, and inference pipelines into a cohesive, production-grade MLOps platform, illustrating the flow of data and logic from the initial user click to the final, personalized action.

13.1 A Unified Architectural Blueprint

The architecture is a hybrid system that leverages managed AWS services for scalability and reliability while using best-of-breed open-source tools for core ML lifecycle management.

13.2 Real-Time Inference Sequence Diagram

This diagram details the critical path for a single prediction request, highlighting the latency budget for each step.

13.3 Potential Bottlenecks and Performance Optimizations

To meet our strict latency and throughput requirements, we must be proactive about identifying and mitigating potential bottlenecks in the inference path.

Bottleneck

Risk Level

Mitigation & Optimization Strategy

Online Feature Store Latency

High

The get_online_features() call is the most critical I/O operation. Our choice of Amazon ElastiCache for Redis is the primary mitigation. We must also optimize the network path between SageMaker and Redis (e.g., by placing them in the same VPC and Availability Zones). At extreme scale, introducing a local, in-memory cache (e.g., using functools.lru_cache) within the inference service itself could cache features for very active users for short periods (e.g., 5 seconds) to further reduce Redis lookups.

Model Execution Time

Medium

Our LightGBM model is very fast, but as we add features or complexity, this can increase. We will implement model quantization (converting model weights from float32 to int8), which can provide a 2-3x speedup on CPU-based instances with minimal accuracy loss. We will also use the latest, most optimized version of the LightGBM library.

Cold Starts

Medium

If the SageMaker endpoint scales down to zero instances during periods of no traffic, the first request will incur a “cold start” latency penalty of several seconds while the container is provisioned. To prevent this, we will configure the autoscaling policy to always keep at least one instance warm. For spiky traffic, we can also use scheduled scaling to proactively increase instance counts before anticipated peak shopping times.

Inefficient Request Handling

Low

The Python web server (Gunicorn) must be configured correctly with an appropriate number of worker processes to fully utilize the CPU cores on the SageMaker instance. Not doing so can lead to requests queuing up and increased latency under load.

13.4 Estimated Monthly Costs

This is a high-level estimate assuming a mid-sized European e-commerce business. Actual costs will vary based on traffic, data volume, and specific AWS instance choices.

Assumptions:

  • Daily Sessions: ~250,000

  • Avg. Events per Session: 10

  • Total Monthly Ingestion Events: 250k sessions/day * 10 events/session * 30 days = 75 Million

  • Total Monthly Inference Requests: Same as ingestion = 75 Million

  • Peak Traffic Factor: 5x average traffic during peak hours.

Pipeline Component

AWS Service(s)

Detailed Cost Calculation & Rationale

Estimated Cost (USD)

Data Ingestion

API Gateway
Kinesis Data Streams

API Gateway: Priced per million requests.
- 75 million requests * ~$1.00/million = $75

Kinesis Data Streams: Priced per Shard Hour and per 1M PUT units. We need two streams (raw, processed) and at least 2 shards per stream for redundancy and peak load handling.
- Shard Hours: 2 streams * 2 shards/stream * 720 hours/month * ~$0.015/hour = $43.80
- PUT Payloads: 75M requests * 2 (for both streams) * ~$0.014/million units = $2.10
The primary cost is the infrastructure (shards) to be ready for the traffic.

$120 - $180

Feature Engineering

EMR (Spark)
ElastiCache (Redis)

EMR Cluster: Priced per instance-hour. A continuous streaming job requires a persistent cluster. We’ll assume a small 3-node cluster (1 master, 2 workers) of m5.xlarge instances.
- 3 instances * 720 hours/month * (~$0.192/hr EC2 + ~$0.048/hr EMR) = ~$525
- Add cost for daily batch jobs (e.g., 2 hours/day on 5 nodes) = ~$75

ElastiCache for Redis: Priced per instance-hour. To hold features for active users (~2-3 GB data) with high availability, we need 2 nodes (primary + replica) of cache.m6g.large.
- 2 nodes * 720 hours/month * ~$0.266/hour = ~$388

$1,300 - $1,800

Model Training

SageMaker Training
Airflow Infra

SageMaker Training Jobs: Priced per instance-second. Assuming we retrain 4 times per month for 2 hours each on an ml.m5.4xlarge instance.
- 4 runs/month * 2 hours/run * ~$1.038/hour = ~$8.30. The training itself is very cheap.

Airflow Infrastructure: The main cost is the 24/7 instance needed to run the Airflow scheduler/webserver. Assuming a managed service like MWAA or a t3.medium EC2 instance.
- 1 instance * 720 hours/month * ~$0.05/hour = ~$36.50

$80 - $150

Model Serving / Inference

SageMaker Endpoints

Priced per instance-hour. To handle ~150 req/sec at peak with p99<100ms latency, we need a baseline of 3 ml.c5.xlarge instances with autoscaling.
- 3 instances * 720 hours/month * ~$0.238/hour = ~$521

The cost includes a buffer for autoscaling up to 4-5 instances during peak shopping hours or marketing campaigns.

$600 - $900

Storage & Logging

S3
CloudWatch

S3: Priced per GB-month. Assuming data grows to 1 TB total (raw events, feature store, logs).
- 1024 GB * ~$0.023/GB-month = ~$23.50
- Add ~$10 for requests (PUT, GET).

CloudWatch: Priced per GB of logs ingested/stored. This can be significant. Assuming ~150 GB of logs per month.
- 150 GB * ~$0.50/GB (Log Ingestion) = ~$75
- Add ~$25 for custom metrics and alarms.

$140 - $250

Total Estimated Monthly Cost

-

-

$2,200 - $3,300

This detailed breakdown reveals that the vast majority of the monthly operational cost (~80%) is concentrated in the 24/7 services required for real-time feature engineering and model serving (EMR, ElastiCache, and SageMaker Endpoints). This highlights the critical importance of optimizing these components for cost-efficiency, for instance by right-sizing clusters and implementing intelligent autoscaling policies. The cost of actually training the model, by contrast, is almost negligible.

13.5 Deep Dive: Calculating Inference Instance Requirements

Estimating the number of instances is a bottom-up process. We start by understanding the performance of a single server and then scale that out to meet our target throughput. The fundamental goal is to answer: “How many requests per second (RPS) can a single instance handle?”

13.5.1 The Core Factors & Performance Equation

The throughput of a single inference server instance is governed by this relationship:

RPS_per_instance = (Concurrency) / (Latency_per_request)

Let’s break down each component:

  1. Latency per Request: The total time to process one request. This is the sum of:

    • I/O Wait Time: Time spent waiting for network operations. The most significant is the call to the online feature store (~10ms for Redis).

    • Model Execution Time: The actual model.predict() time. For a well-optimized LightGBM model on a single data row, this is extremely fast, typically ~20-30ms on a modern CPU.

    • Pre/Post-processing: CPU time for data validation, feature transformations, and JSON serialization. Let’s estimate ~10ms.

    • Total Latency per Request ≈ 40-50ms

  2. Concurrency: This is the most critical factor for CPU utilization. It’s the number of requests an instance can process simultaneously.

    • Naive Approach (Synchronous): A simple server handles one request at a time. Concurrency = 1. This is incredibly inefficient, as the CPU sits idle during the ~10ms of I/O wait time for the feature store.

    • Optimized Approach (Asynchronous): By using an async framework like FastAPI with Uvicorn workers, the server can handle many concurrent requests. While one request is waiting for the feature store (I/O-bound), the CPU is free to work on another request’s model execution (CPU-bound). This is the key to maximizing CPU utilization. The concurrency is limited not by waiting, but by the number of parallel workers you can run. A standard rule of thumb for Gunicorn (which manages the Uvicorn workers) is to have (2 * number_of_vCPUs) + 1 workers.

13.5.2 Performance Optimization Strategies

Before calculating, we must apply optimizations to get the best performance from each instance.

Optimization

Technique & Rationale

Impact

1. Asynchronous Request Handling

Use FastAPI + Uvicorn + Gunicorn. This is the single most important optimization. It allows the server to handle new requests while others are waiting for I/O (like feature store lookups), preventing the CPU from being idle.

Dramatically increases throughput (5x-10x) by improving concurrency and CPU utilization.

2. Model Quantization

Convert the model’s weights from 32-bit floating-point numbers to 8-bit integers (INT8).

Reduces model size and speeds up CPU execution by 2-3x with a negligible impact on the accuracy of a tree-based model like LightGBM.

3. Right-Sizing the Instance

Choose a CPU-optimized instance type. For SageMaker, the ml.c5 family (“c” for compute) is designed for this type of workload, offering a better price-to-performance ratio than general-purpose (ml.m5) instances.

Lowers cost by providing more CPU power for the same price as a general-purpose instance.

4. Connection Pooling

Use a persistent client (like redis-py’s connection pool) to communicate with the feature store.

Reduces latency by avoiding the overhead of establishing a new TCP connection to Redis for every single prediction request.

13.5.3 Bottom-Up Calculation: Throughput of a Single Instance

Let’s apply these principles to calculate the realistic throughput of one ml.c5.xlarge instance.

  • Instance Type: ml.c5.xlarge

  • vCPUs: 4

  • Gunicorn Workers: (2 * 4) + 1 = 9 workers. This means our server can handle 9 requests in parallel.

Step 1: Calculate Optimized Latency per Request

  • Original Model Latency: 25ms

  • Latency after Quantization (2.5x speedup): 25ms / 2.5 = 10ms

  • Total Latency = I/O Wait (10ms) + Optimized Model Execution (10ms) + Processing (10ms) = 30ms

Step 2: Calculate Max Theoretical RPS per Worker

  • A single worker, if always busy, can process 1 request / 0.030 seconds = ~33 RPS.

Step 3: Calculate Max Theoretical RPS per Instance

  • 9 workers * 33 RPS/worker = ~300 RPS

Step 4: Apply a Realistic Utilization Target

  • No system can run at 100% theoretical capacity without latency increasing dramatically due to queuing effects. A safe and standard target for production systems is 70-80% utilization.

  • Realistic Sustainable Throughput = 300 RPS * 0.7 = **210 RPS per instance**.

This number is our key metric for scaling.

13.5.4 Scaling Analysis: Instances and Costs by Request Volume

Now we can create the scaling table. We will assume a baseline of 2 instances for High Availability (HA) – you never want a single point of failure in production.

  • Cost of one ml.c5.xlarge instance: ~$0.238/hour * 720 hours/month = ~$174 per month

Target Throughput

Calculation

Minimum Instances Needed

Recommended Instances (for HA & Peak Buffer)

Estimated Monthly Cost

10 RPS

10 / 210 = 0.05

1

2

2 * $174 = $348

100 RPS

100 / 210 = 0.48

1

2

2 * $174 = $348

1,000 RPS

1,000 / 210 = 4.76

5

6

6 * $174 = $1,044

10,000 RPS

10,000 / 210 = 47.6

48

50

50 * $174 = $8,700

Key Insights from the Analysis:

  • Non-Linear Cost at the Start: The cost is flat for low throughputs because the primary driver is the business requirement for High Availability (minimum of 2 instances), not the load itself.

  • Linear Scaling: Once the load surpasses the capacity of the HA baseline, the cost scales linearly with the number of requests.

  • Optimization is Key: Without the async server and quantization optimizations, our RPS per instance would be much lower (perhaps 20-30 RPS), and the costs would be 7-10 times higher for the same throughput. This demonstrates that performance optimization is a direct and powerful cost-reduction strategy.

  • Validation is Mandatory: This entire calculation is a robust estimate. It must be validated with a real-world load test (using a tool like Locust) against a staging endpoint to confirm the actual sustainable RPS before deploying to production.



Code Implementation

Data Ingestion Pipeline

Architecture


Feature Engineering: Batch

Architecture

IaC (Terraform)

# ------------------------------------------------------------------
# ROLES AND PROFILES FOR ALL EMR CLUSTERS (Batch & Streaming)
# These foundational components are defined once and used by both provisioning methods.
# ------------------------------------------------------------------

resource "aws_iam_role" "emr_service_role" {
  name = "emr_service_role"
  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [{ Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "elasticmapreduce.amazonaws.com" } }],
  })
}
# Attach AWS managed policy for EMR service role
resource "aws_iam_role_policy_attachment" "emr_service_policy_attach" {
  role       = aws_iam_role.emr_service_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceRole"
}

resource "aws_iam_role" "emr_ec2_role" {
  name = "emr_ec2_instance_role"
  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [{ Action = "sts:AssumeRole", Effect = "Allow", Principal = { Service = "ec2.amazonaws.com" } }],
  })
}
# Attach AWS managed policy for EMR EC2 role
resource "aws_iam_role_policy_attachment" "emr_ec2_policy_attach" {
  role       = aws_iam_role.emr_ec2_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role"
}

# This instance profile is used by the EC2 instances in BOTH clusters
resource "aws_iam_instance_profile" "emr_instance_profile" {
  name = "emr_ec2_instance_profile"
  role = aws_iam_role.emr_ec2_role.name
}

# ------------------------------------------------------------------
# PERSISTENT STREAMING CLUSTER (Managed by Terraform)
# This is the long-running cluster for our 24/7 real-time feature pipeline.
# ------------------------------------------------------------------

resource "aws_emr_cluster" "streaming_cluster" {
  name          = "ecom-propensity-streaming-cluster"
  release_label = "emr-6.9.0"
  applications  = ["Spark"]

  # This cluster is kept alive
  keep_job_flow_alive_when_no_steps = true
  termination_protection            = true

  ec2_attributes {
    instance_profile = aws_iam_instance_profile.emr_instance_profile.arn
    # Additional networking configurations (subnet_id, security_groups) go here
  }

  master_instance_group {
    instance_type  = "m5.xlarge"
    instance_count = 1
  }

  core_instance_group {
    instance_type  = "m5.xlarge"
    instance_count = 2 # Start with 2 and autoscale
    # Autoscaling configurations would be defined here
  }

  # The service and job flow roles defined above are used here
  service_role = aws_iam_role.emr_service_role.arn

  tags = {
    Project = "EcomPropensity"
    Type    = "PersistentStreaming"
  }
}

# Note: The Airflow DAG does NOT reference this resource directly. 
# It creates its own separate, transient cluster but uses the same IAM roles and profiles.

Python Scripts

# /feature_repo/user_features.py
from google.protobuf.duration_pb2 import Duration
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String

# Define the user entity
user = Entity(name="user_id", join_keys=["user_id"])

# Define the source of our batch features
# This points to the S3 "Gold" bucket where our Spark job will write its output
batch_feature_source = FileSource(
    path="s3://ecom-propensity-gold-features/user_features/",
    event_timestamp_column="event_timestamp",
    created_timestamp_column="created_timestamp",
)

# Define the Feature View for user-level historical features
user_features_view = FeatureView(
    name="user_historical_features",
    entities=[user],
    ttl=Duration(seconds=86400 * 90),  # 90 days
    schema=[
        Field(name="lifetime_purchase_count", dtype=Int64),
        Field(name="avg_order_value_90d", dtype=Float32),
        Field(name="days_since_last_purchase", dtype=Int64),
        Field(name="preferred_product_category", dtype=String),
    ],
    online=True,
    source=batch_feature_source,
    tags={"owner": "ml_team", "pipeline": "batch"},
)
import logging
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, max, avg, count, datediff, lit, window
from pyspark.sql.types import StructType

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def compute_user_features(spark: SparkSession, silver_df: DataFrame) -> DataFrame:
    """Computes historical features for each user."""
    
    logging.info("Computing historical user features...")
    
    # Ensure all timestamps are handled correctly
    purchase_events = silver_df.filter(col("event_type") == "purchase")
    
    # Lifetime purchase count
    lifetime_purchases = purchase_events.groupBy("user_id").agg(
        count("event_id").alias("lifetime_purchase_count")
    )

    # Average order value over the last 90 days
    ninety_days_ago = datetime.utcnow() - timedelta(days=90)
    aov_90d = purchase_events.filter(col("event_timestamp") >= lit(ninety_days_ago))\
        .groupBy("user_id")\
        .agg(avg("price").alias("avg_order_value_90d"))

    # Days since last purchase
    days_since_purchase = purchase_events.groupBy("user_id").agg(
        max("event_timestamp").alias("last_purchase_ts")
    ).withColumn(
        "days_since_last_purchase",
        datediff(lit(datetime.utcnow()), col("last_purchase_ts"))
    )

    # Join all features together
    features_df = lifetime_purchases\
        .join(aov_90d, "user_id", "left")\
        .join(days_since_purchase.select("user_id", "days_since_last_purchase"), "user_id", "left")

    # Add other features like preferred_product_category
    # ... (logic elided for brevity) ...

    # Add timestamp columns required by Feast
    final_df = features_df.withColumn("event_timestamp", lit(datetime.utcnow()))\
                           .withColumn("created_timestamp", lit(datetime.utcnow()))
    
    logging.info("Finished computing user features.")
    return final_df

if __name__ == '__main__':
    # This block runs when the script is submitted to Spark
    
    # Get S3 paths from arguments passed by Airflow
    # For example: --silver-path s3://... --output-path s3://...
    
    spark = SparkSession.builder.appName("BatchFeatureEngineering").getOrCreate()
    
    # silver_df = spark.read.parquet(silver_path)
    # features_df = compute_user_features(spark, silver_df)
    # features_df.write.mode("overwrite").parquet(output_path)
    
    spark.stop()

Unit Tests

import pytest
from pyspark.sql import SparkSession
from datetime import datetime, timedelta

# Add src to path to allow imports
import sys
sys.path.append('src/feature_engineering')

from batch_features import compute_user_features

@pytest.fixture(scope="session")
def spark_session():
    """Creates a Spark session for testing."""
    return SparkSession.builder.master("local[2]").appName("PytestSpark").getOrCreate()

def test_compute_user_features(spark_session):
    """Test the feature computation logic with sample data."""
    # Create sample data representing events
    utc_now = datetime.utcnow()
    yesterday_utc = utc_now - timedelta(days=1)
    
    # User 1: One purchase yesterday
    # User 2: Two purchases, one today, one 100 days ago
    mock_data = [
        ("evt1", "purchase", "user1", "prodA", 10.0, yesterday_utc),
        ("evt2", "purchase", "user2", "prodB", 20.0, utc_now),
        ("evt3", "purchase", "user2", "prodC", 30.0, utc_now - timedelta(days=100)),
        ("evt4", "page_view", "user1", "prodB", None, utc_now),
    ]
    schema = ["event_id", "event_type", "user_id", "product_id", "price", "event_timestamp"]
    silver_df = spark_session.createDataFrame(mock_data, schema)
    
    # Compute features
    features_df = compute_user_features(spark_session, silver_df)
    features_map = {row['user_id']: row for row in features_df.collect()}
    
    # Assertions for User 1
    assert features_map['user1']['lifetime_purchase_count'] == 1
    assert features_map['user1']['days_since_last_purchase'] == 1
    
    # Assertions for User 2
    assert features_map['user2']['lifetime_purchase_count'] == 2
    # Only the recent purchase is in the 90-day window
    assert features_map['user2']['avg_order_value_90d'] == 20.0
    assert features_map['user2']['days_since_last_purchase'] == 0

Pipeline (Airflow DAG)

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import EmrCreateJobFlowOperator, EmrAddStepsOperator, EmrTerminateJobFlowOperator
from airflow.providers.great_expectations.operators.great_expectations import GreatExpectationsOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

# Define the Spark step for EMR
spark_steps = [
    {
        "Name": "ComputeBatchFeatures",
        "ActionOnFailure": "TERMINATE_CLUSTER",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode", "cluster",
                "s3://ecom-propensity-airflow-artifacts/scripts/batch_features.py",
                "--silver-path", "s3://ecom-propensity-silver/...",
                "--output-path", "s3://ecom-propensity-gold-features/user_features_temp/"
            ],
        },
    }
]

# Define the EMR cluster configuration
job_flow_overrides = {
    "Name": "ecom-propensity-batch-feature-emr",
    "ReleaseLabel": "emr-6.9.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {"Name": "Master nodes", "Market": "ON_DEMAND", "InstanceRole": "MASTER", "InstanceType": "m5.xlarge", "InstanceCount": 1},
            {"Name": "Core nodes", "Market": "ON_DEMAND", "InstanceRole": "CORE", "InstanceType": "m5.xlarge", "InstanceCount": 2},
        ],
        "KeepJobFlowAliveWhenNoSteps": False,
        "TerminationProtected": False,
    },
    "VisibleToAllUsers": True,
    "JobFlowRole": "EMR_EC2_DefaultRole",
    "ServiceRole": "EMR_DefaultRole",
}

with DAG(
    dag_id='batch_feature_engineering',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False,
    tags=['feature-engineering'],
) as dag:

    # 1. Create a transient EMR cluster
    cluster_creator = EmrCreateJobFlowOperator(
        task_id="create_emr_cluster",
        job_flow_overrides=job_flow_overrides,
        aws_conn_id="aws_default",
        emr_conn_id="emr_default",
    )

    # 2. Add the Spark job step
    step_adder = EmrAddStepsOperator(
        task_id="run_spark_job",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        steps=spark_steps,
    )

    # 3. Validate the output data
    data_validator = GreatExpectationsOperator(
        task_id="validate_features",
        expectation_suite_name="user_features.warning", # Example suite
        data_context_root_dir="/usr/local/airflow/great_expectations",
        data_asset_name="s3://ecom-propensity-gold-features/user_features_temp/"
    )
    
    # 4. Materialize features to the online store
    feast_materialize = BashOperator(
        task_id="feast_materialize",
        bash_command="cd /usr/local/airflow/feature_repo && feast materialize-incremental $(date -u +'%Y-%m-%dT%H:%M:%SZ')",
    )

    # 5. Terminate the EMR cluster
    cluster_remover = EmrTerminateJobFlowOperator(
        task_id="terminate_emr_cluster",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        trigger_rule="all_done", # Run whether previous steps succeed or fail
    )
    
    # Define dependencies
    cluster_creator >> step_adder >> data_validator >> feast_materialize >> cluster_remover

Integration Tests

The purpose of this test is not to verify the Spark logic (that’s the unit test’s job), but to verify that the entire orchestrated workflow runs correctly in a deployed environment. It tests the interactions between Airflow, EMR, S3, Great Expectations, and Feast.

1. How and When the Test is Run

This test is fundamentally different from a unit test and cannot be run in a simple CI runner.

  • Environment: It is designed to run against a fully deployed staging environment that mirrors production.

  • Trigger: It is executed as a job in our Continuous Deployment (CD) pipeline in GitHub Actions, immediately after the Airflow DAG and its related scripts have been deployed to the staging environment.

  • Principle: It acts as a final “smoke test” or “health check” to confirm that the deployment was successful and the pipeline is operational before it’s ever run on real production data.

2. Required Setup (Prerequisites)

  1. A small, static, and version-controlled sample of input data (sample_silver_data.parquet) must be present in a known S3 bucket in the staging account.

  2. The integration test runner (e.g., a GitHub Actions runner) must have AWS credentials for the staging environment.

  3. The runner must have API access to the staging Airflow instance.

import os
import time
import logging
import pytest
from feast import FeatureStore
from airflow_client.client import Client
from airflow_client.client.api import dag_run_api
from airflow_client.client.model.dag_run import DAGRun
from airflow_client.client.model.error import Error
from pprint import pprint

# Configure logging
logging.basicConfig(level=logging.INFO)

# --- Test Configuration ---
# These would be fetched from environment variables in a CI/CD runner
AIRFLOW_HOST = os.environ.get("STAGING_AIRFLOW_HOST", "http://localhost:8080/api/v1")
AIRFLOW_USERNAME = os.environ.get("STAGING_AIRFLOW_USERNAME", "airflow")
AIRFLOW_PASSWORD = os.environ.get("STAGING_AIRFLOW_PASSWORD", "airflow")

DAG_ID = "batch_feature_engineering"
STAGING_FEAST_REPO_PATH = "feature_repo/"

# The specific user we will check for in the feature store after the pipeline runs
TEST_USER_ID = "user_for_integration_test" 
# The expected value for this user based on our sample data
EXPECTED_PURCHASE_COUNT = 5 

# --- Pytest Marker ---
@pytest.mark.integration
def test_batch_feature_pipeline_end_to_end():
    """
    Triggers the batch feature engineering DAG in a staging environment and
    validates that the final feature values are correctly written to the
    online feature store.
    """
    # --- 1. SETUP: Initialize API clients ---
    api_client = Client(host=AIRFLOW_HOST, user=AIRFLOW_USERNAME, passwd=AIRFLOW_PASSWORD)
    
    # --- 2. TRIGGER: Start a new DAG Run ---
    logging.info(f"Triggering DAG: {DAG_ID}")
    dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
    
    try:
        # Trigger the DAG with a specific conf to override the default paths
        # This points the job to our small, static test dataset
        api_response = dag_run_api_instance.post_dag_run(
            dag_id=DAG_ID,
            dag_run=DAGRun(
                conf={
                    "input_path": "s3://ecom-propensity-staging-data/sample_silver_data.parquet",
                    "output_path": f"s3://ecom-propensity-staging-gold/test_run_{int(time.time())}/"
                }
            )
        )
        dag_run_id = api_response.dag_run_id
        logging.info(f"Successfully triggered DAG run with ID: {dag_run_id}")
    except Exception as e:
        pytest.fail(f"Failed to trigger DAG run: {e}")

    # --- 3. POLL: Wait for the DAG Run to complete ---
    wait_for_dag_run_completion(api_client, dag_run_id)

    # --- 4. VALIDATE: Check the final state in the Feature Store ---
    logging.info("DAG run complete. Validating results in Feast online store...")
    fs = FeatureStore(repo_path=STAGING_FEAST_REPO_PATH)
    
    feature_vector = fs.get_online_features(
        features=[
            "user_historical_features:lifetime_purchase_count",
        ],
        entity_rows=[{"user_id": TEST_USER_ID}],
    ).to_dict()

    logging.info(f"Retrieved feature vector: {feature_vector}")

    # Assert that the feature was updated with the correct value
    assert feature_vector["lifetime_purchase_count"][0] == EXPECTED_PURCHASE_COUNT, \
        f"Feature validation failed for user {TEST_USER_ID}!"
    
    logging.info("Integration test passed successfully!")


def wait_for_dag_run_completion(api_client: Client, dag_run_id: str, timeout_seconds: int = 600):
    """Polls the Airflow API until the DAG run is complete or timeout is reached."""
    dag_run_api_instance = dag_run_api.DAGRunApi(api_client)
    start_time = time.time()
    
    while time.time() - start_time < timeout_seconds:
        try:
            api_response = dag_run_api_instance.get_dag_run(dag_id=DAG_ID, dag_run_id=dag_run_id)
            state = api_response.state
            logging.info(f"Polling DAG run {dag_run_id}. Current state: {state}")
            
            if state == "success":
                return
            elif state == "failed":
                pytest.fail(f"DAG run {dag_run_id} failed.")
            
            time.sleep(30) # Wait 30 seconds between polls
        except Exception as e:
            pytest.fail(f"API error while polling for DAG run status: {e}")
            
    pytest.fail(f"Timeout: DAG run {dag_run_id} did not complete within {timeout_seconds} seconds.")

CI/CD Workflow

name: CI/CD for Batch Feature Pipeline

on:
  push:
    branches: [ staging ] # This workflow runs when code is pushed to the 'staging' branch

jobs:
  deploy_to_staging:
    name: Deploy All Artifacts to Staging
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read
    steps:
      - uses: actions/checkout@v3
      - uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/github-actions-deploy-role
          aws-region: eu-west-1
      
      - name: Deploy Airflow DAGs and Scripts to S3
        run: |
          aws s3 sync pipelines/dags/ s3://ecom-propensity-staging-airflow/dags/
          aws s3 sync src/ s3://ecom-propensity-staging-airflow/src/
      
      - name: Apply Feast Feature Definitions
        run: |
          pip install feast
          cd feature_repo
          feast apply # This would be configured to point to the staging registry

  run-integration-test:
    name: Run End-to-End Integration Test
    runs-on: ubuntu-latest
    needs: deploy_to_staging # This job only runs after the deployment job succeeds
    
    steps:
      - uses: actions/checkout@v3
      - uses: actions/setup-python@v4
        with: { python-version: '3.9' }

      - name: Install Test Dependencies
        run: |
          pip install pytest feast apache-airflow-client
      
      - name: Execute Integration Test
        env:
          STAGING_AIRFLOW_HOST: ${{ secrets.STAGING_AIRFLOW_HOST }}
          STAGING_AIRFLOW_USERNAME: ${{ secrets.STAGING_AIRFLOW_USERNAME }}
          STAGING_AIRFLOW_PASSWORD: ${{ secrets.STAGING_AIRFLOW_PASSWORD }}
        run: |
          pytest tests/integration/test_batch_feature_pipeline.py

The Two EMR Cluster Patterns

Our project uses two types of EMR clusters for two very different jobs, and we use the best tool to manage each one:

1. The Transient Batch Cluster (Handled by Airflow)

  • Purpose: To run our daily batch feature engineering job. This job is resource-intensive but only needs to run for a few hours each day.

  • Provisioning Method: The Airflow DAG uses the EmrCreateJobFlowOperator.

  • Lifecycle: Transient/Ephemeral. The cluster is created on-demand at the start of the DAG run, performs its task, and is immediately terminated by the EmrTerminateJobFlowOperator.

  • Why use Airflow for this? Cost Optimization. It is extremely cost-effective. We only pay for the EMR cluster for the 1-2 hours it’s actually running the Spark job. If we used a persistent cluster for this, we would be paying for it to sit idle for the other 22-23 hours of the day.

2. The Persistent Streaming Cluster (Defined in Terraform)

  • Purpose: To run our 24/7 Spark Structured Streaming job for real-time feature engineering. This is a long-running, continuous application that must always be on to process events from Kinesis as they arrive.

  • Provisioning Method: It is a core piece of our infrastructure, just like a database, so it is defined in Terraform.

  • Lifecycle: Persistent/Long-Lived. Terraform creates it once, and it stays running. We manage its state and configuration through our IaC.

  • Why use Terraform for this? Infrastructure as Code (IaC) Best Practices. Long-running, foundational infrastructure should be explicitly declared, version-controlled, and managed via an IaC tool like Terraform. This ensures its state is predictable and auditable. It would be an anti-pattern to have an Airflow DAG launch a “permanent” cluster.

What is the emr.tf file really for?

The emr.tf file has two critical responsibilities:

  1. Defining Foundational Components (Roles & Security): It creates the IAM roles, instance profiles, and security configurations that are required by ANY EMR cluster, regardless of whether it’s the transient batch cluster or the persistent streaming one. Airflow needs to assume these roles to get permission to create its transient cluster.

  2. Defining the Persistent Streaming Cluster: The emr.tf file is the correct place to define the long-running EMR cluster dedicated to our real-time pipeline.

Summary Table

Transient Batch Cluster

Persistent Streaming Cluster

Purpose

Daily historical feature calculation

24/7 real-time feature processing

Provisioning

Airflow (EmrCreateJobFlowOperator)

Terraform (aws_emr_cluster)

Lifecycle

Ephemeral (created & destroyed daily)

Long-lived (stateful infrastructure)

Cost Model

Pay-per-use (very low cost)

Always-on (a primary cost driver)

Primary Use Case

The Batch Feature Engineering Pipeline

The Real-Time Feature Pipeline

In summary, we are deliberately using two different, purpose-built strategies for our compute infrastructure, a common and effective pattern in production MLOps.


Feature Engineering: Streaming Pipeline

Architecture

Python Scripts

from google.protobuf.duration_pb2 import Duration
from feast import Entity, FeatureView, Field, FileSource, ValueType

# Define session as an entity (can be joined with user_id)
session = Entity(name="session_id", value_type=ValueType.STRING)

# The source for these features is the PARQUET data being archived to S3
# by the streaming job. This is what Feast uses for creating training data.
stream_feature_source = FileSource(
    path="s3://ecom-propensity-gold-features/session_features/",
    event_timestamp_column="event_timestamp",
)

# Define the Feature View for real-time session features
session_features_view = FeatureView(
    name="session_streaming_features",
    entities=[session],
    ttl=Duration(seconds=86400 * 2),  # 2 days
    schema=[
        Field(name="session_duration_seconds", dtype=Int64),
        Field(name="distinct_products_viewed_count", dtype=Int64),
        Field(name="add_to_cart_count", dtype=Int64),
    ],
    online=True,
    source=stream_feature_source,
    tags={"owner": "ml_team", "pipeline": "streaming"},
)
import logging
import json
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType

import redis
from feast import FeatureStore

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Schema for the incoming JSON data from Kinesis
EVENT_SCHEMA = StructType([
    StructField("event_type", StringType()),
    Field("product_id", StringType()),
    Field("session_id", StringType(), False), # session_id is required
    Field("event_timestamp", TimestampType()),
    # ... other fields
])

def update_session_state(session_id, new_events, state):
    """The core stateful logic for updating session features."""
    
    # Get current state or initialize a new one
    if state.exists:
        current_state = state.get
    else:
        current_state = {
            "session_start_time": datetime.utcnow(),
            "distinct_products_viewed": set(),
            "add_to_cart_count": 0
        }

    # Iterate through new events and update state
    for event in new_events:
        if event.product_id:
            current_state["distinct_products_viewed"].add(event.product_id)
        if event.event_type == 'add_to_cart':
            current_state["add_to_cart_count"] += 1

    # Update state object and set a timeout to prevent infinite state growth
    state.update(current_state)
    state.setTimeoutDuration("24 hours") # Evict state if no events for 24 hours

    # Yield the updated features
    yield (session_id, 
           (datetime.utcnow() - current_state["session_start_time"]).seconds, 
           len(current_state["distinct_products_viewed"]), 
           current_state["add_to_cart_count"],
           datetime.utcnow())

def write_to_feast_online_store(df: DataFrame, epoch_id: int):
    """Writes a micro-batch of features to the Feast Redis store."""
    logging.info(f"Writing batch {epoch_id} to online store...")
    
    # Using feast's push mechanism is one way, another is direct to Redis
    # For performance, direct Redis client is often better.
    # Note: Connection pooling should be used in a real production job.
    r = redis.Redis(host='your-redis-endpoint', port=6379, db=0)
    
    for row in df.rdd.collect():
        entity_key = f"session_id:{row.session_id}"
        feature_payload = {
            "session_duration_seconds": row.session_duration_seconds,
            "distinct_products_viewed_count": row.distinct_products_viewed_count,
            "add_to_cart_count": row.add_to_cart_count
        }
        # In Redis, features are often stored in a Hash
        r.hset(entity_key, mapping=feature_payload)
    logging.info(f"Finished writing batch {epoch_id}.")


if __name__ == '__main__':
    spark = SparkSession.builder.appName("StreamingFeatureEngineering").getOrCreate()

    # Read from Kinesis
    kinesis_df = spark.readStream \
        .format("kinesis") \
        .option("streamName", "processed-events-stream") \
        .option("startingPosition", "latest") \
        .load()

    # Parse JSON data and apply schema
    json_df = kinesis_df.selectExpr("CAST(data AS STRING) as json") \
        .select(from_json(col("json"), EVENT_SCHEMA).alias("data")) \
        .select("data.*")

    # Apply the stateful transformation
    features_df = json_df.groupBy("session_id").flatMapGroupsWithState(
        outputMode="update",
        stateFormatVersion="2",
        timeoutConf="eventTimeTimeout",
        func=update_session_state
    ).toDF(["session_id", "session_duration_seconds", "distinct_products_viewed_count", "add_to_cart_count", "event_timestamp"])

    # --- Write to Sinks ---
    # Sink 1: Write to Feast Online Store (Redis)
    query_online = features_df.writeStream \
        .foreachBatch(write_to_feast_online_store) \
        .option("checkpointLocation", "s3://ecom-propensity-checkpoints/online_sink") \
        .start()

    # Sink 2: Write to S3 for Feast Offline Store (Archival)
    query_offline = features_df.writeStream \
        .format("parquet") \
        .outputMode("append") \
        .option("path", "s3://ecom-propensity-gold-features/session_features/") \
        .option("checkpointLocation", "s3://ecom-propensity-checkpoints/offline_sink") \
        .start()

    spark.streams.awaitAnyTermination()

Unit Tests

from unittest.mock import MagicMock
import pytest
from datetime import datetime

import sys
sys.path.append('src/feature_engineering')
from streaming_features import update_session_state

@pytest.fixture
def mock_spark_state():
    """Mocks the Spark state object."""
    state_store = {}
    
    mock_state = MagicMock()
    
    def exists_func():
        return bool(state_store)
        
    def get_func():
        return state_store.get('state')

    def update_func(new_state):
        state_store['state'] = new_state

    mock_state.exists = property(fget=exists_func)
    mock_state.get = property(fget=get_func)
    mock_state.update = update_func

    return mock_state

def test_update_session_state_new_session(mock_spark_state):
    """Test the initialization and update of a new session."""
    
    # Mock an incoming event
    MockEvent = type("MockEvent", (), {"product_id": "prodA", "event_type": "page_view"})
    new_events = [MockEvent()]
    
    # Run the function
    results = list(update_session_state("sess1", iter(new_events), mock_spark_state))
    
    # Assert state was updated
    assert mock_spark_state.get['add_to_cart_count'] == 0
    assert "prodA" in mock_spark_state.get['distinct_products_viewed']
    
    # Assert correct output was yielded
    assert len(results) == 1
    assert results[0][2] == 1 # distinct_products_viewed_count

def test_update_session_state_existing_session(mock_spark_state):
    """Test updating an already existing session state."""
    
    # Pre-populate the state
    initial_state = {
        "session_start_time": datetime.utcnow(),
        "distinct_products_viewed": {"prodA"},
        "add_to_cart_count": 0
    }
    mock_spark_state.update(initial_state)

    # Mock a new event
    MockEvent = type("MockEvent", (), {"product_id": "prodB", "event_type": "add_to_cart"})
    new_events = [MockEvent()]

    # Run the function
    results = list(update_session_state("sess1", iter(new_events), mock_spark_state))

    # Assert state was updated correctly
    assert mock_spark_state.get['add_to_cart_count'] == 1
    assert len(mock_spark_state.get['distinct_products_viewed']) == 2

Pipeline (Airflow DAG)

from airflow import DAG
from airflow.providers.amazon.aws.sensors.emr import EmrClusterSensor
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from datetime import datetime

STREAMING_CLUSTER_ID = "j-STREAMINGCLUSTERID" # This should come from a Variable or SSM

def slack_alert_on_failure(context):
    """Send a Slack alert if the task fails."""
    alert = SlackWebhookOperator(
        task_id='slack_alert',
        http_conn_id='slack_connection', # Airflow connection to Slack webhook
        message=f"""
            :red_circle: High-Priority Alert: EMR Streaming Cluster is DOWN!
            *DAG*: {context.get('task_instance').dag_id}
            *Task*: {context.get('task_instance').task_id}
            *Cluster ID*: {STREAMING_CLUSTER_ID}
        """,
        channel='#mlops-alerts'
    )
    return alert.execute(context=context)

with DAG(
    dag_id='streaming_job_monitor',
    start_date=datetime(2023, 1, 1),
    schedule_interval='*/15 * * * *', # Run every 15 minutes
    catchup=False,
    on_failure_callback=slack_alert_on_failure,
    tags=['monitoring', 'streaming'],
) as dag:
    
    check_emr_cluster_health = EmrClusterSensor(
        task_id='check_emr_cluster_health',
        job_flow_id=STREAMING_CLUSTER_ID,
        target_states=['WAITING'], # 'WAITING' means the cluster is idle and ready for steps
        aws_conn_id='aws_default',
    )

Integration Tests

import os
import time
import json
import boto3
import pytest
from feast import FeatureStore

TEST_SESSION_ID = f"test-session-{int(time.time())}"
TEST_PRODUCT_ID = "prod-for-stream-test"

@pytest.mark.integration
def test_streaming_pipeline_end_to_end():
    """
    Pushes a synthetic event to the raw Kinesis stream and validates
    that the feature eventually appears in the Feast online store.
    """
    # --- 1. SETUP ---
    kinesis_client = boto3.client("kinesis", region_name="eu-west-1")
    raw_stream_name = "raw-events-stream"
    
    # --- 2. SEND EVENT ---
    event_payload = {
        "event_id": "evt-stream-test",
        "event_type": "add_to_cart",
        "product_id": TEST_PRODUCT_ID,
        "user_id": "user-stream-test",
        "session_id": TEST_SESSION_ID,
        "client_timestamp": datetime.utcnow().isoformat() + "Z"
    }
    
    kinesis_client.put_record(
        StreamName=raw_stream_name,
        Data=json.dumps(event_payload).encode('utf-8'),
        PartitionKey=TEST_SESSION_ID
    )
    logging.info(f"Sent test event for session {TEST_SESSION_ID} to Kinesis.")

    # --- 3. WAIT & POLL ---
    # Wait for the event to propagate through ingestion lambda and Spark streaming
    time.sleep(90) # This wait time depends on the streaming trigger interval

    # --- 4. VALIDATE in FEAST ---
    logging.info("Polling Feast online store for updated feature...")
    fs = FeatureStore(repo_path="feature_repo/")
    
    feature_vector = fs.get_online_features(
        features=["session_streaming_features:add_to_cart_count"],
        entity_rows=[{"session_id": TEST_SESSION_ID}],
    ).to_dict()

    logging.info(f"Retrieved feature vector: {feature_vector}")
    
    assert feature_vector["add_to_cart_count"][0] >= 1, \
        "Feature was not updated correctly in the online store!"

CI/CD Workflow

name: Deploy Streaming Feature Engineering Job

on:
  push:
    branches: [ main ]
    paths:
      - 'src/feature_engineering/streaming_features.py'

jobs:
  deploy_streaming_job:
    name: Deploy and Submit Spark Streaming Job
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read
    steps:
      - uses: actions/checkout@v3
      - uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/github-actions-deploy-role
          aws-region: eu-west-1

      - name: Upload Spark script to S3
        run: |
          aws s3 cp src/feature_engineering/streaming_features.py s3://ecom-propensity-airflow-artifacts/scripts/
      
      - name: Find and Terminate Existing Streaming Step (if any)
        run: |
          # In a production system, you need logic to gracefully update the job.
          # A common strategy is to find the old step ID and cancel it before submitting the new one.
          # This is complex and depends on naming conventions.
          echo "Finding and terminating existing job steps..."

      - name: Submit New Spark Job to EMR Cluster
        id: submit_job
        run: |
          CLUSTER_ID=$(aws emr list-clusters --active --query "Clusters[?Name=='ecom-propensity-streaming-cluster'].Id" --output text)
          if [ -z "$CLUSTER_ID" ]; then
            echo "::error::Persistent EMR cluster not found!"
            exit 1
          fi
          
          aws emr add-steps --cluster-id $CLUSTER_ID --steps Type=spark,Name="Realtime Feature Engineering",ActionOnFailure=CONTINUE,Args=[--deploy-mode,client,s3://ecom-propensity-airflow-artifacts/scripts/streaming_features.py]

      - name: Run Integration Test
        if: success()
        run: |
          pip install pytest boto3 feast
          pytest tests/integration/test_streaming_feature_pipeline.py

Model Training pipeline

Architecture

IaC (Terraform)

# IAM Role that SageMaker Training Jobs will assume
resource "aws_iam_role" "sagemaker_training_role" {
  name = "sagemaker-training-execution-role"
  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [{
      Action    = "sts:AssumeRole",
      Effect    = "Allow",
      Principal = { Service = "sagemaker.amazonaws.com" },
    }],
  })
}

# Policy allowing access to S3, ECR, and CloudWatch Logs
resource "aws_iam_policy" "sagemaker_training_policy" {
  name = "sagemaker-training-policy"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect   = "Allow",
        Action   = ["s3:GetObject", "s3:PutObject", "s3:ListBucket"],
        Resource = ["arn:aws:s3:::ecom-propensity-*", "arn:aws:s3:::ecom-propensity-*/*"],
      },
      {
        Effect   = "Allow",
        Action   = ["ecr:GetAuthorizationToken", "ecr:BatchGetImage", "ecr:GetDownloadUrlForLayer"],
        Resource = "*",
      },
      {
        Effect   = "Allow",
        Action   = ["logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents"],
        Resource = "arn:aws:logs:*:*:*",
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "sagemaker_training_attach" {
  role       = aws_iam_role.sagemaker_training_role.name
  policy_arn = aws_iam_policy.sagemaker_training_policy.arn
}

# Define an ECR repository to store our training container
resource "aws_ecr_repository" "training_repo" {
  name = "ecom-propensity/training"
}

Python Scripts

import os
import argparse
import logging
import pandas as pd
import lightgbm as lgb
import mlflow
from sklearn.model_selection import train_test_split

logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
logger = logging.getLogger()

def main():
    # --- MLflow setup ---
    # The MLFLOW_TRACKING_URI is set by the SageMaker operator in Airflow
    mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
    mlflow.set_experiment("purchase-intent-training")

    # --- Parse arguments ---
    # SageMaker passes hyperparameters and data paths as command-line arguments.
    parser = argparse.ArgumentParser()
    parser.add_argument("--n_estimators", type=int, default=200)
    parser.add_argument("--learning_rate", type=float, default=0.1)
    # SageMaker environment variables for data channels
    parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))
    args = parser.parse_args()

    # --- Data Loading ---
    logger.info("Loading training data...")
    df = pd.read_parquet(args.train)
    X = df.drop("target", axis=1)
    y = df["target"]
    X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

    # --- MLflow Tracking ---
    with mlflow.start_run() as run:
        run_id = run.info.run_id
        logger.info(f"Started MLflow Run: {run_id}")
        mlflow.log_params(vars(args))

        # --- Model Training ---
        logger.info("Training LightGBM model...")
        model = lgb.LGBMClassifier(
            n_estimators=args.n_estimators,
            learning_rate=args.learning_rate,
            objective='binary'
        )
        model.fit(
            X_train, y_train,
            eval_set=[(X_val, y_val)],
            callbacks=[lgb.early_stopping(10, verbose=False), lgb.log_evaluation(period=0)]
        )

        # --- Log artifacts & metrics ---
        val_auc = model.best_score_['valid_0']['binary_logloss'] # Example metric
        mlflow.log_metric("validation_auc", val_auc)
        mlflow.lightgbm.log_model(model, "model")
        logger.info(f"Logged model with Validation AUC: {val_auc}")

if __name__ == "__main__":
    main()

This script is the second validation gate. It runs more complex, business-logic-oriented tests to ensure the model behaves as expected in edge cases. It exits with a non-zero status code if any test fails, which will cause the Airflow task to fail.

# /src/model_training/test.py
import sys
import argparse
import logging
import pandas as pd
import numpy as np
import mlflow

logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
logger = logging.getLogger()

def run_sliced_evaluation(model, X_test: pd.DataFrame, y_test: pd.Series, overall_auc: float) -> bool:
    """Evaluates model performance on critical data slices."""
    logger.info("--- Running Sliced Evaluation ---")
    
    slices = {
        "mobile_users": "device_type_mobile == 1",
        "desktop_users": "device_type_desktop == 1",
    }
    all_slices_passed = True

    for slice_name, query in slices.items():
        slice_idx = X_test.eval(query).to_numpy().nonzero()[0]
        if len(slice_idx) == 0:
            logger.warning(f"Slice '{slice_name}' is empty. Skipping.")
            continue
            
        slice_auc = roc_auc_score(y_test.iloc[slice_idx], model.predict_proba(X_test.iloc[slice_idx])[:, 1])
        logger.info(f"AUC for slice '{slice_name}': {slice_auc:.4f}")
        
        # Check if performance drops significantly on the slice
        if slice_auc < overall_auc * 0.95: # Allow for a 5% drop
            logger.error(f"FAIL: Performance on slice '{slice_name}' is significantly lower than overall performance.")
            all_slices_passed = False
    
    if all_slices_passed:
        logger.info("SUCCESS: All sliced evaluations passed.")
    return all_slices_passed

def run_behavioral_tests(model, X_test: pd.DataFrame) -> bool:
    """Runs behavioral tests like invariance and directional expectations."""
    logger.info("--- Running Behavioral Tests ---")
    all_tests_passed = True
    
    # --- Invariance Test ---
    # Prediction should not change if we alter a non-predictive ID
    test_record = X_test.iloc[[0]].copy()
    original_pred = model.predict_proba(test_record)[:, 1][0]
    
    test_record_modified = test_record.copy()
    # Assuming a feature like 'session_uuid' that shouldn't affect the outcome
    # If not present, we can skip or use another irrelevant feature
    if 'session_uuid' in test_record_modified.columns:
        test_record_modified['session_uuid'] = 12345 
        modified_pred = model.predict_proba(test_record_modified)[:, 1][0]
        if not np.isclose(original_pred, modified_pred):
            logger.error("FAIL: Invariance test failed. Prediction changed with session_uuid.")
            all_tests_passed = False
        else:
            logger.info("SUCCESS: Invariance test passed.")

    # --- Directional Expectation Test ---
    # Adding an 'add_to_cart' event should increase the propensity score
    # Find a record with a moderate number of add_to_cart events
    try:
        record_to_test = X_test[X_test['add_to_cart_count'] > 0].iloc[[0]].copy()
        base_pred = model.predict_proba(record_to_test)[:, 1][0]
        
        record_to_test['add_to_cart_count'] += 1
        higher_pred = model.predict_proba(record_to_test)[:, 1][0]
        
        if higher_pred <= base_pred:
            logger.error("FAIL: Directional test failed. Increasing add_to_cart_count did not increase score.")
            all_tests_passed = False
        else:
            logger.info("SUCCESS: Directional test passed.")
    except IndexError:
        logger.warning("Skipping directional test: no suitable test records found.")

    return all_tests_passed

def advanced_tests(run_id: str, test_data_path: str):
    mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])

    logger.info(f"Running advanced tests for model run: {run_id}")
    model = mlflow.lightgbm.load_model(f"runs:/{run_id}/model")
    
    df_test = pd.read_parquet(test_data_path)
    X_test = df_test.drop("target", axis=1)
    y_test = df_test["target"]

    # Get overall AUC from the MLflow run to use as a baseline
    client = mlflow.tracking.MlflowClient()
    overall_auc = client.get_run(run_id).data.metrics["test_auc"]

    sliced_passed = run_sliced_evaluation(model, X_test, y_test, overall_auc)
    behavioral_passed = run_behavioral_tests(model, X_test)

    if not (sliced_passed and behavioral_passed):
        logger.error("One or more advanced tests failed. Halting promotion.")
        sys.exit(1) # Exit with a non-zero code to fail the Airflow task
    
    logger.info("All advanced tests passed successfully.")
    sys.exit(0)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--run-id", type=str, required=True)
    parser.add_argument("--test-data-path", type=str, required=True)
    args = parser.parse_args()
    advanced_tests(args.run_id, args.test_data_path)

This is the final step. If all previous gates have passed, this script formally registers the model and promotes it to the “Staging” environment, making it available for the CI/CD deployment pipeline.

# /src/model_training/register.py
import os
import argparse
import logging
import mlflow

logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
logger = logging.getLogger()

def register_model(run_id: str, model_name: str):
    """
    Registers a new model version from an MLflow run and transitions it to Staging.
    
    Args:
        run_id (str): The MLflow run ID of the model to register.
        model_name (str): The name of the model in the MLflow Model Registry.
    """
    mlflow.set_tracking_uri(os.environ["MLFLOW_TRACKING_URI"])
    client = mlflow.tracking.MlflowClient()

    model_uri = f"runs:/{run_id}/model"
    logger.info(f"Registering model '{model_name}' from URI: {model_uri}")

    # Register the model, which creates a new version
    model_version_info = mlflow.register_model(model_uri, model_name)
    version = model_version_info.version
    logger.info(f"Successfully registered model version: {version}")

    # Add a description to the model version
    description = (
        f"This model (version {version}) was automatically promoted by the "
        f"training pipeline on {datetime.date.today()}. "
        f"Source run ID: {run_id}"
    )
    client.update_model_version(
        name=model_name,
        version=version,
        description=description
    )

    # Transition the new model version to the "Staging" stage
    logger.info(f"Transitioning model version {version} to 'Staging'...")
    client.transition_model_version_stage(
        name=model_name,
        version=version,
        stage="Staging",
        archive_existing_versions=True # Crucial for production!
    )
    logger.info("Transition to Staging complete.")


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--run-id", type=str, required=True)
    parser.add_argument("--model-name", type=str, required=True)
    args = parser.parse_args()
    register_model(args.run_id, args.model_name)

Unit Tests

import os
from unittest.mock import patch, MagicMock
import pytest
import pandas as pd

import sys
sys.path.append('src/model_training')
import train

@pytest.fixture
def mock_train_data():
    """Create a mock DataFrame for testing."""
    return pd.DataFrame({
        'feature1': range(100),
        'feature2': [i * 0.1 for i in range(100)],
        'target': [0] * 50 + [1] * 50
    })

@patch('train.mlflow')
@patch('train.lgb.LGBMClassifier')
@patch('pandas.read_parquet')
def test_main_training_logic(mock_read_parquet, mock_lgbm, mock_mlflow, mock_train_data):
    """Test the main training function's flow."""
    # Setup mocks
    mock_read_parquet.return_value = mock_train_data
    mock_lgbm_instance = MagicMock()
    mock_lgbm.return_value = mock_lgbm_instance
    os.environ['MLFLOW_TRACKING_URI'] = 'http://dummy-uri'
    os.environ['SM_CHANNEL_TRAIN'] = '/data'

    # Run the main function
    train.main()
    
    # Assertions
    mock_mlflow.set_experiment.assert_called_with("purchase-intent-training")
    mock_mlflow.start_run.assert_called_once()
    mock_lgbm_instance.fit.assert_called_once()
    mock_mlflow.log_params.assert_called()
    mock_mlflow.log_metric.assert_called_with("validation_auc", pytest.approx(0.693, 0.1)) # Check for some value
    mock_mlflow.lightgbm.log_model.assert_called_once()

Pipeline (Airflow DAG)

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTrainingOperator
from airflow.utils.dates import days_ago
from sagemaker.estimator import Estimator

# --- DAG Definition ---
with DAG(
    dag_id='model_training_pipeline',
    start_date=days_ago(1),
    schedule_interval='@weekly',
    catchup=False,
    tags=['training'],
) as dag:

    # Task 1: Get data from Feast
    def get_data_from_feast(**kwargs):
        # ... logic from src/model_training/data.py
        # Saves data to S3 and pushes path via XCom
        pass

    # Task 2: Validate data with Great Expectations
    def validate_data(**kwargs):
        # ... logic from src/model_training/validate.py
        pass

    # Task 3: Launch SageMaker Training Job
    # Define the estimator here for clarity
    sagemaker_estimator = Estimator(
        image_uri="<aws_account_id>.dkr.ecr.eu-west-1.amazonaws.com/ecom-propensity/training:latest",
        role="arn:aws:iam::<aws_account_id>:role/sagemaker-training-execution-role",
        instance_count=1,
        instance_type='ml.m5.large',
        # Pass hyperparameters
        hyperparameters={'n_estimators': 250, 'learning_rate': 0.05},
        # Pass environment variables
        environment={'MLFLOW_TRACKING_URI': 'http://your-mlflow-server:5000'}
    )
    
    train_model = SageMakerTrainingOperator(
        task_id='train_model',
        config={
            "TrainingJobName": "propensity-model-{{ ds_nodash }}",
            "AlgorithmSpecification": {
                "TrainingImage": sagemaker_estimator.image_uri,
                "TrainingInputMode": "File",
            },
            "RoleArn": sagemaker_estimator.role,
            # ... other SageMaker configs
        },
        inputs={'train': '{{ ti.xcom_pull(task_ids="get_data_task")["s3_path"] }}'}
    )

    # Task 4: Evaluate model and decide whether to proceed
    def evaluate_and_decide(**kwargs):
        # ... logic from src/model_training/evaluate.py
        # Compares new model run to prod model in MLflow Registry
        # if is_better:
        #    return 'run_advanced_tests'
        # else:
        #    return 'end_pipeline'
        pass
    
    branch_on_evaluation = BranchPythonOperator(
        task_id='check_evaluation',
        python_callable=evaluate_and_decide,
    )

    # Task 5: Run advanced behavioral and fairness tests
    def run_advanced_tests(**kwargs):
        # ... logic from src/model_training/test.py
        pass

    # Task 6: Register model in MLflow Staging
    def register_model(**kwargs):
        # ... logic from src/model_training/register.py
        pass

    # Define dependencies
    # get_data_task >> validate_data >> train_model >> branch_on_evaluation
    # branch_on_evaluation >> [run_advanced_tests_task, end_pipeline_task]
    # run_advanced_tests_task >> register_model_task

Integration Tests

import os
import time
import pytest
from airflow_client.client import Client
import mlflow

# --- Test Configuration ---
DAG_ID = "model_training_pipeline"
MLFLOW_TRACKING_URI = os.environ.get("STAGING_MLFLOW_URI")

@pytest.mark.integration
def test_training_pipeline_end_to_end():
    """Triggers the training DAG and validates that a new model version is created in Staging."""
    
    # --- 1. SETUP: Get current latest version ---
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    client = mlflow.tracking.MlflowClient()
    model_name = "PurchasePropensityModel"
    try:
        initial_versions = client.get_latest_versions(model_name, stages=["Staging"])
        initial_version_count = len(initial_versions)
    except mlflow.exceptions.RestException:
        initial_version_count = 0 # Model doesn't exist yet

    # --- 2. TRIGGER: Start a new DAG Run ---
    # (Code to trigger DAG via Airflow API, similar to feature pipeline test)
    # ...
    
    # --- 3. POLL: Wait for the DAG Run to complete ---
    # (Code to poll for DAG run completion)
    # ...
    
    # --- 4. VALIDATE: Check MLflow Model Registry ---
    final_versions = client.get_latest_versions(model_name, stages=["Staging"])
    final_version_count = len(final_versions)

    assert final_version_count > initial_version_count, \
        "Integration test failed: No new model version was moved to Staging."

CI/CD Workflow

name: CI/CD for Model Training Pipeline

on:
  push:
    branches: [ main ]
    paths:
      - 'src/model_training/**'
      - 'pipelines/dags/model_training_dag.py'
  pull_request:
    paths:
      - 'src/model_training/**'
      - 'pipelines/dags/model_training_dag.py'

jobs:
  build-and-test:
    name: Build Container & Run Tests
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v3
      - name: Run Unit Tests
        run: pytest tests/unit/test_training.py

      - name: Configure AWS credentials
        if: github.ref == 'refs/heads/main'
        uses: aws-actions/configure-aws-credentials@v2
        # ... credentials config ...

      - name: Login to Amazon ECR
        if: github.ref == 'refs/heads/main'
        id: login-ecr
        uses: aws-actions/amazon-ecr-login@v1

      - name: Build, tag, and push image to Amazon ECR
        if: github.ref == 'refs/heads/main'
        env:
          ECR_REGISTRY: ${{ steps.login-ecr.outputs.registry }}
          ECR_REPOSITORY: ecom-propensity/training
          IMAGE_TAG: latest
        run: |
          docker build -t $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG -f src/model_training/Dockerfile .
          docker push $ECR_REGISTRY/$ECR_REPOSITORY:$IMAGE_TAG

  deploy-dag:
    name: Deploy Airflow DAG
    runs-on: ubuntu-latest
    needs: build-and-test
    if: github.ref == 'refs/heads/main'
    steps:
      - uses: actions/checkout@v3
      - uses: aws-actions/configure-aws-credentials@v2
        # ... credentials config ...

      - name: Deploy DAG to S3
        run: |
          aws s3 cp pipelines/dags/model_training_dag.py s3://ecom-propensity-airflow-artifacts/dags/

Feature Engineering: Batch

Architecture

Python Scripts

import os
import logging
import pandas as pd
import lightgbm as lgb
from fastapi import FastAPI, Request, HTTPException
from pydantic import BaseModel
from feast import FeatureStore

# --- Logging Setup ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# --- FastAPI App Initialization ---
app = FastAPI()

# --- Model & Feature Store Loading ---
# This happens once at container startup
try:
    MODEL_PATH = "/opt/ml/model/model.joblib" # SageMaker's default model path
    model = pd.read_pickle(MODEL_PATH)
    logger.info("Successfully loaded model from %s", MODEL_PATH)
    
    # Initialize Feast feature store. Assumes feature_repo is packaged with the app.
    # The registry.db path needs to be accessible. For production, a shared path is better.
    fs = FeatureStore(repo_path=".") 
    logger.info("Successfully initialized Feast Feature Store.")
except Exception as e:
    logger.critical("Failed to load model or feature store: %s", e)
    model = None
    fs = None

# --- Pydantic Schemas for Request & Response ---
class PredictionRequest(BaseModel):
    user_id: str
    session_id: str

class PredictionResponse(BaseModel):
    propensity: float
    model_version: str = os.environ.get("MODEL_VERSION", "v0.0.0")

# --- API Endpoints ---
@app.get("/health")
async def health_check():
    """Health check endpoint for SageMaker to ping."""
    if model is None or fs is None:
        raise HTTPException(status_code=503, detail="Model or Feature Store not loaded")
    return {"status": "ok"}

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Main prediction endpoint."""
    if model is None or fs is None:
        raise HTTPException(status_code=503, detail="Model is not ready")

    try:
        # 1. Fetch online features from Feast (Redis)
        feature_vector = fs.get_online_features(
            features=[
                "user_historical_features:lifetime_purchase_count",
                "user_historical_features:avg_order_value_90d",
                "session_streaming_features:add_to_cart_count",
                # ... add all other features here
            ],
            entity_rows=[{"user_id": request.user_id, "session_id": request.session_id}],
        ).to_df()
        
        # 2. Prepare features for the model (drop IDs, ensure order)
        feature_df = feature_vector.drop(columns=["user_id", "session_id", "event_timestamp"])

        # 3. Get prediction
        prediction = model.predict_proba(feature_df)[:, 1][0]
        
        logger.info("Prediction successful for session %s", request.session_id)
        
        # 4. Return response
        return PredictionResponse(propensity=prediction)

    except Exception as e:
        logger.error("Error during prediction: %s", e)
        raise HTTPException(status_code=500, detail="Internal server error during prediction.")
# Use an official lightweight Python image.
FROM python:3.9-slim

# Set the working directory in the container
WORKDIR /app

# Set environment variables for SageMaker
ENV SAGEMAKER_PROGRAM app.py

# Copy the requirements file and install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# Copy the feature repository and the application code
COPY ./feature_repo ./feature_repo
COPY . .

# SageMaker will mount the model artifacts to /opt/ml/model
# The CMD is not needed, as SageMaker's entrypoint will run the app

# If running locally (not on SageMaker):
# CMD ["gunicorn", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "-b", "0.0.0.0:8080", "app:app"]

Unit Tests

from unittest.mock import patch, MagicMock
from fastapi.testclient import TestClient
import pandas as pd
import pytest

# Add src to path to allow imports
import sys
sys.path.append('src/serving')
from app import app

client = TestClient(app)

@patch('app.fs') # Mock the Feast FeatureStore object
def test_predict_success(mock_fs):
    """Test the happy path for the /predict endpoint."""
    # Mock the return value of get_online_features
    mock_feature_df = pd.DataFrame({
        "user_id": ["user123"],
        "session_id": ["sess456"],
        "lifetime_purchase_count": [5],
        "avg_order_value_90d": [99.5],
        "add_to_cart_count": [2],
        "event_timestamp": [pd.Timestamp.now()]
    })
    mock_fs.get_online_features.return_value.to_df.return_value = mock_feature_df

    # Mock the model object
    with patch('app.model') as mock_model:
        mock_model.predict_proba.return_value = [[0.2, 0.8]] # Mock output
        
        response = client.post(
            "/predict",
            json={"user_id": "user123", "session_id": "sess456"}
        )
        
        assert response.status_code == 200
        data = response.json()
        assert "propensity" in data
        assert 0.0 <= data["propensity"] <= 1.0
        assert data["propensity"] == 0.8

def test_health_check():
    """Test the /health endpoint."""
    response = client.get("/health")
    assert response.status_code == 200
    assert response.json() == {"status": "ok"}

IaC (Terraform)

resource "aws_ecr_repository" "serving_repo" {
  name = "ecom-propensity/serving"
}

resource "aws_sagemaker_model" "propensity_model" {
  name               = "propensity-model-${var.model_version}" # Versioned model
  execution_role_arn = aws_iam_role.sagemaker_training_role.arn # Reuse role for simplicity
  
  primary_container {
    image = "${aws_ecr_repository.serving_repo.repository_url}:${var.image_tag}"
    environment = {
      "MODEL_VERSION" = var.model_version
    }
  }
}

resource "aws_sagemaker_endpoint_configuration" "propensity_endpoint_config" {
  name = "propensity-endpoint-config-${var.model_version}"

  production_variants {
    variant_name           = "v1" # This would be the old variant in a canary
    model_name             = aws_sagemaker_model.propensity_model.name
    instance_type          = "ml.c5.xlarge"
    initial_instance_count = 2
    initial_variant_weight = 1.0 # In a canary, this would be updated
  }
  
  # When doing a canary, you would add a second production_variant block
  # for the new model and adjust the initial_variant_weight.
}

resource "aws_sagemaker_endpoint" "propensity_endpoint" {
  name                 = "propensity-endpoint"
  endpoint_config_name = aws_sagemaker_endpoint_configuration.propensity_endpoint_config.name
}

Integration Tests

import os
import json
import boto3
import pytest

SAGEMAKER_ENDPOINT_NAME = os.environ.get("STAGING_SAGEMAKER_ENDPOINT")

@pytest.mark.integration
def test_sagemaker_endpoint_invocation():
    """Sends a real request to the deployed SageMaker endpoint."""
    assert SAGEMAKER_ENDPOINT_NAME is not None, "STAGING_SAGEMAKER_ENDPOINT env var not set"

    sagemaker_runtime = boto3.client("sagemaker-runtime", region_name="eu-west-1")
    
    payload = {
        "user_id": "integration-test-user",
        "session_id": "integration-test-session"
    }
    
    response = sagemaker_runtime.invoke_endpoint(
        EndpointName=SAGEMAKER_ENDPOINT_NAME,
        ContentType='application/json',
        Body=json.dumps(payload)
    )
    
    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
    
    result = json.loads(response['Body'].read().decode())
    assert 'propensity' in result
    assert isinstance(result['propensity'], float)```

API Contract & Smoke Test

import os
import json
import time
import boto3
import pytest
from pydantic import BaseModel, ValidationError

# --- Test Configuration ---
SAGEMAKER_ENDPOINT_NAME = os.environ.get("STAGING_SAGEMAKER_ENDPOINT")
AWS_REGION = "eu-west-1"

# --- Pydantic Model for Response Validation ---
# This defines the "API Contract" we expect.
class ExpectedResponse(BaseModel):
    propensity: float
    model_version: str

# --- Pytest Fixtures ---
@pytest.fixture(scope="module")
def sagemaker_client():
    """Boto3 client for SageMaker service."""
    return boto3.client("sagemaker", region_name=AWS_REGION)

@pytest.fixture(scope="module")
def sagemaker_runtime_client():
    """Boto3 client for SageMaker runtime (invocations)."""
    return boto3.client("sagemaker-runtime", region_name=AWS_REGION)

# --- Test Cases ---
@pytest.mark.smoke
@pytest.mark.integration
def test_endpoint_is_in_service(sagemaker_client):
    """
    Smoke Test 1: Checks that the endpoint is deployed and healthy.
    This is our proxy for the /health check.
    """
    assert SAGEMAKER_ENDPOINT_NAME is not None, "STAGING_SAGEMAKER_ENDPOINT env var not set"
    
    try:
        response = sagemaker_client.describe_endpoint(EndpointName=SAGEMAKER_ENDPOINT_NAME)
        status = response["EndpointStatus"]
        assert status == "InService", f"Endpoint is not InService. Current status: {status}"
        
        # We can add a wait loop here if needed
        # ...
        
    except sagemaker_client.exceptions.ClientError as e:
        pytest.fail(f"Could not describe SageMaker endpoint. Error: {e}")

@pytest.mark.smoke
@pytest.mark.integration
def test_api_contract_and_schema(sagemaker_runtime_client):
    """
    Smoke Test 2: Invokes the endpoint and validates the response schema (API Contract).
    """
    payload = {
        "user_id": "contract-test-user",
        "session_id": "contract-test-session"
    }
    
    response = sagemaker_runtime_client.invoke_endpoint(
        EndpointName=SAGEMAKER_ENDPOINT_NAME,
        ContentType='application/json',
        Body=json.dumps(payload)
    )
    
    assert response['ResponseMetadata']['HTTPStatusCode'] == 200
    
    # Validate the response body against our Pydantic schema
    response_body = json.loads(response['Body'].read().decode())
    try:
        validated_response = ExpectedResponse(**response_body)
        assert 0.0 <= validated_response.propensity <= 1.0
    except ValidationError as e:
        pytest.fail(f"API contract validation failed. Response did not match schema: {e}")

CI/CD Workflow

name: Deploy Model to SageMaker

on:
  workflow_dispatch:
    inputs:
      model_version:
        description: 'MLflow Model Version (e.g., 5)'
        required: true
      image_tag:
        description: 'Docker image tag (e.g., v1.2.0)'
        required: true

jobs:
  deploy-to-staging:
    name: Build & Deploy to Staging
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read

    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/github-actions-deploy-role
          aws-region: eu-west-1
      
      - name: Download model artifact from MLflow
        # This step requires a script that uses the MLflow client
        # to download the model artifact for the specified version.
        run: |
          mkdir -p /tmp/model
          python scripts/download_model.py --model-name PurchasePropensity --version ${{ github.event.inputs.model_version }} --output-path /tmp/model/model.joblib

      - name: Build and push Docker image to ECR
        run: |
          # ... (logic to build /src/serving/Dockerfile, including the downloaded model, and push to ECR with the image_tag) ...

      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v2
      
      - name: Terraform Apply to Staging
        run: |
          cd infrastructure
          terraform init
          terraform apply -auto-approve \
            -var="model_version=${{ github.event.inputs.model_version }}" \
            -var="image_tag=${{ github.event.inputs.image_tag }}" \
            -var-file="staging.tfvars" # Use a workspace for staging

      - name: Run Integration Test on Staging Endpoint
        env:
          STAGING_SAGEMAKER_ENDPOINT: "propensity-endpoint" # This should be an output from Terraform
        run: |
          pip install -r tests/requirements.txt
          pytest tests/integration/test_inference_pipeline.py
  
  smoke-test-staging:
    name: Run API Contract & Smoke Test
    runs-on: ubuntu-latest
    needs: deploy-to-staging
    
    steps:
      - uses: actions/checkout@v3
      - uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/github-actions-deploy-role
          aws-region: eu-west-1
      
      - name: Install Test Dependencies
        run: pip install pytest boto3 pydantic
      
      - name: Execute Smoke & Contract Tests
        env:
          # This endpoint name should be an output from the 'deploy-to-staging' job
          STAGING_SAGEMAKER_ENDPOINT: ${{ needs.deploy-to-staging.outputs.endpoint_name }}
        run: pytest tests/integration/test_api_contract.py

Performance & Load Testing

The load test is a separate, longer-running workflow, often triggered manually or on a nightly schedule, not on every deployment.

import os
import json
import time
import boto3
from locust import User, task, between
from botocore.config import Config
import random

# --- Locust Configuration ---
AWS_REGION = "eu-west-1"
SAGEMAKER_ENDPOINT_NAME = os.environ.get("STAGING_SAGEMAKER_ENDPOINT")

class Boto3Client:
    """A wrapper for the Boto3 SageMaker client to be used in Locust."""
    def __init__(self, host=""):
        # Increase retries and timeouts for a production load test
        config = Config(
            retries={'max_attempts': 5, 'mode': 'standard'},
            connect_timeout=10,
            read_timeout=10
        )
        self.sagemaker_runtime = boto3.client(
            "sagemaker-runtime",
            region_name=AWS_REGION,
            config=config
        )

    def invoke_endpoint(self, payload):
        """Invoke endpoint and record the result in Locust."""
        request_meta = {
            "request_type": "sagemaker",
            "name": "invoke_endpoint",
            "start_time": time.time(),
            "response_length": 0,
            "response": None,
            "context": {},
            "exception": None,
        }
        start_perf_counter = time.perf_counter()
        
        try:
            response = self.sagemaker_runtime.invoke_endpoint(
                EndpointName=SAGEMAKER_ENDPOINT_NAME,
                ContentType='application/json',
                Body=payload
            )
            response_body = response['Body'].read()
            request_meta["response_length"] = len(response_body)
        except Exception as e:
            request_meta["exception"] = e
        
        request_meta["response_time"] = (time.perf_counter() - start_perf_counter) * 1000
        # This is where we fire the event for Locust to record
        events.request.fire(**request_meta)

class SageMakerUser(User):
    """
    Simulates a user making prediction requests to the SageMaker endpoint.
    """
    wait_time = between(0.1, 0.5) # Wait 100-500ms between requests
    
    def __init__(self, environment):
        super().__init__(environment)
        if SAGEMAKER_ENDPOINT_NAME is None:
            raise ValueError("STAGING_SAGEMAKER_ENDPOINT env var must be set for Locust test.")
        self.client = Boto3Client()

    @task
    def make_prediction(self):
        # Generate a random payload to simulate different users
        user_id = f"load-test-user-{random.randint(1, 10000)}"
        session_id = f"load-test-session-{random.randint(1, 50000)}"
        
        payload = json.dumps({
            "user_id": user_id,
            "session_id": session_id
        })
        
        self.client.invoke_endpoint(payload)

# Locust needs to be imported here for the event hook to work
from locust import events

How to run this test: This command simulates 100 users (-u 100), with each user starting 20 tasks per second (-r 20), for a total duration of 5 minutes (–run-time 5m), and saves an HTML report.

# Install Locust
pip install locust

# Set the environment variable
export STAGING_SAGEMAKER_ENDPOINT="propensity-endpoint"

# Run Locust from the command line
locust -f tests/performance/locustfile.py --headless -u 100 -r 20 --run-time 5m --html report.html
name: Run Performance Load Test on Staging

on:
  workflow_dispatch: # Allows manual triggering
  schedule:
    - cron: '0 2 * * *' # Runs every night at 2 AM UTC

jobs:
  run-load-test:
    name: Execute Locust Load Test
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read

    steps:
      - uses: actions/checkout@v3
      - uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/github-actions-deploy-role
          aws-region: eu-west-1
      
      - name: Install Dependencies
        run: pip install locust boto3
      
      - name: Run Locust Test
        env:
          STAGING_SAGEMAKER_ENDPOINT: ${{ secrets.STAGING_ENDPOINT_NAME }}
        run: |
          locust -f tests/performance/locustfile.py --headless \
            -u 200 -r 50 --run-time 10m \
            --csv report \
            --exit-code-on-error 1
      
      - name: Upload Locust Report
        uses: actions/upload-artifact@v3
        with:
          name: locust-report
          path: report*

A/B Testing

Architecture

IaC

variable "champion_model_name" {
  description = "The name of the currently deployed champion model."
  type        = string
}

variable "challenger_model_name" {
  description = "The name of the new challenger model for the A/B test. Can be empty."
  type        = string
  default     = ""
}

variable "challenger_weight" {
  description = "The percentage of traffic (0-100) to route to the challenger model."
  type        = number
  default     = 0
}

resource "aws_sagemaker_endpoint_configuration" "propensity_endpoint_config" {
  name = "propensity-endpoint-config-ab-test"
  # This lifecycle block prevents Terraform from destroying the old config before the new one is active
  lifecycle {
    create_before_destroy = true
  }

  # --- Champion Model Variant ---
  production_variants {
    variant_name           = "champion"
    model_name             = var.champion_model_name
    instance_type          = "ml.c5.xlarge"
    initial_instance_count = 2
    initial_variant_weight = 100 - var.challenger_weight
  }

  # --- Challenger Model Variant (Created Conditionally) ---
  dynamic "production_variants" {
    # This block is only created if a challenger_model_name is provided
    for_each = var.challenger_model_name != "" ? [1] : []
    content {
      variant_name           = "challenger"
      model_name             = var.challenger_model_name
      instance_type          = "ml.c5.xlarge"
      initial_instance_count = 2 # Start with same capacity for fair performance test
      initial_variant_weight = var.challenger_weight
    }
  }
}

resource "aws_sagemaker_endpoint" "propensity_endpoint" {
  name                 = "propensity-endpoint"
  endpoint_config_name = aws_sagemaker_endpoint_configuration.propensity_endpoint_config.name
}

Analysis Script

This script performs the statistical analysis of the experiment results. It would be run by an Airflow DAG after the experiment concludes.

import argparse
import pandas as pd
from scipy import stats

def analyze_experiment_results(data_path: str):
    """
    Loads experiment data and performs a T-test to determine the winner.
    
    Args:
        data_path (str): Path to the Parquet file containing experiment results.
                         Expected columns: `user_id`, `variant_name`, `converted` (0 or 1).
    """
    df = pd.read_parquet(data_path)
    
    control_group = df[df['variant_name'] == 'champion']
    treatment_group = df[df['variant_name'] == 'challenger']
    
    # --- Calculate metrics ---
    control_conversion_rate = control_group['converted'].mean()
    treatment_conversion_rate = treatment_group['converted'].mean()
    lift = (treatment_conversion_rate - control_conversion_rate) / control_conversion_rate
    
    # --- Perform Welch's T-test ---
    # More robust than standard T-test as it doesn't assume equal variance
    t_stat, p_value = stats.ttest_ind(
        treatment_group['converted'], 
        control_group['converted'], 
        equal_var=False
    )
    
    # --- Print Results ---
    print("--- A/B Test Analysis Report ---")
    print(f"Control (Champion) Conversion Rate: {control_conversion_rate:.4f}")
    print(f"Treatment (Challenger) Conversion Rate: {treatment_conversion_rate:.4f}")
    print(f"Relative Lift: {lift:+.2%}")
    print(f"\nP-value: {p_value:.5f}")
    
    if p_value < 0.05: # Using alpha = 0.05
        print("\nResult: Statistically Significant. The challenger model is the winner.")
    else:
        print("\nResult: Not Statistically Significant. We cannot conclude the challenger is better.")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--data-path", required=True)
    args = parser.parse_args()
    analyze_experiment_results(args.data_path)

Pipeline: Airflow DAG

This DAG is for analyzing the results after the A/B test has run for its designated period.

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id='ab_test_analysis',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None, # Triggered manually
    catchup=False,
    tags=['analysis', 'ab-testing'],
) as dag:
    
    run_analysis = BashOperator(
        task_id='run_statistical_analysis',
        bash_command=(
            "python /opt/airflow/src/analysis/run_ab_test_analysis.py "
            "--data-path s3://ecom-propensity-analytics/ab-test-results/experiment_XYZ.parquet"
        )
    )

CI/CD GitHub Actions Workflow

name: Start A/B Test on SageMaker

on:
  workflow_dispatch:
    inputs:
      champion_model_name:
        description: 'Name of the production (champion) SageMaker model'
        required: true
      challenger_model_name:
        description: 'Name of the new (challenger) SageMaker model'
        required: true
      challenger_traffic_split:
        description: 'Traffic % for challenger (e.g., 50 for a 50/50 split)'
        required: true
        default: '50'

jobs:
  deploy-ab-test:
    name: Deploy Challenger Variant for A/B Test
    runs-on: ubuntu-latest
    permissions:
      id-token: write
      contents: read

    steps:
      - name: Checkout code
        uses: actions/checkout@v3

      - name: Configure AWS credentials
        uses: aws-actions/configure-aws-credentials@v2
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/github-actions-deploy-role
          aws-region: eu-west-1
      
      # NOTE: Assumes the challenger model's container is already built and pushed to ECR
      # This workflow only handles the infrastructure update to start the test.

      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v2
      
      - name: Terraform Apply to Start A/B Test
        run: |
          cd infrastructure
          terraform init
          terraform apply -auto-approve \
            -var="champion_model_name=${{ github.event.inputs.champion_model_name }}" \
            -var="challenger_model_name=${{ github.event.inputs.challenger_model_name }}" \
            -var="challenger_weight=${{ github.event.inputs.challenger_traffic_split }}"

      - name: Run Smoke Test on BOTH Variants
        # This is a more advanced integration test that would need to be written.
        # It would invoke the endpoint multiple times, checking the headers
        # to confirm it gets responses from both the 'champion' and 'challenger' variants.
        run: echo "SMOKE TEST: Verifying both variants are healthy..."

The Continual Learning & Monitoring Loop

Architecture

This architecture illustrates the closed-loop system. A daily monitoring job checks for drift. If drift is detected, it triggers an automated retraining workflow. This workflow produces a new model candidate, which is then safely deployed via a multi-stage canary release process, with automated analysis and rollback capabilities.

Daily Monitoring and Drift Detection Artifacts

import sys
import argparse
import logging
import pandas as pd
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, TargetDriftPreset

logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
logger = logging.getLogger()

def run_drift_analysis(ref_data_path: str, prod_data_path: str, report_path: str):
    """
    Compares production data to reference data to detect drift.
    Exits with a non-zero status code if drift is detected.
    """
    logger.info("Loading reference data from %s", ref_data_path)
    ref_df = pd.read_parquet(ref_data_path)
    
    logger.info("Loading production data from %s", prod_data_path)
    prod_df = pd.read_parquet(prod_data_path)

    # For this example, let's assume prediction is 'propensity' and target is 'converted'
    ref_df.rename(columns={'target': 'converted'}, inplace=True)

    logger.info("Generating data drift report...")
    report = Report(metrics=[
        DataDriftPreset(),
        TargetDriftPreset(),
    ])
    report.run(reference_data=ref_df, current_data=prod_df)
    
    logger.info("Saving drift report to %s", report_path)
    report.save_html(report_path)

    drift_report = report.as_dict()
    is_drift_detected = drift_report['metrics'][0]['result']['dataset_drift']
    
    if is_drift_detected:
        logger.error("Data drift detected! The production data distribution has shifted significantly.")
        sys.exit(1) # Fail the task if drift is found
    else:
        logger.info("No significant data drift detected.")
        sys.exit(0)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--ref-data", required=True)
    parser.add_argument("--prod-data", required=True)
    parser.add_argument("--report-path", required=True)
    args = parser.parse_args()
    run_drift_analysis(args.ref_data, args.prod_data, args.report_path)

Monitoring DAG

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago

def on_drift_detection_failure(context):
    """
    This function is called when the drift check fails.
    It triggers the GitHub Actions workflow for retraining.
    """
    # In a real system, you'd use a more robust method like calling the GitHub API
    # from a PythonOperator. For clarity, a curl command is shown.
    # The GITHUB_TOKEN would be stored as an Airflow secret.
    trigger_workflow_command = """
    curl -X POST \
      -H "Accept: application/vnd.github.v3+json" \
      -H "Authorization: token {{ conn.github_pat.password }}" \
      https://api.github.com/repos/your-org/ecom-propensity/actions/workflows/retrain_and_deploy.yml/dispatches \
      -d '{"ref":"main", "inputs":{"trigger_reason":"data_drift"}}'
    """
    return BashOperator(
        task_id='trigger_retraining_workflow',
        bash_command=trigger_workflow_command,
    ).execute(context=context)

with DAG(
    dag_id='daily_monitoring_and_drift_check',
    start_date=days_ago(1),
    schedule_interval='@daily',
    catchup=False,
    tags=['monitoring'],
) as dag:
    
    run_drift_check = BashOperator(
        task_id='run_drift_check',
        bash_command=(
            "python /opt/airflow/src/monitoring/run_drift_check.py "
            "--ref-data s3://ecom-propensity-gold-features/training_data_profile.parquet "
            "--prod-data s3://ecom-propensity-monitoring-logs/latest/ "
            "--report-path /tmp/drift_report_{{ ds }}.html"
        ),
        on_failure_callback=on_drift_detection_failure,
    )

Automated Retraining and Canary Release Artifacts

import sys
import argparse
import logging
from datetime import datetime, timedelta
import boto3

logging.basicConfig(level=logging.INFO, format="%(asctime)-15s %(message)s")
logger = logging.getLogger()

# Define acceptable performance thresholds for the challenger
MAX_LATENCY_INCREASE_FACTOR = 1.10  # Allow 10% higher latency
MAX_ERROR_RATE_ABSOLUTE = 0.01      # Max 1% error rate

def analyze_canary_metrics(endpoint_name: str, bake_time_mins: int) -> bool:
    """
    Queries CloudWatch metrics for champion and challenger variants and compares them.
    Returns True if the challenger is healthy, False otherwise.
    """
    client = boto3.client("cloudwatch", region_name="eu-west-1")
    end_time = datetime.utcnow()
    start_time = end_time - timedelta(minutes=bake_time_mins)
    
    def get_metric(variant_name, metric_name):
        response = client.get_metric_data(
            MetricDataQueries=[{
                'Id': 'm1',
                'MetricStat': {
                    'Metric': {'Namespace': 'AWS/SageMaker', 'MetricName': metric_name,
                               'Dimensions': [{'Name': 'EndpointName', 'Value': endpoint_name},
                                              {'Name': 'VariantName', 'Value': variant_name}]},
                    'Period': 60, 'Stat': 'Average'
                }, 'ReturnData': True
            }],
            StartTime=start_time, EndTime=end_time
        )
        values = response['MetricDataResults'][0]['Values']
        return values[0] if values else 0.0

    # --- Fetch metrics for both variants ---
    champion_latency = get_metric("champion", "ModelLatency")
    challenger_latency = get_metric("challenger", "ModelLatency")
    challenger_errors = get_metric("challenger", "Invocation5XXErrors")
    
    logger.info(f"Champion Latency: {champion_latency:.2f}ms")
    logger.info(f"Challenger Latency: {challenger_latency:.2f}ms")
    logger.info(f"Challenger Errors (avg per min): {challenger_errors}")

    # --- Compare against thresholds ---
    healthy = True
    if challenger_latency > champion_latency * MAX_LATENCY_INCREASE_FACTOR:
        logger.error("FAIL: Challenger latency is unacceptably high.")
        healthy = False
    if challenger_errors > MAX_ERROR_RATE_ABSOLUTE:
        logger.error("FAIL: Challenger error rate is unacceptably high.")
        healthy = False

    if healthy:
        logger.info("SUCCESS: Challenger performance is within acceptable limits.")
    
    return healthy

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--endpoint-name", required=True)
    parser.add_argument("--bake-time-mins", type=int, default=15)
    args = parser.parse_args()
    
    if not analyze_canary_metrics(args.endpoint_name, args.bake_time_mins):
        sys.exit(1)
    sys.exit(0)

Retraining and Canary Deployment Workflow

name: Triggered Retraining and Canary Deployment

on:
  workflow_dispatch:
    inputs:
      trigger_reason:
        description: 'Reason for triggering the run (e.g., data_drift, scheduled)'
        required: true
        default: 'manual'

jobs:
  start-training-pipeline:
    name: Start Model Retraining
    runs-on: ubuntu-latest
    outputs:
      new_model_version: ${{ steps.check_mlflow.outputs.version }}

    steps:
      - name: Trigger Airflow Training DAG
        id: trigger_dag
        # ... (logic to call Airflow API to start 'model_training_pipeline' DAG) ...
        run: echo "DAG run started..."

      - name: Wait for Training DAG to Complete
        id: wait_dag
        # ... (logic to poll Airflow API until DAG run is successful) ...
        run: echo "DAG run finished successfully."

      - name: Check MLflow for New Staging Model
        id: check_mlflow
        # ... (Python script to check MLflow Registry for a new model in 'Staging') ...
        run: |
          # This script would output the new version number
          echo "version=6" >> $GITHUB_OUTPUT

  canary-release:
    name: Canary Release to Production
    runs-on: ubuntu-latest
    needs: start-training-pipeline
    if: needs.start-training-pipeline.outputs.new_model_version != ''

    steps:
      - uses: actions/checkout@v3
      - uses: aws-actions/configure-aws-credentials@v2
        # ... (credentials config) ...
      - uses: hashicorp/setup-terraform@v2

      - name: 'Step 1: Canary Deploy (10% Traffic)'
        id: canary_deploy
        run: |
          cd infrastructure
          terraform init
          # This assumes a script that gets champion/challenger names from MLflow
          terraform apply -auto-approve -var="challenger_weight=10" ...
      
      - name: 'Step 2: Bake Period'
        run: sleep 900 # Wait 15 minutes

      - name: 'Step 3: Automated Canary Analysis'
        id: analysis
        run: |
          pip install boto3
          python src/deployment/canary_analysis.py --endpoint-name propensity-endpoint

      - name: 'Step 4: Promote (if analysis passed)'
        if: steps.analysis.outcome == 'success'
        run: |
          echo "Canary analysis passed. Promoting to 100% traffic."
          cd infrastructure
          terraform apply -auto-approve -var="challenger_weight=100" ...

      - name: 'Step 5: Rollback (if analysis failed)'
        if: steps.analysis.outcome != 'success'
        run: |
          echo "::error::Canary analysis failed! Rolling back."
          cd infrastructure
          terraform apply -auto-approve -var="challenger_weight=0" ...
          exit 1