Skip to main content

Architecture

This document outlines the architecture and design patterns of the ML Service, including system components, data flow, and deployment strategies.

🏗️ System Overview

The ML Service is designed as a distributed, scalable microservice architecture that handles machine learning workloads for movie recommendations and analytics.

High-Level Architecture

graph TB
subgraph "Client Layer"
A[Web Application]
B[Mobile App]
C[Third-party APIs]
end

subgraph "API Gateway"
D[Load Balancer]
E[Rate Limiter]
F[Authentication]
end

subgraph "ML Service"
G[Recommendation Engine]
H[Model Training]
I[Feature Store]
J[Real-time Inference]
end

subgraph "Data Layer"
K[User Database]
L[Movie Database]
M[Model Registry]
N[Feature Cache]
end

subgraph "Infrastructure"
O[Message Queue]
P[Monitoring]
Q[Logging]
end

A --> D
B --> D
C --> D
D --> E
E --> F
F --> G
G --> I
G --> J
H --> M
I --> N
J --> K
J --> L
G --> O
H --> O
P --> Q

🔧 Core Components

1. API Gateway

Purpose: Entry point for all external requests with routing, authentication, and rate limiting.

Technology Stack:

  • Kong or AWS API Gateway for routing
  • Redis for rate limiting
  • JWT for authentication

Responsibilities:

  • Request routing and load balancing
  • Authentication and authorization
  • Rate limiting and throttling
  • Request/response transformation
  • API versioning
# Example API Gateway configuration
from kong_admin import KongAdmin

kong = KongAdmin('http://kong:8001')

# Configure ML service routes
kong.services.create({
'name': 'ml-service',
'url': 'http://ml-service:8000'
})

kong.routes.create({
'service': 'ml-service',
'paths': ['/api/ml'],
'methods': ['GET', 'POST']
})

# Add rate limiting
kong.plugins.create({
'name': 'rate-limiting',
'service': 'ml-service',
'config': {
'minute': 100,
'hour': 1000
}
})

2. Recommendation Engine

Purpose: Core service for generating personalized movie recommendations.

Architecture Pattern: Microservice with multiple algorithm implementations

# Service structure
class RecommendationEngine:
def __init__(self):
self.collaborative_filter = CollaborativeFilter()
self.content_filter = ContentBasedFilter()
self.hybrid_model = HybridRecommender()
self.model_registry = ModelRegistry()

async def get_recommendations(
self,
user_id: str,
algorithm: str = 'hybrid',
limit: int = 10
) -> List[Recommendation]:
"""Get personalized recommendations for a user."""

# Load appropriate model
model = await self.model_registry.get_model(algorithm)

# Generate recommendations
if algorithm == 'collaborative':
return await self.collaborative_filter.recommend(
user_id, model, limit
)
elif algorithm == 'content':
return await self.content_filter.recommend(
user_id, model, limit
)
else:
return await self.hybrid_model.recommend(
user_id, model, limit
)

Key Features:

  • Multiple algorithm support (collaborative, content-based, hybrid)
  • Real-time inference with sub-100ms latency
  • A/B testing framework
  • Model versioning and rollback capabilities

3. Model Training Pipeline

Purpose: Automated model training, evaluation, and deployment.

Architecture: Event-driven pipeline with orchestration

# Training pipeline orchestration
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

