# Anomaly Detection in Time Series IoT Data ## --- ### TL;DR: Predictive Maintenance for Smart Heating Systems * **Challenge:** A home automation company faced high operational costs and resident dissatisfaction due to reactive maintenance of apartment heating systems. Failures were only addressed after residents reported discomfort, leading to expensive emergency call-outs and a poor customer experience. The goal was to shift from this reactive model to a proactive, predictive maintenance strategy. * **My Role & Solution:** As the sole Data Scientist and MLOps Engineer, I designed and built the end-to-end Machine Learning system to proactively detect heating anomalies. My contributions spanned the entire ML lifecycle: * **Strategic Approach:** I developed a phased ML strategy, starting with unsupervised models (like Isolation Forest) to provide immediate value in the absence of labeled data, and establishing a human-in-the-loop feedback system with maintenance technicians. This data flywheel was crucial for collecting labels, which enabled the transition to a high-performing supervised XGBoost model. * **Feature Engineering:** I designed and implemented robust data pipelines in AWS Glue to process raw IoT sensor data (heating energy, room/setpoint temperatures) and external weather data, creating crucial temporal features like lags and rolling averages that captured the system's dynamic behavior. * **MLOps Infrastructure:** Using Terraform and Bitbucket Pipelines, I built and automated the entire MLOps workflow on AWS. This included containerized model training (Docker/ECR), orchestrated by AWS Step Functions, and a serverless inference pipeline using SageMaker Batch Transform. * **Production Lifecycle Management:** I implemented a complete model lifecycle using SageMaker Model Registry for versioning and governance, with automated integration tests to validate deployments. I also established a monitoring and retraining framework to address model drift and ensure long-term performance. * **Impact:** The deployed system successfully transitioned the company to a data-driven maintenance approach. Within the first six months of operation, the solution achieved: * **A 20% reduction in emergency maintenance call-outs** for heating systems, by identifying developing issues before they became critical failures. * **A 75% precision rate (Precision@50)** on the highest-priority alerts, ensuring that technician time was focused on investigating real, actionable issues. * **An estimated 15-20% improvement in operational efficiency** for the maintenance team through better planning and prioritization of tasks. * **System Architecture:** I architected and implemented the entire AWS-based solution, focusing on serverless, scalable, and automated components. ![](../_static/past_experiences/iot_anomaly/contributions.png) ### Introduction #### Purpose This document provides detailed technical information about the Machine Learning (ML) based Anomaly Detection system. It serves as a guide for developers, MLOps engineers, and operations teams involved in maintaining, operating, and further developing the system. #### Business Goal The primary goal is to transition from reactive to predictive maintenance for apartment heating systems. By detecting anomalies indicative of potential malfunctions *before* they cause resident discomfort or system failure, the system aims to: * Reduce operational costs associated with emergency maintenance. * Optimize maintenance scheduling and resource allocation. * Improve heating system reliability and uptime. * Enhance overall resident satisfaction. #### Key Technologies * **Cloud Platform:** AWS (Amazon Web Services) * **Data Lake:** Amazon S3 * **Data Processing/ETL:** AWS Glue (PySpark), AWS Lambda (Python), SageMaker Processing Jobs (PySpark/Scikit-learn) * **Feature Management:** Amazon SageMaker Feature Store (Offline Store) * **Model Training:** Amazon SageMaker Training Jobs (using custom Docker containers) * **Model Inference:** Amazon SageMaker Batch Transform * **Model Registry:** Amazon SageMaker Model Registry * **Orchestration:** AWS Step Functions * **Scheduling:** Amazon EventBridge Scheduler * **Alert Storage:** Amazon DynamoDB * **Infrastructure as Code:** Terraform * **CI/CD:** Bitbucket Pipelines * **Containerization:** Docker * **Core Libraries:** PySpark, Pandas, Scikit-learn, Boto3, Joblib, PyYAML ### Table of Contents 1. [Introduction](#introduction) * [Purpose](#purpose) * [Business Goal](#business-goal) * [Scope](#scope) * [Key Technologies](#key-technologies) 2. [Discovery and Scoping](#discovery-and-scoping) * [Use Case Evaluation](#use-case-evaluation) * [Product Strategies](#product-strategies) * [Features](#features) * [Product Requirements Document](#product-requirements-document) * [Milestones and Timelines](#milestones-and-timelines) 3. [System Architecture](#system-architecture) * [Overall Data Flow](#overall-data-flow) * [Training Workflow Diagram](#training-workflow-diagram) * [Inference Workflow Diagram](#inference-workflow-diagram) 4. [Challenges and learnings](#challenges-and-learnings) 5. [Configuration Management](#configuration-management) 6. [Infrastructure as Code (Terraform)](#infrastructure-as-code-terraform) * [Stacks Overview](#stacks-overview) * [Key Resources](#key-resources) 7. [Cost Analysis](#cost-analysis) 8. [CI/CD Pipeline (Bitbucket)](#cicd-pipeline-bitbucket) * [CI Workflow](#ci-workflow) * [Training CD Workflow](#training-cd-workflow) * [Inference CD Workflow](#inference-cd-workflow) 9. [Deployment & Execution](#deployment--execution) * [Prerequisites](#prerequisites) * [Initial Deployment](#initial-deployment) * [Running Training](#running-training) * [Running Inference](#running-inference) * [Model Approval](#model-approval) 10. [Monitoring & Alerting](#monitoring--alerting) 11. [Troubleshooting Guide](#troubleshooting-guide) 12. [Security Considerations](#security-considerations) 13. [Roadmap & Future Enhancements](#roadmap--future-enhancements) 14. [Appendices](#appendices) * [Configuration File Example](#configuration-file-example) ### Discovery and Scoping #### Use Case Evaluation ![](../_static/past_experiences/iot_anomaly/use_case.png) #### Product Strategies ![](../_static/past_experiences/iot_anomaly/strategy.png) #### Features ![](../_static/past_experiences/iot_anomaly/features.png) #### Product Requirements Document ![](../_static/past_experiences/iot_anomaly/prd.png) #### Development Stages

