This guide provides practical instructions for encrypting sensitive data using ObjEncryption - a robust encryption framework built on RSA public-key cryptography.
ObjEncryption provides comprehensive data encryption capabilities:
data.configdata.config directory for key storage# Generate RSA keys for a package
dev-env/bin/python factory.core/ObjEncryption.py provision factory.core
This creates:
data.config/factory.core.public.pem - Public keydata.config/factory.core.private.pem - Private key# Encrypt text
dev-env/bin/python factory.core/ObjEncryption.py encrypt "my secret password"
# Output: ^encrypted_string_here
# Decrypt text
dev-env/bin/python factory.core/ObjEncryption.py decrypt "^encrypted_string_here"
# Output: my secret password
from ObjData import ObjData
# Initialize
obj = ObjData(0)
# Encrypt
encrypted = obj.encryption_rsa_encode("sensitive data")
print(f"Encrypted: {encrypted}") # Starts with ^
# Decrypt
decrypted = obj.encryption_rsa_decode(encrypted)
print(f"Decrypted: {decrypted}")
from ObjEncryption import ObjEncrypt
# Initialize
encrypt = ObjEncrypt(0)
# Provision keys for package
encrypt.provision(package="factory.core")
# This creates:
# - data.config/factory.core.public.pem
# - data.config/factory.core.private.pem
# - Stores public key in def_package table
# Check for PEM files
ls -la data.config/*.pem
# Expected output:
# factory.core.public.pem
# factory.core.private.pem
import os
from ObjEncryption import ObjEncrypt
def rotate_encryption_keys(package):
"""Rotate encryption keys for a package."""
# Step 1: Backup existing keys
os.rename(
f"data.config/{package}.private.pem",
f"data.config/{package}.private.pem.backup"
)
os.rename(
f"data.config/{package}.public.pem",
f"data.config/{package}.public.pem.backup"
)
# Step 2: Generate new keys
encrypt = ObjEncrypt(0)
encrypt.provision(package=package)
# Step 3: Re-encrypt all encrypted data
# (Implementation depends on your data structure)
reencrypt_database_fields(package)
print(f"Keys rotated for {package}")
# Use with caution!
# rotate_encryption_keys("factory.core")
In config.yaml:
datastore:
encryption:
sys_user:
crypt_cols: ["password", "api_key", "secret_token"]
key_cols: ["user_code"]
def_remote:
crypt_cols: ["password", "connection_string"]
key_cols: ["remote_code", "package"]
customer_data:
crypt_cols: ["credit_card_number", "ssn", "bank_account"]
key_cols: ["customer_id"]
from ObjEncryption import ObjEncrypt
# Initialize
encrypt = ObjEncrypt(0)
# Encrypt specific table
encrypt.encode_table(
table="sys_user",
crypt_cols=["password", "api_key"],
key_cols=["user_code"]
)
# CLI command
dev-env/bin/python factory.core/ObjEncryption.py encode-tables
# Or programmatically
from ObjEncryption import ObjEncrypt
encrypt = ObjEncrypt(0)
encrypt.encode_tables(package="factory.core")
from ObjData import ObjData
def create_user(username, password, email):
"""Create user with encrypted password."""
obj = ObjData(0)
# Encrypt password
encrypted_password = obj.encryption_rsa_encode(password)
# Store in database
obj.sql_execute(f"""
INSERT INTO sys_user (
user_code,
login_name,
email,
password
) VALUES (
'{username}',
'{username}',
'{email}',
'{encrypted_password}'
)
""")
return True
# Use
create_user("john", "MySecretPass123!", "john@example.com")
def store_api_credentials(service_name, api_key, api_secret):
"""Store encrypted API credentials."""
obj = ObjData(0)
# Encrypt sensitive data
encrypted_key = obj.encryption_rsa_encode(api_key)
encrypted_secret = obj.encryption_rsa_encode(api_secret)
obj.sql_execute(f"""
INSERT INTO api_credentials (
service_name,
api_key,
api_secret,
created_at
) VALUES (
'{service_name}',
'{encrypted_key}',
'{encrypted_secret}',
NOW()
)
""")
# Use
store_api_credentials(
"stripe",
"pk_live_ABC123",
"sk_live_XYZ789"
)
# Retrieve and decrypt
def get_api_credentials(service_name):
"""Get and decrypt API credentials."""
obj = ObjData(0)
result = obj.query_identity(f"""
SELECT api_key, api_secret
FROM api_credentials
WHERE service_name = '{service_name}'
""")
if result:
return {
"api_key": obj.encryption_rsa_decode(result['api_key']),
"api_secret": obj.encryption_rsa_decode(result['api_secret'])
}
return None
# Use
credentials = get_api_credentials("stripe")
print(f"API Key: {credentials['api_key']}")
def store_remote_connection(remote_code, host, user, password, database):
"""Store encrypted database connection."""
obj = ObjData(0)
# Encrypt password
encrypted_password = obj.encryption_rsa_encode(password)
# Build encrypted connection string
connection_string = f"mysql://{user}:{password}@{host}/{database}"
encrypted_connection = obj.encryption_rsa_encode(connection_string)
obj.sql_execute(f"""
INSERT INTO def_remote (
remote_code,
package,
host,
username,
password,
database,
connection_string
) VALUES (
'{remote_code}',
'factory.core',
'{host}',
'{user}',
'{encrypted_password}',
'{database}',
'{encrypted_connection}'
)
""")
def store_payment_method(customer_id, card_number, cvv, expiry):
"""Store encrypted credit card information."""
obj = ObjData(0)
# Encrypt PCI data
encrypted_card = obj.encryption_rsa_encode(card_number)
encrypted_cvv = obj.encryption_rsa_encode(cvv)
# Store with timestamp
obj.sql_execute(f"""
INSERT INTO payment_methods (
customer_id,
card_number_encrypted,
cvv_encrypted,
expiry_date,
created_at
) VALUES (
'{customer_id}',
'{encrypted_card}',
'{encrypted_cvv}',
'{expiry}',
NOW()
)
""")
# IMPORTANT: Follow PCI DSS compliance requirements!
from ObjData import ObjData
# Encrypt sensitive config values
obj = ObjData(0)
sensitive_values = {
"database_password": "MyDBPass123!",
"smtp_password": "EmailPass456!",
"api_secret": "ApiSecret789!"
}
encrypted_config = {}
for key, value in sensitive_values.items():
encrypted_config[key] = obj.encryption_rsa_encode(value)
# Store encrypted values in config.yaml
# Then decrypt when needed
# config.yaml
database:
password: "^encrypted_password_here"
smtp:
password: "^encrypted_smtp_password_here"
def get_decrypted_config(config_key):
"""Get and decrypt configuration value."""
obj = ObjData(0)
# Read from config
encrypted_value = obj.get_ini_value("database", "password")
# Decrypt if starts with ^
if encrypted_value and encrypted_value.startswith("^"):
return obj.encryption_rsa_decode(encrypted_value)
return encrypted_value
# Ensure PEM files have restricted permissions
chmod 600 data.config/*.private.pem
chmod 644 data.config/*.public.pem
# Verify permissions
ls -la data.config/*.pem
# Should show: -rw------- for private keys
# -rw-r--r-- for public keys
import logging
def safe_decrypt(encrypted_value):
"""Decrypt value without logging plaintext."""
obj = ObjData(0)
try:
decrypted = obj.encryption_rsa_decode(encrypted_value)
# DO NOT: logging.info(f"Decrypted: {decrypted}")
logging.info("Value decrypted successfully")
return decrypted
except Exception as e:
logging.error(f"Decryption failed: {type(e).__name__}")
return None
def is_encrypted(value):
"""Check if value is encrypted."""
return isinstance(value, str) and value.startswith("^")
def ensure_encrypted(value):
"""Ensure value is encrypted."""
if not is_encrypted(value):
obj = ObjData(0)
return obj.encryption_rsa_encode(value)
return value
import getpass
def change_user_password(user_code):
"""Change user password securely."""
# Get password without echoing to terminal
new_password = getpass.getpass("Enter new password: ")
confirm_password = getpass.getpass("Confirm password: ")
if new_password != confirm_password:
print("Passwords don't match!")
return False
# Encrypt and store
obj = ObjData(0)
encrypted_password = obj.encryption_rsa_encode(new_password)
obj.sql_execute(f"""
UPDATE sys_user
SET password = '{encrypted_password}',
password_changed_at = NOW()
WHERE user_code = '{user_code}'
""")
# Clear variables
new_password = None
confirm_password = None
return True
# Create secure backup of private keys
mkdir -p ~/encrypted_key_backup
cp data.config/*.private.pem ~/encrypted_key_backup/
chmod 700 ~/encrypted_key_backup
chmod 600 ~/encrypted_key_backup/*.pem
# Or backup to encrypted archive
tar -czf keys_backup.tar.gz data.config/*.pem
gpg -c keys_backup.tar.gz # Encrypt with passphrase
rm keys_backup.tar.gz # Remove unencrypted archive
def encrypt_customer_data_batch():
"""Efficiently encrypt customer data in batches."""
from ObjData import ObjData
obj = ObjData(0)
# Query unencrypted records
records = obj.sql_execute("""
SELECT customer_id, credit_card, ssn
FROM customers
WHERE credit_card NOT LIKE '^%' -- Not yet encrypted
LIMIT 1000
""")
# Encrypt in batch
update_statements = []
for record in records:
encrypted_cc = obj.encryption_rsa_encode(record['credit_card'])
encrypted_ssn = obj.encryption_rsa_encode(record['ssn'])
update_statements.append(f"""
UPDATE customers
SET credit_card = '{encrypted_cc}',
ssn = '{encrypted_ssn}'
WHERE customer_id = '{record['customer_id']}'
""")
# Execute updates
for statement in update_statements:
obj.sql_execute(statement)
print(f"Encrypted {len(records)} customer records")
class EncryptionCache:
"""Cache encrypted values to reduce encryption overhead."""
def __init__(self):
self.cache = {}
self.obj = ObjData(0)
def encrypt(self, plaintext):
"""Encrypt with caching."""
if plaintext not in self.cache:
self.cache[plaintext] = self.obj.encryption_rsa_encode(plaintext)
return self.cache[plaintext]
def clear(self):
"""Clear cache."""
self.cache.clear()
# Use cache for repeated values
cache = EncryptionCache()
encrypted1 = cache.encrypt("password123")
encrypted2 = cache.encrypt("password123") # Returns cached value
Check key files exist:
ls -la data.config/*.pem
Generate keys:
dev-env/bin/python factory.core/ObjEncryption.py provision factory.core
Verify encrypted string format:
def validate_encrypted_format(encrypted_value):
"""Validate encrypted string format."""
if not isinstance(encrypted_value, str):
return False, "Not a string"
if not encrypted_value.startswith("^"):
return False, "Missing ^ prefix"
if len(encrypted_value) < 10:
return False, "Too short to be valid"
return True, "Valid format"
# Use
valid, message = validate_encrypted_format(encrypted_string)
if not valid:
print(f"Invalid encrypted format: {message}")
Fix permissions:
# Owner read/write for private keys
chmod 600 data.config/*.private.pem
# Owner read/write, others read for public keys
chmod 644 data.config/*.public.pem
# Verify
ls -la data.config/*.pem
Verify key pair matches:
def verify_key_pair(package):
"""Verify public and private keys match."""
obj = ObjData(0)
# Test encryption/decryption
test_string = "key_verification_test"
try:
encrypted = obj.encryption_rsa_encode(test_string)
decrypted = obj.encryption_rsa_decode(encrypted)
if decrypted == test_string:
print(f"✓ Keys valid for {package}")
return True
else:
print(f"✗ Keys mismatch for {package}")
return False
except Exception as e:
print(f"✗ Key verification failed: {e}")
return False
# Use
verify_key_pair("factory.core")
from ObjData import ObjData
import hashlib
import getpass
class SecureUserAuth:
def __init__(self):
self.obj = ObjData(0)
def create_user(self, username, email):
"""Create user with encrypted password."""
# Get password securely
password = getpass.getpass("Enter password: ")
confirm = getpass.getpass("Confirm password: ")
if password != confirm:
return False, "Passwords don't match"
# Validate password strength
if len(password) < 8:
return False, "Password too short"
# Hash password for comparison
password_hash = hashlib.sha256(password.encode()).hexdigest()
# Encrypt password for storage
encrypted_password = self.obj.encryption_rsa_encode(password)
# Store user
self.obj.sql_execute(f"""
INSERT INTO sys_user (
user_code,
login_name,
email,
password,
password_hash,
created_at,
status
) VALUES (
'{username}',
'{username}',
'{email}',
'{encrypted_password}',
'{password_hash}',
NOW(),
'active'
)
""")
# Clear sensitive data
password = None
confirm = None
return True, f"User {username} created"
def authenticate(self, username, password):
"""Authenticate user."""
# Hash input password
password_hash = hashlib.sha256(password.encode()).hexdigest()
# Query user
user = self.obj.query_identity(f"""
SELECT user_code, password_hash, status
FROM sys_user
WHERE login_name = '{username}'
""")
if not user:
return False, "Invalid credentials"
# Check status
if user['status'] != 'active':
return False, "Account disabled"
# Verify password hash
if user['password_hash'] == password_hash:
return True, f"Welcome {username}"
return False, "Invalid credentials"
# Use secure authentication
auth = SecureUserAuth()
# Create user
success, message = auth.create_user("john", "john@example.com")
print(message)
# Authenticate user
password = getpass.getpass("Enter password: ")
success, message = auth.authenticate("john", password)
print(message)