Date: 2026-02-07
Status: Implementation Guide
Completed: Admin CLI, Mock Client, Rate Limiting
Remaining: HA Support, Advanced Alerting, Password Reset, Environment Config, Doctor Tool
factory.web/ObjKeycloakAdmin.py, .mdfactory.test/ObjKeycloakMock.py, .md, resource.test/pytests/factory.test/test_ObjKeycloakMock.pyfactory.core/ObjRateLimit.py, .md, local.processing/schema/.../sys_user_login_attempts.yaml, config.yaml (security section)Add support for multiple Keycloak servers with automatic failover.
File: config.yaml (lines 125-143)
keycloak:
ha:
enabled: false
strategy: priority # priority, round_robin, random
servers:
- server: https://auth.technocore.co.za
priority: 1
weight: 100
- server: https://auth-backup.technocore.co.za
priority: 2
weight: 50
File: factory.core/ObjKeycloakResilient.py
Add to __init__() method:
# Load HA configuration
self._ha_enabled = ini.Get("keycloak.ha", "enabled", False)
self._ha_strategy = ini.Get("keycloak.ha", "strategy", "priority")
if self._ha_enabled:
servers_config = ini.Get("keycloak.ha", "servers", [])
self._servers = []
for server_config in servers_config:
self._servers.append({
'url': server_config.get('server'),
'priority': server_config.get('priority', 99),
'weight': server_config.get('weight', 1),
'available': True,
'failure_count': 0,
'last_attempt': None
})
# Sort by priority
self._servers.sort(key=lambda x: x['priority'])
else:
# Single server mode (existing behavior)
self._servers = [{
'url': self._server,
'priority': 1,
'weight': 100,
'available': True,
'failure_count': 0,
'last_attempt': None
}]
self._current_server_index = 0
Add new methods to ObjKeycloakResilient:
def _get_next_server(self) -> Optional[Dict[str, Any]]:
"""
Get next available server based on strategy.
Returns:
Server dict or None if all unavailable
"""
if not self._ha_enabled:
return self._servers[0] if self._servers else None
available_servers = [s for s in self._servers if s['available']]
if not available_servers:
# All servers down, reset and try primary
for server in self._servers:
server['available'] = True
server['failure_count'] = 0
return self._servers[0] if self._servers else None
if self._ha_strategy == "priority":
# Return highest priority available
return available_servers[0]
elif self._ha_strategy == "round_robin":
# Weighted round-robin
total_weight = sum(s['weight'] for s in available_servers)
import random
rand = random.randint(0, total_weight - 1)
cumulative = 0
for server in available_servers:
cumulative += server['weight']
if rand < cumulative:
return server
return available_servers[0]
elif self._ha_strategy == "random":
import random
return random.choice(available_servers)
return available_servers[0]
def _mark_server_failed(self, server_url: str):
"""Mark server as failed and potentially unavailable."""
for server in self._servers:
if server['url'] == server_url:
server['failure_count'] += 1
server['last_attempt'] = datetime.now()
# Mark unavailable after threshold
if server['failure_count'] >= self._Failurethreshold:
server['available'] = False
self.debug(f"Server {server_url} marked unavailable")
break
def _mark_server_success(self, server_url: str):
"""Mark server as successful."""
for server in self._servers:
if server['url'] == server_url:
server['failure_count'] = 0
server['available'] = True
break
Modify to use server selection:
def _get_keycloak_client(self):
"""Get Keycloak client with HA support."""
server = self._get_next_server()
if not server:
self.debug("No Keycloak servers available")
return None
try:
from ObjKeycloak import ObjKeycloak
client = ObjKeycloak(self.db_connection)
# Override server URL
client._server = server['url']
self.debug(f"Using Keycloak server: {server['url']}")
return client
except Exception as e:
self._mark_server_failed(server['url'])
self.debug(f"Failed to connect to {server['url']}: {e}")
# Try next server
return self._get_keycloak_client()
Wrap operations with failure tracking:
def authenticate(self, username: str, password: str, realm: str):
"""Authenticate with HA support."""
server = self._get_next_server()
if not server:
return self.get_cached_token(username, realm)
try:
keycloak = self._get_keycloak_client()
token = keycloak.authenticate(username, password, realm)
if token:
self._mark_server_success(server['url'])
self._cache_token(username, realm, token)
self._record_success()
return token
except Exception as e:
self._mark_server_failed(server['url'])
self._record_failure()
# Try next server recursively
return self.authenticate(username, password, realm)
return None
# Test failover
from factory.core.ObjKeycloakResilient import ObjKeycloakResilient
client = ObjKeycloakResilient()
# Enable HA in config first
# Simulate primary failure
# System should automatically fail over to backup
Enhanced alert system beyond basic circuit breaker notifications.
File: config.yaml - Add to keycloak.resilience:
keycloak:
resilience:
alerts:
enabled: true
channels: [email, slack] # Alert destinations
# Circuit open duration alert
circuit_open_minutes_threshold: 10
circuit_open_severity: warning
# Queue depth alerts
sync_queue_warning_threshold: 50
sync_queue_critical_threshold: 100
# Token refresh failure rate
token_refresh_failure_rate_threshold: 0.5 # 50%
# Failed sync count
failed_sync_threshold: 10
# Digest report schedule
daily_digest_time: "08:00" # UTC
weekly_digest_day: "monday"
File: factory.core/ObjKeycloakAlerts.py
from factory.core import ObjData, ObjNotify
from datetime import datetime, timedelta
class KeycloakAlerts(ObjData.ObjData):
"""Advanced alerting for Keycloak resilience system."""
def __init__(self, db_connection=0):
super().__init__(db_connection)
self._notify = ObjNotify.ObjNotify(db_connection)
# Load alert configuration
ini = ConfigIni.ConfigIni("config.yaml")
self._alerts_enabled = ini.Get(
"keycloak.resilience.alerts", "enabled", True
)
self._circuit_open_threshold = ini.Get(
"keycloak.resilience.alerts",
"circuit_open_minutes_threshold",
10
)
self._queue_warning = ini.Get(
"keycloak.resilience.alerts",
"sync_queue_warning_threshold",
50
)
self._queue_critical = ini.Get(
"keycloak.resilience.alerts",
"sync_queue_critical_threshold",
100
)
def check_circuit_open_duration(
self,
client: 'ObjKeycloakResilient'
):
"""Alert if circuit open too long."""
status = client.get_status()
if status['circuit_state'] == 'open':
last_failure = status.get('last_failure')
if last_failure:
duration = (datetime.now() - last_failure).seconds // 60
if duration >= self._circuit_open_threshold:
self._notify.send_notification(
title="⚠️ Keycloak Circuit Open",
message=f"Circuit breaker open for {duration} minutes. "
f"System in degraded mode.",
severity="warning",
category="keycloak_resilience"
)
def check_queue_depth(self, client: 'ObjKeycloakResilient'):
"""Alert on high queue depth."""
status = client.get_status()
queue_depth = status.get('pending_syncs', 0)
if queue_depth >= self._queue_critical:
self._notify.send_notification(
title="🚨 Critical: Keycloak Sync Queue",
message=f"Sync queue has {queue_depth} pending items (critical)",
severity="critical",
category="keycloak_resilience"
)
elif queue_depth >= self._queue_warning:
self._notify.send_notification(
title="⚠️ Warning: Keycloak Sync Queue",
message=f"Sync queue has {queue_depth} pending items",
severity="warning",
category="keycloak_resilience"
)
def check_failed_syncs(self, client: 'ObjKeycloakResilient'):
"""Alert on accumulating failed syncs."""
sql = """
SELECT COUNT(*) as failed_count
FROM sys_keycloak_sync_queue
WHERE Status = 'failed'
AND QueuedAt > DATE_SUB(NOW(), INTERVAL 1 HOUR)
"""
row = self.sql_get_row(sql)
failed_count = row[0] if row else 0
threshold = 10
if failed_count >= threshold:
self._notify.send_notification(
title="⚠️ Keycloak Failed Syncs",
message=f"{failed_count} syncs failed in the last hour. "
f"Manual intervention may be needed.",
severity="warning",
category="keycloak_resilience"
)
def send_daily_digest(self, package: Optional[str] = None):
"""Send daily status digest."""
from factory.report.package.core.ObjReportKeycloakStatus import Report
report = Report()
html = report.Render(package or "", "detailed")
self._notify.send_email(
to="ops@example.com",
subject=f"Keycloak Daily Digest - {datetime.now().strftime('%Y-%m-%d')}",
body_html=html
)
def send_weekly_summary(self, package: Optional[str] = None):
"""Send weekly summary report."""
# Include weekly trends, stats, issues
pass
File: factory.core/monitor_keycloak_alerts.py
#!/usr/bin/env python3
"""Monitor Keycloak and trigger alerts."""
from factory.core.ObjKeycloakResilient import ObjKeycloakResilient
from factory.core.ObjKeycloakAlerts import KeycloakAlerts
import time
def main():
client = ObjKeycloakResilient()
alerts = KeycloakAlerts()
while True:
alerts.check_circuit_open_duration(client)
alerts.check_queue_depth(client)
alerts.check_failed_syncs(client)
time.sleep(60) # Check every minute
if __name__ == "__main__":
main()
Systemd Service: resource.config/keycloak-alerts.service
Allow users to reset passwords via email verification.
File: local.processing/schema/package.sync/tables/sys_password_reset_tokens.yaml
table: sys_password_reset_tokens
columns:
- '`TokenId` varchar(64) NOT NULL PRIMARY KEY'
- '`User` varchar(100) NOT NULL'
- '`Package` varchar(100) NOT NULL'
- '`Token` varchar(64) NOT NULL UNIQUE'
- '`ExpiresAt` datetime NOT NULL'
- '`Used` tinyint(1) NOT NULL DEFAULT 0'
- '`CreatedAt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP'
indexes:
- name: idx_user_package
columns: [User, Package]
- name: idx_token
columns: [Token]
- name: idx_expires
columns: [ExpiresAt, Used]
def request_password_reset(
self,
username: str,
package: str,
ip_address: Optional[str] = None
) -> Dict[str, Any]:
"""
Request password reset token via email.
Args:
username: User email/username
package: Package name
ip_address: IP of request (for security)
Returns:
Result dictionary
"""
# Load user
self.Read(username=username, package=package)
if not self._User:
return {'success': False, 'reason': 'user_not_found'}
# Generate reset token
import secrets
token = secrets.token_urlsafe(32)
expires_at = datetime.now() + timedelta(hours=24)
sql = f"""
INSERT INTO sys_password_reset_tokens
(TokenId, User, Package, Token, ExpiresAt)
VALUES (
'{self.get_uuid()}',
'{self.escape_sql(username)}',
'{self.escape_sql(package)}',
'{token}',
'{expires_at.strftime('%Y-%m-%d %H:%M:%S')}'
)
"""
self.sql_execute(sql)
# Send reset email
reset_url = f"https://yourdomain.com/reset-password?token={token}"
from ObjNotify import ObjNotify
notify = ObjNotify(self.db_connection)
notify.send_email(
to=self._Email,
subject="Password Reset Request",
body_html=f"""
<p>You requested a password reset.</p>
<p>Click the link below to reset your password:</p>
<p><a href="{reset_url}">{reset_url}</a></p>
<p>This link expires in 24 hours.</p>
<p>If you didn't request this, ignore this email.</p>
"""
)
return {'success': True, 'message': 'Reset email sent'}
def reset_password_with_token(
self,
token: str,
new_password: str
) -> Dict[str, Any]:
"""
Reset password using reset token.
Args:
token: Reset token from email
new_password: New password
Returns:
Result dictionary
"""
# Verify token
sql = f"""
SELECT User, Package, ExpiresAt, Used
FROM sys_password_reset_tokens
WHERE Token = '{self.escape_sql(token)}'
"""
row = self.sql_get_row(sql)
if not row:
return {'success': False, 'reason': 'invalid_token'}
username, package, expires_at, used = row
if used:
return {'success': False, 'reason': 'token_already_used'}
if datetime.now() > expires_at:
return {'success': False, 'reason': 'token_expired'}
# Load user
self.Read(username=username, package=package)
# Set new password
hashed = self.encrypt(new_password, package)
sql = f"""
UPDATE sys_user
SET Password = '{hashed}',
UpdatedAt = NOW()
WHERE User = '{self.escape_sql(username)}'
AND Package = '{self.escape_sql(package)}'
"""
self.sql_execute(sql)
# Mark token as used
sql = f"""
UPDATE sys_password_reset_tokens
SET Used = 1
WHERE Token = '{self.escape_sql(token)}'
"""
self.sql_execute(sql)
# Queue Keycloak password sync
try:
from ObjKeycloakResilient import ObjKeycloakResilient
keycloak = ObjKeycloakResilient(self.db_connection)
keycloak.queue_sync(
username,
package,
'password_change',
{'password': new_password},
priority=8
)
except:
pass
return {'success': True, 'message': 'Password reset successfully'}
Create password reset pages in factory.pages/.
Support feature flags and environment-specific settings.
File: config.yaml - Add base section:
base:
environment: production # production, staging, development
feature_flags:
keycloak_enabled: true
rate_limiting_enabled: true
ha_mode_enabled: false
password_reset_enabled: true
staging:
environment: staging
feature_flags:
keycloak_enabled: true
rate_limiting_enabled: false # Looser for testing
ha_mode_enabled: false
development:
environment: development
feature_flags:
keycloak_enabled: false # Use mock
rate_limiting_enabled: false
ha_mode_enabled: false
File: factory.core/ObjFeatureFlags.py
import ConfigIni
class FeatureFlags:
"""Feature flag management."""
def __init__(self):
self.ini = ConfigIni.ConfigIni("config.yaml")
self.environment = self.ini.Get("base", "environment", "production")
def is_enabled(self, flag_name: str) -> bool:
"""Check if feature is enabled."""
return self.ini.Get("feature_flags", flag_name, False)
def get_environment(self) -> str:
"""Get current environment."""
return self.environment
# Usage
flags = FeatureFlags()
if flags.is_enabled("rate_limiting_enabled"):
# Apply rate limiting
pass
Interactive diagnostic and remediation tool.
File: factory.deploy/ObjKeycloakDoctor.py
import typer
from rich.console import Console
from rich.table import Table
from factory.core.ObjKeycloakResilient import ObjKeycloakResilient
from factory.core.ObjRateLimit import RateLimit
app = typer.Typer()
console = Console()
@app.command(name="diagnose")
def diagnose_system():
"""Run comprehensive system diagnostics."""
console.print("\n[bold]Keycloak Doctor - System Diagnostics[/bold]\n")
# Check 1: Configuration
console.print("[cyan]1. Checking configuration...[/cyan]")
# Validate config.yaml
# Check required fields
# Check 2: Database tables
console.print("[cyan]2. Checking database tables...[/cyan]")
# Verify all required tables exist
# Check 3: Keycloak connectivity
console.print("[cyan]3. Testing Keycloak connection...[/cyan]")
client = ObjKeycloakResilient()
status = client.get_status()
if status['available']:
console.print(" ✓ Keycloak is reachable")
else:
console.print(" ✗ Keycloak is unreachable")
console.print(f" Circuit state: {status['circuit_state']}")
# Check 4: Sync queue health
console.print("[cyan]4. Checking sync queue...[/cyan]")
pending = status.get('pending_syncs', 0)
if pending == 0:
console.print(" ✓ No pending syncs")
elif pending < 50:
console.print(f" ⚠ {pending} pending syncs (acceptable)")
else:
console.print(f" ✗ {pending} pending syncs (high)")
# Check 5: Token cache
console.print("[cyan]5. Checking token cache...[/cyan]")
# Check 6: Rate limiting
console.print("[cyan]6. Checking rate limiting...[/cyan]")
# Check 7: Recent errors
console.print("[cyan]7. Analyzing recent errors...[/cyan]")
@app.command(name="fix")
def auto_fix():
"""Attempt automatic remediation."""
console.print("\n[bold]Attempting automatic fixes...[/bold]\n")
# Fix 1: Create missing tables
# Fix 2: Clear stuck syncs
# Fix 3: Reset circuit breaker
# Fix 4: Cleanup expired tokens
@app.command(name="test-auth")
def test_authentication(
username: str = typer.Argument(...),
package: str = typer.Argument(...)
):
"""Test authentication flow for user."""
console.print(f"\n[bold]Testing authentication for {username}@{package}[/bold]\n")
# Step 1: Check user exists
# Step 2: Check rate limiting
# Step 3: Check Keycloak
# Step 4: Check cached tokens
if __name__ == "__main__":
app()
Implementation Status: 3/8 Complete (Admin CLI, Mock, Rate Limiting)
Remaining: HA Support, Alerting, Password Reset, Env Config, Doctor Tool
Estimated Effort: 8-12 hours for remaining features