### System Architecture #### Overview The system follows a modular, event-driven, and batch-oriented architecture on AWS. It consists of distinct pipelines for data ingestion, model training, and daily inference. Orchestration relies heavily on AWS Step Functions, with SageMaker providing core ML capabilities.

#### Data Flow 1. **Ingestion:** Meter, Weather, and Topology data land in a raw S3 bucket via appropriate methods (batch, API fetch). A specific Lambda handles late-arriving meter data. 2. **Processing:** A daily AWS Glue ETL job reads raw data, cleans it, performs transformations (e.g., daily aggregations, temp vs. setpoint diff), joins datasets, and engineers features required by the AD models. Processed features are stored in Parquet format in the Processed S3 zone. The Glue Data Catalog is updated. 3. **Model Training:** A scheduled Step Functions workflow orchestrates training. It uses Glue to prepare a training dataset from the processed features, then triggers a SageMaker Training Job using the selected algorithms. The trained model is registered in the SageMaker Model Registry. 4. **Batch Inference:** A daily Step Functions workflow prepares the latest features (Glue), then runs a SageMaker Batch Transform job using the latest registered model to score each apartment/room. 5. **Results Storage:** Raw scores/flags are stored in S3. A subsequent Lambda/Glue job loads actionable *alerts* (where scores exceed thresholds) into a DynamoDB table or RDS instance for efficient dashboard querying. 6. **Serving:** The internal dashboard queries the Alert Database via API Gateway/Lambda. Data Analysts can use Athena to query both processed features and raw data directly via the Glue Data Catalog. #### Ingestion Workflow The Ingestion pipeline processes raw data into a structured, partitioned format (Parquet) in the S3 Processed Zone and updates the Glue Data Catalog. * **Responsibility:** Separate pipeline/process * **Output:** Partitioned Parquet data with corresponding Glue Data Catalog tables.

