Customer Lifetime Value

How I Built a Customer Lifetime Value Model for an E-commerce Business


TLDR: Building a Production-Grade CLV Prediction System

This project details the end-to-end design and implementation of an automated MLOps system to predict Customer Lifetime Value (CLV) for a mid-sized e-commerce business.

Challenge

Our e-commerce business struggled to move beyond reactive marketing. We lacked a forward-looking way to identify high-value customers, leading to inefficient ad spend, generic campaigns, and missed retention opportunities. The core challenge was to build a system that could accurately predict future customer value, enabling a fundamental shift to data-driven, proactive personalization and budget allocation.

My Role & Solution

As the ML Engineer and Data Scientist on a lean, three-person team (alongside a Product Manager and Data Engineer), I owned the design and implementation of the complete end-to-end MLOps system on AWS.

My solution involved architecting a series of automated, event-driven pipelines:

  • Data & Feature Engineering: I built robust data ingestion and validation pipelines using AWS Glue and Great Expectations, and a scalable feature engineering pipeline with Spark on EMR to generate rich, time-aware customer features for the model.

  • Model Development & Training: I developed and iterated on the core XGBoost prediction model, established experiment tracking with MLflow, and designed the automated training and evaluation pipeline using SageMaker Pipelines. This included conditional model registration based on performance, fairness, and robustness checks.

  • Deployment & Operations: I then built the weekly batch inference pipeline using SageMaker Batch Transform and established a full-circle monitoring and observability stack with CloudWatch and SHAP for explainability to track data drift and model performance.

  • Continual Learning & Testing: Finally, I designed the continual learning framework with automated retraining triggers and a phased production testing strategy (Shadow Deployment and Canary Releases) to ensure the model evolves safely and effectively over time.

Tech Stack: AWS (SageMaker, S3, Glue, EMR, Kinesis, MWAA), Airflow, MLflow, DVC, Great Expectations, Spark, XGBoost, Terraform.

Impact

The system provided a significant, measurable lift by enabling proactive, data-informed marketing strategies. It moved our ML operations from a quarterly manual effort to a fully automated weekly cycle.

  • +18% Marketing ROI by focusing ad spend and promotions on predicted high-value customer segments.

  • -12% Churn Rate in top-tier customer cohorts due to targeted, proactive retention campaigns.

  • +7% Conversion Rate on marketing campaigns that used the CLV-based segments.

  • 95% Reduction in Manual Effort by fully automating the model lifecycle from data ingestion to retraining and deployment.

System Architecture

The diagram below illustrates the complete MLOps system. The components within the highlighted area represent the core systems I personally designed and built.

The Business Challenge: Moving from Hindsight to Foresight

The core challenge was to move beyond simple historical metrics and accurately predict the future Customer Lifetime Value (CLV). We needed to transition from merely looking at past revenue to forecasting future profitability, enabling us to make smarter, data-driven decisions.

This initiative was driven by the need to answer critical business questions:

  • Optimized Customer Acquisition: How much should we strategically invest to acquire a new customer? Where can we find more prospects who resemble our current high-value customers?

  • Personalized Retention: Which of our high-value customers are at risk of churning? How can we proactively and efficiently tailor retention efforts to keep them engaged?

  • Smarter Budget Allocation: How should we allocate marketing spend, discounts, and promotions across different customer segments to maximize long-term return on investment (ROI)?

The financial stakes were clear and compelling. Industry benchmarks show that:

  • A 5% increase in customer retention can boost profitability by 25-95%.

  • It costs 5 times more to acquire a new customer than to retain an existing one.

  • The probability of selling to an existing customer is 60-70%, compared to just 5-20% for a new prospect.

The objective was to build a system that could reliably identify our most valuable customers—both existing and future—and empower the business to focus resources where they would generate the greatest impact.


Problem Framing: Translating Business Needs into a Technical Blueprint

With a clear business objective, the next critical step is to translate that goal into a precise, solvable machine learning problem. This framing process dictates our data requirements, model choice, and ultimately, how the final output can be actioned by the business.

Is Machine Learning the Right Approach?

Before committing to a complex ML system, we first validated if it was necessary. While a simple heuristic like (Average Order Value) x (Average Customer Lifespan) provides a basic CLV estimate, it suffers from critical flaws:

  • It treats all customers as a single, homogenous group, failing to capture individual behavior.

  • It cannot identify high-value customers at risk or differentiate between promising new users and those likely to make only a single purchase.

ML is the ideal approach here because:

  • Complex Patterns: Customer purchasing and churn behavior is driven by intricate, non-linear patterns that are difficult to define with simple rules.

  • Predictive Nature: The core task is forecasting future behavior, which is a strength of ML.

  • Scale and Adaptation: ML can process data for millions of customers and can be retrained to adapt to evolving market trends and customer habits.

Defining the Core ML Task: From Business Goals to a Predictive Model

We framed this challenge primarily as a regression problem. This was a strategic choice to provide the most direct and actionable output for our business stakeholders.

  • Model Input: A rich set of features for each customer, calculated up to a specific “cutoff date” (e.g., today). This includes:

    • RFM Features: Recency, Frequency, and Monetary value.

    • Purchase Pattern Features: Average time between orders, product diversity, return rates.

    • Temporal Features: Spend and activity aggregated over rolling time windows (e.g., last 30, 90, 365 days).

    • Engagement Features: Non-transactional data like website visits, session duration, and email click-through rates.

  • Model Output: A continuous numerical value representing the predicted total revenue a customer will generate in the next 12 months.

While regression was our primary task, we recognized its sensitivity to outliers (a few “whale” customers). Therefore, we also designed the system to support a secondary classification output, bucketing customers into ‘Low’, ‘Medium’, and ‘High’ value tiers. This provides an intuitive and robust output for marketing segmentation, complementing the precise financial forecast from the regression model.

Assessing Feasibility & Risks (Can We Execute This Vision?)

Before committing to development, we conducted a rigorous feasibility assessment to identify potential risks and ensure the project was grounded in reality.

  • Data Feasibility:

    • Availability: While core transactional data was abundant, integrating it with often sparse or inconsistent CRM and web behavioral data posed a significant data engineering challenge.

    • Quality & Privacy: We identified a need for robust data cleaning and validation pipelines. Handling Personally Identifiable Information (PII) under regulations like GDPR was a top priority, requiring careful data anonymization and governance.

  • Modeling & Technical Feasibility:

    • Problem Complexity: The core challenge was not the algorithm itself, but modeling the dynamic, non-stationary nature of customer behavior (i.e., concept drift). Customer tastes and spending habits change over time.

    • Latency Requirements: Our primary use case (batch scoring for marketing campaigns) had relaxed latency needs. However, we acknowledged that a future real-time personalization use case would require building a more complex low-latency serving infrastructure and an online feature store.

    • Interpretability: Gaining business trust was paramount. “Black box” predictions were unacceptable, making model explainability (e.g., using SHAP) a mandatory requirement to understand why a customer was flagged as high-value.

  • Business & Operational Risks:

    • Cost of Errors: The model’s predictions have a direct financial impact. Over-predicting CLV leads to wasted marketing spend, while under-predicting results in missed revenue opportunities from high-potential customers.

    • Ethical Considerations: We identified a risk of creating negative feedback loops. If the model were biased against a certain customer segment, we might under-invest in them, reinforcing the initial bias. This required a commitment to continuous fairness monitoring.

    • ROI Justification: The project required a significant upfront investment in data engineering and MLOps. However, the potential ROI—driven by more efficient marketing, improved retention, and higher long-term profitability—was substantial and clearly justified the investment.

Defining Success: From Technical Metrics to Business Impact

A model can be technically “correct” but fail to deliver business value. Therefore, we defined two distinct sets of success criteria:

  1. Model Evaluation Metrics (Offline): These are technical metrics used to assess the model’s performance on a held-out test dataset.

    • Primary Metrics: For our regression task, we used standard metrics like Root Mean Squared Error (RMSE) and Mean Absolute Error (MAE).

    • Business-Oriented Metric: Critically, we also used the Gini Coefficient and plotted the Lorenz Curve. This measures the model’s ability to accurately rank customers from least to most valuable, which is essential for targeting the top percentile of customers.

    • Fairness Check: We evaluated performance across key customer segments (e.g., by acquisition channel, geography) to ensure the model wasn’t unfairly penalizing or misjudging a specific group.

  2. Business Success Metrics (Online): These are the real-world KPIs we aimed to influence after deploying the model.

    • Increase ROI on marketing spend by targeting high-CLV prospects.

    • Reduce churn rate among customers predicted to be high-value.

    • Increase average order value and purchase frequency through personalized upselling campaigns aimed at specific CLV tiers.

This dual-metric approach ensures our technical work remains directly tethered to tangible business outcomes.


MLOps End-to-End Project Planning and Operational Strategy

A successful machine learning project is built on a foundation of solid engineering and operational planning. It’s not enough to build an accurate model; we must build a reliable system that can consistently deliver value. This section outlines the technical architecture, core workflows, and project management strategy for bringing the CLV prediction model to production.

Tech Stack

The technology stack was chosen to balance the power of best-in-class open-source tools with the scalability and manageability of a leading cloud provider (AWS). The primary goal was to create a robust, automated, and repeatable system.

Component

Chosen Tool/Framework

Rationale & Key Trade-offs

Cloud Platform

Amazon Web Services (AWS)

A mature and comprehensive ecosystem of managed services for data and ML, allowing the team to focus on business logic rather than undifferentiated infrastructure management.

Data Lake & Storage

Amazon S3 & Parquet

S3 provides virtually limitless, cost-effective, and durable object storage. Storing data in the open-source Parquet format ensures high performance and interoperability.

Data Versioning

DVC (Data Version Control)

While Git is excellent for code, it’s not designed for large data files. DVC integrates with Git to provide versioning for datasets, making our data pipelines fully reproducible.

Workflow Orchestration

Apache Airflow (on AWS MWAA)

The industry standard for orchestrating complex, dependency-aware workflows. While it has a learning curve, its power and flexibility are unmatched for managing our multi-step data and ML pipelines. AWS MWAA provides a managed service offering.

Data Processing

Apache Spark (on AWS EMR)

The de facto standard for large-scale, distributed data transformation. Essential for efficiently computing customer-level RFM features and other aggregations across the entire dataset.

Feature Store

Amazon SageMaker Feature Store

Chosen to solve the critical challenge of training-serving skew. It provides a central, governed repository for features, ensuring consistency between our training and inference pipelines and promoting feature reusability across future projects.

Experiment Tracking

MLflow Tracking

A powerful open-source tool for logging and comparing all aspects of ML experiments (parameters, metrics, artifacts). It fosters a rigorous, scientific approach to model development.

Model Registry

MLflow Model Registry

Provides a central, version-controlled repository for our trained model artifacts. It’s the critical hand-off point between model training and deployment, enabling robust governance and auditability.

CI/CD Automation

GitHub Actions

A modern, flexible CI/CD tool that integrates seamlessly with our source code repository on GitHub, allowing us to automate testing and deployment workflows.

Infrastructure as Code

Terraform

The cloud-agnostic standard for defining and managing infrastructure programmatically. This ensures our environments are reproducible, version-controlled, and can be easily provisioned or torn down.

Model Deployment

Amazon SageMaker Batch Transform

For our primary use case (weekly scoring of the customer base), batch inference is the most cost-effective and operationally simple deployment pattern. SageMaker provides a fully managed solution.

Monitoring

Amazon SageMaker Model Monitor & CloudWatch

SageMaker Model Monitor provides built-in capabilities to detect data and model quality drift. CloudWatch is used for monitoring the health and performance of all underlying AWS infrastructure.

List of Core Pipelines/Workflows

The entire MLOps system is composed of several distinct, automated pipelines, orchestrated by Airflow.

  1. Data Ingestion & Validation Pipeline

    • Trigger: Daily scheduled run.

    • Inputs: Connection details for source transactional databases and CRM systems.

    • Key Steps:

      1. Extract raw transactional and customer data from source systems.

      2. Validate incoming data against a predefined schema and quality rules (using libraries like Great Expectations).

      3. Land the validated, raw data in the S3 Data Lake.

      4. Update the data catalog and trigger the Feature Engineering Pipeline.

    • Outputs: Versioned, validated raw data in S3.

  2. Feature Engineering Pipeline

    • Trigger: Successful completion of the Data Ingestion Pipeline.

    • Inputs: Raw customer and transaction data from the S3 Data Lake.

    • Key Steps:

      1. Launch a Spark job (on EMR) to perform customer-level aggregations.

      2. Calculate RFM features, rolling time-window features, and other behavioral signals.

      3. Populate (or update) the SageMaker Feature Store with the newly computed features.

    • Outputs: Updated and versioned features in the offline and online feature stores.

  3. Model Training, Evaluation & Registration Pipeline

    • Trigger: Weekly scheduled run or on-demand by an ML engineer.

    • Inputs: Features from the SageMaker Feature Store, model training configuration.

    • Key Steps:

      1. Create a temporally correct training/test data split.

      2. Train the CLV model (XGBoost) using the training data.

      3. Evaluate the model on the test set using both technical (RMSE, MAE) and business-aligned (Gini Coefficient) metrics.

      4. Compare the new model’s performance to the currently deployed production model.

      5. If performance exceeds the threshold, register the new model version in the MLflow Model Registry with its associated metrics and artifacts.

    • Outputs: A newly registered model in MLflow, ready for deployment.

  4. Batch Inference Pipeline

    • Trigger: Weekly scheduled run, following the successful completion of the training pipeline.

    • Inputs: The latest “approved for production” model from the MLflow Registry and a list of all active customers.

    • Key Steps:

      1. Initiate a SageMaker Batch Transform job.

      2. For each customer, retrieve the latest features from the SageMaker Feature Store.

      3. Generate a 12-month CLV prediction.

      4. Load the predictions into the downstream data warehouse and CRM system for use by business teams.

    • Outputs: Updated CLV scores for every customer in the business intelligence and marketing platforms.

Project Management and Stages

We adopted an iterative, phased approach to manage the project, ensuring we built a solid foundation before adding complexity.

  1. Ideation & Planning (Weeks 1-2)

    • Align with business stakeholders on the primary goals and define success metrics.

    • Conduct the Problem Framing and Feasibility assessment.

    • Define the MLOps tech stack and high-level architecture.

    • Establish the cross-functional team and define roles.

  2. Model Experimentation & Baseline (Weeks 3-6)

    • Perform deep Exploratory Data Analysis (EDA) on the integrated dataset.

    • Engineer an initial set of robust RFM features.

    • Develop a simple baseline model (e.g., Linear Regression) to establish a performance floor.

    • Experiment with more advanced models (XGBoost) and track all runs in MLflow.

    • Conduct rigorous offline evaluation to select the champion model architecture.

  3. End-to-End Pipeline Development (Weeks 7-12)

    • Develop the four core automated pipelines (Ingestion, Feature Engineering, Training, Batch Inference) as code.

    • Write Infrastructure as Code (Terraform) to provision the required AWS resources.

    • Implement robust unit and integration tests for all pipeline components.

    • Set up CI/CD workflows in GitHub Actions.

  4. Deployment & Serving (Week 13)

    • Deploy the production environment using Terraform.

    • Execute the first full run of the Batch Inference Pipeline in a staging environment.

    • Validate the output and, upon approval, promote to production.

    • Integrate the output CLV scores with downstream CRM and BI dashboards.

  5. Monitoring & Iteration (Ongoing)

    • Establish monitoring dashboards for infrastructure health and data/model drift.

    • Set up automated alerts for any detected anomalies.

    • Establish a formal A/B testing framework to evaluate the business impact of future model versions.

    • Schedule periodic reviews of model performance and plan for retraining and future iterations.

Cross-Functional Team & Roles

  • ML Engineer: Led the overall technical design, developed the core ML pipelines, and implemented the CI/CD and monitoring systems.

  • Data Engineer: Focused on building the robust data ingestion and validation pipelines from source systems and owned the data model in the lake.

  • Business Analyst/Product Manager: Acted as the crucial link to business stakeholders, defining requirements, interpreting model outputs, and designing how the CLV scores would be used in marketing campaigns.

Versioning and Governance Strategy

  • Code: All code (pipeline definitions, feature logic, model training scripts, IaC) is versioned in Git.

  • Data: Large datasets and feature sets are versioned using DVC, with pointers checked into Git.

  • Models: Every trained model is versioned and governed through the MLflow Model Registry, creating an auditable lineage from a prediction back to the exact code and data that produced it.


Data Sourcing and Discovery

The most sophisticated model is only as good as the data it’s trained on. For our CLV project, the initial and most critical phase was to source, understand, and validate the diverse datasets that capture the complete customer journey. This foundational work is the “mise en place” of our entire MLOps system—everything must be meticulously prepared before we begin modeling.

Following a structured framework, here is how we approached the data lifecycle for this project:

Framework Step

Application to the CLV Project

Key Rationale & Chosen Tools

1. Identifying Data Requirements

To predict future value, we needed a holistic customer view. Transactional Data: Order history, product details, prices, discounts.
Customer/CRM Data: Demographics, acquisition channel, loyalty status.
Behavioral Data: Web/app session logs, clicks, page views, and cart activity.

Transactional data is essential for baseline RFM features. CRM and behavioral data provide the rich, nuanced signals needed for a high-performance machine learning model to outperform simple heuristics.

2. Exploring Data Sources

Production OLTP Database (PostgreSQL/MySQL): Source for real-time transactional data.
CRM System (e.g., Salesforce): Source for customer demographic and marketing data.
Event Streaming Platform : Source for high-volume behavioral data.

We tapped directly into the systems of record, ensuring data authenticity. The challenge was not finding data but integrating these disparate, siloed sources into a unified view.

3. Data Collection & Ingestion

We established a daily batch ingestion strategy.
• An Airflow DAG extracts data from the production DB replica and CRM.
• Data is landed in its raw format in our AWS S3 Data Lake.
• Raw datasets are versioned using DVC to ensure every pipeline run is reproducible.

For CLV, daily batch processing provides sufficient data freshness while being more cost-effective and operationally simpler than a real-time streaming architecture.

4. Exploratory Data Analysis (EDA)

Before any feature engineering, we performed a thorough EDA in Jupyter notebooks. We focused on:
• Profiling RFM distributions to understand purchasing rhythms.
• Visualizing sales trends to identify seasonality.
• Quantifying data quality issues like missing CustomerIDs or negative UnitPrice values (returns).

This crucial step prevented a “garbage in, garbage out” scenario. It allowed us to identify data quality rules for our validation pipeline and informed our feature engineering strategy by uncovering key patterns and outliers.

5. Data Documentation & Discovery

• Used the AWS Glue Data Catalog to make our S3 data lake’s schema discoverable and queryable via Athena.
• Created documentation (“Data Cards”) for key datasets, defining their schema, ownership, and update frequency.

A central catalog is critical for governance, trust, and enabling other teams to discover and reuse these valuable, curated data assets.

6. Early Governance & Security

Given the sensitivity of customer data, governance was a day-one priority.
• We defined strict AWS IAM policies to enforce least-privilege access to S3 buckets.
• We created a PII (Personally Identifiable Information) transformation step in our data pipeline to hash or mask sensitive fields.
• We established clear data retention policies.

Security and privacy are non-negotiable. Building these governance controls directly into our data pipelines ensures compliance and builds trust with both our customers and internal stakeholders.


Data Engineering and Pipelines: Building the Foundation for Accurate Predictions

The reliability of our entire CLV prediction system rests on the quality of its data foundation. This “mise en place” stage involves transforming raw, disparate data into clean, validated, and feature-rich inputs ready for model training. This is not a one-off task but a system of automated, repeatable, and robust pipelines.

Here’s how we applied core data engineering principles to build the data backbone for our CLV project:

Framework Step

Application to the CLV Project

Key Rationale & Chosen Tools

1. Designing Data Processing Workflows

The primary workflow is an ELT (Extract, Load, Transform) process. Raw data is extracted daily and loaded into the S3 data lake. Transformations and feature calculations are then performed as a separate, downstream step using Spark. The workflow is designed to be modular (Ingestion -> Validation -> Feature Engineering) and idempotent, ensuring that rerunning a pipeline on the same raw data produces the identical feature set.

An ELT approach leverages the power of the cloud data lake and a distributed engine like Spark, offering more flexibility than a rigid ETL process. Modularity and idempotency are core principles for building resilient, testable, and maintainable data pipelines that can recover from transient failures.

2. Data Cleaning & Wrangling