default_args = {
'owner': 'ml-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}

dag = DAG(
'model_training_pipeline',
default_args=default_args,
description='ML model training and deployment pipeline',
schedule_interval='@daily',
catchup=False
)

# Pipeline tasks
extract_data = PythonOperator(
task_id='extract_training_data',
python_callable=extract_training_data,
dag=dag
)

preprocess_data = PythonOperator(
task_id='preprocess_data',
python_callable=preprocess_features,
dag=dag
)

train_model = PythonOperator(
task_id='train_collaborative_model',
python_callable=train_collaborative_filter,
dag=dag
)

evaluate_model = PythonOperator(
task_id='evaluate_model_performance',
python_callable=evaluate_model_metrics,
dag=dag
)

deploy_model = PythonOperator(
task_id='deploy_to_production',
python_callable=deploy_model_to_registry,
dag=dag
)

# Define dependencies
extract_data >> preprocess_data >> train_model >> evaluate_model >> deploy_model

Components:

  • Data Extraction: Automated data collection from multiple sources
  • Feature Engineering: Real-time and batch feature computation
  • Model Training: Distributed training with hyperparameter optimization
  • Model Evaluation: Automated A/B testing and performance metrics
  • Model Deployment: Blue-green deployment with rollback capabilities

4. Feature Store

Purpose: Centralized repository for ML features with real-time and batch access.

Technology: Feast or custom solution with Redis + PostgreSQL

# Feature store implementation
from feast import FeatureStore
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.field import Field
from feast.types import Float32, Int64, String

# Define entities
user = Entity(
name="user",
value_type=String,
description="User identifier"
)

movie = Entity(
name="movie",
value_type=String,
description="Movie identifier"
)

# Define feature views
user_features = FeatureView(
name="user_features",
entities=[user],
features=[
Field(name="avg_rating", dtype=Float32),
Field(name="total_ratings", dtype=Int64),
Field(name="favorite_genre", dtype=String),
],
online=True,
batch_source=user_ratings_source,
ttl=timedelta(days=1)
)

movie_features = FeatureView(
name="movie_features",
entities=[movie],
features=[
Field(name="imdb_rating", dtype=Float32),
Field(name="genre_vector", dtype=String),
Field(name="popularity_score", dtype=Float32),
],
online=True,
batch_source=movie_metadata_source,
ttl=timedelta(hours=12)
)

# Initialize feature store
fs = FeatureStore(repo_path=".")

# Get features for inference
feature_vector = fs.get_online_features(
features=[
"user_features:avg_rating",
"user_features:favorite_genre",
"movie_features:imdb_rating",
"movie_features:genre_vector"
],
entity_rows=[
{"user": "user123", "movie": "movie456"}
]
).to_dict()

Features:

  • Real-time feature serving (sub-10ms latency)
  • Batch feature computation for training
  • Feature versioning and lineage tracking
  • Data quality monitoring
  • Feature sharing across teams

5. Model Registry

Purpose: Centralized storage and versioning for trained ML models.

Technology: MLflow with S3 or GCS backend

# Model registry implementation
import mlflow
from mlflow.tracking import MlflowClient

class ModelRegistry:
def __init__(self):
self.client = MlflowClient()
mlflow.set_tracking_uri("http://mlflow-server:5000")

def register_model(
self,
model,
model_name: str,
version: str,
metadata: dict
):
"""Register a new model version."""

with mlflow.start_run():
# Log model artifacts
mlflow.sklearn.log_model(
model,
"model",
registered_model_name=model_name
)

# Log metadata
mlflow.log_params(metadata)

# Log metrics
mlflow.log_metrics({
"accuracy": metadata.get("accuracy", 0),
"precision": metadata.get("precision", 0),
"recall": metadata.get("recall", 0)
})

def get_model(self, model_name: str, stage: str = "Production"):
"""Load a model from registry."""

model_version = self.client.get_latest_versions(
model_name,
stages=[stage]
)[0]

model_uri = f"models:/{model_name}/{model_version.version}"
return mlflow.sklearn.load_model(model_uri)

def promote_model(self, model_name: str, version: str):
"""Promote model to production."""

self.client.transition_model_version_stage(
name=model_name,
version=version,
stage="Production"
)

📊 Data Architecture

Data Flow

graph LR
A[User Interactions] --> B[Event Streaming]
B --> C[Real-time Processing]
B --> D[Batch Processing]
C --> E[Feature Store]
D --> F[Data Warehouse]
E --> G[Model Inference]
F --> H[Model Training]
G --> I[Recommendations]
H --> J[Model Registry]

Storage Systems

1. Operational Databases

-- User interactions table (PostgreSQL)
CREATE TABLE user_interactions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id VARCHAR(255) NOT NULL,
movie_id VARCHAR(255) NOT NULL,
interaction_type VARCHAR(50) NOT NULL, -- 'rating', 'view', 'like'
value DECIMAL(3,2), -- rating value or duration
timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
metadata JSONB
);