#### Training Workflow **Summary:** Triggered manually or by CI/CD/schedule -> Validates Schema -> Engineers Features (to Feature Store) -> Trains Model (using custom container) -> Evaluates Model -> Conditionally Registers Model (Pending Approval). 1. **State:** `ValidateSchema` * **Service:** SageMaker Processing Job (Spark) * **Action:** Reads sample/metadata from `processed_meter_data` for the input date range. Compares schema against predefined definition. Fails workflow on critical mismatch. 2. **State:** `FeatureEngineering` * **Service:** SageMaker Processing Job (Spark) / AWS Glue ETL Job * **Action:** Reads `processed_meter_data` and `processed_weather_data` for input date range. Calculates features (aggregations, lags, rolling windows, joins). Ingests features into SageMaker Feature Store (`ad-apartment-features` group). 3. **State:** `ModelTraining` * **Service:** SageMaker Training Job * **Action:** Reads features for the training period from Feature Store Offline S3 location. Instantiates selected model strategy (e.g., `LR_LOF_Model`). Fits model components (Scaler, LR, LOF). Saves fitted artifacts as `model.joblib` within `model.tar.gz` to S3 output path. 4. **State:** `ModelEvaluation` * **Service:** SageMaker Processing Job (Python/Scikit-learn) * **Action:** Loads `model.tar.gz` artifact. Reads evaluation features (hold-out set) from Feature Store Offline S3 location. Calculates performance metrics (e.g., backtesting precision/recall if labels available, score distributions). Estimates training throughput. Writes `evaluation_report.json` to S3. 5. **State:** `CheckEvaluation` (Choice) * **Service:** Step Functions Choice State * **Action:** Compares key metrics from `evaluation_report.json` (requires parsing, possibly via an intermediate Lambda) against configured thresholds. Transitions to `RegisterModelLambda` or `EvaluationFailed`. 6. **State:** `RegisterModelLambda` * **Service:** AWS Lambda * **Action:** Reads evaluation report URI and model artifact URI from state. Gathers metadata (git hash, params, metrics, data lineage). Creates a new Model Package version in the target SageMaker Model Package Group with status `PendingManualApproval`. 7. **Terminal States:** `WorkflowSucceeded`, `EvaluationFailed`, `WorkflowFailed`. #### Inference Workflow 1. **State:** `GetApprovedModelPackage` * **Service:** AWS Lambda * **Action:** Queries SageMaker Model Registry for the latest Model Package with `Approved` status in the configured group. Returns its ARN. Fails if none found. 2. **State:** `CreateModelResource` * **Service:** AWS Lambda * **Action:** Creates a SageMaker `Model` resource using the approved Model Package ARN from the previous step and a unique name. This `Model` resource links the artifacts and container for Batch Transform. Returns the created `ModelName`. 3. **State:** `FeatureEngineeringInference` * **Service:** SageMaker Processing Job (Spark) / AWS Glue ETL Job * **Action:** Reads processed data for the inference date + lookback period. Calculates features using the *exact same logic* as training feature engineering. Outputs features (e.g., CSV format without headers) required by the model to a unique S3 path for this execution. 4. **State:** `BatchTransform` * **Service:** SageMaker Batch Transform Job * **Action:** Uses the `ModelName` created earlier. SageMaker launches the container, mounts the model artifact to `/opt/ml/model`, and provides input features from S3. The script loads the model, generates anomaly scores, and outputs scores (e.g., CSV format with identifiers and scores) to the specified S3 output path. 5. **State:** `ProcessResults` * **Service:** AWS Lambda * **Action:** Triggered after Batch Transform. Reads raw score files from the S3 output path. Applies the configured alert threshold. Formats alert data (ApartmentID, Date, Score, Status='Unseen', etc.). Writes alerts to the DynamoDB Alert Table using `BatchWriteItem`. 6. **Terminal States:** `WorkflowSucceeded`, `WorkflowFailed`. ### Model Development & Iteration **Models for Anomaly Detection** ![](../_static/past_experiences/iot_anomaly/models1.png) ![](../_static/past_experiences/iot_anomaly/models2.png) ![](../_static/past_experiences/iot_anomaly/model_development.png) ### Challenges and learnings ![](../_static/past_experiences/iot_anomaly/challenges1.png) ![](../_static/past_experiences/iot_anomaly/challenges2.png) ![](../_static/past_experiences/iot_anomaly/challenges3.png) ### Configuration Management * **Primary Method:** Version-controlled configuration files (e.g., `config/ad_config.yaml`) stored in Git. These define non-sensitive parameters like hyperparameters, feature lists, thresholds, instance types. * **Distribution:** Config files are uploaded to a designated S3 location (e.g., `s3://[scripts-bucket]/config/`) by the CI/CD pipeline. * **Loading:** Scripts (Glue, SM Processing, Lambda) receive the S3 URI of the relevant config file via an environment variable (`CONFIG_S3_URI`) or argument. They use `boto3` to download and parse the file at runtime. Libraries like `PyYAML` are needed. * **Runtime Overrides:** Step Function inputs or job arguments can override specific parameters from the config file for execution-specific needs (e.g., `inference_date`, experimental hyperparameters). * **Secrets:** Sensitive information MUST be stored in AWS Secrets Manager or SSM Parameter Store (SecureString) and fetched by the application code using its IAM role. Do NOT store secrets in Git config files. * **Environment Variables:** Used primarily for passing S3 URIs (config file, data paths), resource names (table names, feature group), and potentially secrets fetched from secure stores. ### Infrastructure as Code (Terraform) * **Tool:** Terraform manages all AWS infrastructure. * **State Management:** Configure a remote backend (e.g., S3 with DynamoDB locking) for Terraform state files. * **Stacks:** Infrastructure is divided into logical stacks: * `ingestion`: S3 buckets (Raw, Processed), Glue DB/Tables, Ingestion Glue Job, associated IAM roles. * `training`: S3 buckets (Scripts, Artifacts, Reports - potentially reused/shared), ECR Repo, Feature Group, Model Package Group, specific IAM roles, Lambdas (Register Model), Step Function (`ADTrainingWorkflow`). * `inference`: DynamoDB Table (Alerts), specific IAM roles, Lambdas (Get Model, Create Model, Process Results), Step Function (`ADInferenceWorkflow`), EventBridge Scheduler. * **Variables & Outputs:** Stacks use input variables (defined in `variables.tf`) for configuration and expose key resource identifiers via outputs (defined in `outputs.tf`). Outputs from one stack (e.g., `processed_bucket_name` from ingestion) are passed as inputs to dependent stacks. ### CI/CD Pipeline (Bitbucket) * **Tool:** Bitbucket Pipelines (`bitbucket-pipelines.yml`). * **CI Workflow (Branches/PRs):** 1. Lint Python code (`flake8`). 2. Run Unit Tests (`pytest tests/unit/`). 3. Build Training/Inference Docker container. 4. Push container to AWS ECR (tagged with commit hash). 5. Validate Terraform code (`terraform validate`, `fmt -check`) for all stacks. * **Training CD Workflow (`custom:deploy-and-test-ad-training`):** 1. (Manual Trigger Recommended) 2. Run CI steps (Lint, Unit Test, Build/Push). 3. Apply `training_ad` Terraform stack (using commit-specific image URI). 4. Prepare integration test data (trigger ingestion or verify pre-staged). 5. Run Training Integration Tests (`pytest tests/integration/test_training_workflow.py`). * **Inference CD Workflow (`custom:deploy-and-test-ad-inference`):** 1. (Manual Trigger Recommended) 2. (Optional) Run CI checks. 3. Apply `inference_ad` Terraform stack. 4. Prepare integration test data (verify processed data, ensure approved model exists). 5. Run Inference Integration Tests (`pytest tests/integration/test_inference_workflow.py`). * **Variables:** Uses Bitbucket Repository Variables (CI) and Deployment Variables (CD) for AWS credentials and environment-specific parameters. ### Cost Analysis This is a high-level estimate based on the architecture we designed and the data volume assumptions made previously. Actual costs will vary. **Assumptions:** * **Environment:** AWS `eu-central-1` (Frankfurt) region. * **Apartments:** 3,000. * **Daily Processed Data:** ~300 MB ingested into the Processed S3 Zone. * **Total Monthly Processed Data:** 300 MB/day * 30 days = **~9 GB**. * **Model Training Frequency:** 4 times per month (weekly). * **Batch Inference Frequency:** 30 times per month (daily). | Pipeline Component | AWS Service(s) | Detailed Cost Calculation & Rationale | Estimated Cost (USD) | | :--- | :--- | :--- | :--- | | **Data Ingestion & Processing** | **AWS Glue**
**S3** | **AWS Glue ETL:** Priced per DPU-hour. This covers the initial job to process raw IoT data into the `processed_meter_data` S3 zone. Assuming a daily run of a small-to-medium job.
- 1 job/day * 30 days * 0.25 hours/job * 5 DPUs * ~$0.44/DPU-hr = **~$16.50**