Our automated pipeline systematically addresses data quality issues identified during EDA:
Handling Nulls: Rows with a missing CustomerID are discarded as they are unusable for CLV.
Handling Returns: Transactions with negative Quantity or UnitPrice are processed to correctly adjust customer spending totals, preventing data contamination.
Outlier Treatment: Extreme monetary values (e.g., transactions > €50,000), which can skew model training, are automatically capped at a predefined threshold based on the 99.9th percentile of the training data.

This automated cleaning is the first line of defense for data quality. Explicitly handling outliers is crucial for regression models, which are highly sensitive to extreme values. These steps ensure that the data entering our feature engineering process is clean and consistent.

3. Data Transformation & Standardization

To prepare data for the model, our pipeline performs two key transformations using a saved scikit-learn Pipeline object:
Standardization: Numerical features like Monetary Value and Frequency are scaled using StandardScaler.
Encoding: Categorical features like Acquisition Channel are converted into numerical format using OneHotEncoder.
Crucially, the scaler and encoder are fit only on the training dataset and then saved as versioned artifacts.

Most machine learning algorithms perform better when numerical input features are on a standard scale. The “fit on train, transform on all” approach is a fundamental best practice to prevent data leakage from the test set into the training process, which would lead to overly optimistic and unrealistic performance metrics.

4. Programmatic Data Labeling

For our regression task, the target “label” is not manually created but programmatically generated. The pipeline calculates the ground truth for each customer by summing their total revenue in the 12 months following the specified cutoff date. This ensures a consistent and objective definition of the value we aim to predict.

There is no ambiguity in our target variable, removing the need for costly and slow human-in-the-loop labeling. This programmatic approach makes the entire label generation process fast, scalable, and perfectly reproducible.

5. Data Splitting & Sampling

The pipeline enforces a strict temporal data split. To evaluate model performance, we train the model on one period (e.g., Months 1-9) and test it on a subsequent, unseen period (e.g., predictions for Months 10-12). We do not perform any over- or under-sampling on the test set to ensure it reflects the true distribution of customers in the real world.

This is the only correct way to validate a time-dependent forecasting model. Using a random split would leak information from the future into the training process, yielding a model that performs deceptively well in tests but fails in production.

6. Data Validation as a Pipeline Stage

We integrated Great Expectations as a dedicated validation task within our Airflow DAG. This task runs after data ingestion and before feature engineering. Key checks include: expect_column_values_to_not_be_null for CustomerID and expect_column_mean_to_be_between for Monetary Value to detect significant data drift. A failure in this stage halts the entire pipeline and triggers an alert.

This automated quality gate is critical for building trust and reliability. It prevents “bad data” from silently propagating downstream, which is a common and costly source of production ML failures. It acts as a data contract, ensuring the data adheres to our expectations before we invest compute resources in processing it.

7. Data Versioning & Lineage

Every dataset is versioned.
DVC tracks the versions of our S3-based datasets.
• The Git commit hash versions the code.
MLflow logs the specific DVC hash of the data used for each training run. This creates an immutable link between a model artifact, the code that generated it, and the precise version of the data it was trained on.

This practice provides complete, end-to-end lineage. If a model behaves unexpectedly in production, we can trace its exact origins, making debugging and auditing straightforward and reliable. It turns reproducibility from a manual best practice into a guaranteed property of the system.

8. Orchestration & Execution

The entire multi-step data workflow is defined as a Directed Acyclic Graph (DAG) in Apache Airflow. Airflow manages the scheduling, dependencies (e.g., validation must complete before feature engineering begins), error handling, and retries for the entire data engineering process. Each complex step is containerized to ensure a consistent execution environment.

Using a dedicated orchestrator like Airflow transforms a collection of separate scripts into a single, robust, and observable system. It provides the operational control and visibility necessary to manage a production-grade data pipeline, which is far superior to relying on simple cron jobs.

Planning the Data Ingestion Pipeline

  • Python Scripts (src/): For the core business logic.

  • Unit Tests (tests/): To ensure the correctness of the Python scripts.

  • Pipeline Code (pipelines/): The Airflow DAGs that define the workflow.

  • Infrastructure as Code (terraform/): To provision the necessary AWS resources.

  • Integration Tests (tests/): To verify that the different components and services work together as expected.

  • Architecture Diagram: Essential for documentation and team alignment.

Now, let’s address your excellent question about the choice of tooling.

Tool & Compute Choice: Spark/EMR vs. Other Frameworks

  1. For Transactional Data (Pipeline 1): The daily volume (50-200 MB) is relatively small. Spinning up an EMR cluster (which can take several minutes and incurs costs for the duration it’s running) just to process this amount of data is inefficient and not cost-effective.

    • Better Choice: AWS Glue. Glue is a serverless ETL service on AWS. You only pay for the resources consumed during the job run, with no cluster management overhead. It is perfectly suited for this scale of data. Alternatively, a simple Python script using the Pandas library, running directly on the Airflow worker, would also be sufficient.

  2. For Behavioral Data (Pipeline 2): This data arrives as a continuous stream. The most efficient AWS-native pattern here is to use Amazon Kinesis Data Firehose.

    • How it Works: Your website/app sends events to a Kinesis Data Stream. A Kinesis Firehose delivery stream subscribes to this data stream. Firehose automatically buffers the incoming events (e.g., for 5 minutes or until 128 MB of data is collected), batches them into files, and writes them directly to your S3 data lake.

    • Benefit: This process is serverless, fully managed, and requires no processing engine like Spark. It’s the most cost-effective and operationally simple way to get high-volume streaming data into S3. Airflow’s role here would be to monitor this process or to trigger downstream pipelines after the data has landed in S3.

When to use Spark on EMR?

We will use Spark and EMR in the next major pipeline: Feature Engineering. After all the raw data (both transactional and behavioral) has been landed in S3, we will need a powerful, distributed engine to process the entire historical dataset at once to compute complex customer-level features (e.g., aggregations, rolling window calculations). That is where Spark’s capabilities are essential and cost-effective.

Pipeline

Source

Processing/Tools

Destination

Orchestration

Data Ingestion 1 (Batch)

Production DB (e.g., PostgreSQL)

AWS Glue or Python/Pandas for extraction and transformation. Great Expectations for validation.

S3 Data Lake (Raw Zone)

Apache Airflow (MWAA)

Data Ingestion 2 (Stream)

Kinesis Data Streams

Kinesis Data Firehose for automated buffering and delivery.

S3 Data Lake (Raw Zone)

Kinesis Firehose (Managed)

Data Ingestion Pipeline: Implementation

Architecture Diagram

Infrastructure as Code (Terraform)

This defines the AWS Glue resources needed for the batch ingestion pipeline.

# terraform/aws_glue.tf

# IAM Role for the Glue job to access S3 and the DB connection
resource "aws_iam_role" "glue_job_role" {
  name = "clv-glue-job-role-${var.environment}"
  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [{
      Effect    = "Allow",
      Principal = { Service = "glue.amazonaws.com" },
      Action    = "sts:AssumeRole"
    }]
  })
}

resource "aws_iam_role_policy_attachment" "glue_s3_access" {
  role       = aws_iam_role.glue_job_role.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonS3FullAccess" # Scope down in production
}

resource "aws_iam_role_policy_attachment" "glue_basic_execution" {
  role       = aws_iam_role.glue_job_role.name
  policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
}

# The Glue job definition
resource "aws_glue_job" "ingest_transactional_job" {
  name     = "clv-ingest-transactional-job-${var.environment}"
  role_arn = aws_iam_role.glue_job_role.arn
  command {
    script_location = "s3://${var.artifacts_bucket_name}/scripts/ingest_transactional_data.py"
    python_version  = "3"
  }
  glue_version = "3.0"
  number_of_workers = 2
  worker_type       = "G.1X"
}

# The Glue connection to store DB credentials securely
resource "aws_glue_connection" "source_db_connection" {
  name = "clv-source-db-connection-${var.environment}"
  connection_type = "JDBC"

  connection_properties = {
    JDBC_CONNECTION_URL = "jdbc:postgresql://your-db-endpoint.rds.amazonaws.com:5432/ecommerce"
    USERNAME            = var.db_username
    PASSWORD            = var.db_password
  }
}

This defines the Kinesis Stream and Firehose for handling behavioral events.

# terraform/aws_kinesis.tf

resource "aws_kinesis_stream" "behavioral_events_stream" {
  name        = "clv-behavioral-events-stream-${var.environment}"
  shard_count = 1
}

resource "aws_kinesis_firehose_delivery_stream" "behavioral_stream_to_s3" {
  name        = "clv-behavioral-stream-to-s3-${var.environment}"
  destination = "extended_s3"

  extended_s3_configuration {
    role_arn   = aws_iam_role.firehose_role.arn
    bucket_arn = aws_s3_bucket.raw_data_bucket.arn # Assuming raw_data_bucket is defined elsewhere
    
    # Buffer hints: deliver every 5 minutes or when 64MB is reached
    buffering_interval_in_seconds = 300
    buffering_size_in_mb          = 64

    # Convert incoming JSON to Parquet for efficiency
    data_format_conversion_configuration {
      enabled = true
      input_format_configuration {
        deserializer {
          hive_json_ser_de {}
        }
      }
      output_format_configuration {
        serializer {
          parquet_ser_de {}
        }
      }
    }
  }

  kinesis_source_configuration {
    kinesis_stream_arn = aws_kinesis_stream.behavioral_events_stream.arn
    role_arn           = aws_iam_role.firehose_role.arn
  }
}

# Role for Firehose to read from Kinesis and write to S3
resource "aws_iam_role" "firehose_role" {
  name = "clv-firehose-role-${var.environment}"
  # Assume role policy allows firehose.amazonaws.com
}

# ... Attach necessary policies to firehose_role ...

Great Expectations Suite

This JSON file defines our data quality rules. It would be generated by the GE CLI and stored in your Git repo.

{
  "expectation_suite_name": "transactional_data.warning",
  "ge_cloud_id": null,
  "meta": { "great_expectations_version": "0.15.0" },
  "expectations": [
    {
      "expectation_type": "expect_table_columns_to_match_ordered_list",
      "kwargs": { "column_list": ["CustomerID", "InvoiceNo", "StockCode", "Description", "Quantity", "InvoiceDate", "UnitPrice", "Country"] }
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": { "column": "CustomerID" }
    },
    {
      "expectation_type": "expect_column_values_to_be_of_type",
      "kwargs": { "column": "Quantity", "type_": "int" }
    },
    {
      "expectation_type": "expect_column_values_to_be_between",
      "kwargs": { "column": "UnitPrice", "min_value": 0 }
    }
  ]
}

Python Scripts

# src/ingest_transactional_data.py
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime

# Get job arguments
args = getResolvedOptions(sys.argv, ["JOB_NAME", "output_path"])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Read from the Glue Data Catalog using the specified connection
source_dyf = glueContext.create_dynamic_frame.from_catalog(
    database="ecommerce_db",
    table_name="transactions",
    connection_name="clv-source-db-connection-staging" # Use connection name
)

# Convert to Spark DataFrame for processing
source_df = source_dyf.toDF()

# Add a processing timestamp
source_df = source_df.withColumn("processing_timestamp", lit(datetime.now()))

# Write data to S3 in Parquet format
glueContext.write_dynamic_frame.from_options(
    frame=DynamicFrame.fromDF(source_df, glueContext, "source_df"),
    connection_type="s3",
    connection_options={"path": args["output_path"]},
    format="parquet"
)

job.commit()

A simple script to simulate sending events to the Kinesis stream (for testing)

# src/produce_behavioral_events.py
import boto3
import json
import random
import time
from datetime import datetime

STREAM_NAME = "clv-behavioral-events-stream-staging"
client = boto3.client("kinesis", region_name="eu-west-1")

def send_event(event_data):
    try:
        response = client.put_record(
            StreamName=STREAM_NAME,
            Data=json.dumps(event_data),
            PartitionKey=str(event_data['CustomerID'])
        )
        print(f"Sent event for Customer {event_data['CustomerID']}. SequenceNumber: {response['SequenceNumber']}")
    except Exception as e:
        print(f"Error sending event: {e}")

if __name__ == "__main__":
    for _ in range(10):
        event_type = random.choice(['page_view', 'add_to_cart', 'search'])
        customer_id = random.randint(1000, 2000)
        
        event = {
            "event_type": event_type,
            "CustomerID": customer_id,
            "timestamp": datetime.now().isoformat(),
            "session_id": f"sess_{customer_id}_{int(time.time())}"
        }
        send_event(event)
        time.sleep(0.1)

Airflow DAG

This DAG orchestrates the transactional ingestion and validation.

# pipelines/dag_ingest_transactional.py
from airflow.decorators import dag
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator
from datetime import datetime

GE_PROJECT_ROOT_DIR = "/path/to/your/great_expectations"
OUTPUT_S3_PATH = "s3://clv-raw-data-bucket-staging/transactional/"

@dag(
    dag_id="clv_ingest_transactional_data_with_validation",
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False,
)
def ingest_dag():
    
    ingest_job = GlueJobOperator(
        task_id="run_glue_ingestion_job",
        job_name="clv-ingest-transactional-job-staging",
        script_args={"--output_path": OUTPUT_S3_PATH},
        aws_conn_id="aws_default",
    )
    
    validation_task = GreatExpectationsOperator(
        task_id="validate_raw_transactional_data",
        data_context_root_dir=GE_PROJECT_ROOT_DIR,
        checkpoint_name="s3_raw_data_checkpoint", # Assumes a checkpoint is configured
        fail_task_on_validation_failure=True,
    )

    ingest_job >> validation_task

ingest_dag()

Unit Test

# tests/unit/test_event_producer.py
import pytest
from unittest.mock import patch, MagicMock
from src.produce_behavioral_events import send_event

@patch('boto3.client')
def test_send_event_success(mock_boto_client):
    """Tests the happy path of sending a Kinesis event."""
    mock_kinesis = MagicMock()
    mock_boto_client.return_value = mock_kinesis
    
    event_data = {"CustomerID": 123, "event_type": "page_view"}
    
    with patch('src.produce_behavioral_events.client', mock_kinesis):
        send_event(event_data)

    mock_kinesis.put_record.assert_called_once()
    # You can add more specific assertions on the call arguments

Integration Test

# tests/integration/test_ingestion_pipelines.py
import pytest
import boto3
import time
from airflow.api.client.local_client import Client
from src.produce_behavioral_events import send_event

@pytest.mark.integration
def test_transactional_ingestion_dag():
    """Triggers the transactional data DAG and checks for completion."""
    c = Client(None, None)
    run_id = f"test_transactional_ingestion_{int(time.time())}"
    
    # Trigger and wait for the DAG to complete
    # ... (polling logic similar to previous examples) ...
    
    # Assert that the final state is 'success'
    dag_run = c.get_dag_run(dag_id="clv_ingest_transactional_data_with_validation", run_id=run_id)
    assert dag_run.state == 'success'

@pytest.mark.integration
def test_behavioral_ingestion_pipeline():
    """Sends a test event and checks if it lands in S3 via Firehose."""
    s3 = boto3.client("s3")
    bucket = "clv-raw-data-bucket-staging"
    prefix_before = f"behavioral/{datetime.now().strftime('%Y/%m/%d')}"

    # Send a test event
    test_event = {"CustomerID": 9999, "event_type": "integration_test"}
    send_event(test_event)

    # Wait for Firehose buffer to flush (e.g., 60 seconds)
    print("Waiting 65 seconds for Firehose to deliver...")
    time.sleep(65)
    
    # Check S3 for a new object
    response = s3.list_objects_v2(Bucket=bucket, Prefix=prefix_before)
    assert 'Contents' in response and len(response['Contents']) > 0, "No file was delivered to S3"

CI/CD Workflow

This workflow validates and deploys both ingestion pipelines.

# .github/workflows/cicd_data_ingestion.yml
name: "CI/CD for Data Ingestion Pipelines"

on:
  push:
    branches: [ main ]
    paths:
      - 'src/ingest_transactional_data.py'
      - 'src/produce_behavioral_events.py'
      - 'pipelines/dag_ingest_transactional.py'
      - 'terraform/aws_glue.tf'
      - 'terraform/aws_kinesis.tf'
      - 'great_expectations/**'
      - 'tests/**'

jobs:
  ci-checks:
    name: "Static Checks and Unit Tests"
    # ... (linting, unit tests, terraform validate) ...
  
  cd-staging-and-test:
    name: "Deploy to Staging & Run Integration Tests"
    needs: ci-checks
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    environment: staging

    steps:
      - name: Checkout Repository
        # ...
      - name: Configure Staging AWS Credentials
        # ...
      - name: Deploy Infrastructure (Terraform Apply)
        run: |
          cd terraform
          terraform apply -auto-approve -var-file=staging.tfvars
      
      - name: Deploy DAG to Staging Airflow
        # ...
        
      - name: Run Integration Tests
        run: pytest tests/integration/test_ingestion_pipelines.py

Feature Engineering Pipeline

Planning

  1. Objective: Create a single, automated pipeline that takes the raw transactional and behavioral data from our S3 data lake and produces a comprehensive feature set for each customer. This feature set will be stored in the Amazon SageMaker Feature Store to ensure consistency between training and serving.

  2. Core Tooling: We will use Apache Spark on AWS EMR for this task. Given that we need to process the entire historical dataset and perform complex aggregations (like rolling time windows), Spark’s distributed processing capability is the right choice here. It’s both scalable and cost-effective for this type of heavy-lifting workload.

  3. Pipeline Implementation:

    • Orchestration: The pipeline will be defined as an Airflow DAG. This DAG will be triggered upon the successful completion of the batch transactional data ingestion pipeline.

    • Compute: The Airflow DAG will have a task that programmatically launches a temporary EMR cluster, submits the Spark job, waits for its completion, and then terminates the cluster to save costs.

    • Logic: The core feature engineering logic will be encapsulated in a single, well-structured PySpark script.

  4. Types of Features to Engineer: We will create a rich set of features that go beyond simple RFM to capture nuanced customer behavior:

    • Static RFM Features:

      • recency: Days since last purchase.

      • frequency: Total number of distinct purchase days.

      • monetary: Average spend per purchase day.

      • T: Tenure of the customer (days since first purchase).

    • Time-Windowed Features: To capture trends, we’ll calculate key metrics over multiple rolling windows (e.g., last 30, 90, 365 days):

      • purchase_count_30d, purchase_count_90d, purchase_count_365d

      • total_spend_30d, total_spend_90d, total_spend_365d

    • Behavioral Features: Aggregations from the clickstream data:

      • total_sessions_90d: Number of website/app sessions.

      • avg_session_duration_90d: Average time spent per session.

      • add_to_cart_count_90d: Number of times items were added to the cart.

  5. Artifacts to Generate:

    • Python/PySpark Script (src/): The main feature engineering script.

    • Unit Tests (tests/): Pytest unit tests for the Spark transformation functions using a local Spark session.

    • Pipeline Code (pipelines/): The Airflow DAG that orchestrates the EMR cluster and Spark job.

    • Infrastructure as Code (terraform/): Additions to our Terraform code to define IAM roles and permissions for EMR.

    • Architecture Diagram (Mermaid Code):

Implementation

Airflow DAG Script to Orchestrate Feature Generation

from airflow import DAG
from airflow.providers.amazon.aws.operators.emr import (
    EmrCreateJobFlowOperator,
    EmrAddStepsOperator,
    EmrTerminateJobFlowOperator,
)
from airflow.models.baseoperator import chain
from datetime import datetime

# --- Constants ---
S3_BUCKET = "airflow-bucket-name" # Bucket for Airflow logs and scripts
EMR_EC2_ROLE = "emr-ec2-instance-role"
EMR_SERVICE_ROLE = "emr-service-role"
FEATURE_GROUP_NAME = "clv-feature-group-v1"
AWS_REGION = "eu-west-1"

# EMR cluster configuration
JOB_FLOW_OVERRIDES = {
    "Name": "clv-feature-engineering-cluster",
    "ReleaseLabel": "emr-6.9.0",
    "Applications": [{"Name": "Spark"}],
    "Instances": {
        "InstanceGroups": [
            {
                "Name": "Master node",
                "Market": "ON_DEMAND",
                "InstanceRole": "MASTER",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 1,
            },
            {
                "Name": "Core nodes",
                "Market": "ON_DEMAND",
                "InstanceRole": "CORE",
                "InstanceType": "m5.xlarge",
                "InstanceCount": 2,
            },
        ],
        "KeepJobFlowAliveWhenNoSteps": False,
        "TerminationProtected": False,
    },
    "JobFlowRole": EMR_EC2_ROLE,
    "ServiceRole": EMR_SERVICE_ROLE,
    "VisibleToAllUsers": True,
}

