Why First: Can't deploy models to production without this.
Implementation Time: 2-3 days
Code to Add:
import joblib
import json
from datetime import datetime
def save_model_to_db(
self,
model: Any,
model_name: str,
version: str,
feature_names: List[str],
training_metrics: Dict,
encoder: Any = None
) -> str:
"""
Save model with metadata to database.
"""
import uuid
model_id = str(uuid.uuid4())
# Serialize model
model_binary = joblib.dumps(model)
encoder_binary = joblib.dumps(encoder) if encoder else None
# Prepare metadata
metadata = {
"training_date": datetime.now().isoformat(),
"python_version": sys.version,
"sklearn_version": sklearn.__version__
}
# Insert into database
insert_query = """
INSERT INTO def_ml_models
(model_id, model_name, version, model_binary,
encoder_binary, metadata_json, feature_names,
training_metrics, created_at, is_active)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW(), TRUE)
"""
cursor = self.DB.cursor()
cursor.execute(insert_query, (
model_id,
model_name,
version,
model_binary,
encoder_binary,
json.dumps(metadata),
json.dumps(feature_names),
json.dumps(training_metrics)
))
self.DB.commit()
cursor.close()
self.note(f"✅ Model saved: {model_name} v{version}")
return model_id
def load_model_from_db(
self,
model_name: str,
version: str = "latest"
) -> Tuple[Any, Dict]:
"""
Load model and metadata from database.
"""
if version == "latest":
query = """
SELECT model_binary, encoder_binary,
metadata_json, feature_names
FROM def_ml_models
WHERE model_name = %s AND is_active = TRUE
ORDER BY created_at DESC
LIMIT 1
"""
params = (model_name,)
else:
query = """
SELECT model_binary, encoder_binary,
metadata_json, feature_names
FROM def_ml_models
WHERE model_name = %s AND version = %s
LIMIT 1
"""
params = (model_name, version)
cursor = self.DB.cursor()
cursor.execute(query, params)
result = cursor.fetchone()
cursor.close()
if not result:
raise ValueError(f"Model not found: {model_name} v{version}")
model = joblib.loads(result[0])
encoder = joblib.loads(result[1]) if result[1] else None
metadata = json.loads(result[2])
feature_names = json.loads(result[3])
return model, {
"encoder": encoder,
"metadata": metadata,
"feature_names": feature_names
}
Database Migration:
CREATE TABLE def_ml_models (
model_id VARCHAR(36) PRIMARY KEY,
model_name VARCHAR(255) NOT NULL,
version VARCHAR(50) NOT NULL,
model_type VARCHAR(100),
model_binary LONGBLOB NOT NULL,
encoder_binary LONGBLOB,
metadata_json JSON,
feature_names JSON,
training_metrics JSON,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
created_by VARCHAR(255),
is_active BOOLEAN DEFAULT FALSE,
UNIQUE KEY unique_name_version (model_name, version),
INDEX idx_model_active (model_name, is_active)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Why: Industry standard for credit scoring.
Implementation Time: 1 day
Code to Add:
def calculate_gini_coefficient(
self,
y_true: np.ndarray,
y_pred_proba: np.ndarray
) -> float:
"""
Calculate Gini coefficient (2*AUC - 1).
Interpretation:
- 0.0-0.2: Poor model
- 0.2-0.4: Acceptable
- 0.4-0.6: Good
- 0.6-0.8: Very Good
- 0.8-1.0: Excellent (check for overfitting)
"""
from sklearn.metrics import roc_auc_score
if len(np.unique(y_true)) < 2:
raise ValueError("y_true must have at least 2 classes")
auc = roc_auc_score(y_true, y_pred_proba)
gini = 2 * auc - 1
self.note(f"📊 Gini Coefficient: {gini:.4f}")
self.note(f"📊 AUC: {auc:.4f}")
return gini
def calculate_ks_statistic(
self,
y_true: np.ndarray,
y_pred_proba: np.ndarray
) -> Tuple[float, float]:
"""
Calculate Kolmogorov-Smirnov statistic.
Returns KS value and optimal threshold.
Interpretation:
- 0.0-0.2: Poor separation
- 0.2-0.4: Fair separation
- 0.4-0.6: Good separation
- 0.6+: Excellent separation
"""
# Separate scores by class
pos_scores = y_pred_proba[y_true == 1]
neg_scores = y_pred_proba[y_true == 0]
# Calculate KS statistic
from scipy.stats import ks_2samp
ks_stat, p_value = ks_2samp(pos_scores, neg_scores)
# Find optimal threshold (max KS point)
thresholds = np.sort(np.unique(y_pred_proba))
ks_values = []
for threshold in thresholds:
tpr = np.sum((y_pred_proba >= threshold) & (y_true == 1)) / \
np.sum(y_true == 1)
fpr = np.sum((y_pred_proba >= threshold) & (y_true == 0)) / \
np.sum(y_true == 0)
ks_values.append(abs(tpr - fpr))
max_ks_idx = np.argmax(ks_values)
optimal_threshold = thresholds[max_ks_idx]
self.note(f"📊 KS Statistic: {ks_stat:.4f}")
self.note(f"📊 Optimal Threshold: {optimal_threshold:.4f}")
return ks_stat, optimal_threshold
def generate_performance_summary(
self,
y_true: np.ndarray,
y_pred: np.ndarray,
y_pred_proba: np.ndarray
) -> Dict:
"""
Comprehensive performance summary.
"""
from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
f1_score, roc_auc_score, confusion_matrix
)
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, zero_division=0),
"recall": recall_score(y_true, y_pred, zero_division=0),
"f1": f1_score(y_true, y_pred, zero_division=0),
"auc": roc_auc_score(y_true, y_pred_proba),
"gini": self.calculate_gini_coefficient(y_true, y_pred_proba),
"ks_statistic": self.calculate_ks_statistic(y_true, y_pred_proba)[0],
"confusion_matrix": confusion_matrix(y_true, y_pred).tolist()
}
self.note("=" * 50)
self.note("MODEL PERFORMANCE SUMMARY")
self.note("=" * 50)
self.note(f"Accuracy: {metrics['accuracy']:.4f}")
self.note(f"Precision: {metrics['precision']:.4f}")
self.note(f"Recall: {metrics['recall']:.4f}")
self.note(f"F1 Score: {metrics['f1']:.4f}")
self.note(f"AUC: {metrics['auc']:.4f}")
self.note(f"Gini: {metrics['gini']:.4f}")
self.note(f"KS Stat: {metrics['ks_statistic']:.4f}")
self.note("=" * 50)
return metrics
Usage Example:
obj = ObjML()
model, metrics, y_pred, X_test, y_test = obj.train_cost_sensitive_classifier(
X, y, class_weights={0: 1, 1: 5}
)
# Get predictions
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Calculate metrics
gini = obj.calculate_gini_coefficient(y_test, y_pred_proba)
ks, threshold = obj.calculate_ks_statistic(y_test, y_pred_proba)
# Full summary
summary = obj.generate_performance_summary(y_test, y_pred, y_pred_proba)
Why: Essential for monitoring data drift.
Implementation Time: 1 day
Code to Add:
def calculate_psi(
self,
expected: pd.Series,
actual: pd.Series,
bins: int = 10,
feature_name: str = "Feature"
) -> float:
"""
Calculate Population Stability Index.
Interpretation:
- PSI < 0.1: No significant change
- 0.1 <= PSI < 0.2: Moderate change, investigate
- PSI >= 0.2: Significant change, retrain model
Args:
expected: Baseline distribution (training data)
actual: Current distribution (production data)
bins: Number of bins for discretization
feature_name: Name of feature (for logging)
Returns:
PSI value
"""
# Create bins based on expected distribution
breakpoints = np.percentile(
expected.dropna(),
np.linspace(0, 100, bins + 1)
)
breakpoints = np.unique(breakpoints) # Remove duplicates
# Bin both distributions
expected_binned = pd.cut(
expected,
bins=breakpoints,
include_lowest=True,
duplicates='drop'
)
actual_binned = pd.cut(
actual,
bins=breakpoints,
include_lowest=True,
duplicates='drop'
)
# Calculate percentages
expected_pct = expected_binned.value_counts(normalize=True, sort=False)
actual_pct = actual_binned.value_counts(normalize=True, sort=False)
# Align indices
expected_pct, actual_pct = expected_pct.align(actual_pct, fill_value=0)
# Avoid log(0) by adding small epsilon
epsilon = 1e-10
expected_pct = expected_pct + epsilon
actual_pct = actual_pct + epsilon
# Calculate PSI
psi = ((actual_pct - expected_pct) *
np.log(actual_pct / expected_pct)).sum()
# Log result with interpretation
if psi < 0.1:
status = "✅ STABLE"
elif psi < 0.2:
status = "⚠️ INVESTIGATE"
else:
status = "🚨 RETRAIN NEEDED"
self.note(f"📊 PSI for {feature_name}: {psi:.4f} - {status}")
return psi
def monitor_feature_drift(
self,
baseline_df: pd.DataFrame,
current_df: pd.DataFrame,
features: List[str],
threshold: float = 0.2
) -> pd.DataFrame:
"""
Monitor drift across multiple features.
Returns DataFrame with PSI values and alerts.
"""
results = []
for feature in features:
if feature not in baseline_df.columns:
self.debug(f"❌ Feature not found: {feature}")
continue
if feature not in current_df.columns:
self.debug(f"❌ Feature not found in current data: {feature}")
continue
psi = self.calculate_psi(
baseline_df[feature],
current_df[feature],
feature_name=feature
)
results.append({
"feature": feature,
"psi": psi,
"status": "STABLE" if psi < 0.1
else "INVESTIGATE" if psi < threshold
else "ALERT",
"requires_action": psi >= threshold
})
results_df = pd.DataFrame(results).sort_values("psi", ascending=False)
# Print summary
alert_count = (results_df["requires_action"] == True).sum()
if alert_count > 0:
self.note(f"🚨 {alert_count} features require attention!")
else:
self.note("✅ All features are stable")
return results_df
Why: Improves WOE/IV calculation and model interpretability.
Implementation Time: 1-2 days
Code to Add:
def create_optimal_bins(
self,
df: pd.DataFrame,
feature: str,
target: str,
max_bins: int = 10,
min_samples_bin: int = 50
) -> Tuple[List[float], pd.DataFrame]:
"""
Create optimal bins for numeric feature using decision tree.
Returns bin edges and binning summary.
"""
from sklearn.tree import DecisionTreeClassifier
# Prepare data
X = df[[feature]].values
y = df[target].values
# Fit decision tree to find optimal splits
tree = DecisionTreeClassifier(
max_leaf_nodes=max_bins,
min_samples_leaf=min_samples_bin,
random_state=42
)
tree.fit(X, y)
# Extract thresholds
thresholds = sorted(tree.tree_.threshold[tree.tree_.threshold != -2])
# Create bins
bins = [-np.inf] + thresholds + [np.inf]
# Create binned column
df_binned = df.copy()
df_binned[f"{feature}_binned"] = pd.cut(
df_binned[feature],
bins=bins,
duplicates='drop'
)
# Calculate summary statistics per bin
summary = df_binned.groupby(f"{feature}_binned").agg({
target: ['count', 'sum', 'mean']
}).reset_index()
summary.columns = ['bin', 'count', 'bad_count', 'bad_rate']
summary['good_count'] = summary['count'] - summary['bad_count']
self.note(f"📊 Created {len(bins)-1} bins for {feature}")
return bins, summary
def bin_numeric_features(
self,
df: pd.DataFrame,
numeric_features: List[str],
target: str,
strategy: str = "quantile",
n_bins: int = 10
) -> Tuple[pd.DataFrame, Dict]:
"""
Bin multiple numeric features.
Strategies:
- quantile: Equal frequency bins
- uniform: Equal width bins
- kmeans: K-means clustering
- optimal: Decision tree-based
"""
binned_df = df.copy()
bin_edges = {}
for feature in numeric_features:
if strategy == "optimal":
bins, summary = self.create_optimal_bins(
df, feature, target
)
elif strategy == "quantile":
bins = df[feature].quantile(
np.linspace(0, 1, n_bins + 1)
).unique()
elif strategy == "uniform":
bins = np.linspace(
df[feature].min(),
df[feature].max(),
n_bins + 1
)
elif strategy == "kmeans":
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=n_bins, random_state=42)
kmeans.fit(df[[feature]])
bins = sorted(kmeans.cluster_centers_.flatten())
bins = [-np.inf] + bins.tolist() + [np.inf]
else:
raise ValueError(f"Unknown strategy: {strategy}")
# Apply binning
binned_df[f"{feature}_binned"] = pd.cut(
binned_df[feature],
bins=bins,
duplicates='drop'
)
bin_edges[feature] = bins
return binned_df, bin_edges
Why: Choose best model systematically.
Implementation Time: 1 day
Code to Add:
def compare_models(
self,
X: pd.DataFrame,
y: pd.Series,
model_types: List[str],
class_weights: Dict = None,
cv: int = 5
) -> pd.DataFrame:
"""
Compare multiple models with cross-validation.
Returns DataFrame with metrics for each model.
"""
from sklearn.model_selection import cross_val_score
results = []
for model_type in model_types:
self.note(f"🔄 Training {model_type}...")
# Train model
model, metrics, y_pred, X_test, y_test = \
self.train_cost_sensitive_classifier(
X, y,
class_weights=class_weights or {0: 1, 1: 1},
model_type=model_type
)
# Get probabilities
y_pred_proba = model.predict_proba(X_test)[:, 1]
# Calculate metrics
gini = self.calculate_gini_coefficient(y_test, y_pred_proba)
ks, _ = self.calculate_ks_statistic(y_test, y_pred_proba)
results.append({
"model_type": model_type,
"accuracy": metrics["classification_report"]["accuracy"],
"precision": metrics["classification_report"]["1"]["precision"],
"recall": metrics["classification_report"]["1"]["recall"],
"f1": metrics["classification_report"]["1"]["f1-score"],
"gini": gini,
"ks": ks
})
# Create comparison table
comparison_df = pd.DataFrame(results)
comparison_df = comparison_df.sort_values("gini", ascending=False)
# Print results
self.note("\n" + "=" * 70)
self.note("MODEL COMPARISON RESULTS")
self.note("=" * 70)
self.note(comparison_df.to_string(index=False))
self.note("=" * 70)
return comparison_df
from ObjML import ObjML
from ObjMLDatasets import ObjMLDatasets
# 1. Load data
datasets = ObjMLDatasets()
df = datasets.load_dataset("german_credit")
# 2. Split features
obj = ObjML()
numeric_features = ["duration_in_month", "credit_amount", "age_in_years"]
categorical_features = ["existing_checking_account", "credit_history"]
# 3. Bin numeric features
df_binned, bin_edges = obj.bin_numeric_features(
df,
numeric_features,
target="class",
strategy="optimal"
)
# 4. Calculate WOE/IV for binned features
for feature in numeric_features:
woe_map, iv = obj.calculate_woe_iv(
df_binned,
f"{feature}_binned",
"class"
)
print(f"{feature}: IV = {iv:.4f}")
# 5. Compare models
comparison = obj.compare_models(
X=df.drop("class", axis=1),
y=df["class"],
model_types=[
"LogisticRegression",
"RandomForestClassifier",
"GradientBoostingClassifier"
],
class_weights={0: 1, 1: 5}
)
# 6. Train best model
best_model_type = comparison.iloc[0]["model_type"]
model, metrics, y_pred, X_test, y_test = obj.train_cost_sensitive_classifier(
df.drop("class", axis=1),
df["class"],
class_weights={0: 1, 1: 5},
model_type=best_model_type
)
# 7. Evaluate with industry metrics
y_pred_proba = model.predict_proba(X_test)[:, 1]
summary = obj.generate_performance_summary(y_test, y_pred, y_pred_proba)
# 8. Save model
model_id = obj.save_model_to_db(
model=model,
model_name="german_credit_model",
version="v1.0",
feature_names=list(df.drop("class", axis=1).columns),
training_metrics=summary
)
# 9. Monitor for drift (later in production)
baseline_df = df.sample(frac=0.5, random_state=42)
current_df = df.sample(frac=0.5, random_state=123)
drift_report = obj.monitor_feature_drift(
baseline_df=baseline_df,
current_df=current_df,
features=numeric_features,
threshold=0.2
)
print("\nDrift Report:")
print(drift_report)
Total: 6-8 days for all 5 priorities
Start with Priority 1 (Model Persistence) as it's foundational for everything else!