Skip to main content

ML repo

The ML Platform team's repository (repo-ml/) demonstrates advanced machine learning workflows that build upon the Analytics team's foundational data. This repository showcases how to implement feature engineering, model training, and prediction pipelines while maintaining dependencies on assets from another code location.

Repository Structure

The ML repository is organized around machine learning workflows and model lifecycle management:

repo-ml/
├── dagster_cloud.yaml # Dagster+ deployment config
├── pyproject.toml # Python dependencies (includes ML libraries)
├── src/ml_platform/
│ ├── definitions.py # Main definitions entry point
│ └── defs/
│ ├── io_managers.py # Resource configurations
│ └── models/
│ ├── features.py # Feature engineering assets
│ └── training.py # Model training and prediction assets

The repository uses a models/ subdirectory to organize ML-specific assets, clearly separating feature engineering from model training and prediction workflows.

Cross-Repository Feature Engineering

The ML team creates features by consuming and transforming assets from the Analytics repository:

features.py - External Asset Loading
def load_external_asset(asset_name: str) -> pd.DataFrame:
"""Simple utility to load external assets from shared storage."""
file_path = SHARED_DATA_PATH / f"{asset_name}.parquet"
if not file_path.exists():
raise FileNotFoundError(f"External asset {asset_name} not found at {file_path}")
return pd.read_parquet(file_path)

The load_external_asset utility function provides a simple way to access assets materialized by other code locations. This pattern gives the ML team explicit control over external dependencies while maintaining loose coupling between repositories.

Customer Feature Engineering

The ML repository transforms analytics data into machine learning features:

features.py - Customer Features Asset
@asset(
deps=[AssetKey("customer_order_summary")], # Just declares dependency
group_name="ml_features",
)
def customer_features() -> pd.DataFrame:
"""Features for customer behavior prediction models."""
# Load external asset manually - simple and explicit
customer_order_summary = load_external_asset("customer_order_summary")

features = customer_order_summary.copy()

# Calculate days since last order (simulated current date)
current_date = pd.Timestamp("2023-07-15")
features["days_since_last_order"] = (
current_date - pd.to_datetime(features["last_order"])
).dt.days
features["days_since_first_order"] = (
current_date - pd.to_datetime(features["first_order"])
).dt.days

# Create feature ratios
features["order_frequency"] = (
features["total_orders"] / (features["days_since_first_order"] + 1) * 30
) # orders per month

# One-hot encode tier
features["is_premium"] = (features["tier"] == "premium").astype(int)

# Create RFM-style features (Recency, Frequency, Monetary)
features["recency_score"] = pd.cut(features["days_since_last_order"], bins=3, labels=[3, 2, 1])
features["frequency_score"] = pd.cut(features["total_orders"], bins=3, labels=[1, 2, 3])
features["monetary_score"] = pd.cut(features["total_spent"], bins=3, labels=[1, 2, 3])

# Convert scores to numeric
features["recency_score"] = features["recency_score"].astype(int)
features["frequency_score"] = features["frequency_score"].astype(int)
features["monetary_score"] = features["monetary_score"].astype(int)

# Select final feature columns
feature_columns = [
"customer_id",
"total_orders",
"total_spent",
"avg_order_value",
"days_since_last_order",
"days_since_first_order",
"order_frequency",
"is_premium",
"recency_score",
"frequency_score",
"monetary_score",
]

return features[feature_columns]

The customer_features asset demonstrates sophisticated feature engineering patterns:

  • Temporal Features: Calculates days since last order and customer lifecycle metrics
  • Behavioral Features: Derives order frequency and purchase patterns
  • RFM Analysis: Implements Recency, Frequency, and Monetary scoring for customer segmentation
  • Categorical Encoding: Transforms customer tier information into model-ready features

The asset explicitly declares its dependency on customer_order_summary from the Analytics repository using deps=[AssetKey("customer_order_summary")], ensuring proper lineage tracking across code locations.