# Spark job steps
SPARK_STEPS = [
    {
        "Name": "Generate CLV Features",
        "ActionOnFailure": "TERMINATE_JOB_FLOW",
        "HadoopJarStep": {
            "Jar": "command-runner.jar",
            "Args": [
                "spark-submit",
                "--deploy-mode", "cluster",
                f"s3://{S3_BUCKET}/scripts/generate_features.py",
                f"s3://clv-raw-data-bucket/transactional/",
                f"s3://clv-raw-data-bucket/behavioral/",
                FEATURE_GROUP_NAME,
                AWS_REGION,
            ],
        },
    }
]

with DAG(
    dag_id="clv_feature_generation_pipeline",
    start_date=datetime(2025, 1, 1),
    schedule_interval="@daily",
    catchup=False,
    tags=["clv", "feature-engineering"],
) as dag:
    # Task to create a transient EMR cluster
    create_emr_cluster = EmrCreateJobFlowOperator(
        task_id="create_emr_cluster",
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id="aws_default",
        emr_conn_id="emr_default",
    )

    # Task to add the Spark job step
    add_spark_step = EmrAddStepsOperator(
        task_id="add_spark_step",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        steps=SPARK_STEPS,
        aws_conn_id="aws_default",
    )

    # Task to terminate the EMR cluster
    terminate_emr_cluster = EmrTerminateJobFlowOperator(
        task_id="terminate_emr_cluster",
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_emr_cluster', key='return_value') }}",
        aws_conn_id="aws_default",
        trigger_rule="all_done",  # Ensures cluster is terminated even if the Spark job fails
    )

    # Define task dependencies
    chain(create_emr_cluster, add_spark_step, terminate_emr_cluster)

Python Script for Feature Generation

# src/generate_features.py

import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, max, min, countDistinct, sum, avg, datediff, expr, window
from pyspark.sql.types import TimestampType
import boto3

def get_sagemaker_feature_store_client(region):
    """Initializes the SageMaker Feature Store runtime client."""
    return boto3.client('sagemaker-featurestore-runtime', region_name=region)

def generate_features(spark, transactional_data_path, behavioral_data_path):
    """
    Reads raw data and generates a customer feature set.
    """
    # Load raw data
    trans_df = spark.read.parquet(transactional_data_path)
    behav_df = spark.read.parquet(behavioral_data_path)

    # --- Data Preparation ---
    trans_df = trans_df.withColumn("InvoiceDate", col("InvoiceDate").cast(TimestampType()))
    
    # --- Feature Engineering ---
    current_timestamp = datetime.utcnow()

    # 1. RFM-T Features
    customer_summary = trans_df.groupBy("CustomerID").agg(
        max("InvoiceDate").alias("last_purchase_date"),
        min("InvoiceDate").alias("first_purchase_date"),
        countDistinct("InvoiceNo").alias("frequency"),
        sum("SalePrice").alias("total_spend")
    )

    rfm_t_features = customer_summary.withColumn("recency", datediff(col("last_purchase_date"), col("first_purchase_date"))) \
        .withColumn("T", datediff(lit(current_timestamp), col("first_purchase_date"))) \
        .withColumn("monetary", col("total_spend") / col("frequency")) \
        .select("CustomerID", "recency", "frequency", "monetary", "T")

    # 2. Time-Windowed Features (Transactional)
    window_spec_30d = window(col("InvoiceDate"), "30 days")
    window_spec_90d = window(col("InvoiceDate"), "90 days")

    time_window_features = trans_df.groupBy("CustomerID").agg(
        countDistinct("InvoiceNo", window_spec_30d).alias("purchase_count_30d"),
        sum(col("SalePrice"), window_spec_30d).alias("total_spend_30d"),
        countDistinct("InvoiceNo", window_spec_90d).alias("purchase_count_90d"),
        sum(col("SalePrice"), window_spec_90d).alias("total_spend_90d"),
    )
    
    # 3. Behavioral Features (Aggregating last 90 days of behavioral data)
    ninety_days_ago = current_timestamp - expr("INTERVAL 90 DAYS")
    behavioral_summary = behav_df.filter(col("event_timestamp") >= ninety_days_ago) \
        .groupBy("CustomerID").agg(
            countDistinct("session_id").alias("total_sessions_90d"),
            avg("session_duration_seconds").alias("avg_session_duration_90d"),
            sum(expr("case when event_type = 'add_to_cart' then 1 else 0 end")).alias("add_to_cart_count_90d")
        )

    # --- Join all features ---
    final_features = rfm_t_features \
        .join(time_window_features, "CustomerID", "left") \
        .join(behavioral_summary, "CustomerID", "left") \
        .na.fill(0) # Fill nulls from joins with 0
    
    # Add EventTime feature required by Feature Store
    final_features = final_features.withColumn("EventTime", lit(current_timestamp).cast(TimestampType()))

    return final_features

def write_to_feature_store(df, feature_group_name, region):
    """
    Ingests a Spark DataFrame into a SageMaker Feature Store.
    This function would typically use the SageMaker Feature Store SDK.
    For a Spark job, a common pattern is to convert to Pandas and use boto3.
    """
    # NOTE: In a high-scale scenario, you might use a more direct Spark-to-FeatureStore connector.
    # For simplicity, this demonstrates the boto3 approach within a Spark context.
    featurestore_client = get_sagemaker_feature_store_client(region)
    
    def put_records(partition):
        for row in partition:
            record = [
                {'FeatureName': name, 'ValueAsString': str(value)}
                for name, value in row.asDict().items()
            ]
            featurestore_client.put_record(
                FeatureGroupName=feature_group_name,
                Record=record
            )

    df.foreachPartition(put_records)


if __name__ == "__main__":
    if len(sys.argv) != 5:
        print("Usage: spark-submit generate_features.py <transactional_data_path> <behavioral_data_path> <feature_group_name> <aws_region>")
        sys.exit(-1)

    spark = SparkSession.builder.appName("CLVFeatureGeneration").getOrCreate()

    transactional_data_path = sys.argv[1]
    behavioral_data_path = sys.argv[2]
    feature_group_name = sys.argv[3]
    aws_region = sys.argv[4]

    print(f"Starting feature generation...")
    features_df = generate_features(spark, transactional_data_path, behavioral_data_path)
    
    print(f"Writing {features_df.count()} records to Feature Group: {feature_group_name}")
    write_to_feature_store(features_df, feature_group_name, aws_region)
    
    print("Feature generation and ingestion complete.")
    spark.stop()

Unit Tests for Feature Generation script

import pytest
from pyspark.sql import SparkSession
from src.generate_features import generate_features
import pandas as pd
from datetime import datetime, timedelta

@pytest.fixture(scope="session")
def spark_session():
    """Creates a Spark session for testing."""
    return SparkSession.builder \
        .master("local[2]") \
        .appName("CLVFeatureTests") \
        .getOrCreate()

def test_feature_generation_logic(spark_session):
    """Tests the end-to-end feature generation function."""
    # Create mock transactional data
    trans_pd = pd.DataFrame({
        'CustomerID': [1, 1, 2, 1],
        'InvoiceNo': ['A1', 'A2', 'B1', 'A3'],
        'InvoiceDate': [
            datetime.now() - timedelta(days=100),
            datetime.now() - timedelta(days=50),
            datetime.now() - timedelta(days=20),
            datetime.now() - timedelta(days=10)
        ],
        'SalePrice': [10.0, 20.0, 50.0, 30.0]
    })
    # Create mock behavioral data
    behav_pd = pd.DataFrame({
        'CustomerID': [1, 2],
        'session_id': ['s1', 's2'],
        'event_timestamp': [datetime.now() - timedelta(days=5), datetime.now() - timedelta(days=15)],
        'session_duration_seconds': [120, 300],
        'event_type': ['add_to_cart', 'page_view']
    })

    trans_df = spark_session.createDataFrame(trans_pd)
    behav_df = spark_session.createDataFrame(behav_pd)
    
    # Create temporary paths for the test data
    trans_path = "file:///tmp/test_trans_data"
    behav_path = "file:///tmp/test_behav_data"
    trans_df.write.mode("overwrite").parquet(trans_path)
    behav_df.write.mode("overwrite").parquet(behav_path)

    # Run the feature generation function
    features_df = generate_features(spark_session, trans_path, behav_path)
    
    # Collect results for validation
    result = features_df.filter(col("CustomerID") == 1).first()

    assert result is not None
    assert result['CustomerID'] == 1
    assert result['recency'] == 90  # 100 days ago to 10 days ago
    assert result['T'] >= 100
    assert result['frequency'] == 3
    assert abs(result['monetary'] - 20.0) < 0.01 # (10+20+30)/3
    assert result['purchase_count_30d'] == 1
    assert result['add_to_cart_count_90d'] == 1

Integration Test for Feature Generation pipeline

import pytest
import boto3
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
import pandas as pd
from pyspark.sql import SparkSession
from datetime import datetime, timedelta
import time
import random

# Import the functions from our source script
from src.generate_features import generate_features, write_to_feature_store

# --- Fixtures for AWS Resource Management ---

@pytest.fixture(scope="session")
def spark_session():
    """Provides a Spark session for the test."""
    return SparkSession.builder.master("local[*]").appName("IntegrationTest").getOrCreate()

@pytest.fixture(scope="module")
def aws_region():
    return "eu-west-1"

@pytest.fixture(scope="module")
def sagemaker_session(aws_region):
    """Provides a SageMaker session."""
    boto_session = boto3.Session(region_name=aws_region)
    return sagemaker.Session(boto_session=boto_session)

@pytest.fixture(scope="module")
def test_s3_bucket(sagemaker_session):
    """Creates a temporary S3 bucket for the test run and cleans it up afterward."""
    bucket_name = f"clv-test-bucket-integration-{int(time.time())}-{random.randint(1000, 9999)}"
    s3 = sagemaker_session.boto_session.resource("s3")
    try:
        s3.create_bucket(Bucket=bucket_name, CreateBucketConfiguration={'LocationConstraint': sagemaker_session.boto_region_name})
        print(f"Created test bucket: {bucket_name}")
        yield bucket_name
    finally:
        print(f"Cleaning up bucket: {bucket_name}")
        bucket = s3.Bucket(bucket_name)
        bucket.objects.all().delete()
        bucket.delete()

@pytest.fixture(scope="module")
def feature_group_name():
    """Generates a unique feature group name for the test run."""
    return f"clv-test-fg-{int(time.time())}"

@pytest.fixture(scope="module")
def sagemaker_feature_group(sagemaker_session, feature_group_name, aws_region):
    """Creates a SageMaker Feature Group for the test and cleans it up afterward."""
    feature_group = FeatureGroup(name=feature_group_name, sagemaker_session=sagemaker_session)
    
    # Define the schema based on the output of our Spark job
    feature_definitions = [
        {"FeatureName": "CustomerID", "FeatureType": "String"},
        {"FeatureName": "recency", "FeatureType": "Integral"},
        {"FeatureName": "frequency", "FeatureType": "Integral"},
        {"FeatureName": "monetary", "FeatureType": "Fractional"},
        {"FeatureName": "T", "FeatureType": "Integral"},
        {"FeatureName": "purchase_count_30d", "FeatureType": "Integral"},
        {"FeatureName": "total_spend_30d", "FeatureType": "Fractional"},
        {"FeatureName": "purchase_count_90d", "FeatureType": "Integral"},
        {"FeatureName": "total_spend_90d", "FeatureType": "Fractional"},
        {"FeatureName": "total_sessions_90d", "FeatureType": "Integral"},
        {"FeatureName": "avg_session_duration_90d", "FeatureType": "Fractional"},
        {"FeatureName": "add_to_cart_count_90d", "FeatureType": "Integral"},
        {"FeatureName": "EventTime", "FeatureType": "String"}
    ]
    
    feature_group.feature_definitions = feature_definitions
    feature_group.record_identifier_name = "CustomerID"
    feature_group.event_time_feature_name = "EventTime"
    
    try:
        print(f"Creating Feature Group: {feature_group_name}")
        feature_group.create(
            s3_uri=f"s3://{sagemaker_session.default_bucket()}/{feature_group_name}",
            enable_online_store=True,
            role_arn="arn:aws:iam::${YOUR_ACCOUNT_ID}:role/service-role/AmazonSageMaker-ExecutionRole-..." # Replace with your actual SageMaker role ARN
        )
        # Wait for the feature group to be created
        while feature_group.describe().get("FeatureGroupStatus") == "Creating":
            print("Waiting for feature group creation...")
            time.sleep(10)
        yield feature_group_name
    finally:
        print(f"Deleting Feature Group: {feature_group_name}")
        try:
            feature_group.delete()
        except Exception as e:
            print(f"Error deleting feature group: {e}")


@pytest.fixture(scope="module")
def test_data_on_s3(spark_session, test_s3_bucket):
    """Creates mock raw data and uploads it to the test S3 bucket."""
    # Create mock transactional data
    trans_pd = pd.DataFrame({
        'CustomerID': ['101', '101', '102'],
        'InvoiceNo': ['A1', 'A2', 'B1'],
        'InvoiceDate': [datetime.now() - timedelta(days=60), datetime.now() - timedelta(days=10), datetime.now() - timedelta(days=30)],
        'SalePrice': [15.50, 25.00, 100.00]
    })
    
    # Create mock behavioral data
    behav_pd = pd.DataFrame({
        'CustomerID': ['101'],
        'session_id': ['s1'],
        'event_timestamp': [datetime.now() - timedelta(days=5)],
        'session_duration_seconds': [180],
        'event_type': ['add_to_cart']
    })
    
    trans_df = spark_session.createDataFrame(trans_pd)
    behav_df = spark_session.createDataFrame(behav_pd)

    trans_path = f"s3a://{test_s3_bucket}/transactional/"
    behav_path = f"s3a://{test_s3_bucket}/behavioral/"
    
    trans_df.write.mode("overwrite").parquet(trans_path)
    behav_df.write.mode("overwrite").parquet(behav_path)

    return trans_path, behav_path

# --- The Integration Test ---

def test_spark_job_s3_to_feature_store(spark_session, test_data_on_s3, sagemaker_feature_group, aws_region):
    """
    This test orchestrates the full feature engineering pipeline:
    1. Reads test data from a temporary S3 bucket.
    2. Runs the Spark feature generation logic.
    3. Writes the results to a temporary SageMaker Feature Group.
    4. Validates the data in the Feature Group.
    """
    # ARRANGE: Fixtures have already set up S3 data and the Feature Group.
    transactional_path, behavioral_path = test_data_on_s3
    feature_group_name = sagemaker_feature_group

    # ACT: Run the feature generation and ingestion logic.
    features_df = generate_features(spark_session, transactional_path, behavioral_path)
    write_to_feature_store(features_df, feature_group_name, aws_region)
    
    # Give a few seconds for the records to be available in the online store
    time.sleep(15)

    # ASSERT: Query the Feature Store to verify the results.
    featurestore_client = boto3.client('sagemaker-featurestore-runtime', region_name=aws_region)
    
    response = featurestore_client.get_record(
        FeatureGroupName=feature_group_name,
        RecordIdentifierValueAsString='101'
    )

    assert 'Record' in response, "Record for CustomerID 101 not found in Feature Store"
    
    # Convert the returned record to a more usable dictionary
    result_dict = {item['FeatureName']: item['ValueAsString'] for item in response['Record']}
    
    assert result_dict['CustomerID'] == '101'
    assert int(result_dict['recency']) == 50  # 60 days ago to 10 days ago
    assert int(result_dict['frequency']) == 2
    assert abs(float(result_dict['monetary']) - 20.25) < 0.01  # (15.50 + 25.00) / 2
    assert int(result_dict['purchase_count_30d']) == 1 # one purchase 10 days ago
    assert int(result_dict['add_to_cart_count_90d']) == 1

Model Development & Iteration

This is where raw data potential is transformed into a predictive model. The core philosophy is iterative and evidence-based: start simple, measure rigorously, and justify every increase in complexity with a tangible improvement against clear, predefined metrics.

This framework is organized into four pillars:

  1. Foundations for Success: The strategic setup required before experimentation begins.

  2. The Core Iterative Loop: The primary cycle of building, debugging, and improving the model.

  3. Advanced Optimization: Techniques to apply when initial iterations plateau.

  4. Validation and Governance: Ensuring the model is not just accurate, but also robust, fair, and ready for production.

I. Foundations for Success

  • Define Success Metrics:

    • Optimizing Metric: The single technical metric the model will be tuned to improve (e.g., RMSE, F1-Score).

    • Satisficing Metrics: A set of operational or business constraints the model must meet to be considered viable (e.g., inference latency <100ms, fairness across key segments).

    • Business KPI: The high-level business metric the model is intended to influence (e.g., customer retention rate, marketing ROI).

  • Establish Data Strategy:

    • Data Splitting: For time-series problems like CLV, a strict temporal split (e.g., train on months 1-9, validate on months 10-12) is non-negotiable to prevent data leakage.

    • Data Quality: Ensure the dev/test sets reflect the expected production data distribution. Use a schema to detect anomalies.

  • Establish Baselines:

    • Non-ML Baseline: A simple heuristic (e.g., Avg. Spend x Avg. Lifespan) to quantify the value of using ML.

    • Simple ML Baseline: A simple, interpretable model (e.g., Linear Regression) to validate the end-to-end pipeline and set an initial performance floor.

    • Human-Level Performance (HLP): If applicable, estimate the performance of a human expert to understand the “avoidable bias.”

II. The Core Iterative Loop

  • Experiment & Model Selection:

    • Start with the simplest model that can do the job (e.g., Gradient Boosting for tabular data).

    • Compare model families (Probabilistic vs. Ensemble vs. Deep Learning), weighing trade-offs between accuracy, interpretability, and operational cost.

    • Formulate clear, testable hypotheses for each experiment (e.g., “Adding behavioral features will reduce RMSE by 5%”).

  • Track Everything:

    • Use an experiment tracking tool (like MLflow) to log every run’s:

      • Code Version (Git Hash)

      • Data Version (DVC Hash)

      • Hyperparameters

      • Evaluation Metrics

      • Model Artifacts

  • Debug & Diagnose Systematically:

    • Bias vs. Variance: Use learning curves to determine if the model is underfitting (high bias) or overfitting (high variance). This guides the next steps (e.g., “get more data” vs. “use a bigger model”).

    • Error Analysis: Manually inspect the examples where the model fails most significantly. This often reveals data quality issues or opportunities for new features.

    • Feature Importance: Use techniques like SHAP to understand which features are driving predictions. This aids debugging and builds business trust.

III. Advanced Optimization

  • Iterative Feature Engineering:

    • Continuously add, remove, or transform features based on error analysis and feature importance results. Use a Feature Store to manage this process scalably.

  • Automated Hyperparameter Optimization (HPO):

    • Use model-based methods like Bayesian Optimization (available in SageMaker, Vertex AI, or via libraries like Optuna) to efficiently search for the best hyperparameters.

  • Ensemble Methods:

    • If performance plateaus, consider simple ensembles (e.g., averaging predictions from an XGBoost and a linear model) to improve robustness and accuracy.

IV. Validation and Governance

  • Slice-Based Evaluation: Do not trust a single, global performance metric. Evaluate model performance across critical business segments (e.g., by country, acquisition channel) to uncover hidden biases or performance gaps.

  • Model Calibration: Check if the model’s predicted outputs align with real-world averages. A model that predicts an average CLV of $500 for a segment should be checked against the actual average CLV of that segment.

  • Model Registry & Versioning:

    • Promote the final, validated model artifact to a Model Registry (like MLflow’s).

    • Tag the model with its version, performance metrics, and a link back to the training experiment, creating an auditable and governable asset ready for deployment.

Applying the Framework to the CLV Project

Here is how we applied this framework to our e-commerce Customer Lifetime Value prediction project:

Factor to Consider

Decision / Choice Made

Rationale & Trade-offs

1. Success Metrics

Optimizing: Root Mean Squared Error (RMSE).
Satisficing: Gini Coefficient > 0.3, Batch Inference Time < 2 hours.
Business KPI: Increase in 12-month revenue from targeted retention campaigns.

RMSE was chosen to penalize large prediction errors heavily. The Gini coefficient ensures the model can effectively rank customers, which is vital for targeting campaigns. The time constraint ensures the pipeline fits within our nightly batch window.

2. Data Splitting

Strict temporal split. We trained the model on data up to a fixed cutoff date and evaluated it on the subsequent 3-month period.

This is the only correct way to validate a time-series forecasting model. A random split would leak future information into the training set, leading to misleadingly optimistic performance metrics.