**S3 (PUT Requests):** Cost for ingestion jobs writing to the Processed bucket. Assuming ~100k PUT requests per day for all apartments.
- 100k req/day * 30 days * ~$0.005/1k req = **~$15.00**
The cost is primarily driven by the daily processing compute and the volume of S3 writes. | **$30 - $50** | | **Feature Engineering** | **SageMaker Processing**
**SageMaker Feature Store** | **SageMaker Processing Jobs:** Priced per instance-second. This covers the feature engineering steps in *both* the weekly training pipeline and the daily inference pipeline.
- Training: 4 runs/month * 1.5 hours/run * 1 `ml.m5.large` instance * ~$0.11/hr = **~$0.66**
- Inference: 30 runs/month * 0.5 hours/run * 1 `ml.m5.large` instance * ~$0.11/hr = **~$1.65**

**SageMaker Feature Store:** Priced per GB-month (Offline), plus Write/Read units.
- Offline Store (S3): Covered under Storage cost. Assume ~50 GB of feature data.
- Write/Read Units: For batch-only workflows, this cost is typically low. Assuming minimal average usage. = **~$1.00** (buffer) | **$3 - $8** | | **Model Training** | **SageMaker Training**
**Step Functions** | **SageMaker Training Jobs:** Priced per instance-second. Assuming weekly retraining on a standard instance.
- 4 runs/month * 2.0 hours/run * 1 `ml.m5.large` instance * ~$0.11/hr = **~$0.88**

