Key Principle: Web users must be able to continue working even if Keycloak is unreachable.
class User(ObjData):
"""
Authentication priority:
1. Local sys_user (always works)
2. Keycloak (if available, provides SSO)
3. Cached Keycloak tokens (grace period)
"""
def authenticate(self, username: str, password: str, package: str) -> bool:
"""
Multi-tier authentication with fallback.
Priority:
1. Check local sys_user (instant, always available)
2. Validate with Keycloak (if available, for SSO benefits)
3. Use cached Keycloak session (if Keycloak down)
"""
# TIER 1: Local authentication (PRIMARY)
local_auth = self._authenticate_local(username, password, package)
if not local_auth:
return False # User doesn't exist or wrong password
# User authenticated locally - create session
session_id = self._create_session(username, package, 'password')
# TIER 2: Keycloak enhancement (OPTIONAL)
try:
if self._is_keycloak_available():
keycloak_token = self._authenticate_keycloak(username, password)
if keycloak_token:
# Store token for SSO
self._cache_keycloak_token(username, package, keycloak_token)
self._update_session_token(session_id, keycloak_token)
except Exception as e:
# Keycloak failed - continue with local auth
self.debug(f"Keycloak unavailable, using local auth: {e}")
return True # Success with local auth (Keycloak is bonus)
class KeycloakCircuitBreaker:
"""
Stop trying Keycloak after repeated failures.
Automatically retry after cooldown period.
"""
def __init__(self):
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
self.failure_threshold = 3
self.cooldown_seconds = 60
self.half_open_timeout = 10
def call_keycloak(self, func, *args, **kwargs):
"""
Execute Keycloak call with circuit breaker protection.
"""
if self.state == 'OPEN':
# Circuit is open - check if we should try again
if self._should_attempt_reset():
self.state = 'HALF_OPEN'
else:
raise KeycloakUnavailableException("Circuit breaker OPEN")
try:
result = func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise KeycloakUnavailableException(f"Keycloak call failed: {e}")
def _on_success(self):
"""Reset circuit breaker on success."""
self.failure_count = 0
self.state = 'CLOSED'
def _on_failure(self):
"""Increment failure count and possibly open circuit."""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
self.debug(f"Circuit breaker OPEN after {self.failure_count} failures")
def _should_attempt_reset(self):
"""Check if enough time has passed to try again."""
if not self.last_failure_time:
return True
elapsed = (datetime.now() - self.last_failure_time).total_seconds()
return elapsed >= self.cooldown_seconds
class KeycloakTokenCache:
"""
Cache Keycloak tokens with expiration.
Use cached tokens when Keycloak is down.
"""
def __init__(self, db_connection):
self.db = db_connection
self.grace_period_hours = 24 # Allow cached tokens for 24h
def cache_token(self, username: str, package: str, token_data: dict):
"""
Store Keycloak token in database cache.
"""
sql = f"""
INSERT INTO sys_keycloak_token_cache
(User, Package, AccessToken, RefreshToken, ExpiresAt, CachedAt)
VALUES
('{self.db.escape_sql(username)}',
'{self.db.escape_sql(package)}',
'{self.db.escape_sql(token_data['access_token'])}',
'{self.db.escape_sql(token_data.get('refresh_token', ''))}',
DATE_ADD(NOW(), INTERVAL {token_data.get('expires_in', 300)} SECOND),
NOW())
ON DUPLICATE KEY UPDATE
AccessToken = VALUES(AccessToken),
RefreshToken = VALUES(RefreshToken),
ExpiresAt = VALUES(ExpiresAt),
CachedAt = NOW()
"""
self.db.sql_execute(sql)
def get_cached_token(self, username: str, package: str) -> dict:
"""
Retrieve cached token if still valid (within grace period).
"""
sql = f"""
SELECT AccessToken, RefreshToken, ExpiresAt, CachedAt
FROM sys_keycloak_token_cache
WHERE User = '{self.db.escape_sql(username)}'
AND Package = '{self.db.escape_sql(package)}'
AND CachedAt > DATE_SUB(NOW(), INTERVAL {self.grace_period_hours} HOUR)
ORDER BY CachedAt DESC
LIMIT 1
"""
result = self.db.sql_get_array(sql)
if result:
return {
'access_token': result[0][0],
'refresh_token': result[0][1],
'expires_at': result[0][2],
'cached_at': result[0][3],
'is_cached': True
}
return None
def cleanup_expired(self):
"""
Remove old cached tokens (maintenance job).
"""
sql = f"""
DELETE FROM sys_keycloak_token_cache
WHERE CachedAt < DATE_SUB(NOW(), INTERVAL {self.grace_period_hours * 2} HOUR)
"""
self.db.sql_execute(sql)
class KeycloakHealthMonitor:
"""
Monitor Keycloak health and provide status.
"""
def __init__(self):
self.last_check_time = None
self.last_status = None
self.check_interval_seconds = 30
def is_available(self, force_check: bool = False) -> bool:
"""
Check if Keycloak is available.
Cache result for check_interval_seconds.
"""
now = datetime.now()
if not force_check and self.last_check_time:
elapsed = (now - self.last_check_time).total_seconds()
if elapsed < self.check_interval_seconds:
return self.last_status
# Perform health check
try:
response = requests.get(
f"{self.keycloak_url}/health/ready",
timeout=2, # Short timeout
verify=False
)
self.last_status = response.status_code == 200
except Exception:
self.last_status = False
self.last_check_time = now
return self.last_status
def get_status(self) -> dict:
"""
Get detailed Keycloak status.
"""
return {
'available': self.is_available(),
'last_check': self.last_check_time,
'last_status': self.last_status,
'mode': 'online' if self.last_status else 'offline'
}
description: Cache Keycloak tokens for offline operation
create:
cols:
- '`User` varchar(150) NOT NULL'
- '`Package` varchar(100) NOT NULL'
- '`AccessToken` text NOT NULL'
- '`RefreshToken` text NULL'
- '`ExpiresAt` datetime NOT NULL'
- '`CachedAt` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP'
keys:
PRIMARY:
- '`User`'
- '`Package`'
INDEXES:
'`idx_cached`':
- '`CachedAt`'
'`idx_expires`':
- '`ExpiresAt`'
name: sys_keycloak_token_cache
description: Track Keycloak synchronization status
create:
cols:
- '`SyncId` char(36) NOT NULL'
- '`Package` varchar(100) NOT NULL'
- '`SyncType` varchar(50) NOT NULL' # user_provision, token_refresh, etc.
- '`Status` varchar(20) NOT NULL' # success, failed, pending
- '`StartedAt` datetime NOT NULL'
- '`CompletedAt` datetime NULL'
- '`ErrorMessage` text NULL'
- '`ItemsProcessed` int NOT NULL DEFAULT 0'
- '`ItemsFailed` int NOT NULL DEFAULT 0'
keys:
PRIMARY:
- '`SyncId`'
INDEXES:
'`idx_package_status`':
- '`Package`'
- '`Status`'
'`idx_started`':
- '`StartedAt`'
name: sys_keycloak_sync_log
keycloak:
# Connection settings
server: https://auth.technocore.co.za
realm: technocore
clientid: 3d7427c8-887b-42d7-ae18-6104f649082e
# Resilience settings
resilience:
# Enable fallback to local auth
enable_fallback: true
# Circuit breaker settings
failure_threshold: 3
cooldown_seconds: 60
# Token caching
cache_tokens: true
token_grace_period_hours: 24
# Health checks
health_check_interval: 30
health_check_timeout: 2
# Offline mode
allow_offline_mode: true
offline_mode_max_duration_hours: 72
# Sync settings
auto_sync_enabled: true
sync_interval_minutes: 15
# Degraded mode features
degraded_mode:
allow_login: true
allow_password_change: false # Requires Keycloak
allow_user_creation: false # Requires Keycloak
allow_group_changes: false # Requires Keycloak
class User(ObjData):
"""Enhanced User class with Keycloak resilience."""
def __init__(self, DB=0):
super().__init__(DB)
# Keycloak components (optional)
self.keycloak_health = KeycloakHealthMonitor()
self.keycloak_circuit = KeycloakCircuitBreaker()
self.keycloak_cache = KeycloakTokenCache(self.DB)
# Configuration
self.keycloak_enabled = self.ini.Get("keycloak", "resilience", "enable_fallback", True)
def Login(self, username: str, password: str, package: str = None) -> dict:
"""
Login with automatic fallback.
Returns:
{
'success': bool,
'session_id': str,
'auth_mode': 'local' | 'keycloak' | 'cached',
'user': User object,
'keycloak_status': 'online' | 'offline' | 'degraded'
}
"""
if not package:
package = self.get_package()
result = {
'success': False,
'session_id': None,
'auth_mode': None,
'user': None,
'keycloak_status': 'unknown'
}
# Step 1: Local authentication (ALWAYS)
if not self._authenticate_local(username, password, package):
return result # Invalid credentials
# Load user object
self.Read(username, package=package)
result['user'] = self
# Step 2: Keycloak enhancement (OPTIONAL)
keycloak_available = self.keycloak_health.is_available()
result['keycloak_status'] = 'online' if keycloak_available else 'offline'
if keycloak_available and self.keycloak_enabled:
# Try Keycloak authentication
try:
token = self.keycloak_circuit.call_keycloak(
self._authenticate_keycloak,
username, password, package
)
if token:
# Cache token for offline use
self.keycloak_cache.cache_token(username, package, token)
result['auth_mode'] = 'keycloak'
else:
# Keycloak auth failed - use local
result['auth_mode'] = 'local'
except KeycloakUnavailableException:
# Circuit breaker open or Keycloak down
result['auth_mode'] = 'local'
result['keycloak_status'] = 'degraded'
else:
# Keycloak disabled or unavailable - use local
result['auth_mode'] = 'local'
# Step 3: Create session
session_id = self._create_session(username, package, result['auth_mode'])
result['session_id'] = session_id
result['success'] = True
# Step 4: Log authentication
self._log_authentication(username, package, result)
return result
def ValidateSession(self, session_id: str) -> dict:
"""
Validate session with graceful degradation.
"""
# Check local session first (fast)
session = self._get_session(session_id)
if not session or not session['is_active']:
return {'valid': False, 'reason': 'session_not_found'}
# Check session expiration
if session['expires_at'] < datetime.now():
return {'valid': False, 'reason': 'session_expired'}
# If session has Keycloak token, try to validate (best effort)
if session.get('session_token') and self.keycloak_health.is_available():
try:
keycloak_valid = self._validate_keycloak_token(session['session_token'])
if not keycloak_valid:
# Token invalid - check cache
cached_token = self.keycloak_cache.get_cached_token(
session['user'],
session['package']
)
if cached_token:
return {
'valid': True,
'mode': 'cached',
'warning': 'Using cached token'
}
else:
# No cached token - invalidate session
return {'valid': False, 'reason': 'token_invalid'}
except Exception as e:
# Keycloak unavailable - allow session to continue
self.debug(f"Keycloak unavailable during validation: {e}")
# Session valid
return {'valid': True, 'mode': session.get('auth_mode', 'local')}
def ChangePassword(self, username: str, old_password: str, new_password: str, package: str = None) -> dict:
"""
Change password with Keycloak sync.
"""
if not package:
package = self.get_package()
result = {
'success': False,
'local_updated': False,
'keycloak_updated': False,
'mode': 'unknown'
}
# Verify old password locally
if not self._authenticate_local(username, old_password, package):
return result
# Update local password (ALWAYS)
self.Read(username, package=package)
self._Password = self.encrypt(new_password)
self._PasswordAge = datetime.now()
self.Update()
result['local_updated'] = True
# Try to update Keycloak (BEST EFFORT)
if self.keycloak_health.is_available() and self.keycloak_enabled:
try:
keycloak_success = self.keycloak_circuit.call_keycloak(
self._update_keycloak_password,
username, new_password, package
)
result['keycloak_updated'] = keycloak_success
result['mode'] = 'synchronized'
except KeycloakUnavailableException:
# Keycloak down - queue for later sync
self._queue_password_sync(username, package, new_password)
result['mode'] = 'local_only_queued'
else:
result['mode'] = 'local_only'
result['success'] = True
return result
def _queue_password_sync(self, username: str, package: str, new_password: str):
"""
Queue password change for later Keycloak sync.
"""
sql = f"""
INSERT INTO sys_keycloak_sync_queue
(User, Package, SyncType, SyncData, QueuedAt, Status)
VALUES
('{self.escape_sql(username)}',
'{self.escape_sql(package)}',
'password_change',
'{self.escape_sql(self.encrypt(new_password))}',
NOW(),
'pending')
"""
self.sql_execute(sql)
class KeycloakSyncService:
"""
Background service to sync with Keycloak when it comes back online.
"""
def __init__(self, db_connection):
self.db = db_connection
self.health = KeycloakHealthMonitor()
def run_sync_cycle(self):
"""
Run one sync cycle - process queued items.
"""
if not self.health.is_available():
self.debug("Keycloak unavailable - skipping sync")
return
# Get pending sync items
sql = """
SELECT SyncId, User, Package, SyncType, SyncData
FROM sys_keycloak_sync_queue
WHERE Status = 'pending'
ORDER BY QueuedAt ASC
LIMIT 100
"""
items = self.db.sql_get_array(sql)
for item in items:
sync_id, user, package, sync_type, sync_data = item
try:
if sync_type == 'password_change':
self._sync_password(user, package, sync_data)
elif sync_type == 'user_provision':
self._sync_user(user, package, sync_data)
elif sync_type == 'group_change':
self._sync_groups(user, package, sync_data)
# Mark as completed
self._mark_sync_complete(sync_id)
except Exception as e:
self._mark_sync_failed(sync_id, str(e))
def _sync_password(self, user: str, package: str, encrypted_password: str):
"""Sync password change to Keycloak."""
# Decrypt password
password = self.db.decrypt(encrypted_password)
# Update in Keycloak
keycloak = ObjKeycloak(self.db)
keycloak.update_user_password(package, user, password)
def _mark_sync_complete(self, sync_id: str):
"""Mark sync item as completed."""
sql = f"""
UPDATE sys_keycloak_sync_queue
SET Status = 'completed', CompletedAt = NOW()
WHERE SyncId = '{self.escape_sql(sync_id)}'
"""
self.db.sql_execute(sql)
def _mark_sync_failed(self, sync_id: str, error: str):
"""Mark sync item as failed."""
sql = f"""
UPDATE sys_keycloak_sync_queue
SET Status = 'failed',
ErrorMessage = '{self.escape_sql(error)}',
AttemptCount = AttemptCount + 1
WHERE SyncId = '{self.escape_sql(sync_id)}'
"""
self.db.sql_execute(sql)
# User attempts login
user = User()
result = user.Login("john.doe", "password123", "homechoice")
if result['success']:
print(f"Login successful!")
print(f"Auth mode: {result['auth_mode']}") # local, keycloak, or cached
print(f"Keycloak status: {result['keycloak_status']}") # online, offline, degraded
if result['auth_mode'] == 'local':
print("⚠ Operating in local-only mode (Keycloak unavailable)")
elif result['auth_mode'] == 'cached':
print("⚠ Using cached Keycloak token (offline mode)")
else:
print("Login failed - invalid credentials")
# Validate session on each request
user = User()
validation = user.ValidateSession(session_id)
if validation['valid']:
# Session is valid - proceed
if validation.get('mode') == 'cached':
# Show warning banner
print("⚠ Operating with cached authentication")
else:
# Session invalid - redirect to login
print(f"Session invalid: {validation.get('reason')}")
# User changes password
user = User()
result = user.ChangePassword("john.doe", "old_pass", "new_pass", "homechoice")
if result['success']:
print("Password changed successfully!")
if result['keycloak_updated']:
print("✓ Keycloak synchronized")
elif result['mode'] == 'local_only_queued':
print("⚠ Keycloak unavailable - change queued for sync")
else:
print("⚠ Local password only (Keycloak disabled)")
def get_keycloak_health_status() -> dict:
"""
Get comprehensive Keycloak health status.
"""
health = KeycloakHealthMonitor()
circuit = KeycloakCircuitBreaker()
return {
'available': health.is_available(),
'last_check': health.last_check_time,
'circuit_state': circuit.state,
'failure_count': circuit.failure_count,
'pending_syncs': get_pending_sync_count(),
'failed_syncs': get_failed_sync_count(),
'cached_sessions': get_cached_session_count(),
'mode': 'normal' if health.is_available() else 'degraded'
}
# Alert if Keycloak down for > 5 minutes
if keycloak_downtime > 300:
send_alert("Keycloak unavailable", severity="warning")
# Alert if sync queue growing
if pending_sync_count > 100:
send_alert("Keycloak sync queue growing", severity="warning")
# Alert if circuit breaker open
if circuit.state == 'OPEN':
send_alert("Keycloak circuit breaker open", severity="error")
# Alert if cached sessions expiring soon
if expiring_cached_sessions > 50:
send_alert("Cached sessions expiring", severity="warning")
# Simulate Keycloak outage
disable_keycloak()
# Users can still login
result = user.Login("john.doe", "password", "homechoice")
assert result['success'] == True
assert result['auth_mode'] == 'local'
assert result['keycloak_status'] == 'offline'
# User logged in with Keycloak
result = user.Login("john.doe", "password", "homechoice")
session_id = result['session_id']
# Keycloak goes down
disable_keycloak()
# Session still valid (uses cache)
validation = user.ValidateSession(session_id)
assert validation['valid'] == True
assert validation['mode'] == 'cached'
# Keycloak comes back online
enable_keycloak()
# Pending syncs processed
sync_service = KeycloakSyncService(db)
sync_service.run_sync_cycle()
# Verify syncs completed
assert get_pending_sync_count() == 0
keycloak:
resilience:
failure_threshold: 1 # Fast fallback
cooldown_seconds: 30 # Quick recovery attempts
token_grace_period_hours: 48 # Long grace period
allow_offline_mode: true
keycloak:
resilience:
failure_threshold: 3
cooldown_seconds: 60
token_grace_period_hours: 24
allow_offline_mode: true
keycloak:
resilience:
failure_threshold: 5 # Try harder
cooldown_seconds: 120 # Longer cooldown
token_grace_period_hours: 12 # Shorter grace
allow_offline_mode: true # But still allow fallback
This design ensures your web application is resilient and can continue operating even when Keycloak is unavailable, while still leveraging Keycloak's benefits when it's available.