3. Baseline Models

1. Heuristic: Simple average spend per customer.
2. ML Baseline: A Linear Regression model using only RFM features.

The heuristic model established the absolute minimum performance bar. The linear model validated our end-to-end pipeline and proved that even basic ML could outperform simple averages.

4. Primary Model Choice

XGBoost (Gradient Boosting Machine)

We chose XGBoost over simpler models for its high performance and over deep learning for its lower data requirements and better training efficiency on tabular data. It represents the industry standard for this type of problem.

5. Experiment Tracking

MLflow

Aligned with our tech stack, MLflow provides a robust open-source solution for tracking experiments, versioning data/code, and managing the model lifecycle through its Model Registry.

6. Debugging & Diagnostics

Used SHAP (SHapley Additive exPlanations) for feature importance and to explain individual predictions. Analyzed learning curves to balance bias and variance.

SHAP was critical for gaining business trust by explaining why a customer was predicted to be high-value. Learning curves guided our decision to invest more in feature engineering rather than just a bigger model.

7. Hyperparameter Optimization

Amazon SageMaker’s Automatic Model Tuning (using Bayesian Optimization).

We leveraged a managed cloud service to automate this computationally intensive task. This freed up engineering time and ran parallel trials more efficiently than a manual or grid search approach.

8. Validation & Governance

Evaluated model RMSE on slices based on Acquisition Channel and Country. Promoted the final model to the MLflow Model Registry with a “production-ready” tag.

Slice-based evaluation ensured our model was not just accurate overall, but also fair and effective for key business segments. The Model Registry provides the formal, auditable hand-off from development to production deployment.

A Step-by-Step Experimental Journey

Our approach to model development was methodical and iterative. We started with the simplest possible baselines to establish a performance floor and justify every increase in complexity with measurable improvements in our key metrics: Root Mean Squared Error (RMSE) for predictive accuracy and the Gini Coefficient for ranking ability.

Here is a summary of the experiments conducted:

Experiment No.

Model/Technique Applied

Features Used

Result & Key Learning

1. Heuristic Baseline

Calculated CLV as (Avg. Monthly Spend per Customer) x (Avg. Customer Lifespan in Months).

Basic historical transaction data.

Result: Provided a single, company-wide CLV number.
Learning: This approach was too generic. It couldn’t differentiate between new and old customers or identify high-value segments, making it unusable for personalized marketing. It served as our “minimum-bar” baseline to beat.

2. Simple ML Baseline

Linear Regression

Foundational RFM features (Recency, Frequency, Monetary).

Result: 35% reduction in RMSE compared to the error of the heuristic baseline’s single prediction.
Learning: This validated that even the simplest ML model, by accounting for individual customer behavior, provided a significant accuracy boost. It also proved our end-to-end data pipeline was working correctly.

3. Probabilistic Model

BG/NBD + Gamma-Gamma Model

Foundational RFM features.

Result: Performed similarly to Linear Regression on RMSE but provided a high degree of interpretability.
Learning: This model was excellent for understanding churn probability and purchase frequency drivers. We decided to keep it as a secondary, explainable model for the business, but its inability to use other features limited its predictive power.

4. Gradient Boosting Model

XGBoost

Foundational RFM features.

Result: 20% further reduction in RMSE over the Linear Regression baseline.
Learning: XGBoost’s ability to capture non-linear relationships in the RFM features provided a substantial performance lift. This became our new champion model.

5. Adding Temporal Features

XGBoost

RFM + Time-Windowed Features (e.g., spend and frequency over the last 30, 90, 365 days).

Result: 15% reduction in RMSE and a significant 10% increase in the Gini Coefficient.
Learning: Capturing trends in customer behavior was highly predictive. A customer whose spending is recent and increasing is far more valuable than one with the same total spend spread out over years.

6. Adding Behavioral Features

XGBoost

All previous features + Behavioral Data (e.g., session counts, add-to-cart events).

Result: 10% reduction in RMSE. The improvement was most pronounced for newer customers with limited transaction history.
Learning: Clickstream data provides powerful early signals of customer intent and engagement, helping the model make better predictions before a purchase history is established.

7. Hyperparameter Optimization

XGBoost with Amazon SageMaker Automatic Model Tuning (Bayesian Optimization).

All available features.

Result: 5% final reduction in RMSE and a 5% increase in the Gini Coefficient.
Learning: Automated HPO squeezed out the final percentage points of performance by fine-tuning the model’s complexity and regularization, leading to a more robust and generalized final model.

Final Outcome: Through this structured, iterative process, we developed a final XGBoost model that was over 60% more accurate (in terms of RMSE) than our initial simple baseline and demonstrated a strong ability to accurately rank our most valuable customers. Each step provided valuable insights and justified the subsequent increase in complexity.

ML Training pipelines

Plan

With SageMaker Pipelines as the core execution engine, orchestrated by Airflow.

Component

Plan & Implementation Details

Python Scripts (src/)

We will create modular Python scripts for each step of the SageMaker Pipeline:
- preprocess.py: Loads data from the Feature Store, performs final transformations (e.g., scaling), and splits data. Saves the fitted scaler object.
- train.py: Takes preprocessed data, trains the XGBoost model, and saves the model artifact.
- evaluate.py: Takes the trained model and test data, calculates metrics (RMSE, Gini), and generates an evaluation report.

Unit Tests (tests/)

We will write pytest unit tests for the business logic within preprocess.py, train.py, and evaluate.py to ensure their correctness in isolation.

Pipeline Code

1. SageMaker Pipeline (pipelines/build_pipeline.py): A Python script using the SageMaker SDK to define the DAG of ML steps (Preprocess -> Tune -> Train -> Evaluate -> Register).
2. Airflow DAG (pipelines/dag_trigger_training.py): A simple DAG that triggers the execution of the SageMaker Pipeline on a weekly schedule.

Infrastructure as Code (terraform/)

We will add Terraform code to provision the necessary IAM roles for SageMaker Pipelines to execute and access other AWS services (S3, Feature Store, etc.).

Integration Test (tests/)

A pytest integration test that will:
1. Programmatically trigger the Airflow DAG.
2. Poll for the completion status of the SageMaker Pipeline execution.
3. Check the MLflow Model Registry to assert that a new model version was (or was not) created based on the test outcome.

Architecture Diagram (Mermaid)

A single, clear diagram illustrating how Airflow triggers the SageMaker Pipeline and how the pipeline steps interact with the Feature Store, S3, and Model Registry.

Why Amazon SageMaker Pipelines for the core ML steps.

Instead of having Airflow directly manage each individual ML step (processing, training, evaluation), a more modern and robust pattern is to use Amazon SageMaker Pipelines.

  • How it works: We will define the ML workflow (preprocess, train, evaluate, register) as a single, powerful SageMaker Pipeline. The Airflow DAG’s role then becomes much simpler: its only job is to trigger this SageMaker Pipeline on a schedule.

Why is this a better approach?

  1. Tightly Integrated: SageMaker Pipelines are deeply integrated with the entire SageMaker ecosystem (Feature Store, Training Jobs, Model Registry). This reduces boilerplate code and simplifies passing data between steps.

  2. ML-Specific Features: It provides features that Airflow doesn’t have out-of-the-box, such as experiment tracking integration, caching of pipeline steps (to avoid re-running steps with unchanged inputs), and more granular ML-focused monitoring.

  3. Decoupling: It decouples your general-purpose orchestrator (Airflow) from your ML-specific logic. Your Airflow DAGs remain clean and simple, while the complexity of the ML workflow is encapsulated within SageMaker. This makes the system easier to maintain.

This leverages the best of both worlds: Airflow for enterprise-wide scheduling and dependency management, and SageMaker Pipelines for its powerful, purpose-built features for orchestrating ML workflows.

Implementation

Python Component Scripts

#src/preprocess.py
def clean_and_split_data(df: pd.DataFrame, target_col: str):
    """
    Cleans the input dataframe and splits it into training and testing sets.
    
    Args:
        df (pd.DataFrame): The input data.
        target_col (str): The name of the target variable column.
        
    Returns:
        tuple: A tuple containing X_train, X_test, y_train, y_test.
    """
    df_cleaned = df.dropna(subset=[target_col])
    
    features = [col for col in df_cleaned.columns if col not in ['CustomerID', target_col, 'EventTime']]
    X = df_cleaned[features]
    y = df_cleaned[target_col]

    return train_test_split(X, y, test_size=0.2, random_state=42)

def scale_features(X_train: pd.DataFrame, X_test: pd.DataFrame):
    """
    Fits a StandardScaler on the training data and transforms both training and testing data.
    
    Args:
        X_train (pd.DataFrame): Training feature set.
        X_test (pd.DataFrame): Testing feature set.
        
    Returns:
        tuple: A tuple containing the scaled training data, scaled testing data, and the fitted scaler object.
    """
    scaler = StandardScaler()
    X_train_scaled = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns)
    X_test_scaled = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns)
    
    return X_train_scaled, X_test_scaled, scaler
# src/train.py
def train_model(x_train: pd.DataFrame, y_train: pd.DataFrame, hyperparameters: dict):
    """
    Trains an XGBoost regressor model.

    Args:
        x_train (pd.DataFrame): Training feature data.
        y_train (pd.DataFrame): Training target data.
        hyperparameters (dict): Dictionary of hyperparameters for the model.

    Returns:
        xgb.XGBRegressor: The trained model object.
    """
    model = xgb.XGBRegressor(
        objective='reg:squarederror',
        random_state=42,
        **hyperparameters
    )
    model.fit(x_train, y_train)
    return model
# src/evaluate.py
def gini_coefficient(y_true, y_pred):
    """Calculates the Gini coefficient to measure model ranking ability."""
    if len(y_true) == 0: return 0.0
    df = pd.DataFrame({'true': y_true, 'pred': y_pred}).sort_values('pred', ascending=False)
    df['cumulative_true'] = df['true'].cumsum()
    total_true = df['true'].sum()
    if total_true == 0: return 0.0
    df['cumulative_true_percent'] = df['cumulative_true'] / total_true
    
    area_under_curve = df['cumulative_true_percent'].sum() / len(df)
    return (area_under_curve - 0.5) / 0.5

def evaluate_model(model, x_test: pd.DataFrame, y_test: pd.DataFrame):
    """
    Evaluates the model and returns a dictionary of metrics.

    Args:
        model: The trained model object.
        x_test (pd.DataFrame): Testing feature data.
        y_test (pd.DataFrame): Testing target data.

    Returns:
        dict: A dictionary containing evaluation metrics.
    """
    predictions = model.predict(x_test)
    rmse = np.sqrt(mean_squared_error(y_test, predictions))
    gini = gini_coefficient(y_test.values.flatten(), predictions)
    
    metrics = {
        "regression_metrics": {
            "rmse": {"value": rmse},
            "gini": {"value": gini}
        }
    }
    return metrics

Infrastructure as Code

# terraform/aws_sagemaker_roles.tf

resource "aws_iam_role" "sagemaker_pipeline_execution_role" {
  name = "sagemaker-pipeline-execution-role"

  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [
      {
        Effect    = "Allow",
        Principal = {
          Service = "sagemaker.amazonaws.com"
        },
        Action    = "sts:AssumeRole"
      }
    ]
  })
}

# Granting broad SageMaker permissions for simplicity in this example.
# In a real production environment, you would scope this down significantly.
resource "aws_iam_role_policy_attachment" "sagemaker_full_access" {
  role       = aws_iam_role.sagemaker_pipeline_execution_role.name
  policy_arn = "arn:aws:iam::aws:policy/AmazonSageMakerFullAccess"
}

