- Implement encrypted secrets storage with AES-128 encryption - Add secret rotation capabilities with scheduling - Implement comprehensive audit logging for all secret operations - Create centralized configuration management system - Add CLI tool for interactive secret management - Integrate secrets with Flask configuration - Support environment-specific configurations - Add integrity verification for stored secrets - Implement secure key derivation with PBKDF2 Features: - Encrypted storage in .secrets.json - Master key protection with file permissions - Automatic secret rotation scheduling - Audit trail for compliance - Migration from environment variables - Flask CLI integration - Validation and sanitization Security improvements: - No more hardcoded secrets in configuration - Encrypted storage at rest - Secure key management - Access control via authentication - Comprehensive audit logging - Integrity verification CLI commands: - manage_secrets.py init - Initialize secrets - manage_secrets.py set/get/delete - Manage secrets - manage_secrets.py rotate - Rotate secrets - manage_secrets.py audit - View audit logs - manage_secrets.py verify - Check integrity 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
271 lines
8.4 KiB
Python
Executable File
271 lines
8.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Secret management CLI tool for Talk2Me
|
|
|
|
Usage:
|
|
python manage_secrets.py list
|
|
python manage_secrets.py get <key>
|
|
python manage_secrets.py set <key> <value>
|
|
python manage_secrets.py rotate <key>
|
|
python manage_secrets.py delete <key>
|
|
python manage_secrets.py check-rotation
|
|
python manage_secrets.py verify
|
|
python manage_secrets.py migrate
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import click
|
|
import getpass
|
|
from datetime import datetime
|
|
from secrets_manager import get_secrets_manager, SecretsManager
|
|
import json
|
|
|
|
# Initialize secrets manager
|
|
manager = get_secrets_manager()
|
|
|
|
@click.group()
|
|
def cli():
|
|
"""Talk2Me Secrets Management Tool"""
|
|
pass
|
|
|
|
@cli.command()
|
|
def list():
|
|
"""List all secrets (without values)"""
|
|
secrets = manager.list_secrets()
|
|
|
|
if not secrets:
|
|
click.echo("No secrets found.")
|
|
return
|
|
|
|
click.echo(f"\nFound {len(secrets)} secrets:\n")
|
|
|
|
# Format as table
|
|
click.echo(f"{'Key':<30} {'Created':<20} {'Last Rotated':<20} {'Has Value'}")
|
|
click.echo("-" * 90)
|
|
|
|
for secret in secrets:
|
|
created = secret['created'][:10] if secret['created'] else 'Unknown'
|
|
rotated = secret['rotated'][:10] if secret['rotated'] else 'Never'
|
|
has_value = '✓' if secret['has_value'] else '✗'
|
|
|
|
click.echo(f"{secret['key']:<30} {created:<20} {rotated:<20} {has_value}")
|
|
|
|
@cli.command()
|
|
@click.argument('key')
|
|
def get(key):
|
|
"""Get a secret value (requires confirmation)"""
|
|
if not click.confirm(f"Are you sure you want to display the value of '{key}'?"):
|
|
return
|
|
|
|
value = manager.get(key)
|
|
|
|
if value is None:
|
|
click.echo(f"Secret '{key}' not found.")
|
|
else:
|
|
click.echo(f"\nSecret '{key}':")
|
|
click.echo(f"Value: {value}")
|
|
|
|
# Show metadata
|
|
secrets = manager.list_secrets()
|
|
for secret in secrets:
|
|
if secret['key'] == key:
|
|
if secret.get('metadata'):
|
|
click.echo(f"Metadata: {json.dumps(secret['metadata'], indent=2)}")
|
|
break
|
|
|
|
@cli.command()
|
|
@click.argument('key')
|
|
@click.option('--value', help='Secret value (will prompt if not provided)')
|
|
@click.option('--metadata', help='JSON metadata')
|
|
def set(key, value, metadata):
|
|
"""Set a secret value"""
|
|
if not value:
|
|
value = getpass.getpass(f"Enter value for '{key}': ")
|
|
confirm = getpass.getpass(f"Confirm value for '{key}': ")
|
|
|
|
if value != confirm:
|
|
click.echo("Values do not match. Aborted.")
|
|
return
|
|
|
|
# Parse metadata if provided
|
|
metadata_dict = None
|
|
if metadata:
|
|
try:
|
|
metadata_dict = json.loads(metadata)
|
|
except json.JSONDecodeError:
|
|
click.echo("Invalid JSON metadata")
|
|
return
|
|
|
|
# Validate the secret if validator exists
|
|
if not manager.validate(key, value):
|
|
click.echo(f"Validation failed for '{key}'")
|
|
return
|
|
|
|
manager.set(key, value, metadata_dict, user='cli')
|
|
click.echo(f"Secret '{key}' set successfully.")
|
|
|
|
@cli.command()
|
|
@click.argument('key')
|
|
@click.option('--new-value', help='New secret value (will auto-generate if not provided)')
|
|
def rotate(key):
|
|
"""Rotate a secret"""
|
|
try:
|
|
if not click.confirm(f"Are you sure you want to rotate '{key}'?"):
|
|
return
|
|
|
|
old_value, new_value = manager.rotate(key, new_value, user='cli')
|
|
|
|
click.echo(f"\nSecret '{key}' rotated successfully.")
|
|
click.echo(f"New value: {new_value}")
|
|
|
|
if click.confirm("Do you want to see the old value?"):
|
|
click.echo(f"Old value: {old_value}")
|
|
|
|
except KeyError:
|
|
click.echo(f"Secret '{key}' not found.")
|
|
except ValueError as e:
|
|
click.echo(f"Error: {e}")
|
|
|
|
@cli.command()
|
|
@click.argument('key')
|
|
def delete(key):
|
|
"""Delete a secret"""
|
|
if not click.confirm(f"Are you sure you want to delete '{key}'? This cannot be undone."):
|
|
return
|
|
|
|
if manager.delete(key, user='cli'):
|
|
click.echo(f"Secret '{key}' deleted successfully.")
|
|
else:
|
|
click.echo(f"Secret '{key}' not found.")
|
|
|
|
@cli.command()
|
|
def check_rotation():
|
|
"""Check which secrets need rotation"""
|
|
needs_rotation = manager.check_rotation_needed()
|
|
|
|
if not needs_rotation:
|
|
click.echo("No secrets need rotation.")
|
|
return
|
|
|
|
click.echo(f"\n{len(needs_rotation)} secrets need rotation:")
|
|
for key in needs_rotation:
|
|
click.echo(f" - {key}")
|
|
|
|
if click.confirm("\nDo you want to rotate all of them now?"):
|
|
for key in needs_rotation:
|
|
try:
|
|
old_value, new_value = manager.rotate(key, user='cli')
|
|
click.echo(f"✓ Rotated {key}")
|
|
except Exception as e:
|
|
click.echo(f"✗ Failed to rotate {key}: {e}")
|
|
|
|
@cli.command()
|
|
def verify():
|
|
"""Verify integrity of all secrets"""
|
|
click.echo("Verifying secrets integrity...")
|
|
|
|
if manager.verify_integrity():
|
|
click.echo("✓ All secrets passed integrity check")
|
|
else:
|
|
click.echo("✗ Integrity check failed!")
|
|
click.echo("Some secrets may be corrupted. Check logs for details.")
|
|
|
|
@cli.command()
|
|
def migrate():
|
|
"""Migrate secrets from environment variables"""
|
|
click.echo("Migrating secrets from environment variables...")
|
|
|
|
# List of known secrets to migrate
|
|
secrets_to_migrate = [
|
|
('TTS_API_KEY', 'TTS API Key'),
|
|
('SECRET_KEY', 'Flask Secret Key'),
|
|
('ADMIN_TOKEN', 'Admin Token'),
|
|
('DATABASE_URL', 'Database URL'),
|
|
('REDIS_URL', 'Redis URL'),
|
|
]
|
|
|
|
migrated = 0
|
|
|
|
for env_key, description in secrets_to_migrate:
|
|
value = os.environ.get(env_key)
|
|
if value and value != manager.get(env_key):
|
|
if click.confirm(f"Migrate {description} from environment?"):
|
|
manager.set(env_key, value, {'migrated_from': 'environment'}, user='migration')
|
|
click.echo(f"✓ Migrated {env_key}")
|
|
migrated += 1
|
|
|
|
click.echo(f"\nMigrated {migrated} secrets.")
|
|
|
|
@cli.command()
|
|
@click.argument('key')
|
|
@click.argument('days', type=int)
|
|
def schedule_rotation(key, days):
|
|
"""Schedule automatic rotation for a secret"""
|
|
manager.schedule_rotation(key, days)
|
|
click.echo(f"Scheduled rotation for '{key}' every {days} days.")
|
|
|
|
@cli.command()
|
|
@click.argument('key', required=False)
|
|
@click.option('--limit', default=20, help='Number of entries to show')
|
|
def audit(key, limit):
|
|
"""Show audit log"""
|
|
logs = manager.get_audit_log(key, limit)
|
|
|
|
if not logs:
|
|
click.echo("No audit log entries found.")
|
|
return
|
|
|
|
click.echo(f"\nShowing last {len(logs)} audit log entries:\n")
|
|
|
|
for entry in logs:
|
|
timestamp = entry['timestamp'][:19] # Trim microseconds
|
|
action = entry['action'].ljust(15)
|
|
key_str = entry['key'].ljust(20)
|
|
user = entry['user']
|
|
|
|
click.echo(f"{timestamp} | {action} | {key_str} | {user}")
|
|
|
|
if entry.get('details'):
|
|
click.echo(f"{'':>20} Details: {json.dumps(entry['details'])}")
|
|
|
|
@cli.command()
|
|
def init():
|
|
"""Initialize secrets configuration"""
|
|
click.echo("Initializing Talk2Me secrets configuration...")
|
|
|
|
# Check if already initialized
|
|
if os.path.exists('.secrets.json'):
|
|
if not click.confirm(".secrets.json already exists. Overwrite?"):
|
|
return
|
|
|
|
# Generate initial secrets
|
|
import secrets as py_secrets
|
|
|
|
initial_secrets = {
|
|
'FLASK_SECRET_KEY': py_secrets.token_hex(32),
|
|
'ADMIN_TOKEN': py_secrets.token_urlsafe(32),
|
|
}
|
|
|
|
click.echo("\nGenerating initial secrets...")
|
|
|
|
for key, value in initial_secrets.items():
|
|
manager.set(key, value, {'generated': True}, user='init')
|
|
click.echo(f"✓ Generated {key}")
|
|
|
|
# Prompt for required secrets
|
|
click.echo("\nPlease provide the following secrets:")
|
|
|
|
tts_api_key = getpass.getpass("TTS API Key (press Enter to skip): ")
|
|
if tts_api_key:
|
|
manager.set('TTS_API_KEY', tts_api_key, user='init')
|
|
click.echo("✓ Set TTS_API_KEY")
|
|
|
|
click.echo("\nSecrets initialized successfully!")
|
|
click.echo("\nIMPORTANT:")
|
|
click.echo("1. Keep .secrets.json secure and never commit it to version control")
|
|
click.echo("2. Back up your master key from .master_key")
|
|
click.echo("3. Set appropriate file permissions (owner read/write only)")
|
|
|
|
if __name__ == '__main__':
|
|
cli() |