**Step Functions:** Priced per state transition. The training workflow is complex but runs infrequently.
- ~10 transitions/run * 4 runs/month = 40 transitions. This is well within the free tier. = **~$0.00** | **$1 - $3** | | **Model Inference** | **SageMaker Batch Transform**
**Step Functions** | **SageMaker Batch Transform:** Priced per instance-second. This is the daily job that scores all apartments.
- 30 runs/month * 1.0 hour/run * 1 `ml.m5.large` instance * ~$0.11/hr = **~$3.30**

**Step Functions:** The inference workflow runs daily.
- ~8 transitions/run * 30 runs/month = 240 transitions. Also well within the free tier. = **~$0.00** | **$3 - $5** | | **Alerting & Orchestration** | **AWS Lambda**
**DynamoDB** | **Lambda:** Priced per request and GB-second. Covers several functions in the workflows (Register Model, Get Approved Model, Process Results, etc.).
- ~40 total invocations/month * avg 10 sec duration * 256MB memory. All usage is well within the Lambda free tier. = **~$0.00**

**DynamoDB:** Priced per GB-month storage and per million RCU/WCU. Assuming Pay-Per-Request (On-Demand) mode.
- Storage: Assume ~1 GB of alert data stored. = **~$0.25**
- Write/Read Units: Assume ~5k alerts generated/month (writes) and ~10k dashboard queries/month (reads). On-Demand cost is negligible at this scale. = **~$0.10** | **$1 - $2** | | **Storage & Logging** | **S3**
**ECR**
**CloudWatch** | **S3:** Priced per GB-month. This is the largest continuous cost. Assumes total storage for Raw, Processed, Features, Artifacts, and Logs.
- Raw: 200 GB, Processed: 150 GB, Features: 50 GB, Artifacts/Other: 5 GB = ~405 GB
- 405 GB * ~$0.023/GB-month = **~$9.32**
- Add ~$5 for GET/LIST requests from various jobs.

**ECR:** Priced per GB-month. For storing the custom training/inference container images.
- 10 GB * ~$0.10/GB-month = **~$1.00**