# Policy allowing access to the S3 bucket for artifacts and the Feature Store
resource "aws_iam_policy" "sagemaker_pipeline_custom_policy" {
  name        = "sagemaker-pipeline-s3-featurestore-policy"
  description = "Allows SageMaker pipelines to access project resources"

  policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [
      {
        Effect   = "Allow",
        Action   = [
          "s3:GetObject",
          "s3:PutObject",
          "s3:ListBucket"
        ],
        Resource = [
          "arn:aws:s3:::clv-artifacts-bucket",
          "arn:aws:s3:::clv-artifacts-bucket/*"
        ]
      },
      {
        Effect   = "Allow",
        Action   = [
          "sagemaker:DescribeFeatureGroup",
          # Permissions for Athena query to get data from offline store
          "athena:StartQueryExecution",
          "athena:GetQueryExecution",
          "athena:GetQueryResults",
          "glue:GetTable"
        ],
        Resource = "*" # Scope down in production
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "sagemaker_pipeline_custom_policy_attach" {
  role       = aws_iam_role.sagemaker_pipeline_execution_role.name
  policy_arn = aws_iam_policy.sagemaker_pipeline_custom_policy.arn
}

SageMaker Pipeline Definition

# pipelines/build_pipeline.py
import sagemaker
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.steps import ProcessingStep, TrainingStep
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
from sagemaker.sklearn.estimator import SKLearn
from sagemaker.xgboost.estimator import XGBoost
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet
from sagemaker.model_metrics import ModelMetrics, MetricsSource
from sagemaker.workflow.model_step import ModelStep
import boto3

def get_sagemaker_pipeline(role, s3_bucket):
    # Define Processors
    sklearn_processor = ScriptProcessor(
        image_uri='your-account-id.dkr.ecr.eu-west-1.amazonaws.com/sagemaker-scikit-learn:1.0-1', # Replace with your ECR image URI for sklearn
        command=['python3'],
        instance_type='ml.m5.large',
        instance_count=1,
        base_job_name='clv-preprocess',
        role=role
    )

    # 1. Preprocessing Step
    step_preprocess = ProcessingStep(
        name="PreprocessData",
        processor=sklearn_processor,
        inputs=[ProcessingInput(source=f"s3://{s3_bucket}/feature-data/features.csv", destination="/opt/ml/processing/input")],
        outputs=[
            ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train"),
            ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test"),
            ProcessingOutput(output_name="scaler", source="/opt/ml/processing/output/scaler")
        ],
        code="src/preprocess.py"
    )

    # 2. Training Step
    xgb_estimator = XGBoost(
        entry_point="src/train.py",
        role=role,
        instance_count=1,
        instance_type='ml.m5.xlarge',
        framework_version='1.5-1',
        hyperparameters={'max_depth': 5, 'n_estimators': 150}
    )
    
    step_train = TrainingStep(
        name="TrainModel",
        estimator=xgb_estimator,
        inputs={
            "train": sagemaker.inputs.TrainingInput(
                s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri
            )
        }
    )

    # 3. Evaluation Step
    step_evaluate = ProcessingStep(
        name="EvaluateModel",
        processor=sklearn_processor,
        inputs=[
            ProcessingInput(source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model"),
            ProcessingInput(source=step_preprocess.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, destination="/opt/ml/processing/test")
        ],
        outputs=[ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation")],
        code="src/evaluate.py"
    )

    # 4. Conditional Model Registration Step
    model_metrics = ModelMetrics(
        model_statistics=MetricsSource(
            s3_uri=f"{step_evaluate.properties.ProcessingOutputConfig.Outputs['evaluation'].S3Output.S3Uri}/evaluation.json",
            content_type="application/json"
        )
    )

    # Define a condition to check if RMSE is below a threshold
    cond_lte = ConditionLessThanOrEqualTo(
        left=JsonGet(
            step_name=step_evaluate.name,
            property_file="evaluation.json",
            json_path="regression_metrics.rmse.value"
        ),
        right=250.0  # Your RMSE threshold
    )
    
    # Define the registration step
    # First, create a model package
    model = sagemaker.Model(
        image_uri=xgb_estimator.image_uri,
        model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
        sagemaker_session=sagemaker.Session(),
        role=role
    )
    
    step_create_model = ModelStep(
        name="CreateModelPackage",
        model=model,
        model_package_group_name="CLVModelPackageGroup" # Must be created beforehand
    )

    step_conditional_register = ConditionStep(
        name="CheckEvaluationAndRegister",
        conditions=[cond_lte],
        if_steps=[step_create_model],
        else_steps=[],
    )

    # Create and return the pipeline
    pipeline = Pipeline(
        name="CLV-Training-Pipeline",
        parameters=[],
        steps=[step_preprocess, step_train, step_evaluate, step_conditional_register],
        sagemaker_session=sagemaker.Session()
    )
    return pipeline

if __name__ == "__main__":
    # This part is for creating/updating the pipeline definition in SageMaker
    sagemaker_role_arn = "arn:aws:iam::ACCOUNT_ID:role/sagemaker-pipeline-execution-role" # Replace
    s3_artifact_bucket = "clv-artifacts-bucket" # Replace
    
    pipeline = get_sagemaker_pipeline(role=sagemaker_role_arn, s3_bucket=s3_artifact_bucket)
    pipeline.upsert(role_arn=sagemaker_role_arn)

Airflow DAG

# pipelines/dag_trigger_training.py
from airflow import DAG
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerPipelineOperator
from datetime import datetime

with DAG(
    dag_id="clv_trigger_sagemaker_training",
    start_date=datetime(2025, 1, 1),
    schedule_interval="@weekly",
    catchup=False,
    tags=['clv', 'training', 'sagemaker'],
) as dag:
    trigger_sagemaker_pipeline = SageMakerPipelineOperator(
        task_id="trigger_training_pipeline",
        pipeline_name="CLV-Training-Pipeline",
        aws_conn_id="aws_default",
    )

Integration Test

# tests/integration/test_training_pipeline_integration.py
import pytest
import boto3
from airflow.models.dagbag import DagBag
from airflow.models.dagrun import DagRun
from airflow.utils.state import State
import time

@pytest.mark.integration
def test_training_pipeline_dag_runs_successfully():
    """
    Tests the end-to-end execution by triggering the Airflow DAG
    and monitoring the SageMaker Pipeline.
    """
    # Load the Airflow DAG
    dagbag = DagBag(dag_folder='pipelines/', include_examples=False)
    dag = dagbag.get_dag('clv_trigger_sagemaker_training')
    assert dag is not None, "DAG not found"

    # Manually trigger the DAG
    dag.clear()
    dag_run = dag.create_dagrun(
        state=State.RUNNING,
        run_id=f"test_run_{int(time.time())}",
        conf={},
    )
    
    print(f"Triggered DAG run: {dag_run.run_id}")

    # Monitor the DAG run and the underlying SageMaker Pipeline
    # This is a simplified check. A real-world test would need more robust polling.
    sagemaker_client = boto3.client("sagemaker")
    
    time.sleep(60) # Give Airflow time to start the SageMaker Pipeline

    # Find the latest pipeline execution
    executions = sagemaker_client.list_pipeline_executions(
        PipelineName="CLV-Training-Pipeline",
        SortBy="CreationTime",
        SortOrder="Descending"
    )['PipelineExecutionSummaries']
    
    assert len(executions) > 0, "No SageMaker pipeline execution found"
    
    latest_execution_arn = executions[0]['PipelineExecutionArn']
    print(f"Monitoring SageMaker Pipeline execution: {latest_execution_arn}")

    # Poll until the pipeline execution is complete
    timeout = time.time() + 60*30 # 30 minutes timeout
    while time.time() < timeout:
        response = sagemaker_client.describe_pipeline_execution(
            PipelineExecutionArn=latest_execution_arn
        )
        status = response['PipelineExecutionStatus']
        if status in ['Succeeded', 'Failed', 'Stopped']:
            break
        print(f"Pipeline status: {status}. Waiting...")
        time.sleep(30)

    assert status == 'Succeeded', f"SageMaker pipeline did not succeed. Final status: {status}"

    # ASSERTION: Check if a new model was registered in the MLflow Model Registry
    # (This would require MLflow client setup and querying logic)
    # mlflow_client = mlflow.tracking.MlflowClient()
    # versions = mlflow_client.get_latest_versions("clv-model")
    # assert len(versions) > 0, "No model was registered in MLflow"

ML Training Pipeline CI Workflow

Here is the plan and the corresponding GitHub Actions workflow file.

  1. Trigger: The workflow will be automatically triggered on every pull_request to the main branch. This ensures that no code can be merged without passing all quality checks.

  2. Environment Setup: It will check out the code and set up a clean Python environment with all necessary dependencies. It will also securely configure AWS credentials to allow for interaction with AWS services during testing.

  3. Static Analysis: It will run fast, static checks first, including code linting (flake8) and formatting checks (black), to catch basic errors without needing to execute any code.

  4. Unit Testing: It will execute the pytest suite located in the tests/unit/ directory. This validates that the core logic within our Python scripts (preprocess.py, train.py, evaluate.py) works as expected in isolation.

  5. Pipeline Integrity & Integration Testing: This is the most crucial part. It validates that all the pieces of our training pipeline work together.

    • Terraform Validation: It will run terraform validate and tflint on our infrastructure code to catch syntax errors or non-best-practice configurations.

    • SageMaker Pipeline Definition Test: It will execute our pipelines/build_pipeline.py script. The script will be designed to build the SageMaker Pipeline object in memory without actually deploying it. This acts as a powerful integration test, confirming that all the SDK calls and parameter integrations are correct and that the pipeline definition can be successfully compiled.

If all these steps pass, the pull request will show a green checkmark, giving the repository maintainer confidence that the proposed changes are safe to merge.

# .github/workflows/ci_training_pipeline.yml

name: "CI for ML Training Pipeline"

on:
  pull_request:
    branches:
      - main
    paths:
      - 'src/**'
      - 'pipelines/**'
      - 'tests/**'
      - 'terraform/**'
      - '.github/workflows/ci_training_pipeline.yml'

jobs:
  validate-training-pipeline:
    runs-on: ubuntu-latest
    
    permissions:
      id-token: write  # Required for OIDC AWS authentication
      contents: read
      
    steps:
      - name: Checkout Repository
        uses: actions/checkout@v4

      - name: Set up Python 3.9
        uses: actions/setup-python@v4
        with:
          python-version: 3.9

      - name: Install Python Dependencies
        run: |
          python -m pip install --upgrade pip
          pip install -r requirements.txt # Assuming a requirements.txt in the root
          pip install -r tests/requirements.txt # For test-specific libraries like pytest, moto

      - name: Configure AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/github-actions-ci-role # Replace with your IAM role for GitHub Actions
          aws-region: eu-west-1

      - name: Run Linting and Formatting Checks
        run: |
          pip install flake8 black
          flake8 src/ pipelines/ tests/
          black --check src/ pipelines/ tests/

      - name: Run Unit Tests
        run: |
          pytest tests/unit/

      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v2

      - name: Validate Infrastructure as Code (Terraform)
        run: |
          cd terraform
          terraform init -backend=false
          terraform validate
          tflint --recursive
        
      - name: Validate SageMaker Pipeline Definition
        run: |
          # This step runs the pipeline build script.
          # The script should be designed to build the pipeline object
          # and perform validations without deploying.
          # We can add a --dry-run flag to our script for this purpose.
          python pipelines/build_pipeline.py --role-arn ${{ secrets.SAGEMAKER_EXECUTION_ROLE_ARN }} --bucket-name ${{ secrets.ARTIFACT_BUCKET }} --dry-run

ML Training Pipeline CD Workflow Plan

  1. Trigger: The workflow is triggered on every push to the main branch.

  2. Deploy Pipeline to Staging: It runs the pipelines/build_pipeline.py script without the --dry-run flag, pointing to the staging AWS environment. This upsert action creates or updates the SageMaker Pipeline definition.

  3. Run Pipeline in Staging: It then makes an AWS API call to start an execution of this newly deployed SageMaker Pipeline.

  4. Monitor Run: The final step is to wait for the pipeline execution to complete and check its status. If the pipeline run fails in Staging, the entire CD workflow fails.

# .github/workflows/cd_training_pipeline.yml

name: "CD for ML Training Pipeline"

on:
  push:
    branches:
      - main
    paths:
      - 'src/**'
      - 'pipelines/**'
      - 'terraform/**'
      
jobs:
  deploy-and-run-in-staging:
    runs-on: ubuntu-latest
    environment: staging # This links to GitHub Environments for managing secrets
    
    permissions:
      id-token: write
      contents: read
      
    steps:
      - name: Checkout Repository
        uses: actions/checkout@v4

      - name: Set up Python 3.9
        uses: actions/setup-python@v4
        with:
          python-version: 3.9

      - name: Install Python Dependencies
        run: |
          pip install -r requirements.txt
          pip install boto3

      - name: Configure Staging AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.STAGING_AWS_ROLE_ARN }} # Secret stored in GitHub Environment
          aws-region: eu-west-1

      - name: Deploy SageMaker Pipeline to Staging
        id: deploy_pipeline
        run: |
          echo "Deploying SageMaker Pipeline definition to Staging..."
          python pipelines/build_pipeline.py \
            --role-arn ${{ secrets.STAGING_SAGEMAKER_ROLE_ARN }} \
            --bucket-name ${{ secrets.STAGING_ARTIFACT_BUCKET }}
      
      - name: Start SageMaker Pipeline Execution in Staging
        id: start_execution
        run: |
          echo "Starting pipeline execution..."
          EXECUTION_ARN=$(aws sagemaker start-pipeline-execution \
            --pipeline-name CLV-Training-Pipeline \
            --query 'PipelineExecutionArn' \
            --output text)
          echo "Pipeline execution started: $EXECUTION_ARN"
          echo "execution_arn=$EXECUTION_ARN" >> $GITHUB_OUTPUT

      - name: Wait for Pipeline Execution to Complete
        run: |
          echo "Waiting for pipeline execution to complete..."
          aws sagemaker wait pipeline-execution-complete --pipeline-execution-arn ${{ steps.start_execution.outputs.execution_arn }}
          
          echo "Checking final status..."
          STATUS=$(aws sagemaker describe-pipeline-execution --pipeline-execution-arn ${{ steps.start_execution.outputs.execution_arn }} --query 'PipelineExecutionStatus' --output text)
          
          if [ "$STATUS" != "Succeeded" ]; then
            echo "Pipeline execution failed with status: $STATUS"
            exit 1
          fi
          echo "Pipeline execution succeeded!"

Inference Pipeline

1. High-Level Strategy: Choosing the Deployment Pattern

  • Business Need: The primary goal is to provide updated CLV scores for the marketing and CRM teams to design weekly or monthly campaigns.

  • Latency Requirement: Predictions are not needed in real-time. A delay of several hours is perfectly acceptable.

  • Data Freshness: Features are updated daily, so predictions based on daily-refreshed data are sufficient.

Decision: Based on these requirements, we will implement a Batch Prediction (Asynchronous Inference) strategy. This pattern is ideal for our use case because it is “simpler to implement, cost-effective for large volumes, and allows for high throughput.”

2. Architectural Plan: Components and Tooling

Component / Consideration

Decision / Implementation Choice

Rationale (Applying the Guide)

Workflow Orchestrator

Apache Airflow (on AWS MWAA)

Aligns with our existing tech stack for the training pipeline. Airflow is the industry standard for scheduling and managing complex, multi-step batch jobs, as highlighted in the guide.

Batch Prediction Job/Processor

Amazon SageMaker Batch Transform

This is a fully managed AWS service purpose-built for this task. It eliminates the need to manage our own compute cluster for inference, which is more cost-effective and operationally simpler than using a persistent EMR cluster for this job. It directly aligns with the “Cloud Services” tooling mentioned in the guide.

Model Source

MLflow Model Registry

The pipeline will be configured to automatically fetch the model version currently tagged as “Production” from our MLflow Model Registry. This ensures a governed, auditable link between a production run and a specific, approved model artifact

Feature Source

Amazon SageMaker Feature Store

To prevent training-serving skew, the Batch Transform job will not re-calculate features. Instead, it will retrieve the latest pre-calculated features for each customer directly from the SageMaker Feature Store. This guarantees consistency between the features used for training and inference.

Input Data Source

S3 Bucket

A simple text file containing the list of all active CustomerIDs to be scored will be the input for the Batch Transform job.

Prediction Store

1. S3 Bucket (Primary Output)
2. Data Warehouse (e.g., Redshift)

The Batch Transform job will write its output (a CSV/Parquet file of CustomerID, CLV_Prediction, ModelVersion, PredictionTimestamp) to a designated S3 bucket. A final step in the Airflow DAG will then load this data into the main data warehouse, making it accessible to BI tools and marketing platforms.

3. Core Pipeline Artifacts to Be Implemented

  1. Inference Script (src/batch_inference.py): A Python script containing the core transformation logic. This script will define how SageMaker Batch Transform should handle incoming data, load the model, fetch features, and generate a prediction.

  2. Airflow DAG (pipelines/dag_batch_inference.py): The orchestration logic that defines the end-to-end pipeline:

    • Trigger: Runs on a weekly schedule.

    • Steps:

      1. Fetch the S3 URI of the “Production” model from the MLflow Model Registry.

      2. Initiate the SageMaker Batch Transform job, passing the model URI and the location of the active customer list.

      3. Monitor the job until completion.

      4. Upon success, trigger a subsequent task to load the predictions from the output S3 bucket into the data warehouse.

  3. Infrastructure as Code (terraform/aws_sagemaker_inference.tf): Terraform code to provision the necessary IAM roles and permissions for the SageMaker Batch Transform job to access S3, the Feature Store, and the Model Registry.

  4. Architecture Diagram (Mermaid Code): A clear diagram illustrating the entire batch inference flow.

4. Testing the Inference Pipeline in a Staging Environment

Following the principles in the guide, we must test the inference pipeline’s code and infrastructure, not just the model.

  • Objective: To validate that the entire pipeline mechanism works correctly before deploying it to production. This includes testing the Airflow DAG logic, the inference script, and all IAM permissions.

  • Process:

    1. A staging version of the Airflow DAG will be deployed to our staging Airflow instance.

    2. This DAG will be configured to use a “Staging” tagged model from the MLflow registry.

    3. The pipeline will be run against a small, representative sample of staging customer data.

  • Success Criteria:

    • The DAG completes successfully without any operational errors.

    • The SageMaker Batch Transform job runs without permission or code errors.

    • The output predictions are written to the staging S3 bucket in the correct format and schema.

5. CI/CD for the Inference Pipeline

As discussed, this pipeline requires its own CI/CD workflow, separate from the training pipeline.

  • Continuous Integration (CI): Triggered on a pull request with changes to the inference pipeline code.

    • Runs static analysis and unit tests on src/batch_inference.py.

    • Validates the Airflow DAG syntax.

    • Validates the Terraform configuration for inference resources.

  • Continuous Delivery (CD): Triggered on a merge to the main branch.

    • Deploys the updated Airflow DAG to the Staging Airflow environment.

    • Automatically runs the Staging Test described in the section above to verify the full pipeline execution.

    • Requires manual approval to deploy the DAG to the Production Airflow environment.

This plan provides a robust, secure, and cost-effective solution for our batch inference needs, directly applying the best practices for Model Serving

Implementation

Batch Inference Pipeline Architecture Diagram

Infrastructure as Code

This Terraform code provisions the IAM Role that the SageMaker jobs (both the Transform job and the model object itself) will use.

# terraform/aws_sagemaker_inference.tf

resource "aws_iam_role" "sagemaker_inference_role" {
  name = "sagemaker-inference-execution-role"

  assume_role_policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [
      {
        Effect    = "Allow",
        Principal = {
          Service = "sagemaker.amazonaws.com"
        },
        Action    = "sts:AssumeRole"
      }
    ]
  })
}

# This policy grants the necessary permissions for the batch transform job
resource "aws_iam_policy" "sagemaker_inference_policy" {
  name        = "sagemaker-inference-policy"
  description = "Allows SageMaker to run batch inference jobs for the CLV project"

  policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [
      {
        Effect = "Allow",
        Action = [
            "s3:GetObject",
            "s3:PutObject",
            "s3:ListBucket"
        ],
        Resource = [
            "arn:aws:s3:::clv-artifacts-bucket/*", # Access to model artifacts
            "arn:aws:s3:::clv-inference-data-bucket/*" # Access to input/output data
        ]
      },
      {
        Effect = "Allow",
        Action = [
            "sagemaker:GetRecord"
        ],
        Resource = [
            "arn:aws:sagemaker:eu-west-1:${data.aws_caller_identity.current.account_id}:feature-group/*"
        ]
      },
      {
        Effect = "Allow",
        Action = [
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "logs:CreateLogGroup",
            "logs:DescribeLogStreams"
        ],
        Resource = "arn:aws:logs:*:*:*"
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "sagemaker_inference_policy_attach" {
  role       = aws_iam_role.sagemaker_inference_role.name
  policy_arn = aws_iam_policy.sagemaker_inference_policy.arn
}

Inference Script

This is the core logic that runs inside the SageMaker Batch Transform job. It handles loading the model, fetching features from the Feature Store, and making predictions.

# src/batch_inference.py
import os
import joblib
import boto3
import json
import pandas as pd

def model_fn(model_dir):
    """
    Loads the model and the scaler from the model directory.
    SageMaker will place the contents of the model.tar.gz here.
    """
    print("Loading model and scaler...")
    model = joblib.load(os.path.join(model_dir, "model.joblib"))
    scaler = joblib.load(os.path.join(model_dir, "scaler.joblib")) # Assumes scaler is packaged with the model
    
    # Initialize boto3 client in the global scope for reuse
    # The region should be dynamically available in the SageMaker environment
    region = os.environ.get("AWS_REGION")
    sagemaker_fs_client = boto3.client('sagemaker-featurestore-runtime', region_name=region)
    
    # Define feature group name from environment variable or hardcode
    feature_group_name = os.environ.get("FEATURE_GROUP_NAME", "clv-feature-group-v1")

    return {
        "model": model,
        "scaler": scaler,
        "fs_client": sagemaker_fs_client,
        "feature_group_name": feature_group_name
    }

def input_fn(request_body, request_content_type):
    """
    Parses the input data. The input is expected to be a file where each line is a JSON object
    containing a 'CustomerID'.
    """
    if request_content_type == 'application/json':
        customer_ids = [json.loads(line)['CustomerID'] for line in request_body.strip().split('\n')]
        return customer_ids
    else:
        raise ValueError(f"Unsupported content type: {request_content_type}")

def predict_fn(customer_ids, model_artifacts):
    """
    For each customer, fetches features from the Feature Store, scales them,
    and then makes a prediction.
    """
    model = model_artifacts["model"]
    scaler = model_artifacts["scaler"]
    fs_client = model_artifacts["fs_client"]
    feature_group_name = model_artifacts["feature_group_name"]
    
    # Get the feature names from the scaler object
    feature_names = scaler.feature_names_in_

    all_features = []
    processed_customer_ids = []

    for customer_id in customer_ids:
        try:
            response = fs_client.get_record(
                FeatureGroupName=feature_group_name,
                RecordIdentifierValueAsString=str(customer_id)
            )
            if 'Record' not in response:
                print(f"No record found for customer {customer_id}. Skipping.")
                continue

            # Convert feature store record to a dictionary
            record = {item['FeatureName']: item['ValueAsString'] for item in response['Record']}
            
            # Ensure all required features are present
            features_for_model = [float(record.get(name, 0)) for name in feature_names]
            all_features.append(features_for_model)
            processed_customer_ids.append(customer_id)

        except Exception as e:
            print(f"Error fetching features for customer {customer_id}: {e}. Skipping.")

    if not all_features:
        return {"predictions": []}

    # Create a DataFrame and scale the features
    features_df = pd.DataFrame(all_features, columns=feature_names)
    scaled_features = scaler.transform(features_df)
    
    # Make predictions
    predictions = model.predict(scaled_features)
    
    # Combine customer IDs with their predictions
    output = [
        {"CustomerID": cid, "CLV_Prediction": float(pred)}
        for cid, pred in zip(processed_customer_ids, predictions)
    ]
    
    return {"predictions": output}


def output_fn(prediction_output, accept):
    """
    Formats the predictions into a JSON Lines format.
    """
    if accept == "application/json":
        return '\n'.join(json.dumps(rec) for rec in prediction_output['predictions']), accept
    
    raise ValueError(f"Unsupported accept type: {accept}")

Unit Tests This test validates the batch_inference.py script using mocks.

# tests/unit/test_batch_inference.py
import pytest
from unittest.mock import MagicMock, patch
from src.batch_inference import predict_fn, input_fn

@pytest.fixture
def mock_model_artifacts():
    """Mocks the model artifacts dictionary."""
    # Mock scaler
    mock_scaler = MagicMock()
    mock_scaler.feature_names_in_ = ['feature1', 'feature2']
    mock_scaler.transform.return_value = [[0.5, 0.5], [-0.5, -0.5]] # Dummy scaled data
    
    # Mock model
    mock_model = MagicMock()
    mock_model.predict.return_value = [500.50, 150.25]
    
    # Mock feature store client
    mock_fs_client = MagicMock()
    # Simulate response for two customers
    mock_fs_client.get_record.side_effect = [
        {'Record': [{'FeatureName': 'feature1', 'ValueAsString': '10'}, {'FeatureName': 'feature2', 'ValueAsString': '20'}]},
        {'Record': [{'FeatureName': 'feature1', 'ValueAsString': '1'}, {'FeatureName': 'feature2', 'ValueAsString': '2'}]}
    ]

    return {
        "model": mock_model,
        "scaler": mock_scaler,
        "fs_client": mock_fs_client,
        "feature_group_name": "test-fg"
    }

def test_input_fn():
    """Tests if the input parsing works correctly."""
    request_body = '{"CustomerID": 101}\n{"CustomerID": 102}'
    customer_ids = input_fn(request_body, 'application/json')
    assert customer_ids == [101, 102]

def test_predict_fn(mock_model_artifacts):
    """Tests the core prediction logic."""
    customer_ids = [101, 102]
    predictions_output = predict_fn(customer_ids, mock_model_artifacts)
    
    # Assert feature store was called twice
    assert mock_model_artifacts["fs_client"].get_record.call_count == 2
    
    # Assert model predict was called once with the scaled data
    mock_model_artifacts["model"].predict.assert_called_once()
    
    # Assert the output is correctly formatted
    expected_output = {
        "predictions": [
            {"CustomerID": 101, "CLV_Prediction": 500.50},
            {"CustomerID": 102, "CLV_Prediction": 150.25}
        ]
    }
    assert predictions_output == expected_output

unittest MagicMock return value vs side effect

Use return_value when your mock needs to provide a single, consistent return value.

Use side_effect for more complex and dynamic behaviors, including:

  • Providing a sequence of return values.

  • Raising exceptions to test error handling.

  • Executing custom logic to determine the return value or perform other actions.

Choosing between return_value and side_effect depends on the specific scenario you are trying to test and how much control you need over the mocked object’s behavior.

Airflow DAG

This DAG orchestrates the entire weekly batch prediction job.

# pipelines/dag_batch_inference.py
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTransformOperator
from airflow.models.param import Param
from datetime import datetime
import mlflow

# --- Constants ---
MLFLOW_TRACKING_URI = "http://your-mlflow-server:5000" # Should be an Airflow Variable/Connection
MODEL_NAME = "clv-prediction-model"
SAGEMAKER_ROLE = "arn:aws:iam::ACCOUNT_ID:role/sagemaker-inference-execution-role"
INPUT_S3_URI = "s3://clv-inference-data-bucket/input/active_customers.jsonl"
OUTPUT_S3_URI = "s3://clv-inference-data-bucket/output/"

@dag(
    dag_id="clv_batch_inference_pipeline",
    start_date=datetime(2025, 1, 1),
    schedule_interval="@weekly",
    catchup=False,
    doc_md="DAG to run weekly batch inference for CLV prediction.",
    tags=["clv", "inference"],
)
def batch_inference_dag():

    @task
    def get_production_model_uri() -> str:
        """Fetches the S3 URI of the latest model in the 'Production' stage."""
        client = mlflow.tracking.MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
        latest_versions = client.get_latest_versions(MODEL_NAME, stages=["Production"])
        if not latest_versions:
            raise ValueError(f"No model in Production stage found for '{MODEL_NAME}'")
        
        prod_model = latest_versions[0]
        print(f"Found production model: Version {prod_model.version}, URI: {prod_model.source}")
        return prod_model.source

    model_uri = get_production_model_uri()
    
    # In a real pipeline, the model object in SageMaker should be created separately.
    # For this operator, we assume a SageMaker model object exists.
    # The operator just creates the transform job.
    run_batch_transform = SageMakerTransformOperator(
        task_id="run_sagemaker_batch_transform",
        config={
            "TransformJobName": f"clv-batch-inference-{{{{ ds_nodash }}}}",
            "ModelName": "clv-sagemaker-model-name", # Name of the model in SageMaker
            "TransformInput": {
                "DataSource": {"S3DataSource": {"S3DataType": "S3Prefix", "S3Uri": INPUT_S3_URI}},
                "ContentType": "application/json",
                "SplitType": "Line",
            },
            "TransformOutput": {
                "S3OutputPath": OUTPUT_S3_URI,
                "Accept": "application/json",
                "AssembleWith": "Line",
            },
            "TransformResources": {"InstanceType": "ml.m5.large", "InstanceCount": 1},
        },
        aws_conn_id="aws_default",
    )
    
    # This task would load the S3 output into the data warehouse
    @task
    def load_predictions_to_dwh(s3_output_path: str):
        print(f"Simulating load of data from {s3_output_path} into Data Warehouse.")
        # Add pandas/boto3 logic here to read from S3 and write to Redshift/Snowflake
        return "Load complete."

    model_uri >> run_batch_transform >> load_predictions_to_dwh(OUTPUT_S3_URI)

batch_inference_dag()

Integration Test

This test triggers the DAG and validates the output in a live staging environment.

# tests/integration/test_inference_pipeline_integration.py
import pytest
import boto3
from airflow.api.client.local_client import Client
import time

DAG_ID = "clv_batch_inference_pipeline"
OUTPUT_BUCKET = "clv-inference-data-bucket-staging" # Use a staging bucket
OUTPUT_PREFIX = "output/"

@pytest.mark.integration
def test_inference_dag_end_to_end():
    # Setup: Ensure the output location is clean
    s3 = boto3.resource('s3')
    bucket = s3.Bucket(OUTPUT_BUCKET)
    bucket.objects.filter(Prefix=OUTPUT_PREFIX).delete()
    
    # Act: Trigger the DAG
    c = Client(None, None)
    run_id = f"integration_test_{int(time.time())}"
    c.trigger_dag(dag_id=DAG_ID, run_id=run_id)

    # Poll for completion
    timeout = time.time() + 60 * 15 # 15 minute timeout
    final_state = None
    while time.time() < timeout:
        dag_run = c.get_dag_run(dag_id=DAG_ID, run_id=run_id)
        if dag_run.state in ['success', 'failed']:
            final_state = dag_run.state
            break
        print(f"DAG state is {dag_run.state}. Waiting...")
        time.sleep(30)
        
    # Assert DAG success
    assert final_state == 'success', f"DAG run failed with state: {final_state}"

    # Assert: Check if the output file was created in S3
    objects = list(bucket.objects.filter(Prefix=OUTPUT_PREFIX))
    assert len(objects) > 0, "No output file was found in the S3 bucket."
    
    print(f"Integration test passed. Output file found: {objects[0].key}")

GitHub Actions CI/CD Workflow This workflow validates and deploys the inference pipeline code.

# .github/workflows/cicd_inference_pipeline.yml

name: "CI/CD for Batch Inference Pipeline"

on:
  push:
    branches:
      - main
    paths:
      - 'src/batch_inference.py'
      - 'pipelines/dag_batch_inference.py'
      - 'terraform/**' # Now triggers on infrastructure changes as well
      - 'tests/**'
      - '.github/workflows/cicd_inference_pipeline.yml'
  pull_request:
    branches:
      - main

jobs:
  ci-checks:
    name: "Continuous Integration"
    runs-on: ubuntu-latest
    
    steps:
      - name: Checkout Repository
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: 3.9

      - name: Install Dependencies
        run: |
          pip install -r requirements.txt
          pip install -r tests/requirements.txt
      
      - name: Run Linting and Formatting Checks
        run: |
          pip install flake8 black
          flake8 src/ pipelines/ tests/
          black --check src/ pipelines/ tests/

      - name: Run Unit Tests
        run: pytest tests/unit/
  
  cd-staging:
    name: "Continuous Delivery to Staging"
    needs: ci-checks
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    runs-on: ubuntu-latest
    environment: staging # Links to GitHub secrets for the staging environment

    permissions:
      id-token: write
      contents: read
      
    steps:
      - name: Checkout Repository
        uses: actions/checkout@v4
      
      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: 3.9

      - name: Setup Terraform
        uses: hashicorp/setup-terraform@v2
        
      - name: Configure Staging AWS Credentials
        uses: aws-actions/configure-aws-credentials@v4
        with:
          role-to-assume: ${{ secrets.STAGING_AWS_ROLE_ARN }}
          aws-region: eu-west-1

      - name: Terraform Init (Staging)
        id: init
        run: |
          cd terraform
          # The -backend-config flag points to a file specifying the S3 bucket/key for staging's state
          terraform init -backend-config=staging.backend.hcl

      - name: Terraform Apply (Staging)
        id: apply
        run: |
          cd terraform
          # The -var-file flag points to variables specific to the staging env (e.g., bucket names)
          terraform apply -auto-approve -var-file=staging.tfvars

      - name: Install Python Test Dependencies
        run: |
          pip install -r requirements.txt
          pip install -r tests/integration/requirements.txt
          
      - name: Deploy DAG to Staging Airflow
        run: |
          # This script would sync your DAGs folder with Airflow's DAGs folder in S3
          echo "Syncing pipelines/dag_batch_inference.py to Staging Airflow..."
          # Example: aws s3 sync pipelines/ s3://your-staging-airflow-dags-bucket/ --exclude "*" --include "dag_batch_inference.py"
          
      - name: Run Integration Test in Staging
        run: pytest tests/integration/test_inference_pipeline_integration.py

Monitoring & Observability

1. Guiding Philosophy and Approach

Our approach will be twofold:

  1. Monitoring (Answering “What”): We will implement automated systems to track a predefined set of metrics covering data quality, system health, and model performance proxies. This is our early warning system, designed to be the first to know when a known problem occurs.

  2. Observability (Answering “Why”): We will collect the necessary data (logs, feature values, SHAP values) to enable deep-dive analysis when an alert is triggered. This allows us to move beyond knowing that something is wrong to understanding why it’s wrong, which is crucial for effective debugging and resolution.

Our goal is to create a system that minimizes Time to Detection (TTD) and Time to Resolution (TTR) for any issues that arise in our production ML system.

2. Tech Stack for Monitoring & Observability

Component

Chosen Tool/Framework

Rationale (Based on the Guide & Our Stack)

Data Quality Validation

Great Expectations

Integrated directly into our Airflow data pipelines. Allows us to define “unit tests for data” and stop bad data from ever reaching the feature store

Drift & Performance Monitoring

Amazon SageMaker Model Monitor

The most tightly integrated solution for our SageMaker-based pipelines. It can automatically compare production data against a baseline (our training data) to detect both data and prediction drift.

Infrastructure & System Health

Amazon CloudWatch

The native AWS solution for tracking operational metrics like job duration (latency), error rates, and resource utilization for our SageMaker jobs and Airflow DAGs.

Dashboards & Visualization

1. Amazon CloudWatch Dashboards
2. BI Tool (Tableau, QuickSight)

CloudWatch will be used for real-time operational health dashboards (for the MLOps team). A BI tool will be used for higher-level model performance and business KPI dashboards (for data scientists and product managers).

Alerting & Notifications

Amazon CloudWatch Alarms

Integrated with CloudWatch metrics, these will trigger notifications based on predefined thresholds.

Notification Channels

Slack & PagerDuty

Slack for medium-priority notifications (e.g., moderate drift) and PagerDuty for high-priority, critical alerts that require immediate attention (e.g., pipeline failure).

Explainability & Debugging

SHAP & S3/Data Warehouse

We will generate SHAP values during model evaluation and log them alongside predictions. This allows us to query and analyze the drivers of specific predictions, enabling deep observability.

3. Detailed Monitoring Plan

a) Data Quality Monitoring (The Foundation)
  • Where: Integrated into our Airflow Data Ingestion and Feature Engineering DAGs.

  • Tool: Great Expectations.

  • Checks to Implement:

    • Schema Validation: The number of columns, column names, and column types must match the expected schema.

    • Null Rates: The percentage of missing values for critical features (e.g., total_spend, recency) must not exceed a threshold (e.g., 5%).

    • Range Checks: Numerical features must fall within valid ranges (e.g., SalePrice must be > 0).

    • Type Matching: Ensure features expected to be numerical are not strings.

    • Cardinality Checks: Alert if the number of unique categories for features like country or acquisition_channel changes unexpectedly.

b) Data & Prediction Drift Monitoring (Proxy for Performance)
  • Challenge: Our ground truth (actual 12-month spend) is severely delayed. Therefore, monitoring input and output distributions is our most critical leading indicator of performance degradation.

  • Tool: Amazon SageMaker Model Monitor.

  • Baseline: The training dataset used to build the current production model.

Drift Type

What We’ll Track

Metric

Why It’s Important

Input Drift (Covariate Shift)

Distribution of key input features (recency, frequency, monetary, total_spend_90d).

Population Stability Index (PSI)

Alerts us if the fundamental characteristics of our customer base are changing. This is a direct application of the techniques in Section V-C of the guide.

Prediction Drift

Distribution of the model’s output CLV scores (P(model_output)).

Population Stability Index (PSI)

A powerful signal that the model’s behavior is changing, which could be due to input drift or concept drift. If the model starts predicting much higher or lower on average, we need to investigate.

c) Model Performance Monitoring
  • Challenge: True performance (RMSE against 12-month spend) is a lagging indicator. We must use proxy metrics.

  • Metrics to Track:

    • Primary Proxy Metric: Correlation between predicted 12-month CLV and actual 30-day customer spend. We’ll calculate this monthly. While not perfect, a drop in this correlation is a strong signal that the model’s predictive power is weakening.

    • Business KPI Slices: Track key business metrics for cohorts defined by the model’s predictions. For example:

      • Average Order Value (AOV) for customers in the “Top 10% Predicted CLV” vs. “Bottom 50%”.

      • 30-day churn rate for the “Top 10% Predicted CLV” cohort.

    • Long-Term Ground Truth: On a quarterly basis, calculate the true RMSE and Gini Coefficient for the cohort of customers whose 12-month window has just completed. This validates our model’s long-term accuracy.

d) System Health Monitoring (Operational)
  • Tool: Amazon CloudWatch.

  • Metrics to Track:

    • Airflow DAGs: Success/failure status of all pipeline runs.

    • SageMaker Jobs (Training & Inference): Job duration (latency), error rates, CPU/Memory utilization.

