feat: machine binding and version enforcement
migration 017 adds machine_id to agents table middleware validates X-Machine-ID header on authed routes agent client sends machine ID with requests MIN_AGENT_VERSION config defaults 0.1.22 version utils added for comparison blocks config copying attacks via hardware fingerprint old agents get 426 upgrade required breaking: <0.1.22 agents rejected
This commit is contained in:
153
discord/env_manager.py
Executable file
153
discord/env_manager.py
Executable file
@@ -0,0 +1,153 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Secure Discord Environment Manager
|
||||
Handles loading Discord configuration from .env without exposing secrets
|
||||
"""
|
||||
|
||||
import os
|
||||
import logging
|
||||
from typing import Optional, Dict, Any
|
||||
from dotenv import load_dotenv
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DiscordEnvManager:
|
||||
"""Secure environment manager for Discord configuration"""
|
||||
|
||||
def __init__(self, env_file: str = ".env"):
|
||||
self.env_file = env_file
|
||||
self._config = {}
|
||||
self._load_config()
|
||||
|
||||
def _load_config(self):
|
||||
"""Load configuration from .env file"""
|
||||
try:
|
||||
load_dotenv(self.env_file)
|
||||
self._config = {
|
||||
'DISCORD_BOT_TOKEN': os.getenv('DISCORD_BOT_TOKEN'),
|
||||
'DISCORD_SERVER_ID': os.getenv('DISCORD_SERVER_ID'),
|
||||
'DISCORD_APPLICATION_ID': os.getenv('DISCORD_APPLICATION_ID'),
|
||||
'DISCORD_PUBLIC_KEY': os.getenv('DISCORD_PUBLIC_KEY'),
|
||||
'SERVER_NAME': os.getenv('SERVER_NAME', 'RedFlag Security'),
|
||||
'ADMIN_ROLE_ID': os.getenv('ADMIN_ROLE_ID'),
|
||||
'GENERAL_CHANNEL_ID': os.getenv('GENERAL_CHANNEL_ID'),
|
||||
'ANNOUNCEMENTS_CHANNEL_ID': os.getenv('ANNOUNCEMENTS_CHANNEL_ID'),
|
||||
'SECURITY_ALERTS_CHANNEL_ID': os.getenv('SECURITY_ALERTS_CHANNEL_ID'),
|
||||
'DEV_CHAT_CHANNEL_ID': os.getenv('DEV_CHAT_CHANNEL_ID'),
|
||||
'BUG_REPORTS_CHANNEL_ID': os.getenv('BUG_REPORTS_CHANNEL_ID'),
|
||||
'COMMUNITY_CATEGORY_ID': os.getenv('COMMUNITY_CATEGORY_ID'),
|
||||
'DEVELOPMENT_CATEGORY_ID': os.getenv('DEVELOPMENT_CATEGORY_ID'),
|
||||
'SECURITY_CATEGORY_ID': os.getenv('SECURITY_CATEGORY_ID'),
|
||||
}
|
||||
|
||||
# Validate required fields
|
||||
required_fields = ['DISCORD_BOT_TOKEN', 'DISCORD_SERVER_ID']
|
||||
missing = [field for field in required_fields if not self._config.get(field)]
|
||||
if missing:
|
||||
logger.error(f"Missing required environment variables: {missing}")
|
||||
raise ValueError(f"Missing required fields: {missing}")
|
||||
|
||||
logger.info("✅ Discord configuration loaded successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load Discord configuration: {e}")
|
||||
raise
|
||||
|
||||
def get(self, key: str, default: Any = None) -> Optional[str]:
|
||||
"""Get configuration value"""
|
||||
return self._config.get(key, default)
|
||||
|
||||
def get_required(self, key: str) -> str:
|
||||
"""Get required configuration value"""
|
||||
value = self._config.get(key)
|
||||
if not value:
|
||||
raise ValueError(f"Required environment variable {key} is not set")
|
||||
return value
|
||||
|
||||
def update_channel_ids(self, channel_name: str, channel_id: str):
|
||||
"""Update channel ID in config"""
|
||||
channel_key = f"{channel_name.upper()}_CHANNEL_ID"
|
||||
self._config[channel_key] = channel_id
|
||||
|
||||
# Also update in file
|
||||
self._update_env_file(channel_key, channel_id)
|
||||
|
||||
def update_category_ids(self, category_name: str, category_id: str):
|
||||
"""Update category ID in config"""
|
||||
category_key = f"{category_name.upper()}_CATEGORY_ID"
|
||||
self._config[category_key] = category_id
|
||||
|
||||
# Also update in file
|
||||
self._update_env_file(category_key, category_id)
|
||||
|
||||
def _update_env_file(self, key: str, value: str):
|
||||
"""Update .env file with new value"""
|
||||
try:
|
||||
env_path = os.path.join(os.path.dirname(__file__), self.env_file)
|
||||
|
||||
# Read current file
|
||||
if os.path.exists(env_path):
|
||||
with open(env_path, 'r') as f:
|
||||
lines = f.readlines()
|
||||
else:
|
||||
lines = []
|
||||
|
||||
# Update or add the line
|
||||
updated = False
|
||||
for i, line in enumerate(lines):
|
||||
if line.startswith(f"{key}="):
|
||||
lines[i] = f"{key}={value}\n"
|
||||
updated = True
|
||||
break
|
||||
|
||||
if not updated:
|
||||
lines.append(f"{key}={value}\n")
|
||||
|
||||
# Write back to file
|
||||
with open(env_path, 'w') as f:
|
||||
f.writelines(lines)
|
||||
|
||||
logger.info(f"✅ Updated {key} in {self.env_file}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to update {key} in {self.env_file}: {e}")
|
||||
|
||||
def is_configured(self) -> bool:
|
||||
"""Check if the Discord bot is properly configured"""
|
||||
return (
|
||||
self.get('DISCORD_BOT_TOKEN') and
|
||||
self.get('DISCORD_SERVER_ID') and
|
||||
self.get('DISCORD_APPLICATION_ID')
|
||||
)
|
||||
|
||||
def mask_sensitive_info(self, text: str) -> str:
|
||||
"""Mask sensitive information in logs"""
|
||||
sensitive_words = ['TOKEN', 'KEY']
|
||||
masked_text = text
|
||||
|
||||
for word in sensitive_words:
|
||||
if f"{word}_ID" not in masked_text: # Don't mask channel IDs
|
||||
# Find and mask the value
|
||||
import re
|
||||
pattern = rf'{word}=\w+'
|
||||
replacement = f'{word}=***MASKED***'
|
||||
masked_text = re.sub(pattern, replacement, masked_text)
|
||||
|
||||
return masked_text
|
||||
|
||||
# Global instance for easy access
|
||||
discord_env = DiscordEnvManager()
|
||||
|
||||
# Convenience functions
|
||||
def get_discord_config():
|
||||
"""Get Discord configuration"""
|
||||
return discord_env
|
||||
|
||||
def is_discord_ready():
|
||||
"""Check if Discord is ready for use"""
|
||||
return discord_env.is_configured()
|
||||
Reference in New Issue
Block a user