Model Training and Validation

The ML repository implements complete model training workflows with proper validation and metrics tracking:

training.py - Customer Churn Model
@asset(ins={"customer_features": AssetIn()}, group_name="ml_models")
def customer_churn_model(customer_features: pd.DataFrame) -> dict:
"""Train a model to predict customer churn based on behavior."""
# Create synthetic churn labels for demonstration
# In reality, this would come from historical data
np.random.seed(42)

# Create churn probability based on recency and monetary scores
churn_prob = (
(5 - customer_features["recency_score"]) * 0.3 # Higher recency = more likely to churn
+ (4 - customer_features["monetary_score"]) * 0.2 # Lower spending = more likely to churn
+ (customer_features["days_since_last_order"] > 60) * 0.4 # Long time since last order
+ np.random.random(len(customer_features)) * 0.1 # Add some randomness
)

# Convert to binary labels
y = (churn_prob > 0.5).astype(int)

# Ensure we have both classes in the data (fix for small datasets)
unique_classes = np.unique(y)
if len(unique_classes) < 2:
# If all samples are the same class, artificially create some diversity
if unique_classes[0] == 0:
# All no-churn, make some churn
y[np.random.choice(len(y), size=max(1, len(y) // 4), replace=False)] = 1
else:
# All churn, make some no-churn
y[np.random.choice(len(y), size=max(1, len(y) // 4), replace=False)] = 0

# Select features for model
feature_cols = [
"total_orders",
"total_spent",
"avg_order_value",
"days_since_last_order",
"days_since_first_order",
"order_frequency",
"is_premium",
"recency_score",
"frequency_score",
"monetary_score",
]

X = customer_features[feature_cols]

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Make predictions
y_pred = model.predict(X_test)

# Calculate metrics - with safe error handling
report = classification_report(y_test, y_pred, output_dict=True, zero_division=0)

# Feature importance
feature_importance = dict(zip(feature_cols, model.feature_importances_))

# Safe metric extraction
model_results = {
"model": model,
"accuracy": report.get("accuracy", 0.0),
"precision": report.get("1", {}).get("precision", 0.0),
"recall": report.get("1", {}).get("recall", 0.0),
"f1_score": report.get("1", {}).get("f1-score", 0.0),
"feature_importance": feature_importance,
"training_samples": len(X_train),
"test_samples": len(X_test),
"classes_learned": list(model.classes_),
"class_distribution": dict(zip(*np.unique(y, return_counts=True))),
}

return model_results

The customer_churn_model asset showcases production-ready ML practices:

  • Synthetic Label Generation: Creates realistic churn labels based on customer behavior patterns
  • Feature Selection: Uses specific feature columns optimized for the prediction task
  • Model Training: Implements Random Forest classification with proper train/test splits
  • Comprehensive Evaluation: Tracks accuracy, precision, recall, and F1 scores
  • Feature Importance: Captures and returns feature importance for model interpretability

The asset returns a dictionary containing both the trained model and comprehensive metadata, enabling downstream assets to access both predictions and model performance insights.

Demand Forecasting Pipeline

The ML repository also includes regression models for business forecasting:

training.py - Demand Forecast Model
@asset(ins={"product_features": AssetIn()}, group_name="ml_models")
def demand_forecast_model(product_features: pd.DataFrame) -> dict:
"""Train a model to forecast product demand."""
# Create synthetic demand data for demonstration
np.random.seed(42)

# Base demand influenced by various factors
base_demand = (
product_features["units_sold"] * 0.7 # Historical sales
+ product_features["profit_percentile"] * 20 # Profitability
+ (product_features["performance_tier"] == "high_performer") * 15 # Performance tier
+ np.random.normal(0, 5, len(product_features)) # Random noise
).clip(0, None) # No negative demand

# Select features for model
feature_cols = [
"orders",
"units_sold",
"revenue",
"profit_margin",
"revenue_per_order",
"units_per_order",
"revenue_percentile",
"profit_percentile",
"units_percentile",
]

# Add category features (one-hot encoded)
category_cols = [col for col in product_features.columns if col.startswith("category_")]
feature_cols.extend(category_cols)

X = product_features[feature_cols]
y = base_demand

# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)

# Train model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train, y_train)

# Make predictions
y_pred = model.predict(X_test)

# Calculate metrics
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

# Feature importance
feature_importance = dict(zip(feature_cols, model.feature_importances_))

model_results = {
"model": model,
"mse": mse,
"rmse": np.sqrt(mse),
"r2_score": r2,
"feature_importance": feature_importance,
"training_samples": len(X_train),
"test_samples": len(X_test),
"mean_actual_demand": float(y_test.mean()),
"mean_predicted_demand": float(y_pred.mean()),
}

return model_results

The demand_forecast_model demonstrates regression modeling patterns:

  • Multi-Feature Input: Uses product performance metrics, profitability data, and categorical features
  • Target Variable Engineering: Creates synthetic demand based on realistic business drivers
  • Regression Metrics: Tracks MSE, RMSE, and R² scores for model performance
  • Cross-Category Features: Handles categorical variables through one-hot encoding

Production Prediction Assets

The ML repository includes assets that generate business-ready predictions:

training.py - Customer Risk Scoring
@asset(
ins={"customer_churn_model": AssetIn(), "customer_features": AssetIn()},
group_name="ml_predictions",
)
def customer_risk_scores(
customer_churn_model: dict, customer_features: pd.DataFrame
) -> pd.DataFrame:
"""Generate churn risk scores for all customers."""
model = customer_churn_model["model"]

# Select same features used in training
feature_cols = [
"total_orders",
"total_spent",
"avg_order_value",
"days_since_last_order",
"days_since_first_order",
"order_frequency",
"is_premium",
"recency_score",
"frequency_score",
"monetary_score",
]

X = customer_features[feature_cols]

# Get churn probabilities - safe prediction handling
proba = model.predict_proba(X)
if proba.shape[1] == 2:
churn_probabilities = proba[:, 1] # Get positive class probability
else:
# Only one class learned - use appropriate probability
churn_probabilities = proba[:, 0] if model.classes_[0] == 1 else 1 - proba[:, 0]

# Create risk scores dataframe
risk_scores = customer_features[["customer_id"]].copy()
risk_scores["churn_probability"] = churn_probabilities
risk_scores["risk_tier"] = pd.cut(
churn_probabilities,
bins=[0, 0.3, 0.7, 1.0],
labels=["Low Risk", "Medium Risk", "High Risk"],
)

# Add recommendations
def get_recommendation(row):
if row["risk_tier"] == "High Risk":
return "Immediate intervention needed - contact customer"
elif row["risk_tier"] == "Medium Risk":
return "Send targeted retention offer"
else:
return "Continue normal engagement"

risk_scores["recommendation"] = risk_scores.apply(get_recommendation, axis=1)

return risk_scores

The customer_risk_scores asset demonstrates how to deploy trained models for business use:

  • Probabilistic Predictions: Generates churn probabilities for all customers
  • Risk Tiering: Segments customers into Low, Medium, and High risk categories
  • Actionable Recommendations: Provides specific business actions for each risk tier

This pattern ensures that ML models produce business-ready outputs that non-technical stakeholders can immediately understand and act upon.

Asset Organization and ML Groups

The ML repository uses specialized asset groups for machine learning workflows:

  • ml_features group: Feature engineering and data preparation assets
  • ml_models group: Model training and validation assets
  • ml_predictions group: Production prediction and scoring assets

This organization pattern helps ML practitioners understand the model development lifecycle and ensures clear separation between data preparation, model development, and production deployment phases.

Next steps

Now that we understand how both repositories work individually, we can explore the technical details of how assets depend on each other across repository boundaries and how data flows between teams.