4. Observability & Debugging Plan

Scenario

Observability Workflow

A “High Prediction Drift” alert is triggered.

1. Check Input Drift Dashboards: Is the prediction drift caused by a significant shift in one or more input features? (e.g., a marketing campaign brought in a new demographic, causing recency to drop).
2. Analyze Slice Performance: Did the drift disproportionately affect a specific segment (e.g., customers from a new country)?

The business team asks “Why was this specific customer predicted to have a high CLV?”

1. Query Prediction Logs: Retrieve the saved prediction and associated SHAP values for that CustomerID.
2. Provide Local Explanation: Use the SHAP values to explain which features (e.g., high frequency, large recent purchases) contributed most to that individual’s score. This builds trust and provides actionable insights.

The “Top 10% Predicted CLV” cohort has a high churn rate.

1. Root Cause with SHAP: Analyze the aggregated SHAP values for this cohort. Is the model over-reliant on a feature that isn’t a true indicator of loyalty for this segment?
2. Identify Data Quality Issues: Check if this cohort has data quality problems that are misleading the model.

5. Alerting Strategy

Priority

Channel

Alert Condition

High

PagerDuty

- Any ML pipeline (training or inference) fails completely.
- Critical data quality validation fails (e.g., schema mismatch).

Medium

Slack

- PSI for Prediction Drift > 0.25.
- PSI for a key input feature > 0.25.
- 30-day proxy metric (correlation) drops by > 20%.

Low

Email / Dashboard

- PSI for any drift metric is between 0.1 and 0.25.
- A non-critical data quality check fails (e.g., slight increase in nulls).

6. Dashboard Design

  1. MLOps Team Dashboard (on CloudWatch):

    • Focus: Real-time system and data health.

    • Widgets:

      • Pipeline Status (Green/Red indicators for Airflow DAGs).

      • SageMaker Job Durations (Line graphs).

      • Input Feature Drift Scores (PSI over time).

      • Prediction Drift Score (PSI over time).

      • Data Quality Alerts (List of recent validation failures).

  2. Data Science & Product Dashboard (on Tableau/QuickSight):

    • Focus: Model performance and business impact.

    • Widgets:

      • Distribution of CLV predictions over time (histogram/density plot).

      • Proxy Metric: Correlation of Predicted CLV vs. Actual 30-Day Spend (Line graph).

      • Business KPIs by Prediction Cohort (Bar charts for AOV, Churn Rate).

      • Deep Dive: Slicing performance by country, acquisition channel, etc.

      • Global Feature Importance (Aggregated SHAP values).

Drift Monitoring (Custom Batch Solution)

This script will be run by a SageMaker Processing Job. It calculates drift metrics and produces a JSON report.

# src/run_drift_check.py
import argparse
import pandas as pd
import numpy as np
import json
import os
from scipy.stats import ks_2samp

def calculate_psi(baseline: pd.Series, current: pd.Series, bins=10) -> float:
    """Calculates the Population Stability Index (PSI) for a numerical series."""
    # Define bins based on the baseline distribution
    baseline_bins = pd.cut(baseline, bins=bins, retbins=True)[1]
    
    baseline_dist = pd.cut(baseline, bins=baseline_bins).value_counts(normalize=True)
    current_dist = pd.cut(current, bins=baseline_bins).value_counts(normalize=True)

    # Align distributions and fill missing bins with a small value to avoid division by zero
    psi_df = pd.DataFrame({'baseline': baseline_dist, 'current': current_dist}).fillna(1e-6)
    
    psi_df['psi'] = (psi_df['current'] - psi_df['baseline']) * np.log(psi_df['current'] / psi_df['baseline'])
    
    return psi_df['psi'].sum()

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--baseline-path", type=str, required=True)
    parser.add_argument("--current-path", type=str, required=True)
    parser.add_argument("--output-path", type=str, required=True)
    args = parser.parse_args()

    # Load data
    baseline_df = pd.read_csv(os.path.join(args.baseline_path, "training_data.csv"))
    current_df = pd.read_csv(os.path.join(args.current_path, "inference_input.csv")) # Assuming inference inputs are logged

    drift_report = {
        "input_drift": {},
        "prediction_drift": {}
    }
    
    features_to_check = ['recency', 'frequency', 'monetary', 'total_spend_90d']
    
    print("--- Calculating Input Drift ---")
    for feature in features_to_check:
        psi = calculate_psi(baseline_df[feature], current_df[feature])
        ks_stat, ks_pvalue = ks_2samp(baseline_df[feature], current_df[feature])
        drift_report["input_drift"][feature] = {
            "psi": psi,
            "ks_statistic": ks_stat,
            "ks_pvalue": ks_pvalue
        }
        print(f"Feature '{feature}': PSI = {psi:.4f}, KS p-value = {ks_pvalue:.4f}")

    print("\n--- Calculating Prediction Drift ---")
    # Assuming predictions are part of the 'current_df' and a baseline prediction set exists
    if 'prediction' in current_df.columns and 'prediction' in baseline_df.columns:
        psi = calculate_psi(baseline_df['prediction'], current_df['prediction'])
        ks_stat, ks_pvalue = ks_2samp(baseline_df['prediction'], current_df['prediction'])
        drift_report["prediction_drift"] = {
            "psi": psi,
            "ks_statistic": ks_stat,
            "ks_pvalue": ks_pvalue
        }
        print(f"Prediction Drift: PSI = {psi:.4f}, KS p-value = {ks_pvalue:.4f}")

    # Save the report
    os.makedirs(args.output_path, exist_ok=True)
    with open(os.path.join(args.output_path, "drift_report.json"), "w") as f:
        json.dump(drift_report, f, indent=4)
        
    print("\nDrift check complete.")

Infrastructure for Alerting

This Terraform code sets up the notification and alerting infrastructure.

# terraform/monitoring.tf

# --- SNS Topics for Notifications ---
resource "aws_sns_topic" "critical_alerts_topic" {
  name = "clv-critical-alerts"
  tags = { Environment = var.environment }
}

resource "aws_sns_topic" "medium_alerts_topic" {
  name = "clv-medium-alerts"
  tags = { Environment = var.environment }
}

# Assume subscriptions (e.g., to a PagerDuty endpoint or email) are configured separately

# --- CloudWatch Alarms for System Health ---
resource "aws_cloudwatch_metric_alarm" "training_pipeline_failures" {
  alarm_name          = "clv-${var.environment}-training-pipeline-failures"
  comparison_operator = "GreaterThanOrEqualToThreshold"
  evaluation_periods  = "1"
  metric_name         = "FailedJobs"
  namespace           = "AWS/SageMaker"
  period              = "3600" # Check hourly
  statistic           = "Sum"
  threshold           = "1"
  alarm_description   = "Alerts when a SageMaker training job fails."
  alarm_actions       = [aws_sns_topic.critical_alerts_topic.arn]

  dimensions = {
    TrainingJobName = "clv-training-pipeline-*"
  }
}

Dashboard Definition This JSON represents the structure for a CloudWatch dashboard that our MLOps team would use.

// dashboards/mlops_health_dashboard.json
{
    "widgets": [
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["AWS/SageMaker", "CPUUtilization", "Host", "algo-1", { "stat": "Average" }]
                ],
                "title": "Training Job CPU Utilization (Avg)"
            }
        },
        {
            "type": "metric",
            "properties": {
                "metrics": [
                    ["CLV/Drift", "PredictionDriftPSI", { "stat": "Average" }]
                ],
                "title": "Production Prediction Drift (PSI)"
            }
        },
        {
            "type": "log",
            "properties": {
                "query": "SOURCE '/aws/sagemaker/ProcessingJobs' | fields @timestamp, @message | sort @timestamp desc | limit 20",
                "title": "Latest Processing Job Logs"
            }
        },
        {
            "type": "alarm",
            "properties": {
                "title": "Pipeline Alarms",
                "alarms": [
                    "arn:aws:cloudwatch:eu-west-1:ACCOUNT_ID:alarm:clv-production-training-pipeline-failures"
                ]
            }
        }
    ]
}

Continual Learning & Production Testing Plan

1. Guiding Philosophy: From Static Predictions to a Dynamic, Learning System

Our core philosophy, is that deploying our CLV model is the beginning, not the end. To maintain its value, the model must evolve. The e-commerce landscape is not static; customer preferences change, marketing strategies shift, and new products are introduced. This guarantees that our model will suffer from data and concept drift over time, degrading its accuracy and business utility.

Therefore, we will implement a robust Continual Learning strategy. Our goal is to create an automated, closed-loop system where production performance insights directly fuel model improvements, which are then safely validated and redeployed. This transforms our CLV model from a static artifact into an adaptive, self-improving system, ensuring it consistently drives business value.

2. Continual Learning & Model Retraining Strategy

This section defines how and when we will update our CLV model.

Strategy Component

Decision / Implementation Choice

Rationale (Based on the Provided Guides)

Retraining Approach

Stateless Retraining (from scratch) for now.

For our XGBoost model, stateless retraining is simpler to implement robustly and avoids the risk of catastrophic forgetting. This is a common and practical starting point (Stage 2 of adoption). We can plan to evolve to Stateful (fine-tuning) in the future to reduce compute costs, but we must first establish a solid, reliable baseline.

Data Curation for Retraining

Sliding Window Approach: Train on the last 3 months of data.

This strategy balances freshness with stability. It ensures the model is trained on recent customer behavior while retaining enough data to be robust. The data selection logic will be a parameterized step in our training pipeline.

Triggers for Retraining

We will implement a multi-trigger system, moving towards an event-driven approach.

A simple schedule is a good start, but a mature system reacts to performance signals.

- Trigger 1 (Schedule)

Weekly automated retraining job.

A weekly cadence aligns perfectly with the marketing team’s weekly campaign planning cycle. This is our primary, proactive trigger.

- Trigger 2 (Performance)

Automated trigger if the 30-day proxy metric degrades.

The ground truth (12-month spend) is severely delayed. Therefore, we will monitor the correlation between predicted CLV and actual 30-day spend as a proxy. If this correlation drops by >20% month-over-month, an automated retraining run will be triggered. This is a practical application of using proxy metrics when ground truth is latent.

- Trigger 3 (Drift)

Automated trigger if Prediction Drift PSI > 0.25.

This is our most sensitive leading indicator. A significant change in the distribution of model outputs, as detected by our monitoring system, is a strong signal that the model’s behavior is changing, warranting a proactive retraining run.

3. Production Testing & Rollout Strategy: A Phased Approach

We will adopt a multi-stage, progressive delivery strategy to de-risk the deployment of any new challenger model produced by our retraining pipeline.