-- Indexes for performance
CREATE INDEX idx_user_interactions_user_id ON user_interactions(user_id);
CREATE INDEX idx_user_interactions_movie_id ON user_interactions(movie_id);
CREATE INDEX idx_user_interactions_timestamp ON user_interactions(timestamp);

2. Analytics Database

-- Aggregated user preferences (ClickHouse)
CREATE TABLE user_preferences_daily (
date Date,
user_id String,
genre String,
avg_rating Float32,
total_interactions UInt32,
total_watch_time UInt64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, user_id, genre);

3. Caching Layer

# Redis configuration for feature caching
import redis
import json

class FeatureCache:
def __init__(self):
self.redis_client = redis.Redis(
host='redis-cluster',
port=6379,
decode_responses=True
)

def get_user_features(self, user_id: str) -> dict:
"""Get cached user features."""
cache_key = f"user_features:{user_id}"
cached_data = self.redis_client.get(cache_key)

if cached_data:
return json.loads(cached_data)
return None

def set_user_features(
self,
user_id: str,
features: dict,
ttl: int = 3600
):
"""Cache user features with TTL."""
cache_key = f"user_features:{user_id}"
self.redis_client.setex(
cache_key,
ttl,
json.dumps(features)
)

🚀 Deployment Architecture

Container Orchestration

Kubernetes Deployment:

# ml-service-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-service
labels:
app: ml-service
spec:
replicas: 3
selector:
matchLabels:
app: ml-service
template:
metadata:
labels:
app: ml-service
spec:
containers:
- name: ml-service
image: ml-service:latest
ports:
- containerPort: 8000
env:
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: ml-service-secrets
key: database-url
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: ml-service-secrets
key: redis-url
resources:
limits:
memory: "2Gi"
cpu: "1000m"
requests:
memory: "1Gi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5

---
apiVersion: v1
kind: Service
metadata:
name: ml-service
spec:
selector:
app: ml-service
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: ClusterIP

---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: ml-service-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: ml-service
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80

Infrastructure as Code

Terraform Configuration:

# main.tf
provider "aws" {
region = var.aws_region
}

# EKS Cluster
module "eks" {
source = "terraform-aws-modules/eks/aws"

cluster_name = "ml-service-cluster"
cluster_version = "1.27"

vpc_id = module.vpc.vpc_id
subnet_ids = module.vpc.private_subnets

node_groups = {
ml_workers = {
desired_capacity = 3
max_capacity = 10
min_capacity = 3

instance_types = ["m5.xlarge"]

k8s_labels = {
Environment = var.environment
Application = "ml-service"
}
}

gpu_workers = {
desired_capacity = 1
max_capacity = 5
min_capacity = 1

instance_types = ["p3.2xlarge"]

k8s_labels = {
Environment = var.environment
Application = "ml-training"
NodeType = "gpu"
}

taints = [
{
key = "nvidia.com/gpu"
value = "true"
effect = "NO_SCHEDULE"
}
]
}
}
}