**CloudWatch:** Logs from all jobs and Lambdas. Assuming usage stays within or slightly above the free tier.
- Assume ~10 GB log ingestion * ~$0.50/GB = **~$5.00** | **$20 - $30** | | **Total Estimated Monthly Cost** | **-** | **-** | **$60 - $100** | This detailed breakdown reveals that the operational cost is dominated by **Data Ingestion/Processing (Glue & S3 PUTs)** and **persistent Storage (S3 & ECR)**. The actual ML compute costs for training and inference are relatively small due to their batch, on-demand nature. This highlights the efficiency of a serverless, batch-oriented architecture for this use case, as there are no 24/7 clusters or endpoints to maintain. Optimizing S3 storage with lifecycle policies will be the most effective long-term cost management strategy. **Cost Optimisations** - S3 Storage Dominates: The largest cost component by far is S3 storage. Implementing S3 Lifecycle policies to move older raw/processed data or feature versions to cheaper tiers (like Intelligent-Tiering or Glacier) is crucial for long-term cost management. - Compute is Relatively Low: The actual compute cost for running the training jobs weekly is quite low with these assumptions. - Assumptions Matter: If your training jobs run much longer, use more instances, or run more frequently, the SageMaker costs will increase proportionally. If your data volume is significantly larger, S3 costs increase. - Spot Instances: For SageMaker Processing and Training Jobs, using Spot Instances can potentially save up to 90% on compute costs, but requires designing the jobs to handle potential interruptions (checkpointing for Training, stateless design for Processing). This could significantly reduce the ~$1.45 compute estimate. - Instance Selection: Choosing the right instance type (e.g., ml.t3.medium for less demanding tasks can optimize compute cost. ### Deployment & Execution **Initial Deployment:** 1. Configure AWS credentials locally/in CI runner. 2. Configure Bitbucket variables (Repository & Deployment). 3. Create `terraform.tfvars` files for each stack (`ingestion`, `training`, `inference`) providing required inputs (unique suffixes, potentially outputs from previous stacks). 4. Deploy Terraform stacks **in order**: `ingestion` -> `training` -> `inference`. Run `init`, `plan`, `apply` for each. 5. Build and push the initial Docker training/inference container to the ECR repository created by `training`. Ensure the `training_image_uri` variable used by Terraform deployments points to this image. 6. Place initial configuration files (`config.yaml`) in the designated S3 config location. 7. Prepare initial raw data and run the Ingestion Glue job once to populate the processed data zone. **Running Training:** * Trigger the `ADTrainingWorkflow` Step Function manually or via its schedule. * Provide input JSON specifying date range, parameters, code version (via image URI/git hash). **Running Inference:** * The `ADInferenceWorkflow` Step Function runs automatically based on the EventBridge schedule. * Ensure an *Approved* model package exists in the Model Registry for the workflow to succeed. **Model Approval:** * After a successful *Training* run, navigate to SageMaker -> Model Registry -> Model Package Groups -> [Your AD Group]. * Select the latest version (`PendingManualApproval`). * Review Description, Metadata, Evaluation Metrics. * If satisfactory, update status to `Approved`. ### Monitoring & Alerting * **CloudWatch Logs:** Central location for logs from Lambda, Glue, SageMaker Jobs. Implement structured logging within Python scripts for easier parsing. * **CloudWatch Metrics:** Monitor key metrics: * Step Functions: `ExecutionsFailed`, `ExecutionsTimedOut`. * Lambda: `Errors`, `Throttles`, `Duration`. * SageMaker Jobs: `CPUUtilization`, `MemoryUtilization` (if needed), Job Status (via Logs/Events). * DynamoDB: `ThrottledWriteRequests`, `ThrottledReadRequests`. * **CloudWatch Alarms:** **REQUIRED:** Set alarms on critical failure metrics (SFN `ExecutionsFailed`, Lambda `Errors`). Configure SNS topics for notifications. * **SageMaker Model Monitor (Future):** Implement data quality and model quality monitoring to detect drift over time. * **Application-Level Monitoring:** Track the number of alerts generated daily, processing times, etc. ### Troubleshooting Guide 1. **Workflow Failure (Step Functions):** Check the Step Functions execution history in the AWS Console. Identify the failed state and examine its input, output, and error message. 2. **Job Failures (Glue/SageMaker):** Go to the corresponding CloudWatch Log Group for the failed job (links often available in Step Function state details). Look for Python exceptions or service errors. Check job metrics for resource exhaustion. 3. **Lambda Failures:** Check the Lambda function's CloudWatch Log Group. Look for errors, timeouts, or permission issues. Verify environment variables and input payload. 4. **IAM Permissions:** If errors indicate access denied, carefully review the IAM roles and policies associated with the failing service (SFN, Lambda, SageMaker Job roles) ensuring necessary permissions to other services (S3, SageMaker API, DynamoDB, ECR, etc.). 5. **Data Issues:** * **Schema Mismatch:** Check `ValidateSchema` logs. Verify Glue Catalog definition matches actual data. * **Missing Features:** Ensure feature engineering script runs correctly and produces all columns needed by the model. Check Feature Store ingestion if used. * **Empty Data:** Check upstream processes; ensure ingestion ran and data exists for the target dates. 6. **Configuration Errors:** Verify config files in S3 are correct and accessible. Check environment variables passed to jobs/lambdas. 7. **Model Artifact Issues:** Ensure the `model.tar.gz` exists, is not corrupted, and contains all necessary files (`model.joblib`, etc.). Verify the `inference.py` script loads it correctly. 8. **Batch Transform Failures:** Check Batch Transform job logs in CloudWatch. Common issues include container errors (script failures, dependency issues), data format errors, or IAM permission problems for the model's execution role. ### Security Considerations * **IAM Least Privilege:** Regularly review and tighten IAM roles assigned to Step Functions, Lambdas, Glue, and SageMaker jobs. Grant only necessary permissions. * **Data Encryption:** * **At Rest:** Enable server-side encryption (SSE-S3, SSE-KMS) on all S3 buckets. Enable encryption for DynamoDB tables. Ensure EBS volumes attached to SageMaker jobs are encrypted. * **In Transit:** AWS services use TLS for communication by default. Ensure any custom external connections also use TLS. * **Secret Management:** Use AWS Secrets Manager or SSM Parameter Store (SecureString) for any sensitive credentials or API keys. * **Network Security:** For enhanced security, consider deploying resources within a VPC using VPC Endpoints for AWS service access, minimizing exposure to the public internet. Configure Security Groups appropriately. * **Container Security:** Regularly scan the custom Docker container image for vulnerabilities using ECR Image Scanning or third-party tools. Keep base images and libraries updated. * **Input Validation:** Sanitize and validate inputs to Lambda functions and Step Function executions, especially if triggered externally. * **Access Control:** Restrict access to the SageMaker Model Registry and approval workflows to authorized personnel. ### Roadmap & Future Enhancements * Implement SageMaker Model Monitor for data quality and model drift detection. * Set up automated retraining triggers based on schedule or drift detection. * Explore more sophisticated anomaly detection algorithms (e.g., Autoencoders, Isolation Forests) via the Strategy Pattern. * Implement A/B testing for different model versions using SageMaker Inference Pipelines. * Enhance the Internal Dashboard for better alert visualization and diagnostics. * Integrate alerts directly with maintenance ticketing systems. ### Appendices #### Data Schemas This appendix provides the formal schema definitions for the primary data entities used across the Anomaly Detection and Energy Demand Forecasting workflows. #### 1. Raw Meter Data This represents the logical structure of data as it arrives from the central database into the S3 Raw Zone for processing. It's often in a semi-structured format like JSON Lines. | Field Name | Data Type | Description | | :--- | :--- | :--- | | `timestamp_str` | String | ISO 8601 formatted timestamp (e.g., "2024-10-27T10:30:05Z") of when the readings were recorded by the tablet. | | `building_id` | String | Unique identifier for the building (e.g., "bldg_A123"). | | `apartment_id` | String | Unique identifier for the apartment (e.g., "apt_404"). | | `readings` | Array[Object] | An array of sensor reading objects from the apartment. | | `readings.sensor_type` | String | The type of measurement (e.g., `heating_energy_kwh`, `room_temp_c`). | | `readings.value` | Double/Int | The numerical value of the sensor reading. | | `readings.room_name` | String | (Optional) The specific room for the reading, if applicable (e.g., "living_room"). | **Example (JSON Lines):** ```json {"timestamp_str": "2024-10-27T10:30:00Z", "building_id": "bldg_A123", "apartment_id": "apt_404", "readings": [{"sensor_type": "heating_energy_kwh", "value": 15432.7}, {"sensor_type": "hot_water_litres", "value": 89541.2}, {"sensor_type": "room_temp_c", "room_name": "living_room", "value": 21.5}, {"sensor_type": "setpoint_temp_c", "room_name": "living_room", "value": 22.0}]} ``` --- #### 2. Processed Meter Data (for Anomaly Detection) This is the output of the initial Ingestion Glue ETL job, stored in the S3 Processed Zone. It's a flattened, structured table optimized for analytical queries and as the source for feature engineering. **Format:** Apache Parquet **Partitioned by:** `year`, `month`, `day` | Column Name | Data Type | Description | | :--- | :--- | :--- | | `apartment_id` | String | Unique identifier for the apartment. | | `building_id` | String | Unique identifier for the building. | | `event_ts` | Timestamp | The timestamp of the reading, cast to a proper timestamp type. | | `heating_energy_kwh` | Double | The cumulative heating energy consumption in kilowatt-hours. | | `hot_water_litres` | Double | The cumulative hot water consumption in litres. | | `room_temp_c` | Double | The measured temperature in Celsius for a specific room (or average). | | `setpoint_temp_c` | Double | The user-defined target temperature in Celsius for a specific room. | | `outdoor_temp_c` | Double | The outdoor temperature at the time of the reading, joined from weather data. | | **year** | Integer | **Partition Key:** Year derived from `event_ts`. | | **month** | Integer | **Partition Key:** Month derived from `event_ts`. | | **day** | Integer | **Partition Key:** Day derived from `event_ts`. | --- #### 3. Weather Data **3.1 Raw Weather Forecast Data (from API)** This is the raw JSON structure ingested from the external weather forecast API into the S3 Raw Zone. | Field Name | Data Type | Description | | :--- | :--- | :--- | | `latitude` | Double | Latitude of the forecast location. | | `longitude` | Double | Longitude of the forecast location. | | `generationtime_ms`| Double | Time taken by the API to generate the forecast. | | `utc_offset_seconds`| Integer | UTC offset for the location. | | `hourly` | Object | An object containing arrays of hourly forecast values. | | `hourly.time` | Array[String] | Array of ISO 8601 timestamps for the forecast horizon. | | `hourly.temperature_2m`| Array[Double] | Array of corresponding forecasted temperatures (°C). | | `hourly.cloudcover` | Array[Integer] | Array of corresponding forecasted cloud cover (%). | | `hourly.shortwave_radiation` | Array[Double]| Array of corresponding forecasted solar irradiance (W/m²). | **3.2 Processed Weather Data (Joined in `processed_edf_data`)** This represents the clean, hourly weather data after being processed and joined to the consumption data. | Column Name | Data Type | Description | | :--- | :--- | :--- | | `building_id` | String | Unique identifier for the building the weather corresponds to. | | `timestamp_hour` | Timestamp | The specific hour for which the weather data is valid, truncated. | | `temperature_c` | Double | The average temperature in Celsius for that hour. | | `humidity` | Double | The average relative humidity (%) for that hour. | | `solar_irradiance_ghi`| Double | The average Global Horizontal Irradiance (W/m²) for that hour. | | `is_holiday_flag` | Integer | A binary flag (1 or 0) indicating if the date is a public holiday. | --- #### 4. Feature Store Features (Anomaly Detection) This defines the schema of the `ad-apartment-features` Feature Group in SageMaker Feature Store. These are the inputs to the AD model. | Feature Name | Data Type | Description | | :--- | :--- | :--- | | **apartment_record_id** | String | **Record Identifier:** Unique ID for the record (e.g., `[apartment_id]_[date]`). | | **event_time** | Fractional | **Event Time:** Timestamp when the features were computed/ingested. | | `event_date` | String | The specific date (YYYY-MM-DD) these daily features correspond to. | | `building_id` | String | Identifier for the building. | | `avg_temp_diff` | Fractional | The average difference between the setpoint and actual room temperature for the day. | | `daily_energy_kwh` | Fractional | The total heating energy consumed on that day. | | `hdd` | Fractional | Heating Degree Days, a measure of how cold the day was. | | `energy_lag_1d` | Fractional | The value of `daily_energy_kwh` from the previous day. | | `energy_roll_avg_7d` | Fractional | The 7-day rolling average of `daily_energy_kwh`. | | `temp_diff_roll_std_3d` | Fractional | The 3-day rolling standard deviation of `avg_temp_diff`. | --- #### 5. Alert Table Schema (DynamoDB) This defines the structure of the `ad-alerts` table in Amazon DynamoDB, where the inference pipeline stores actionable alerts. **Table Name:** `hometech-ml-ad-alerts-[env]` | Attribute Name | Data Type | Key Type / Index | Description | | :--- | :--- | :--- | :--- | | **AlertID** | String (S) | **Partition Key (PK)** | Unique identifier for the alert. **Format:** `[ApartmentID]#[EventDate]`. | | `ApartmentID` | String (S) | GSI-1 Partition Key | The unique identifier of the apartment that triggered the alert. | | `BuildingID` | String (S) | GSI-2 Partition Key | The unique identifier of the building. | | `EventDate` | String (S) | - | The date (YYYY-MM-DD) for which the anomaly was detected. | | `AlertTimestamp` | String (S) | GSI-2 Sort Key | ISO 8601 timestamp of when the alert was created by the pipeline. | | `AnomalyScore` | Number (N) | - | The raw numerical score from the ML model. Higher means more anomalous. | | `Threshold` | Number (N) | - | The score threshold that was breached to create this alert. | | **Status** | String (S) | GSI-1 Sort Key | The current state of the alert. **Values:** `Unseen`, `Investigating`, `Resolved-True Positive`, `Resolved-False Positive`. | | `ModelVersion` | String (S) | - | Version of the model package that generated the score (for lineage). | | `FeedbackNotes` | String (S) | - | (Optional) Notes entered by the maintenance technician during review. | **Global Secondary Indexes (GSIs):** * **GSI-1 (`ApartmentStatusIndex`):** Allows efficiently querying for alerts of a specific `Status` within a given `ApartmentID`. * **Partition Key:** `ApartmentID` * **Sort Key:** `Status` * **GSI-2 (`BuildingAlertsIndex`):** Allows efficiently querying for all alerts in a `BuildingID`, sorted by time. * **Partition Key:** `BuildingID` * **Sort Key:** `AlertTimestamp` #### Configuration File Example ```yaml feature_engineering: lookback_days: 7 weather_feature_cols: ["hdd", "avg_temp_c"] training: model_strategy: "LR_LOF" hyperparameters: lof_neighbors: 20 lof_contamination: "auto" feature_columns: # List of features model actually uses - daily_energy_kwh - avg_temp_diff # ... etc instance_type: "ml.m5.large" evaluation: metrics_thresholds: min_f1_score: 0.6 # Example if using labels max_throughput_deviation: 0.2 # Example holdout_data_path: "s3://..." # Path to specific eval data inference: alert_threshold: 5.0 batch_transform_instance_type: "ml.m5.large" ```