Stage 1: Shadow Deployment

  • Purpose: To validate the new model’s operational health and prediction sanity in a live environment with zero user impact.

  • Execution: The existing “Production” batch inference pipeline will run as usual. A parallel, “Shadow” version of the inference pipeline will be triggered, using the new challenger model. It will run on the same input list of active customers.

  • Success Criteria / Metrics:

    • Operational Health: The shadow pipeline must complete successfully with latency and resource consumption within 10% of the champion pipeline.

    • Prediction Sanity: The distribution of the challenger’s predictions will be compared to the champion’s. We will check that the mean prediction has not shifted by an unexpected amount (e.g., > 15%) and there are no catastrophic errors (e.g., all predictions are zero).

  • Outcome: If the shadow deployment passes, the model is automatically promoted to the Canary stage. If it fails, an alert is sent to the MLOps team for investigation.

Stage 2: Canary Release (Segment-based)

  • Purpose: To measure the real-world business impact of the new model’s scores on a small, controlled segment of our marketing efforts before a full rollout.

  • Execution:

    1. Both the champion and challenger models will score the entire active customer base.

    2. For our next marketing campaign (e.g., a promotional email), the target audience will be split.

    3. 95% of the audience (Control) will be selected based on the scores from the champion model.

    4. 5% of the audience (Canary) will be selected based on the scores from the challenger model.

    5. The campaign will be executed, and the results from the two segments will be tracked independently.

  • Success Criteria / Metrics:

    • Primary Metric: The conversion rate of the Canary segment must be statistically equal to or greater than the Control segment.

    • Guardrail Metric: The unsubscribe rate for the Canary segment must not be statistically higher than the Control.

  • Outcome: If the canary test is successful after a one-week run, the model can be approved for full production promotion.

Stage 3: Full Production Rollout

  • Execution: After passing the Canary stage and receiving manual sign-off, the challenger model is promoted to the “Production” tag in the MLflow Model Registry. The next scheduled batch inference pipeline will automatically pick up and use this new model for all customers.

4. A/B Testing Framework for Major Model Changes

For more significant changes (e.g., introducing a new model architecture or fundamentally new features), a simple canary release may be insufficient. We will use a formal A/B testing framework.

  • Hypothesis: A new CLV model (Challenger) that incorporates behavioral clickstream features will identify high-intent, high-value customers more accurately than the current RFM-based model (Champion), leading to a higher return on investment (ROI) for targeted marketing campaigns.

  • Randomization Unit: CustomerID.

  • Traffic Split: 50% Control (Champion Model), 50% Treatment (Challenger Model).

  • Metrics:

    • Primary Metric: Incremental Revenue Per User from the targeted campaign. We will compare the average revenue generated from users in the Treatment group vs. the Control group.

    • Secondary Metrics: Campaign Conversion Rate, Average Order Value (AOV).

    • Guardrail Metrics: Marketing email unsubscribe rate, inference pipeline compute cost (the new model should not be prohibitively expensive).

  • Duration: The test will run for 4 weeks to capture multiple campaign cycles and mitigate short-term novelty effects.

  • Decision: The Challenger will be adopted if it shows a statistically significant lift in the primary metric with a p-value < 0.05, without any significant negative impact on guardrail metrics.

5. Automating the Continual Learning & Testing Cycle

We will use Airflow to orchestrate this entire end-to-end workflow.

  1. Monitoring Service (e.g., a custom Python script, SageMaker Model Monitor output analysis) runs daily/weekly.

  2. If a retraining trigger (performance drop or drift alert) is detected, it makes an API call to Airflow to trigger the clv_retraining_dag.

  3. The clv_retraining_dag runs, producing a new challenger model and registering it in MLflow with a “staging” tag.

  4. Upon successful completion, the retraining DAG triggers the clv_shadow_test_dag.

  5. The clv_shadow_test_dag runs. If it succeeds, it promotes the model to “ready-for-canary” in MLflow and sends a Slack notification for awareness.

  6. The clv_canary_test_dag is triggered manually by the marketing team for the next relevant campaign. It fetches the “ready-for-canary” model and runs the test.

  7. After the canary test period, a final report is generated. A manual approval gate (e.g., using an Airflow EmailOperator or a custom UI) requires a Product Manager or MLOps Lead to sign off.

  8. Upon approval, the model is promoted to the “Production” stage in MLflow, completing the cycle. The next run of the main clv_batch_inference_pipeline will automatically use this new champion.

Automated Retraining Pipeline

This Airflow DAG is triggered when our monitoring system detects a problem. It runs the training pipeline and, on success, promotes the new model to “Staging” and triggers the shadow test.

# pipelines/dag_automated_retraining.py
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerPipelineOperator
from datetime import datetime
import mlflow

MLFLOW_TRACKING_URI = "http://your-mlflow-server:5000"
MODEL_NAME = "clv-prediction-model"

@dag(
    dag_id="clv_automated_retraining",
    start_date=datetime(2025, 1, 1),
    schedule=None,  # This DAG is externally triggered
    catchup=False,
    doc_md="""
    Triggered by monitoring alerts (drift, performance degradation).
    Runs the SageMaker training pipeline and registers a new challenger model.
    On success, triggers the shadow deployment validation pipeline.
    """,
    tags=['clv', 'retraining', 'lifecycle'],
)
def automated_retraining_dag():

    # This operator starts the SageMaker Pipeline we defined in the previous section.
    trigger_sagemaker_training = SageMakerPipelineOperator(
        task_id="trigger_sagemaker_training_pipeline",
        pipeline_name="CLV-Training-Pipeline",
        aws_conn_id="aws_default",
        wait_for_completion=True, # Ensure we wait until it's done
    )
    
    @task
    def get_latest_model_version_and_promote_to_staging() -> str:
        """
        After training, the SageMaker pipeline registers a model. This task finds
        the latest version, assumes it's the one we just trained, and promotes
        it to the 'Staging' stage in MLflow.
        """
        client = mlflow.tracking.MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
        # Find the most recently registered version of our model
        latest_version = client.get_latest_versions(MODEL_NAME, stages=["None"])[0]
        
        print(f"Found new model version: {latest_version.version}. Transitioning to 'Staging'.")
        
        client.transition_model_version_stage(
            name=MODEL_NAME,
            version=latest_version.version,
            stage="Staging",
            archive_existing_versions=True # Demotes any existing model in 'Staging'
        )
        return latest_version.version

    # Trigger the shadow test pipeline, passing the new model version for validation.
    trigger_shadow_test = TriggerDagRunOperator(
        task_id="trigger_shadow_deployment",
        trigger_dag_id="clv_shadow_deployment",
        conf={"model_version": "{{ task_instance.xcom_pull(task_ids='get_latest_model_version_and_promote_to_staging') }}"}
    )

    new_model_version = get_latest_model_version_and_promote_to_staging()
    
    trigger_sagemaker_training >> new_model_version >> trigger_shadow_test

automated_retraining_dag()

Shadow Deployment Pipeline

This DAG compares the new “Staging” model against the “Production” champion on live data without affecting users.

# pipelines/dag_shadow_deployment.py
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTransformOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.models.param import Param
from datetime import datetime
import mlflow
import json
import pandas as pd

MLFLOW_TRACKING_URI = "http://your-mlflow-server:5000"
MODEL_NAME = "clv-prediction-model"
SAGEMAKER_ROLE = "arn:aws:iam::ACCOUNT_ID:role/sagemaker-inference-execution-role"
INPUT_S3_URI = "s3://clv-inference-data-bucket/input/active_customers.jsonl"

@dag(
    dag_id="clv_shadow_deployment",
    start_date=datetime(2025, 1, 1),
    schedule=None, # Triggered by the retraining DAG
    catchup=False,
    params={"model_version": Param(None, type=["null", "string"])},
    doc_md="Compares a 'Staging' challenger model against the 'Production' champion.",
    tags=['clv', 'testing', 'lifecycle'],
)
def shadow_deployment_dag():

    @task
    def get_model_uris(**kwargs) -> dict:
        """Fetches S3 URIs for both the Production and new Staging models."""
        client = mlflow.tracking.MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
        
        # Get Production model
        prod_model = client.get_latest_versions(MODEL_NAME, stages=["Production"])[0]
        
        # Get Challenger (Staging) model version from the trigger payload
        challenger_version = kwargs["params"]["model_version"]
        if not challenger_version:
            raise ValueError("No model_version passed in the trigger config.")
            
        challenger_model = client.get_model_version(MODEL_NAME, challenger_version)

        print(f"Champion Model: v{prod_model.version}")
        print(f"Challenger Model: v{challenger_model.version}")
        
        return {
            "champion_uri": prod_model.source,
            "challenger_uri": challenger_model.source,
            "challenger_version": challenger_model.version
        }
    
    model_uris = get_model_uris()

    # Assume SageMaker model objects are created/updated based on these URIs
    # These tasks run the inference jobs
    run_champion_inference = SageMakerTransformOperator(
        task_id="run_champion_inference",
        # Config would point to the champion model object
    )
    
    run_challenger_inference = SageMakerTransformOperator(
        task_id="run_challenger_inference",
        # Config would point to the challenger model object
    )

    @task
    def compare_shadow_results(champion_output: str, challenger_output: str, challenger_version: str):
        """Compares prediction distributions from both models."""
        s3_hook = S3Hook(aws_conn_id="aws_default")
        
        # This is simplified. In reality, you'd download the files from S3.
        champion_preds = pd.read_json(champion_output, lines=True)
        challenger_preds = pd.read_json(challenger_output, lines=True)

        champion_mean = champion_preds['CLV_Prediction'].mean()
        challenger_mean = challenger_preds['CLV_Prediction'].mean()
        
        percent_diff = abs(challenger_mean - champion_mean) / champion_mean
        
        print(f"Champion Mean Prediction: {champion_mean:.2f}")
        print(f"Challenger Mean Prediction: {challenger_mean:.2f}")
        print(f"Mean Percentage Difference: {percent_diff:.2%}")
        
        # Validation Gate: Pass if the mean prediction is within 15%
        if percent_diff < 0.15:
            print("Shadow test PASSED. Promoting model to 'Ready-for-Canary'.")
            client = mlflow.tracking.MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)
            client.transition_model_version_stage(
                name=MODEL_NAME, version=challenger_version, stage="Ready-for-Canary"
            )
        else:
            print("Shadow test FAILED. Mean prediction shifted too much.")
            # Trigger failure alert here (e.g., via SnsPublishOperator)
            raise ValueError("Shadow test failed.")

    # Define dependencies
    [run_champion_inference, run_challenger_inference] >> compare_shadow_results(
        champion_output=run_champion_inference.output_path,
        challenger_output=run_challenger_inference.output_path,
        challenger_version=model_uris["challenger_version"],
    )

shadow_deployment_dag()

Canary Release Pipeline

This DAG prepares data segments for the marketing team to run a live canary test.

# pipelines/dag_canary_release.py
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTransformOperator
from datetime import datetime
import mlflow
import pandas as pd

@dag(
    dag_id="clv_canary_release_prep",
    start_date=datetime(2025, 1, 1),
    schedule=None, # Manually triggered for a campaign
    catchup=False,
    doc_md="Prepares customer segments for a live canary test.",
    tags=['clv', 'testing', 'lifecycle'],
)
def canary_release_prep_dag():

    @task
    def get_canary_and_champion_models() -> dict:
        # Fetches URIs for 'Production' and 'Ready-for-Canary' models from MLflow
        # (Similar logic to the shadow DAG)
        pass

    model_uris = get_canary_and_champion_models()
    
    # Run two batch transform jobs to score all customers with both models
    score_with_champion = SageMakerTransformOperator(...)
    score_with_challenger = SageMakerTransformOperator(...)
    
    @task
    def generate_campaign_segments(champion_scores_path: str, challenger_scores_path: str):
        """Splits customers into control and canary groups for the campaign."""
        # This task would:
        # 1. Load both sets of scores from S3.
        # 2. Sort customers by predicted CLV for each model.
        # 3. Select the top N customers based on champion scores for the control group.
        # 4. Select the top N*0.05 customers based on challenger scores for the canary group.
        # 5. Save two separate CSV files (control_group.csv, canary_group.csv) to an S3 bucket
        #    for the marketing team to use.
        print("Generated control and canary group files.")
        return "Campaign segments are ready in S3."

    @task
    def notify_marketing_team(status: str):
        # This task would send an email or Slack message.
        print(f"Notifying marketing team: {status}")

    # Define dependencies
    [score_with_champion, score_with_challenger] >> generate_campaign_segments(
        #... pass paths ...
    ) >> notify_marketing_team()

canary_release_prep_dag()

Manual Model Promotion Pipeline

This is the final, human-driven step to make a new model the official champion.

# pipelines/dag_promote_to_production.py
from airflow.decorators import dag, task
from airflow.models.param import Param
from datetime import datetime
import mlflow

@dag(
    dag_id="clv_promote_model_to_production",
    start_date=datetime(2025, 1, 1),
    schedule=None, # Manually triggered only
    catchup=False,
    params={"model_version_to_promote": Param(description="The model version to promote to Production", type="string")},
    doc_md="Manual gate to promote a validated model version to 'Production'.",
    tags=['clv', 'promotion', 'lifecycle'],
)
def promote_to_production_dag():

    @task
    def promote_model(**kwargs):
        version = kwargs["params"]["model_version_to_promote"]
        print(f"Promoting model version {version} to 'Production'...")
        client = mlflow.tracking.MlflowClient(tracking_uri="http://your-mlflow-server:5000")
        client.transition_model_version_stage(
            name="clv-prediction-model",
            version=version,
            stage="Production",
            archive_existing_versions=True
        )
        print("Promotion successful.")

    promote_model()

promote_to_production_dag()

CI/CD Workflow for All Pipelines

This workflow validates all the new lifecycle DAGs.

# .github/workflows/cicd_lifecycle_pipelines.yml

name: "CI/CD for Model Lifecycle DAGs"

on:
  push:
    branches:
      - main
    paths:
      # Add paths to all the new DAGs and any new src files
      - 'pipelines/dag_automated_retraining.py'
      - 'pipelines/dag_shadow_deployment.py'
      - 'pipelines/dag_canary_release.py'
      - 'pipelines/dag_promote_to_production.py'
      - '.github/workflows/cicd_lifecycle_pipelines.yml'
  pull_request:
    branches:
      - main

jobs:
  validate-dags:
    name: "Validate Lifecycle DAGs"
    runs-on: ubuntu-latest
    
    steps:
      - name: Checkout Repository
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: 3.9

      - name: Install Dependencies
        run: |
          pip install -r requirements.txt
          pip install -r tests/requirements.txt
      
      - name: Run Linting Checks
        run: |
          pip install flake8
          # Run flake8 on the new DAG files
          flake8 pipelines/dag_automated_retraining.py pipelines/dag_shadow_deployment.py ...

      - name: Validate Airflow DAG Integrity
        run: |
          # Use the airflow standalone command to check for syntax errors
          airflow dags list --subset pipelines/dag_automated_retraining.py
          airflow dags list --subset pipelines/dag_shadow_deployment.py
          # ... and so on for the other new DAGs

A/B Testing

Architecture Diagram

This diagram shows the two main phases: the Setup Phase (orchestrated by Airflow) and the Analysis Phase (performed by a data scientist after the experiment period).

Infrastructure as Code

We need a dedicated SNS topic to notify stakeholders when the A/B test segments are ready.

# terraform/ab_testing.tf

resource "aws_sns_topic" "ab_test_notifications_topic" {
  name = "clv-ab-test-notifications"
  display_name = "Notifications for CLV A/B Test Readiness"
  
  tags = {
    Environment = var.environment
    Purpose     = "AB-Testing"
  }
}

# We will reuse the sagemaker_inference_role created previously.
# No new roles are needed if permissions are broad enough.

A/B Test Setup Airflow DAG

This DAG prepares all the necessary data for the marketing and analytics teams to run the A/B test.

# pipelines/dag_ab_test_setup.py
from airflow.decorators import dag, task
from airflow.providers.amazon.aws.operators.sagemaker import SageMakerTransformOperator
from airflow.providers.sns.operators.sns import SnsPublishOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
from airflow.models.param import Param
from datetime import datetime
import mlflow
import pandas as pd
import numpy as np

# --- Constants ---
MLFLOW_TRACKING_URI = "http://your-mlflow-server:5000"
MODEL_NAME = "clv-prediction-model"
BASE_S3_PATH = "s3://clv-inference-data-bucket/ab-tests"
CUSTOMER_LIST_PATH = "s3://clv-inference-data-bucket/input/active_customers.jsonl"
SNS_TOPIC_ARN = "arn:aws:sns:eu-west-1:ACCOUNT_ID:clv-ab-test-notifications"

@dag(
    dag_id="clv_ab_test_setup",
    start_date=datetime(2025, 1, 1),
    schedule=None,  # Manually triggered for major tests
    catchup=False,
    params={"challenger_version": Param(type="string", description="The challenger model version from MLflow to test.")},
    doc_md="Sets up a formal A/B test by scoring all users with Champion and Challenger models and assigning them to groups.",
    tags=['clv', 'testing', 'ab-test'],
)
def ab_test_setup_dag():

    @task
    def get_model_uris(**kwargs) -> dict:
        """Fetches S3 URIs for the Production (Champion) and specified Challenger models."""
        # Logic to fetch champion and challenger URIs from MLflow
        # ... (similar to shadow DAG) ...
        pass

    @task
    def assign_users_to_groups(run_id: str) -> str:
        """Randomly assigns all active customers to a 'control' or 'treatment' group."""
        print("Assigning users to A/B test groups...")
        s3_hook = S3Hook()
        customer_file = s3_hook.download_file(key=CUSTOMER_LIST_PATH)
        customers_df = pd.read_json(customer_file, lines=True)

        # Assign each user to a group
        np.random.seed(42) # for reproducibility
        customers_df['group'] = np.random.choice(['control', 'treatment'], size=len(customers_df), p=[0.5, 0.5])
        
        output_path = f"ab-tests/{run_id}/assignments/user_assignments.csv"
        s3_hook.load_string(
            string_data=customers_df[['CustomerID', 'group']].to_csv(index=False),
            key=output_path,
            bucket_name="clv-inference-data-bucket",
            replace=True
        )
        print(f"User assignments saved to {output_path}")
        return output_path

    model_uris = get_model_uris()
    user_assignments_path = assign_users_to_groups(run_id="{{ run_id }}")
    
    # Run two parallel batch transform jobs
    run_champion_job = SageMakerTransformOperator(...)
    run_challenger_job = SageMakerTransformOperator(...)
    
    @task
    def notify_stakeholders(run_id: str):
        """Sends a notification that the A/B test setup is complete."""
        message = f"""
        A/B Test Setup Complete for run_id: {run_id}

        The following artifacts are ready for the Marketing and Analytics teams:
        - User Group Assignments: s3://clv-inference-data-bucket/ab-tests/{run_id}/assignments/
        - Champion Model Scores: {run_champion_job.output_path}
        - Challenger Model Scores: {run_challenger_job.output_path}
        
        The experiment can now begin.
        """
        return message

    notification = notify_stakeholders(run_id="{{ run_id }}")

    publish_notification = SnsPublishOperator(
        task_id="publish_setup_complete_notification",
        target_arn=SNS_TOPIC_ARN,
        message=notification,
    )
    
    # Dependencies
    [run_champion_job, run_challenger_job]
    user_assignments_path >> [run_champion_job, run_challenger_job]
    model_uris >> [run_champion_job, run_challenger_job]
    [run_champion_job, run_challenger_job] >> notification >> publish_notification

ab_test_setup_dag()

Analysis Notebook After the experiment period (e.g., 4 weeks), a data scientist would use this notebook to analyze the results.

# analysis/analyze_ab_test_results.ipynb

# --- 1. Setup and Load Data ---
import pandas as pd
import numpy as np
from scipy import stats
import seaborn as sns
import matplotlib.pyplot as plt

# In a real scenario, you would load the results from your data warehouse.
# Here, we simulate the final dataset.
print("Simulating experimental results...")
np.random.seed(42)
num_users = 100000

# Load the user assignments from the DAG run
# user_assignments = pd.read_csv("s3://clv-inference-data-bucket/ab-tests/RUN_ID/assignments/user_assignments.csv")
user_assignments = pd.DataFrame({
    'CustomerID': range(num_users),
    'group': np.random.choice(['control', 'treatment'], size=num_users, p=[0.5, 0.5])
})

# Simulate revenue generated during the 4-week test period
# Let's assume the treatment group had a small positive effect
control_revenue = np.random.gamma(1.5, 50, size=len(user_assignments[user_assignments['group'] == 'control']))
treatment_revenue = np.random.gamma(1.6, 50, size=len(user_assignments[user_assignments['group'] == 'treatment']))

user_assignments.loc[user_assignments['group'] == 'control', 'revenue'] = control_revenue
user_assignments.loc[user_assignments['group'] == 'treatment', 'revenue'] = treatment_revenue