# RDS for operational data
resource "aws_db_instance" "ml_service_db" {
identifier = "ml-service-${var.environment}"

engine = "postgres"
engine_version = "14.9"
instance_class = "db.r5.xlarge"

allocated_storage = 100
max_allocated_storage = 1000
storage_encrypted = true

db_name = "mlservice"
username = var.db_username
password = var.db_password

vpc_security_group_ids = [aws_security_group.rds.id]
db_subnet_group_name = aws_db_subnet_group.ml_service.name

backup_retention_period = 7
backup_window = "03:00-04:00"
maintenance_window = "Sun:04:00-Sun:05:00"

skip_final_snapshot = false
final_snapshot_identifier = "ml-service-final-snapshot-${formatdate("YYYY-MM-DD-hhmm", timestamp())}"

tags = {
Name = "ML Service Database"
Environment = var.environment
}
}

# ElastiCache for Redis
resource "aws_elasticache_replication_group" "ml_service_redis" {
replication_group_id = "ml-service-${var.environment}"
description = "Redis cluster for ML service caching"

node_type = "cache.r6g.large"
port = 6379
parameter_group_name = "default.redis7"

num_cache_clusters = 3

subnet_group_name = aws_elasticache_subnet_group.ml_service.name
security_group_ids = [aws_security_group.redis.id]

at_rest_encryption_enabled = true
transit_encryption_enabled = true

tags = {
Name = "ML Service Redis"
Environment = var.environment
}
}

📈 Monitoring and Observability

Metrics and Monitoring

Prometheus Configuration:

# prometheus-config.yaml
global:
scrape_interval: 15s

scrape_configs:
- job_name: "ml-service"
static_configs:
- targets: ["ml-service:8000"]
metrics_path: "/metrics"
scrape_interval: 10s

- job_name: "model-training"
static_configs:
- targets: ["training-service:8001"]
scrape_interval: 30s

- job_name: "feature-store"
static_configs:
- targets: ["feature-store:8002"]
scrape_interval: 15s

rule_files:
- "ml_service_alerts.yml"

alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093

Custom Metrics:

# Metrics instrumentation
from prometheus_client import Counter, Histogram, Gauge
import time

# Request metrics
REQUEST_COUNT = Counter(
'ml_service_requests_total',
'Total ML service requests',
['method', 'endpoint', 'status']
)

REQUEST_LATENCY = Histogram(
'ml_service_request_duration_seconds',
'ML service request latency',
['method', 'endpoint']
)

# Model metrics
MODEL_INFERENCE_TIME = Histogram(
'model_inference_duration_seconds',
'Time spent on model inference',
['model_name', 'algorithm']
)

ACTIVE_MODELS = Gauge(
'active_models_total',
'Number of active models in production'
)

# Feature store metrics
FEATURE_CACHE_HITS = Counter(
'feature_cache_hits_total',
'Feature cache hits'
)

FEATURE_CACHE_MISSES = Counter(
'feature_cache_misses_total',
'Feature cache misses'
)

# Middleware for request tracking
async def metrics_middleware(request, call_next):
start_time = time.time()

response = await call_next(request)

REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code
).inc()

REQUEST_LATENCY.labels(
method=request.method,
endpoint=request.url.path
).observe(time.time() - start_time)

return response

Logging Strategy

# Structured logging configuration
import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno
}

# Add extra fields
if hasattr(record, 'user_id'):
log_entry['user_id'] = record.user_id
if hasattr(record, 'request_id'):
log_entry['request_id'] = record.request_id
if hasattr(record, 'model_version'):
log_entry['model_version'] = record.model_version

return json.dumps(log_entry)

# Configure logger
logger = logging.getLogger('ml_service')
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger.addHandler(handler)
logger.setLevel(logging.INFO)

# Usage in application
def generate_recommendations(user_id: str, model_version: str):
logger.info(
"Generating recommendations",
extra={
'user_id': user_id,
'model_version': model_version,
'request_id': get_request_id()
}
)

🔒 Security Architecture

Authentication and Authorization

# JWT-based authentication
from fastapi import HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt

security = HTTPBearer()

class AuthService:
def __init__(self, secret_key: str):
self.secret_key = secret_key

