This guide provides practical instructions for handling personal data in compliance with GDPR (General Data Protection Regulation) and POPIA (Protection of Personal Information Act). It covers both local data storage and email communications.
GDPR and POPIA require organizations to:
data.configfrom ObjData import ObjData
def store_customer_data(customer_id, name, email, phone, id_number):
"""Store customer data with encryption for sensitive fields."""
obj = ObjData(0)
# Encrypt sensitive personal information
encrypted_id = obj.encryption_rsa_encode(id_number)
encrypted_phone = obj.encryption_rsa_encode(phone)
# Store with consent timestamp
obj.sql_execute(f"""
INSERT INTO customer_data (
customer_id,
name,
email,
phone_encrypted,
id_number_encrypted,
consent_date,
data_retention_until,
created_at
) VALUES (
'{customer_id}',
'{name}',
'{email}',
'{encrypted_phone}',
'{encrypted_id}',
NOW(),
DATE_ADD(NOW(), INTERVAL 7 YEAR),
NOW()
)
""")
# Log the data storage event
log_data_access(customer_id, "STORE", "Customer data stored with consent")
return True
from ObjNotify import ObjNotify
def send_gdpr_compliant_email(customer_email, customer_name, message):
"""Send email ensuring GDPR compliance."""
# Minimize data in email - only include necessary information
email_body = f"""
Dear {customer_name.split()[0]}, # Use first name only
{message}
This email contains personal information. Please delete after reading.
Your privacy rights:
- Request data access: privacy@company.com
- Request deletion: privacy@company.com
- Unsubscribe: privacy@company.com
This communication is encrypted in transit via TLS.
"""
notify = ObjNotify(0)
notify.Run(
notify_code="CUSTOMER_EMAIL",
message_text=email_body
)
# Log the email transmission
log_data_access(
customer_email,
"EMAIL_SENT",
f"Email sent to {customer_email}"
)
def handle_data_access_request(customer_id):
"""Process GDPR/POPIA data access request."""
obj = ObjData(0)
# Retrieve all personal data
customer_data = obj.query_identity(f"""
SELECT
customer_id,
name,
email,
phone_encrypted,
id_number_encrypted,
consent_date,
created_at,
updated_at
FROM customer_data
WHERE customer_id = '{customer_id}'
""")
if not customer_data:
return {"error": "No data found"}
# Decrypt sensitive fields
customer_data['phone'] = obj.encryption_rsa_decode(
customer_data['phone_encrypted']
)
customer_data['id_number'] = obj.encryption_rsa_decode(
customer_data['id_number_encrypted']
)
# Remove encrypted fields from response
del customer_data['phone_encrypted']
del customer_data['id_number_encrypted']
# Log the access request
log_data_access(
customer_id,
"DSAR_ACCESS",
"Data subject access request fulfilled"
)
return customer_data
# Configure what constitutes personal data in your system
PERSONAL_DATA_FIELDS = {
"HIGHLY_SENSITIVE": [
"id_number", # National ID, SSN
"passport_number",
"credit_card_number",
"bank_account",
"medical_records",
"biometric_data"
],
"SENSITIVE": [
"phone_number",
"email_address",
"physical_address",
"date_of_birth",
"financial_data"
],
"BASIC": [
"name",
"surname",
"gender",
"title"
]
}
def classify_field(field_name):
"""Classify data field sensitivity."""
for category, fields in PERSONAL_DATA_FIELDS.items():
if field_name in fields:
return category
return "NON_PERSONAL"
def should_encrypt_field(field_name):
"""Determine if field requires encryption."""
classification = classify_field(field_name)
# Encrypt all highly sensitive and sensitive data
return classification in ["HIGHLY_SENSITIVE", "SENSITIVE"]
def store_classified_data(table_name, data_dict):
"""Store data with appropriate encryption."""
obj = ObjData(0)
encrypted_data = {}
for field, value in data_dict.items():
if should_encrypt_field(field):
encrypted_data[f"{field}_encrypted"] = obj.encryption_rsa_encode(value)
else:
encrypted_data[field] = value
# Build INSERT statement
columns = ", ".join(encrypted_data.keys())
values = "', '".join(str(v) for v in encrypted_data.values())
obj.sql_execute(f"""
INSERT INTO {table_name} ({columns})
VALUES ('{values}')
""")
# config.yaml
data_retention:
customer_data:
retention_period_years: 7
auto_delete: true
transaction_logs:
retention_period_years: 10
auto_delete: false # Keep for legal compliance
marketing_consent:
retention_period_years: 2
auto_delete: true
from datetime import datetime, timedelta
def delete_expired_personal_data():
"""Delete personal data that has exceeded retention period."""
obj = ObjData(0)
# Find records past retention date
expired_records = obj.sql_execute("""
SELECT customer_id, name, data_retention_until
FROM customer_data
WHERE data_retention_until < NOW()
AND deleted_at IS NULL
""")
deletion_count = 0
for record in expired_records:
customer_id = record['customer_id']
# Soft delete - mark as deleted
obj.sql_execute(f"""
UPDATE customer_data
SET deleted_at = NOW(),
name = 'DELETED',
email = 'DELETED',
phone_encrypted = NULL,
id_number_encrypted = NULL
WHERE customer_id = '{customer_id}'
""")
# Log the deletion
log_data_access(
customer_id,
"AUTO_DELETE",
f"Data deleted after retention period: {record['data_retention_until']}"
)
deletion_count += 1
return {
"deleted_count": deletion_count,
"deleted_records": [r['customer_id'] for r in expired_records]
}
def delete_customer_data(customer_id, reason="Customer request"):
"""Handle GDPR Right to Erasure / POPIA deletion request."""
obj = ObjData(0)
# Verify customer exists
customer = obj.query_identity(f"""
SELECT customer_id, name, email
FROM customer_data
WHERE customer_id = '{customer_id}'
AND deleted_at IS NULL
""")
if not customer:
return {"success": False, "error": "Customer not found or already deleted"}
# Check if deletion is legally allowed
if has_legal_obligation_to_retain(customer_id):
return {
"success": False,
"error": "Cannot delete - legal retention requirement applies"
}
# Delete from all tables containing personal data
tables_with_personal_data = [
"customer_data",
"customer_addresses",
"customer_preferences",
"marketing_consent"
]
for table in tables_with_personal_data:
obj.sql_execute(f"""
UPDATE {table}
SET deleted_at = NOW()
WHERE customer_id = '{customer_id}'
""")
# Anonymize transaction history (keep for analytics)
obj.sql_execute(f"""
UPDATE transactions
SET customer_id = 'ANONYMIZED',
customer_name = 'DELETED USER'
WHERE customer_id = '{customer_id}'
""")
# Log the deletion
log_data_access(
customer_id,
"MANUAL_DELETE",
f"Data deletion: {reason}"
)
# Send confirmation email
send_deletion_confirmation(customer['email'], customer['name'])
return {
"success": True,
"customer_id": customer_id,
"deleted_from": tables_with_personal_data
}
def has_legal_obligation_to_retain(customer_id):
"""Check if legal obligation requires data retention."""
obj = ObjData(0)
# Example: Check for ongoing legal cases
legal_cases = obj.query_get_value(f"""
SELECT COUNT(*) FROM legal_cases
WHERE customer_id = '{customer_id}'
AND status = 'ACTIVE'
""")
# Example: Check for financial audits
pending_audits = obj.query_get_value(f"""
SELECT COUNT(*) FROM audit_flags
WHERE customer_id = '{customer_id}'
AND audit_completed = 'N'
""")
return legal_cases > 0 or pending_audits > 0
def generate_data_export(customer_id, export_format="JSON"):
"""Generate complete data export for customer."""
obj = ObjData(0)
# Collect all personal data
personal_data = {
"customer_profile": get_customer_profile(customer_id),
"transactions": get_customer_transactions(customer_id),
"communications": get_customer_communications(customer_id),
"consent_history": get_consent_history(customer_id),
"access_logs": get_access_logs(customer_id)
}
# Decrypt all encrypted fields
for category, data in personal_data.items():
if isinstance(data, dict):
for key, value in data.items():
if key.endswith('_encrypted') and value:
decrypted_key = key.replace('_encrypted', '')
data[decrypted_key] = obj.encryption_rsa_decode(value)
del data[key]
# Log the export
log_data_access(
customer_id,
"DATA_EXPORT",
f"Full data export generated in {export_format} format"
)
if export_format == "JSON":
import json
return json.dumps(personal_data, indent=2, default=str)
elif export_format == "CSV":
return convert_to_csv(personal_data)
else:
return personal_data
def update_customer_data(customer_id, field_updates):
"""Handle data correction request."""
obj = ObjData(0)
# Validate that customer can update these fields
allowed_fields = [
"name", "email", "phone", "address",
"date_of_birth", "gender"
]
update_statements = []
for field, new_value in field_updates.items():
if field not in allowed_fields:
continue
# Encrypt sensitive fields
if should_encrypt_field(field):
encrypted_value = obj.encryption_rsa_encode(new_value)
update_statements.append(f"{field}_encrypted = '{encrypted_value}'")
else:
update_statements.append(f"{field} = '{new_value}'")
if update_statements:
obj.sql_execute(f"""
UPDATE customer_data
SET {', '.join(update_statements)},
updated_at = NOW()
WHERE customer_id = '{customer_id}'
""")
# Log the update
log_data_access(
customer_id,
"DATA_CORRECTION",
f"Fields updated: {', '.join(field_updates.keys())}"
)
return {"success": True, "updated_fields": list(field_updates.keys())}
return {"success": False, "error": "No valid fields to update"}
def export_portable_data(customer_id):
"""Export data in machine-readable format for portability."""
obj = ObjData(0)
# Export in standardized JSON format
portable_data = {
"metadata": {
"export_date": str(datetime.now()),
"customer_id": customer_id,
"format_version": "1.0"
},
"personal_data": {
"profile": get_customer_profile(customer_id),
"preferences": get_customer_preferences(customer_id),
"history": get_customer_history(customer_id)
}
}
# Decrypt sensitive data
decrypt_export_data(portable_data)
log_data_access(
customer_id,
"PORTABILITY_EXPORT",
"Portable data export generated"
)
return portable_data
def restrict_data_processing(customer_id, restriction_type):
"""Implement processing restriction."""
obj = ObjData(0)
restriction_flags = {
"MARKETING": "restrict_marketing",
"PROFILING": "restrict_profiling",
"ANALYTICS": "restrict_analytics",
"ALL": "restrict_all_processing"
}
flag = restriction_flags.get(restriction_type, "restrict_all_processing")
obj.sql_execute(f"""
UPDATE customer_data
SET {flag} = 'Y',
restriction_date = NOW()
WHERE customer_id = '{customer_id}'
""")
log_data_access(
customer_id,
"RESTRICT_PROCESSING",
f"Processing restriction applied: {restriction_type}"
)
return {"success": True, "restriction": restriction_type}
def check_processing_allowed(customer_id, processing_type):
"""Check if data processing is allowed."""
obj = ObjData(0)
restrictions = obj.query_identity(f"""
SELECT
restrict_marketing,
restrict_profiling,
restrict_analytics,
restrict_all_processing
FROM customer_data
WHERE customer_id = '{customer_id}'
""")
if restrictions.get('restrict_all_processing') == 'Y':
return False
restriction_map = {
"MARKETING": "restrict_marketing",
"PROFILING": "restrict_profiling",
"ANALYTICS": "restrict_analytics"
}
field = restriction_map.get(processing_type)
return restrictions.get(field) != 'Y'
def send_compliant_notification(customer_id, notification_type, data):
"""Send email with minimal personal data."""
obj = ObjData(0)
# Get only necessary customer information
customer = obj.query_identity(f"""
SELECT
customer_id,
SUBSTRING(name, 1, LOCATE(' ', name)) as first_name,
email
FROM customer_data
WHERE customer_id = '{customer_id}'
AND email_consent = 'Y'
""")
if not customer:
return {"error": "No consent for email communication"}
# Check processing restrictions
if not check_processing_allowed(customer_id, "MARKETING"):
return {"error": "Marketing communications restricted"}
# Build minimal email (data minimization principle)
email_templates = {
"ORDER_CONFIRMATION": """
Dear {first_name},
Your order has been confirmed.
Order reference: {order_ref}
View details: {secure_link}
Unsubscribe: {unsubscribe_link}
""",
"PASSWORD_RESET": """
Dear {first_name},
Reset your password: {reset_link}
This link expires in 1 hour.
If you didn't request this, please ignore.
"""
}
template = email_templates.get(notification_type, "")
message = template.format(
first_name=customer['first_name'],
**data
)
# Send via encrypted channel (TLS)
notify = ObjNotify(0)
notify.Run(
notify_code="CUSTOMER_NOTIFICATION",
message_text=message
)
# Log email sent
log_data_access(
customer_id,
"EMAIL_NOTIFICATION",
f"Email sent: {notification_type}"
)
return {"success": True, "email_sent": customer['email']}
def send_email_with_encrypted_attachment(customer_id, document_path):
"""Send email with encrypted document attachment."""
import os
from ObjEncryption import ObjEncrypt
obj = ObjData(0)
encrypt = ObjEncrypt(0)
# Encrypt the document
with open(document_path, 'rb') as f:
document_content = f.read()
encrypted_content = encrypt.encrypt_file_content(document_content)
# Create temporary encrypted file
encrypted_path = f"{document_path}.encrypted"
with open(encrypted_path, 'wb') as f:
f.write(encrypted_content)
# Generate decryption password
import secrets
decryption_password = secrets.token_urlsafe(16)
# Send email with encrypted attachment
notify = ObjNotify(0)
notify.send_email_with_attachment(
recipient=get_customer_email(customer_id),
subject="Secure Document",
body=f"""
Your document is attached in encrypted format.
Decryption password (sent separately via SMS): ****
This email is encrypted in transit via TLS.
Delete this email after downloading the document.
""",
attachment_path=encrypted_path
)
# Send password via SMS (separate channel)
send_decryption_password_sms(customer_id, decryption_password)
# Clean up
os.remove(encrypted_path)
log_data_access(
customer_id,
"SECURE_EMAIL_SENT",
"Encrypted document sent via email"
)
def handle_email_unsubscribe(customer_id, unsubscribe_token):
"""Process email unsubscribe request."""
obj = ObjData(0)
# Verify token
if not verify_unsubscribe_token(customer_id, unsubscribe_token):
return {"error": "Invalid unsubscribe token"}
# Update consent
obj.sql_execute(f"""
UPDATE customer_data
SET email_consent = 'N',
email_consent_withdrawn = NOW()
WHERE customer_id = '{customer_id}'
""")
# Log withdrawal
log_data_access(
customer_id,
"CONSENT_WITHDRAWN",
"Email marketing consent withdrawn"
)
# Send confirmation (one final email)
send_unsubscribe_confirmation(customer_id)
return {"success": True, "message": "Unsubscribed successfully"}
def get_customer_consent_status(customer_id):
"""Check current consent status."""
obj = ObjData(0)
consent = obj.query_identity(f"""
SELECT
email_consent,
sms_consent,
marketing_consent,
profiling_consent,
consent_date,
email_consent_withdrawn
FROM customer_data
WHERE customer_id = '{customer_id}'
""")
return consent
def log_data_access(entity_id, action_type, description, user_code=None):
"""Log all personal data access for GDPR/POPIA compliance."""
obj = ObjData(0)
import inspect
# Get calling function and file
caller_frame = inspect.stack()[1]
caller_function = caller_frame.function
caller_file = caller_frame.filename
# Get IP address if available
ip_address = get_current_ip_address()
obj.sql_execute(f"""
INSERT INTO data_access_log (
entity_id,
action_type,
description,
user_code,
ip_address,
caller_function,
caller_file,
created_at
) VALUES (
'{entity_id}',
'{action_type}',
'{description}',
'{user_code or "SYSTEM"}',
'{ip_address}',
'{caller_function}',
'{caller_file}',
NOW()
)
""")
def get_access_logs(customer_id, days=30):
"""Retrieve access logs for customer."""
obj = ObjData(0)
logs = obj.sql_execute(f"""
SELECT
action_type,
description,
user_code,
ip_address,
created_at
FROM data_access_log
WHERE entity_id = '{customer_id}'
AND created_at >= DATE_SUB(NOW(), INTERVAL {days} DAY)
ORDER BY created_at DESC
""")
return logs
-- Database table for GDPR/POPIA audit logging
CREATE TABLE IF NOT EXISTS data_access_log (
log_id INT AUTO_INCREMENT PRIMARY KEY,
entity_id VARCHAR(255) NOT NULL,
action_type VARCHAR(50) NOT NULL,
description TEXT,
user_code VARCHAR(255),
ip_address VARCHAR(45),
caller_function VARCHAR(255),
caller_file VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
INDEX idx_entity_id (entity_id),
INDEX idx_action_type (action_type),
INDEX idx_created_at (created_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
def record_consent(customer_id, consent_type, consent_given=True, consent_source="WEB"):
"""Record customer consent for data processing."""
obj = ObjData(0)
consent_value = 'Y' if consent_given else 'N'
# Update current consent status
obj.sql_execute(f"""
UPDATE customer_data
SET {consent_type}_consent = '{consent_value}',
consent_date = NOW(),
consent_source = '{consent_source}'
WHERE customer_id = '{customer_id}'
""")
# Log to consent history
obj.sql_execute(f"""
INSERT INTO consent_history (
customer_id,
consent_type,
consent_given,
consent_source,
ip_address,
user_agent,
created_at
) VALUES (
'{customer_id}',
'{consent_type}',
'{consent_value}',
'{consent_source}',
'{get_current_ip_address()}',
'{get_user_agent()}',
NOW()
)
""")
log_data_access(
customer_id,
"CONSENT_UPDATED",
f"{consent_type} consent: {consent_value}"
)
def get_consent_history(customer_id):
"""Retrieve full consent history."""
obj = ObjData(0)
history = obj.sql_execute(f"""
SELECT
consent_type,
consent_given,
consent_source,
ip_address,
created_at
FROM consent_history
WHERE customer_id = '{customer_id}'
ORDER BY created_at DESC
""")
return history
CONSENT_TYPES = {
"EMAIL_MARKETING": "email",
"SMS_MARKETING": "sms",
"PROFILING": "profiling",
"ANALYTICS": "analytics",
"THIRD_PARTY_SHARING": "third_party",
"AUTOMATED_DECISIONS": "automated"
}
def request_customer_consent(customer_id, required_consents):
"""Request specific consents from customer."""
consent_request = {
"customer_id": customer_id,
"consents_required": [],
"timestamp": str(datetime.now())
}
for consent_type in required_consents:
consent_request["consents_required"].append({
"type": consent_type,
"description": get_consent_description(consent_type),
"mandatory": is_consent_mandatory(consent_type)
})
return consent_request
def get_consent_description(consent_type):
"""Get user-friendly consent description."""
descriptions = {
"EMAIL_MARKETING": "Receive promotional emails and newsletters",
"SMS_MARKETING": "Receive SMS notifications and offers",
"PROFILING": "Use your data to personalize your experience",
"ANALYTICS": "Analyze your usage for service improvement",
"THIRD_PARTY_SHARING": "Share your data with trusted partners",
"AUTOMATED_DECISIONS": "Make automated decisions affecting you"
}
return descriptions.get(consent_type, "Unknown consent type")
def is_consent_mandatory(consent_type):
"""Determine if consent is mandatory for service."""
mandatory_consents = ["ANALYTICS"] # Adjust based on your needs
return consent_type in mandatory_consents
def detect_data_breach():
"""Monitor for potential data breaches."""
obj = ObjData(0)
# Check for unusual access patterns
suspicious_access = obj.sql_execute("""
SELECT
user_code,
COUNT(*) as access_count,
COUNT(DISTINCT entity_id) as unique_entities
FROM data_access_log
WHERE created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
GROUP BY user_code
HAVING access_count > 100 -- Threshold
""")
if suspicious_access:
trigger_breach_investigation(suspicious_access)
# Check for unauthorized decryption attempts
failed_decryptions = obj.sql_execute("""
SELECT COUNT(*) as failure_count
FROM system_logs
WHERE log_type = 'DECRYPTION_FAILED'
AND created_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
""")
if failed_decryptions[0]['failure_count'] > 50:
trigger_breach_alert("Multiple decryption failures detected")
def trigger_breach_investigation(suspicious_activity):
"""Initiate breach investigation protocol."""
# Notify security team
notify = ObjNotify(0)
notify.Run(
"SECURITY_ALERT",
f"Potential data breach detected: {suspicious_activity}"
)
# Log the breach detection
log_data_access(
"SYSTEM",
"BREACH_DETECTED",
f"Suspicious activity: {suspicious_activity}"
)
# Implement automatic containment if needed
implement_breach_containment()
def implement_breach_containment():
"""Automatic containment measures."""
obj = ObjData(0)
# Temporarily restrict all data access
obj.sql_execute("""
UPDATE system_config
SET data_access_restricted = 'Y'
WHERE config_key = 'security'
""")
# Notify compliance officer
notify_compliance_officer("Data breach containment activated")
def notify_data_breach(affected_customers, breach_description):
"""Notify customers of data breach (GDPR Article 34)."""
obj = ObjData(0)
notify = ObjNotify(0)
for customer_id in affected_customers:
customer = get_customer_contact(customer_id)
breach_notification = f"""
IMPORTANT SECURITY NOTICE
Dear {customer['name']},
We are writing to inform you of a data security incident that may
have affected your personal information.
What happened:
{breach_description}
What information was affected:
{get_affected_data_types(customer_id)}
What we are doing:
- Investigating the incident
- Implementing additional security measures
- Notifying relevant authorities
What you should do:
- Change your password immediately
- Monitor your accounts for suspicious activity
- Contact us with any concerns
We sincerely apologize for this incident.
Contact: privacy@company.com
"""
notify.Run("BREACH_NOTIFICATION", breach_notification)
log_data_access(
customer_id,
"BREACH_NOTIFIED",
"Customer notified of data breach"
)
class PrivacyByDesignValidator:
"""Validate privacy compliance in new features."""
def validate_new_feature(self, feature_spec):
"""Check feature against privacy requirements."""
checks = {
"data_minimization": self.check_data_minimization(feature_spec),
"consent_required": self.check_consent_requirements(feature_spec),
"encryption_needed": self.check_encryption_needs(feature_spec),
"retention_defined": self.check_retention_policy(feature_spec),
"dsar_compatible": self.check_dsar_compatibility(feature_spec)
}
return all(checks.values()), checks
def check_data_minimization(self, feature_spec):
"""Ensure only necessary data is collected."""
collected_fields = feature_spec.get('data_fields', [])
required_fields = feature_spec.get('required_fields', [])
# Check if collecting more than required
return len(collected_fields) == len(required_fields)
def run_compliance_audit():
"""Run comprehensive GDPR/POPIA compliance audit."""
obj = ObjData(0)
audit_results = {
"encryption_compliance": audit_encryption_usage(),
"retention_compliance": audit_data_retention(),
"consent_compliance": audit_consent_records(),
"access_log_compliance": audit_access_logging(),
"deletion_compliance": audit_deletion_processes()
}
# Generate audit report
report = generate_audit_report(audit_results)
# Send to compliance officer
send_audit_report(report)
return audit_results
def audit_encryption_usage():
"""Verify all sensitive data is encrypted."""
obj = ObjData(0)
# Check for unencrypted sensitive fields
unencrypted = obj.sql_execute("""
SELECT customer_id, 'phone' as field
FROM customer_data
WHERE phone_encrypted IS NULL AND phone IS NOT NULL
UNION
SELECT customer_id, 'id_number' as field
FROM customer_data
WHERE id_number_encrypted IS NULL AND id_number IS NOT NULL
""")
return {
"compliant": len(unencrypted) == 0,
"unencrypted_count": len(unencrypted),
"details": unencrypted
}
def track_privacy_training(user_code):
"""Track staff privacy training completion."""
obj = ObjData(0)
obj.sql_execute(f"""
INSERT INTO privacy_training_log (
user_code,
training_type,
completed_at,
certificate_issued
) VALUES (
'{user_code}',
'GDPR_POPIA_COMPLIANCE',
NOW(),
'Y'
)
""")
def check_training_current(user_code):
"""Verify staff has current privacy training."""
obj = ObjData(0)
last_training = obj.query_get_value(f"""
SELECT MAX(completed_at)
FROM privacy_training_log
WHERE user_code = '{user_code}'
""")
if not last_training:
return False
# Training must be within last 12 months
months_ago = (datetime.now() - last_training).days / 30
return months_ago < 12
Check legal obligations:
def diagnose_deletion_block(customer_id):
"""Diagnose why data cannot be deleted."""
obj = ObjData(0)
blocks = []
# Check active contracts
contracts = obj.query_get_value(f"""
SELECT COUNT(*) FROM contracts
WHERE customer_id = '{customer_id}'
AND status = 'ACTIVE'
""")
if contracts > 0:
blocks.append(f"Active contracts: {contracts}")
# Check pending transactions
pending = obj.query_get_value(f"""
SELECT COUNT(*) FROM transactions
WHERE customer_id = '{customer_id}'
AND status = 'PENDING'
""")
if pending > 0:
blocks.append(f"Pending transactions: {pending}")
return blocks
Verify consent logging:
def verify_consent_recorded(customer_id, consent_type):
"""Verify consent was properly recorded."""
obj = ObjData(0)
consent = obj.query_identity(f"""
SELECT
{consent_type}_consent,
consent_date,
consent_source
FROM customer_data
WHERE customer_id = '{customer_id}'
""")
history = obj.sql_execute(f"""
SELECT * FROM consent_history
WHERE customer_id = '{customer_id}'
AND consent_type = '{consent_type}'
ORDER BY created_at DESC
LIMIT 1
""")
return {
"current_consent": consent,
"consent_history": history
}