results_df = user_assignments
print(results_df.head())
print("\n--- Data Loaded ---")

# --- 2. Analyze Primary Metric: Incremental Revenue Per User ---
control_group = results_df[results_df['group'] == 'control']['revenue']
treatment_group = results_df[results_df['group'] == 'treatment']['revenue']

avg_control_revenue = control_group.mean()
avg_treatment_revenue = treatment_group.mean()
lift = (avg_treatment_revenue - avg_control_revenue) / avg_control_revenue

print(f"\n--- A/B Test Results ---")
print(f"Control Group Mean Revenue: ${avg_control_revenue:.2f}")
print(f"Treatment Group Mean Revenue: ${avg_treatment_revenue:.2f}")
print(f"Observed Lift: {lift:.2%}")

# --- 3. Statistical Significance Testing (t-test) ---
# Perform an independent t-test to check if the difference is statistically significant
t_stat, p_value = stats.ttest_ind(treatment_group, control_group, equal_var=False) # Welch's t-test

print(f"\nT-statistic: {t_stat:.4f}")
print(f"P-value: {p_value:.4f}")

alpha = 0.05
if p_value < alpha:
    print(f"\nResult is STATISTICALLY SIGNIFICANT at alpha={alpha}. We reject the null hypothesis.")
else:
    print(f"\nResult is NOT statistically significant at alpha={alpha}. We fail to reject the null hypothesis.")

# --- 4. Visualize the Results ---
plt.figure(figsize=(10, 6))
sns.histplot(control_group, color="blue", label="Control Group", kde=True, stat="density", element="step")
sns.histplot(treatment_group, color="red", label="Treatment Group", kde=True, stat="density", element="step")
plt.title("Distribution of Revenue per User")
plt.xlabel("Revenue")
plt.legend()
plt.show()

# --- 5. Conclusion ---
print("\n--- Conclusion ---")
if p_value < alpha:
    print("The challenger model led to a statistically significant increase in revenue per user.")
    print("Recommendation: Promote the challenger model to production.")
else:
    print("The challenger model did not show a statistically significant improvement over the champion.")
    print("Recommendation: Do not ship. Re-evaluate the model or iterate further.")

CI Workflow for A/B Test DAG

This ensures the setup DAG is always valid before being merged.

# .github/workflows/ci_ab_test_dag.yml

name: "CI for A/B Test Setup DAG"

on:
  pull_request:
    branches:
      - main
    paths:
      - 'pipelines/dag_ab_test_setup.py'
      - '.github/workflows/ci_ab_test_dag.yml'

jobs:
  validate-ab-test-dag:
    name: "Validate A/B Test DAG"
    runs-on: ubuntu-latest
    
    steps:
      - name: Checkout Repository
        uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v4
        with:
          python-version: 3.9

      - name: Install Dependencies
        run: |
          pip install -r requirements.txt
      
      - name: Validate Airflow DAG Integrity
        run: |
          # Use the airflow standalone command to check for syntax errors
          airflow dags list --subset pipelines/dag_ab_test_setup.py

Governance, Ethics & The Human Element

1. Guiding Philosophy: Building a Trustworthy and Compliant System

Our core philosophy is that effective Model Governance and Responsible AI (RAI) practices are not optional add-ons but are integral to the long-term success and viability of the CLV prediction system. Given that this system directly influences marketing decisions, customer segmentation, and potentially financial outcomes (promotions, offers), it falls into a category requiring a high degree of diligence. We will integrate governance and ethical checks throughout the MLOps lifecycle.

2. Comprehensive Model Governance Plan

We will implement a governance framework tailored to our MLOps lifecycle, focusing on reproducibility, validation, and control.

ML Lifecycle Stage

Governance Component

Actions & Artifacts for the CLV Project

Development

Reproducibility & Validation

1. Model Registry (MLflow): Every production and challenger model will have its metadata logged, including: Git commit hash of the training code, DVC hash of the training data, key hyperparameters, and final offline evaluation metrics (RMSE, Gini, Fairness metrics).
2. Model Card: We will generate and attach a ModelCard.md to every registered “Production” model version. This will document its intended use, limitations, training data overview, evaluation results on key slices, and fairness assessments.
3. Data Sheets: The schema and source of our core datasets (transactional, behavioral) will be documented using Great Expectations, serving as our “Data Sheets”.

Deployment & Operations

Observation, Control & Auditability

1. Versioned Deployments: All deployments of inference pipelines (our weekly batch job) will be tied to a specific version-controlled Airflow DAG and a versioned model from the registry.
2. Access Control (IAM): We will use specific, least-privilege IAM roles for each component (Airflow, SageMaker Training, SageMaker Inference) to control access to data and resources.
3. Logging & Audit Trail: All pipeline runs (Airflow), model training jobs (SageMaker), and inference jobs (SageMaker Batch Transform) will generate detailed logs stored in CloudWatch. Prediction outputs will be logged to S3 with metadata linking them to the exact model version used. This creates a complete, auditable trail from prediction back to the source model, code, and data.

Cross-Cutting

Model Service Catalog

The MLflow Model Registry will serve as our internal catalog, allowing stakeholders to discover available CLV models, view their performance, and understand their current deployment stage (Staging, Production, Archived).

3. Responsible AI (RAI) Practices

We will proactively address the key pillars of Responsible AI to ensure our CLV model is fair, transparent, and secure.

RAI Pillar

Plan for the CLV Project

Fairness

1. Identify Potential Biases: We recognize that our historical sales data may contain biases. For example, marketing efforts might have historically targeted specific demographics, leading to representation bias.
2. Fairness as a Guardrail Metric: During the offline evaluation step of our training pipeline, we will calculate the Disparate Impact Ratio. We will measure the average predicted CLV for different customer segments (e.g., based on country or acquisition_channel) and calculate the ratio between the lowest-scoring and highest-scoring groups.
3. Mitigation Strategy: If this ratio falls below a threshold (e.g., 0.8), the model will be flagged, and it will not be automatically promoted. This will trigger a manual review by the data science team, who may need to apply pre-processing techniques like re-weighting the training data for the under-predicted segment.

Explainability (XAI)

1. Global Explainability: For each production model, we will generate and store a global feature importance plot using aggregated SHAP values. This will be included in the Model Card to help business stakeholders understand the primary drivers of CLV across the entire customer base.
2. Local Explainability: Our batch inference pipeline will be configured to optionally generate and log SHAP values for each individual prediction. While not enabled by default to save costs, this capability can be turned on for debugging specific customer predictions or for providing explanations to customer service teams.

Transparency

1. Model Cards: The Model Card is our primary tool for transparency. It will be our “nutrition label” for the model.
2. Internal Communication: We will establish a clear process for communicating model updates and their expected impact (from A/B test results) to the marketing and business intelligence teams.

Privacy

1. PII Handling: Our data ingestion pipelines will include a explicit step to hash or anonymize any direct Personally Identifiable Information (PII) like names or full addresses before storing it in our analytical data lake. The CustomerID will be a pseudonymized key.
2. Data Minimization: We will only use the data necessary for the CLV task and will not ingest unrelated sensitive customer data.

Security

1. Endpoint Security: N/A as we are using a batch pipeline, not a real-time API.
2. Access Control: All access to data (S3), code (Git), and infrastructure (AWS) is managed via strict IAM roles and policies.
3. Secret Management: Database credentials and other secrets are stored securely in AWS Secrets Manager, not in code.

4. Holistic System Testing & Production Readiness

We will use the ML Test Score framework as a guiding checklist to assess our production readiness.

  • Data Tests: We have already implemented several of these with Great Expectations (schema checks, feature expectations). We will add a manual review step to ensure no features inadvertently contain PII.

  • Model Development Tests:

    • Our model specs are versioned in Git and peer-reviewed.

    • We have a plan to correlate offline metrics with online A/B test results.

    • Our training pipeline includes hyperparameter tuning.

    • We have a baseline model (Linear Regression) to compare against.

    • We have slice-based performance checks for fairness (evaluate_on_slices).

  • ML Infrastructure Tests:

    • Our training is reproducible via versioned code, data, and config.

    • Our pipeline is integration tested via the CD workflow (cd_training_pipeline.yml).

    • Model quality is validated before promotion (automated checks and manual gates).

    • We have a rollback mechanism (promoting the previous “Production” model from the MLflow archive).

  • Monitoring Tests:

    • We have monitoring for data invariants (Great Expectations), feature compute skew (by using a Feature Store), and prediction quality drift (PSI monitoring).

Our self-assessed ML Test Score for this plan would be high, indicating a strong degree of production readiness.

5. Human Element: Team Structure & User-Centric Design

  • Team Structure: We will operate on a Platform-Enabled Model.

    • The ML Platform Team (MLOps Engineers) is responsible for building and maintaining the automated MLOps infrastructure (the Airflow DAGs, CI/CD workflows, Terraform modules, monitoring stack).

    • The Data Science Team (ML Engineers/Data Scientists) is responsible for the “Task” of building the CLV model. They own the model development code (src/), the evaluation logic, and are the primary consumers of the platform. They are responsible for analyzing model performance and proposing improvements.

  • User-Centric Design: The “users” of our CLV model are the marketing and business intelligence teams.

    • Managing Expectations: We will communicate clearly that the CLV score is a prediction, not a guarantee. The Model Card will document the model’s RMSE to provide a clear indication of its average error range.

    • Building Trust: By providing explanations for model behavior (via feature importance) and being transparent about its performance (via dashboards), we will build the marketing team’s trust in the scores, encouraging them to use the output effectively in their campaigns.


System Architecture, Cost, Performance Optimisations

Overall System Architecture Diagram

Sequence Diagram: Batch Inference

Latency, Potential Bottlenecks, and Optimizations

  • Total Pipeline Latency: The end-to-end latency is dominated by the SageMaker Batch Transform job runtime. A realistic estimate for scoring 1 million customers on moderately complex instances would be 1 to 3 hours.

  • Potential Bottlenecks & Optimizations:

    1. Bottleneck: Feature Retrieval. If our inference script naively calls GetRecord from the Online Feature Store for every single customer, this will be the biggest bottleneck. A single GetRecord call might take ~10-20ms. For 1 million customers, this would be 1,000,000 * 0.015s 4.2 hours of just waiting for feature data, overwhelming any other part of the process.

      • ✅ Performance Optimization: This is the most critical optimization. We must leverage the SageMaker Feature Store’s Offline Store. The inference script should be designed to execute a single, efficient Athena SQL query that joins the input customer list with the feature data in the offline store. This transforms the feature retrieval from millions of slow, individual lookups into a single, fast, parallelized data join operation, likely reducing feature retrieval time to just a few minutes.

    2. Bottleneck: Compute Instance Sizing. The runtime of the Batch Transform job is directly proportional to the compute power allocated.

      • ✅ Performance Optimization: We can horizontally scale by increasing the InstanceCount in the Batch Transform job configuration. SageMaker will automatically partition the input data across the instances, running them in parallel. We can also vertically scale by choosing a more powerful InstanceType (e.g., from ml.m5.large to ml.m5.4xlarge).

    3. Bottleneck: Model Complexity. A very large XGBoost model (many trees, deep trees) will take longer to load and will have higher per-prediction latency.

      • ✅ Performance Optimization: Use model quantization or compile the model using SageMaker Neo. This can reduce the model’s on-disk size and improve inference speed, though it requires an extra step in the training pipeline.

Estimated Monthly Cost

Assumptions:

  • Region: eu-west-1 (Ireland)

  • Active Customers: 1 million

  • Daily Transactions: ~150,000 records (~1GB/month)

  • Daily Behavioral Events: ~250k sessions, generating ~5GB/day (~150GB/month)

  • Feature Engineering: Runs daily on 1 month of data (~150GB).

  • Model Training: Runs weekly.

  • Batch Inference: Runs weekly on 1 million customers.

Component

Assumptions / Usage

Instance / Pricing Unit

Estimated Monthly Cost ($)

S3 Storage

Raw Data (~150GB) + Features (~50GB) + Models/Artifacts (~10GB) = ~210 GB

$0.023 per GB-month

~$5

AWS Glue

Daily job, ~15 mins on 2 DPUs. (0.25h * 2 DPUs * 30 days * $0.44/DPU-hr)

DPU-Hours

~$7

Kinesis Data Streams

1 shard, continuously running. (1 shard * 24h * 30d * $0.015/hr)

Shard-Hours

~$11

Kinesis Data Firehose

Ingests 150 GB/month. ($0.029/GB)

GB Ingested

~$5

EMR (Feature Engineering)

Daily transient cluster. 1 master + 4 core m5.xlarge instances for 1 hour. (5 instances * 1h * 30d * $0.192/hr)

Instance-Hours

~$29

SageMaker Feature Store

Offline: 50GB storage.
Online: Low write/read units for batch. (50 * $0.023) + (2 RCU * 730h * $0.057) + (2 WCU * 730h * $0.285)

Storage + RCU/WCU-Hours

~$500

SageMaker Training

Weekly job on one ml.m5.4xlarge for 2 hours. (1 instance * 2h * 4 weeks * $0.922/hr)

Instance-Hours

~$8

SageMaker Batch Inference

Weekly job on two ml.m5.4xlarge for 2 hours. (2 instances * 2h * 4 weeks * $0.922/hr)

Instance-Hours

~$15

MWAA (Airflow)

Smallest environment (mw1.small), continuously running. ($0.49/hr * 24h * 30d)

Environment-Hours

~$353

CloudWatch

Logs (~10 GB/month) + Custom Metrics + Alarms

GB Ingested & Metrics

~$10

Total Estimated Monthly Cost

~$943

Conclusion on Cost: The primary cost drivers are the continuously running managed services: SageMaker Feature Store (Online Store) and Airflow (MWAA). The batch compute jobs (EMR, SageMaker) are significant but less than the persistent services. This cost structure is very reasonable for providing a production-grade ML capability for a mid-sized business.

Throughput Estimates & Performance Optimizations

a) Throughput Estimates

  • Feature Engineering (EMR): With a 5-node m5.xlarge cluster, processing the full monthly dataset of ~150GB to generate features for ~1M customers would likely take ~1-2 hours. This equates to a throughput of ~500,000 to 1,000,000 customers per hour.

  • Batch Inference (SageMaker): With two ml.m5.4xlarge instances, scoring 1M customers (with the optimized feature retrieval) would likely take ~1 hour. This equates to a throughput of ~1,000,000 customers per hour, or ~280 predictions per second.

Further Performance Optimizations

While we’ve discussed key optimizations, here’s a consolidated list:

  1. Optimize Feature Engineering:

    • Right-size the EMR Cluster: Profile the Spark jobs to find the optimal number and type of instances. Too small a cluster is slow; too large a cluster wastes money.

    • Optimize Spark Code: Use techniques like data partitioning, caching intermediate DataFrames, and avoiding expensive operations like collect().

  2. Optimize Batch Inference:

    • Use the Offline Feature Store: As detailed above, this is the most critical optimization.

    • Increase Parallelism: Increase the MaxConcurrentTransforms and InstanceCount in the Batch Transform job configuration to process the data faster.

    • Batching Strategy: For the inference script, ensure that it processes records in mini-batches to take advantage of vectorized prediction in libraries like XGBoost, rather than predicting one record at a time.

  3. General Optimizations:

    • Use AWS Savings Plans or Reserved Instances: For the continuously running components like MWAA and the Feature Store, committing to a 1 or 3-year term can reduce costs by up to 40-60%.

    • Automate Shutdowns: Ensure all development and staging environments (e.g., test Airflow instances) are automatically shut down when not in use.

Rationale behind Design Choices

Why is the Airflow (MWAA) running continuously?

  • Its Function is Continuous Orchestration: Think of Airflow as the “control tower” or the “central nervous system” for all of our data and ML pipelines. It’s a server, not a job. Its primary responsibilities are continuous and time-sensitive:

    1. Scheduling: It needs to be “awake” 24/7 to check its schedules. When a DAG is defined with schedule_interval="@daily" or "@weekly", Airflow’s internal scheduler component is constantly checking the clock to see if it’s time to trigger a new run.

    2. State Management: It maintains the status of all past and current pipeline runs. If a job fails, Airflow holds that state and knows not to run downstream tasks. This historical state is crucial for debugging and operational reliability.

    3. API and UI: It serves a user interface and an API, allowing developers to inspect logs, trigger manual runs, and see the status of all pipelines at any time.

    4. Listening for Triggers: As we designed in the Continual Learning section, our retraining pipeline can be triggered by an external API call from our monitoring system. Airflow must be running continuously to listen for and respond to these event-driven triggers.

  • Conclusion: Because its core function is to schedule, monitor, and respond in real-time, MWAA is a persistent service by design. It cannot be spun up on-demand just to run a single pipeline, as that would defeat its purpose as an orchestrator.

Why is the Feature Store (Online) running continuously?

  • Its Function is Low-Latency Access: The Online Feature Store is specifically designed and optimized for real-time, low-millisecond data retrieval. It’s essentially a managed, high-performance key-value database. This low-latency guarantee is only possible because the service is persistent, with data indexed and ready in memory. If you had to “spin it up” each time you needed it, the lookup time would be minutes, not milliseconds, and it would just be a slow, expensive version of the Offline Store.

  • Architectural Refinement for Our Use Case:

    • Initial Design: The Online Store was included as a best practice for future-proofing. A common next step for a CLV project is to expose the scores via a real-time API for an application to use (e.g., showing a special offer to a high-value customer while they are browsing). That real-time API would absolutely require the Online Feature Store. By building it into the initial architecture, the system is ready for that evolution.

    • A More Cost-Effective, Batch-Only Approach: If we are 100% certain that we will not have a real-time requirement in the near future, we could optimize for cost. We can configure the SageMaker Feature Group to have only the Offline Store enabled. This means data is still cataloged and organized, but it is written only to S3.

    • Impact on the Pipeline: In this optimized scenario, our Batch Inference script would be modified to query the Offline Store (using an Athena query via the SDK) instead of the Online Store. The cost would drop dramatically (from ~$500/month to just a few dollars for S3 storage and Athena query costs), as we would no longer be paying for the continuously running read/write capacity units of the Online Store.

  • Conclusion: The initial estimate included the Online Store for strategic, forward-looking reasons. However, for the specific batch-only pipeline we’ve built, disabling it is a valid and significant cost optimization. The final decision is a classic MLOps trade-off: current cost savings vs. future agility.

Why is feature engineering run daily instead of just before weekly training?

a) The Need for Fresh Features

The predictive power of our CLV model relies heavily on features that change daily. If we only update them weekly, we lose a significant amount of signal.

  • Recency is Critical: The recency feature (days since last purchase) is one of the strongest predictors of churn and repeat purchases. This value changes every single day for every customer who doesn’t make a purchase. If we only calculate this weekly, a customer who bought yesterday and a customer who bought 6 days ago look identical to the model, but their short-term behavior is vastly different.

  • Rolling Windows Capture Trends: Features like total_spend_30d or purchase_count_90d are designed to capture recent momentum. A daily update ensures this window accurately reflects the last 30/90 days. A weekly update means that for most of the week, the window is stale and includes data that should have already “aged out.”

By calculating features daily, we ensure that when the weekly training job runs, it uses the most accurate, up-to-date representation of each customer’s behavior.

b) Architectural Decoupling: The Feature Store as a Central Asset

This is the key MLOps principle at play. Tightly coupling the feature engineering pipeline to the training pipeline (i.e., only running it when you need to train) creates a brittle, monolithic system. By decoupling them, we create a more robust and scalable architecture.

  • The Feature Store as a “Source of Truth”: The purpose of the daily feature engineering pipeline is to produce a clean, validated, and up-to-date table of customer features in the Feature Store. This table becomes a central data asset for the entire organization, not just for one model.

  • Enabling Other Use Cases: Once this daily-refreshed feature set exists, it can be consumed by numerous other services without needing to re-run the complex engineering logic:

    • Other ML Models: A different team might want to build a real-time churn prediction model. They don’t need to build their own feature pipeline; they can simply consume the fresh features we are already producing.

    • Business Intelligence (BI): The marketing team might want a daily dashboard showing the number of customers in different RFM segments. They can query the Feature Store directly instead of asking data engineering for a custom pipeline.

    • Ad-Hoc Analysis: A data scientist wanting to explore customer behavior can immediately query the Feature Store for clean, ready-to-use data.

By running feature engineering daily, we are not just preparing data for our model; we are creating a reliable, daily-refreshed data product that serves the entire business. This is a core tenet of building a scalable and efficient data culture.