def verify_token(
self,
credentials: HTTPAuthorizationCredentials = Depends(security)
) -> dict:
"""Verify JWT token and return user claims."""
try:
payload = jwt.decode(
credentials.credentials,
self.secret_key,
algorithms=["HS256"]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")

def require_permission(self, permission: str):
"""Decorator to require specific permission."""
def decorator(func):
async def wrapper(*args, **kwargs):
user = kwargs.get('current_user')
if not user or permission not in user.get('permissions', []):
raise HTTPException(403, "Insufficient permissions")
return await func(*args, **kwargs)
return wrapper
return decorator

# Usage
auth_service = AuthService(os.getenv("JWT_SECRET"))

@app.post("/api/ml/train")
@auth_service.require_permission("model:train")
async def train_model(current_user: dict = Depends(auth_service.verify_token)):
# Training logic here
pass

Data Encryption

# Encryption service for sensitive data
from cryptography.fernet import Fernet
import base64

class EncryptionService:
def __init__(self, key: str):
self.fernet = Fernet(key.encode())

def encrypt_user_data(self, data: dict) -> str:
"""Encrypt sensitive user data."""
json_data = json.dumps(data)
encrypted_data = self.fernet.encrypt(json_data.encode())
return base64.b64encode(encrypted_data).decode()

def decrypt_user_data(self, encrypted_data: str) -> dict:
"""Decrypt user data."""
decoded_data = base64.b64decode(encrypted_data.encode())
decrypted_data = self.fernet.decrypt(decoded_data)
return json.loads(decrypted_data.decode())

🔄 Scalability Patterns

Horizontal Scaling

Auto-scaling Strategy:

# Custom auto-scaling based on queue depth
import asyncio
from kubernetes import client, config

class ModelInferenceScaler:
def __init__(self):
config.load_incluster_config()
self.apps_v1 = client.AppsV1Api()
self.queue_threshold = 100
self.min_replicas = 3
self.max_replicas = 20

async def scale_based_on_queue_depth(self):
"""Scale replicas based on inference queue depth."""
while True:
queue_depth = await self.get_queue_depth()
current_replicas = await self.get_current_replicas()

if queue_depth > self.queue_threshold:
# Scale up
new_replicas = min(
current_replicas + 2,
self.max_replicas
)
await self.scale_deployment(new_replicas)
elif queue_depth < self.queue_threshold // 2:
# Scale down
new_replicas = max(
current_replicas - 1,
self.min_replicas
)
await self.scale_deployment(new_replicas)

await asyncio.sleep(30) # Check every 30 seconds

async def scale_deployment(self, replicas: int):
"""Scale the ML service deployment."""
body = {'spec': {'replicas': replicas}}
self.apps_v1.patch_namespaced_deployment_scale(
name='ml-service',
namespace='default',
body=body
)

Caching Strategies

# Multi-level caching
class CacheManager:
def __init__(self):
self.l1_cache = {} # In-memory cache
self.l2_cache = redis.Redis() # Redis cache
self.l3_cache = 'database' # Database

async def get_recommendations(self, user_id: str) -> List[dict]:
"""Get recommendations with multi-level caching."""

# Level 1: In-memory cache (fastest)
cache_key = f"recs:{user_id}"
if cache_key in self.l1_cache:
return self.l1_cache[cache_key]

# Level 2: Redis cache (fast)
cached_data = await self.l2_cache.get(cache_key)
if cached_data:
recommendations = json.loads(cached_data)
self.l1_cache[cache_key] = recommendations
return recommendations

# Level 3: Generate new recommendations (slow)
recommendations = await self.generate_recommendations(user_id)

# Cache at all levels
self.l1_cache[cache_key] = recommendations
await self.l2_cache.setex(
cache_key,
3600, # 1 hour TTL
json.dumps(recommendations)
)

return recommendations

This architecture is designed for high availability, scalability, and maintainability. For specific implementation details, refer to the individual component documentation or reach out to the ML engineering team.