""" Encrypted secrets store with master password protection. The secrets are stored in an encrypted file, with the encryption key derived from a master password using Argon2id. The master password can be changed without re-encrypting all secrets. """ import json import os import stat from pathlib import Path from typing import Dict, Optional, Any from cryptography.fernet import InvalidToken from .crypto import ( generate_salt, derive_key_from_password, encrypt_data, decrypt_data, create_verification_hash, verify_password, ) class SecretsStoreError(Exception): """Base exception for secrets store errors.""" pass class SecretsStoreLocked(SecretsStoreError): """Raised when trying to access secrets while store is locked.""" pass class InvalidMasterPassword(SecretsStoreError): """Raised when master password is incorrect.""" pass class SecretsStore: """ Encrypted secrets store with master password protection. Usage: # Initialize (first time) store = SecretsStore() store.initialize("my-secure-password") # Unlock store = SecretsStore() store.unlock("my-secure-password") # Access secrets api_key = store.get("ANTHROPIC_API_KEY") store.set("NEW_SECRET", "secret-value") # Change master password store.change_master_password("my-secure-password", "new-password") # Lock when done store.lock() """ def __init__(self, data_dir: Optional[Path] = None): """ Initialize secrets store. Args: data_dir: Directory for secrets files (defaults to backend/data) """ if data_dir is None: # Default to backend/data backend_root = Path(__file__).parent.parent.parent data_dir = backend_root / "data" self.data_dir = Path(data_dir) self.data_dir.mkdir(parents=True, exist_ok=True) self.master_key_file = self.data_dir / ".master.key" self.secrets_file = self.data_dir / "secrets.enc" # Runtime state self._encryption_key: Optional[bytes] = None self._secrets: Optional[Dict[str, Any]] = None @property def is_initialized(self) -> bool: """Check if the secrets store has been initialized.""" return self.master_key_file.exists() @property def is_unlocked(self) -> bool: """Check if the secrets store is currently unlocked.""" return self._encryption_key is not None def initialize(self, master_password: str) -> None: """ Initialize the secrets store with a master password. This should only be called once when setting up the store. Args: master_password: The master password to protect the secrets Raises: SecretsStoreError: If store is already initialized """ if self.is_initialized: raise SecretsStoreError( "Secrets store is already initialized. " "Use unlock() to access it or change_master_password() to change the password." ) # Generate a new random salt salt = generate_salt() # Derive encryption key encryption_key = derive_key_from_password(master_password, salt) # Create verification hash verification_hash = create_verification_hash(master_password, salt) # Store salt and verification hash master_key_data = { "salt": salt.hex(), "verification": verification_hash, } self.master_key_file.write_text(json.dumps(master_key_data, indent=2)) # Set restrictive permissions (owner read/write only) os.chmod(self.master_key_file, stat.S_IRUSR | stat.S_IWUSR) # Initialize empty secrets self._encryption_key = encryption_key self._secrets = {} self._save_secrets() print(f"✓ Secrets store initialized at {self.secrets_file}") def unlock(self, master_password: str) -> None: """ Unlock the secrets store with the master password. Args: master_password: The master password Raises: SecretsStoreError: If store is not initialized InvalidMasterPassword: If password is incorrect """ if not self.is_initialized: raise SecretsStoreError( "Secrets store is not initialized. Call initialize() first." ) # Load salt and verification hash master_key_data = json.loads(self.master_key_file.read_text()) salt = bytes.fromhex(master_key_data["salt"]) verification_hash = master_key_data["verification"] # Verify password if not verify_password(master_password, salt, verification_hash): raise InvalidMasterPassword("Invalid master password") # Derive encryption key encryption_key = derive_key_from_password(master_password, salt) # Load and decrypt secrets if self.secrets_file.exists(): try: encrypted_data = self.secrets_file.read_bytes() decrypted_data = decrypt_data(encrypted_data, encryption_key) self._secrets = json.loads(decrypted_data.decode('utf-8')) except InvalidToken: raise InvalidMasterPassword("Failed to decrypt secrets (invalid password)") except json.JSONDecodeError as e: raise SecretsStoreError(f"Corrupted secrets file: {e}") else: # No secrets file yet (fresh initialization) self._secrets = {} self._encryption_key = encryption_key print(f"✓ Secrets store unlocked ({len(self._secrets)} secrets)") def lock(self) -> None: """Lock the secrets store (clear decrypted data from memory).""" self._encryption_key = None self._secrets = None def get(self, key: str, default: Any = None) -> Any: """ Get a secret value. Args: key: Secret key name default: Default value if key doesn't exist Returns: Secret value or default Raises: SecretsStoreLocked: If store is locked """ if not self.is_unlocked: raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.") return self._secrets.get(key, default) def set(self, key: str, value: Any) -> None: """ Set a secret value. Args: key: Secret key name value: Secret value (must be JSON-serializable) Raises: SecretsStoreLocked: If store is locked """ if not self.is_unlocked: raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.") self._secrets[key] = value self._save_secrets() def delete(self, key: str) -> bool: """ Delete a secret. Args: key: Secret key name Returns: True if secret existed and was deleted, False otherwise Raises: SecretsStoreLocked: If store is locked """ if not self.is_unlocked: raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.") if key in self._secrets: del self._secrets[key] self._save_secrets() return True return False def list_keys(self) -> list[str]: """ List all secret keys. Returns: List of secret keys Raises: SecretsStoreLocked: If store is locked """ if not self.is_unlocked: raise SecretsStoreLocked("Secrets store is locked. Call unlock() first.") return list(self._secrets.keys()) def change_master_password(self, current_password: str, new_password: str) -> None: """ Change the master password. This re-encrypts the secrets with a new key derived from the new password. Args: current_password: Current master password new_password: New master password Raises: InvalidMasterPassword: If current password is incorrect """ # ALWAYS verify current password before changing # Load salt and verification hash if not self.is_initialized: raise SecretsStoreError( "Secrets store is not initialized. Call initialize() first." ) master_key_data = json.loads(self.master_key_file.read_text()) salt = bytes.fromhex(master_key_data["salt"]) verification_hash = master_key_data["verification"] # Verify current password is correct if not verify_password(current_password, salt, verification_hash): raise InvalidMasterPassword("Invalid current password") # Unlock if needed to access secrets was_unlocked = self.is_unlocked if not was_unlocked: # Store is locked, so unlock with current password # (we already verified it above, so this will succeed) encryption_key = derive_key_from_password(current_password, salt) # Load and decrypt secrets if self.secrets_file.exists(): encrypted_data = self.secrets_file.read_bytes() decrypted_data = decrypt_data(encrypted_data, encryption_key) self._secrets = json.loads(decrypted_data.decode('utf-8')) else: self._secrets = {} self._encryption_key = encryption_key # Generate new salt new_salt = generate_salt() # Derive new encryption key new_encryption_key = derive_key_from_password(new_password, new_salt) # Create new verification hash new_verification_hash = create_verification_hash(new_password, new_salt) # Update master key file master_key_data = { "salt": new_salt.hex(), "verification": new_verification_hash, } self.master_key_file.write_text(json.dumps(master_key_data, indent=2)) os.chmod(self.master_key_file, stat.S_IRUSR | stat.S_IWUSR) # Re-encrypt secrets with new key old_key = self._encryption_key self._encryption_key = new_encryption_key self._save_secrets() print("✓ Master password changed successfully") # Lock if it wasn't unlocked before if not was_unlocked: self.lock() def _save_secrets(self) -> None: """Save secrets to encrypted file.""" if not self.is_unlocked: raise SecretsStoreLocked("Cannot save while locked") # Serialize secrets to JSON secrets_json = json.dumps(self._secrets, indent=2) secrets_bytes = secrets_json.encode('utf-8') # Encrypt encrypted_data = encrypt_data(secrets_bytes, self._encryption_key) # Write to file self.secrets_file.write_bytes(encrypted_data) # Set restrictive permissions os.chmod(self.secrets_file, stat.S_IRUSR | stat.S_IWUSR) def export_encrypted(self, output_path: Path) -> None: """ Export encrypted secrets to a file (for backup). Args: output_path: Path to export file Raises: SecretsStoreError: If secrets file doesn't exist """ if not self.secrets_file.exists(): raise SecretsStoreError("No secrets to export") import shutil shutil.copy2(self.secrets_file, output_path) print(f"✓ Encrypted secrets exported to {output_path}") def import_encrypted(self, input_path: Path, master_password: str) -> None: """ Import encrypted secrets from a file. This will verify the password can decrypt the import before replacing the current secrets. Args: input_path: Path to import file master_password: Master password for the current store Raises: InvalidMasterPassword: If password doesn't work with import """ if not self.is_unlocked: self.unlock(master_password) # Try to decrypt the imported file with current key try: encrypted_data = Path(input_path).read_bytes() decrypted_data = decrypt_data(encrypted_data, self._encryption_key) imported_secrets = json.loads(decrypted_data.decode('utf-8')) except InvalidToken: raise InvalidMasterPassword( "Cannot decrypt imported secrets with current master password" ) except json.JSONDecodeError as e: raise SecretsStoreError(f"Corrupted import file: {e}") # Replace secrets self._secrets = imported_secrets self._save_secrets() print(f"✓ Imported {len(self._secrets)} secrets from {input_path}")