Files
ai/backend.old/src/secrets_manager/store.py
2026-03-11 18:47:11 -04:00

407 lines
13 KiB
Python

"""
Encrypted secrets store with master password protection.
The secrets are stored in an encrypted file, with the encryption key derived
from a master password using Argon2id. The master password can be changed
without re-encrypting all secrets.
"""
import json
import os
import stat
from pathlib import Path
from typing import Dict, Optional, Any
from cryptography.fernet import InvalidToken
from .crypto import (
generate_salt,
derive_key_from_password,
encrypt_data,
decrypt_data,
create_verification_hash,
verify_password,
)
class SecretsStoreError(Exception):
"""Base exception for secrets store errors."""
pass
class SecretsStoreLocked(SecretsStoreError):
"""Raised when trying to access secrets while store is locked."""
pass
class InvalidMasterPassword(SecretsStoreError):
"""Raised when master password is incorrect."""
pass
class SecretsStore:
"""
Encrypted secrets store with master password protection.
Usage:
# Initialize (first time)
store = SecretsStore()
store.initialize("my-secure-password")
# Unlock
store = SecretsStore()
store.unlock("my-secure-password")
# Access secrets
api_key = store.get("ANTHROPIC_API_KEY")
store.set("NEW_SECRET", "secret-value")
# Change master password
store.change_master_password("my-secure-password", "new-password")
# Lock when done
store.lock()
"""
def __init__(self, data_dir: Optional[Path] = None):
"""
Initialize secrets store.
Args:
data_dir: Directory for secrets files (defaults to backend/data)
"""
if data_dir is None:
# Default to backend/data
backend_root = Path(__file__).parent.parent.parent
data_dir = backend_root / "data"
self.data_dir = Path(data_dir)
self.data_dir.mkdir(parents=True, exist_ok=True)
self.master_key_file = self.data_dir / ".master.key"
self.secrets_file = self.data_dir / "secrets.enc"
# Runtime state
self._encryption_key: Optional[bytes] = None
self._secrets: Optional[Dict[str, Any]] = None
@property
def is_initialized(self) -> bool:
"""Check if the secrets store has been initialized."""
return self.master_key_file.exists()
@property
def is_unlocked(self) -> bool:
"""Check if the secrets store is currently unlocked."""
return self._encryption_key is not None
def initialize(self, master_password: str) -> None:
"""
Initialize the secrets store with a master password.
This should only be called once when setting up the store.
Args:
master_password: The master password to protect the secrets
Raises:
SecretsStoreError: If store is already initialized
"""
if self.is_initialized:
raise SecretsStoreError(
"Secrets store is already initialized. "
"Use unlock() to access it or change_master_password() to change the password."
)
# Generate a new random salt
salt = generate_salt()
# Derive encryption key
encryption_key = derive_key_from_password(master_password, salt)
# Create verification hash
verification_hash = create_verification_hash(master_password, salt)
# Store salt and verification hash
master_key_data = {
"salt": salt.hex(),
"verification": verification_hash,
}
self.master_key_file.write_text(json.dumps(master_key_data, indent=2))
# Set restrictive permissions (owner read/write only)
os.chmod(self.master_key_file, stat.S_IRUSR | stat.S_IWUSR)
# Initialize empty secrets
self._encryption_key = encryption_key
self._secrets = {}
self._save_secrets()
print(f"✓ Secrets store initialized at {self.secrets_file}")
def unlock(self, master_password: str) -> None:
"""
Unlock the secrets store with the master password.
Args:
master_password: The master password
Raises:
SecretsStoreError: If store is not initialized
InvalidMasterPassword: If password is incorrect
"""
if not self.is_initialized:
raise SecretsStoreError(
"Secrets store is not initialized. Call initialize() first."
)
# Load salt and verification hash
master_key_data = json.loads(self.master_key_file.read_text())
salt = bytes.fromhex(master_key_data["salt"])
verification_hash = master_key_data["verification"]
# Verify password
if not verify_password(master_password, salt, verification_hash):
raise InvalidMasterPassword("Invalid master password")
# Derive encryption key
encryption_key = derive_key_from_password(master_password, salt)
# Load and decrypt secrets
if self.secrets_file.exists():
try:
encrypted_data = self.secrets_file.read_bytes()
decrypted_data = decrypt_data(encrypted_data, encryption_key)
self._secrets = json.loads(decrypted_data.decode('utf-8'))
except InvalidToken:
raise InvalidMasterPassword("Failed to decrypt secrets (invalid password)")
except json.JSONDecodeError as e:
raise SecretsStoreError(f"Corrupted secrets file: {e}")
else:
# No secrets file yet (fresh initialization)
self._secrets = {}
self._encryption_key = encryption_key
print(f"✓ Secrets store unlocked ({len(self._secrets)} secrets)")
def lock(self) -> None:
"""Lock the secrets store (clear decrypted data from memory)."""
self._encryption_key = None
self._secrets = None
def get(self, key: str, default: Any = None) -> Any:
"""
Get a secret value.
Args:
key: Secret key name
default: Default value if key doesn't exist
Returns:
Secret value or default
Raises:
SecretsStoreLocked: If store is locked
"""
if not self.is_unlocked:
raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.")
return self._secrets.get(key, default)
def set(self, key: str, value: Any) -> None:
"""
Set a secret value.
Args:
key: Secret key name
value: Secret value (must be JSON-serializable)
Raises:
SecretsStoreLocked: If store is locked
"""
if not self.is_unlocked:
raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.")
self._secrets[key] = value
self._save_secrets()
def delete(self, key: str) -> bool:
"""
Delete a secret.
Args:
key: Secret key name
Returns:
True if secret existed and was deleted, False otherwise
Raises:
SecretsStoreLocked: If store is locked
"""
if not self.is_unlocked:
raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.")
if key in self._secrets:
del self._secrets[key]
self._save_secrets()
return True
return False
def list_keys(self) -> list[str]:
"""
List all secret keys.
Returns:
List of secret keys
Raises:
SecretsStoreLocked: If store is locked
"""
if not self.is_unlocked:
raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.")
return list(self._secrets.keys())
def change_master_password(self, current_password: str, new_password: str) -> None:
"""
Change the master password.
This re-encrypts the secrets with a new key derived from the new password.
Args:
current_password: Current master password
new_password: New master password
Raises:
InvalidMasterPassword: If current password is incorrect
"""
# ALWAYS verify current password before changing
# Load salt and verification hash
if not self.is_initialized:
raise SecretsStoreError(
"Secrets store is not initialized. Call initialize() first."
)
master_key_data = json.loads(self.master_key_file.read_text())
salt = bytes.fromhex(master_key_data["salt"])
verification_hash = master_key_data["verification"]
# Verify current password is correct
if not verify_password(current_password, salt, verification_hash):
raise InvalidMasterPassword("Invalid current password")
# Unlock if needed to access secrets
was_unlocked = self.is_unlocked
if not was_unlocked:
# Store is locked, so unlock with current password
# (we already verified it above, so this will succeed)
encryption_key = derive_key_from_password(current_password, salt)
# Load and decrypt secrets
if self.secrets_file.exists():
encrypted_data = self.secrets_file.read_bytes()
decrypted_data = decrypt_data(encrypted_data, encryption_key)
self._secrets = json.loads(decrypted_data.decode('utf-8'))
else:
self._secrets = {}
self._encryption_key = encryption_key
# Generate new salt
new_salt = generate_salt()
# Derive new encryption key
new_encryption_key = derive_key_from_password(new_password, new_salt)
# Create new verification hash
new_verification_hash = create_verification_hash(new_password, new_salt)
# Update master key file
master_key_data = {
"salt": new_salt.hex(),
"verification": new_verification_hash,
}
self.master_key_file.write_text(json.dumps(master_key_data, indent=2))
os.chmod(self.master_key_file, stat.S_IRUSR | stat.S_IWUSR)
# Re-encrypt secrets with new key
old_key = self._encryption_key
self._encryption_key = new_encryption_key
self._save_secrets()
print("✓ Master password changed successfully")
# Lock if it wasn't unlocked before
if not was_unlocked:
self.lock()
def _save_secrets(self) -> None:
"""Save secrets to encrypted file."""
if not self.is_unlocked:
raise SecretsStoreLocked("Cannot save while locked")
# Serialize secrets to JSON
secrets_json = json.dumps(self._secrets, indent=2)
secrets_bytes = secrets_json.encode('utf-8')
# Encrypt
encrypted_data = encrypt_data(secrets_bytes, self._encryption_key)
# Write to file
self.secrets_file.write_bytes(encrypted_data)
# Set restrictive permissions
os.chmod(self.secrets_file, stat.S_IRUSR | stat.S_IWUSR)
def export_encrypted(self, output_path: Path) -> None:
"""
Export encrypted secrets to a file (for backup).
Args:
output_path: Path to export file
Raises:
SecretsStoreError: If secrets file doesn't exist
"""
if not self.secrets_file.exists():
raise SecretsStoreError("No secrets to export")
import shutil
shutil.copy2(self.secrets_file, output_path)
print(f"✓ Encrypted secrets exported to {output_path}")
def import_encrypted(self, input_path: Path, master_password: str) -> None:
"""
Import encrypted secrets from a file.
This will verify the password can decrypt the import before replacing
the current secrets.
Args:
input_path: Path to import file
master_password: Master password for the current store
Raises:
InvalidMasterPassword: If password doesn't work with import
"""
if not self.is_unlocked:
self.unlock(master_password)
# Try to decrypt the imported file with current key
try:
encrypted_data = Path(input_path).read_bytes()
decrypted_data = decrypt_data(encrypted_data, self._encryption_key)
imported_secrets = json.loads(decrypted_data.decode('utf-8'))
except InvalidToken:
raise InvalidMasterPassword(
"Cannot decrypt imported secrets with current master password"
)
except json.JSONDecodeError as e:
raise SecretsStoreError(f"Corrupted import file: {e}")
# Replace secrets
self._secrets = imported_secrets
self._save_secrets()
print(f"✓ Imported {len(self._secrets)} secrets from {input_path}")