migration 017 adds machine_id to agents table middleware validates X-Machine-ID header on authed routes agent client sends machine ID with requests MIN_AGENT_VERSION config defaults 0.1.22 version utils added for comparison blocks config copying attacks via hardware fingerprint old agents get 426 upgrade required breaking: <0.1.22 agents rejected
153 lines
5.5 KiB
Python
Executable File
153 lines
5.5 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Secure Discord Environment Manager
|
|
Handles loading Discord configuration from .env without exposing secrets
|
|
"""
|
|
|
|
import os
|
|
import logging
|
|
from typing import Optional, Dict, Any
|
|
from dotenv import load_dotenv
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DiscordEnvManager:
|
|
"""Secure environment manager for Discord configuration"""
|
|
|
|
def __init__(self, env_file: str = ".env"):
|
|
self.env_file = env_file
|
|
self._config = {}
|
|
self._load_config()
|
|
|
|
def _load_config(self):
|
|
"""Load configuration from .env file"""
|
|
try:
|
|
load_dotenv(self.env_file)
|
|
self._config = {
|
|
'DISCORD_BOT_TOKEN': os.getenv('DISCORD_BOT_TOKEN'),
|
|
'DISCORD_SERVER_ID': os.getenv('DISCORD_SERVER_ID'),
|
|
'DISCORD_APPLICATION_ID': os.getenv('DISCORD_APPLICATION_ID'),
|
|
'DISCORD_PUBLIC_KEY': os.getenv('DISCORD_PUBLIC_KEY'),
|
|
'SERVER_NAME': os.getenv('SERVER_NAME', 'RedFlag Security'),
|
|
'ADMIN_ROLE_ID': os.getenv('ADMIN_ROLE_ID'),
|
|
'GENERAL_CHANNEL_ID': os.getenv('GENERAL_CHANNEL_ID'),
|
|
'ANNOUNCEMENTS_CHANNEL_ID': os.getenv('ANNOUNCEMENTS_CHANNEL_ID'),
|
|
'SECURITY_ALERTS_CHANNEL_ID': os.getenv('SECURITY_ALERTS_CHANNEL_ID'),
|
|
'DEV_CHAT_CHANNEL_ID': os.getenv('DEV_CHAT_CHANNEL_ID'),
|
|
'BUG_REPORTS_CHANNEL_ID': os.getenv('BUG_REPORTS_CHANNEL_ID'),
|
|
'COMMUNITY_CATEGORY_ID': os.getenv('COMMUNITY_CATEGORY_ID'),
|
|
'DEVELOPMENT_CATEGORY_ID': os.getenv('DEVELOPMENT_CATEGORY_ID'),
|
|
'SECURITY_CATEGORY_ID': os.getenv('SECURITY_CATEGORY_ID'),
|
|
}
|
|
|
|
# Validate required fields
|
|
required_fields = ['DISCORD_BOT_TOKEN', 'DISCORD_SERVER_ID']
|
|
missing = [field for field in required_fields if not self._config.get(field)]
|
|
if missing:
|
|
logger.error(f"Missing required environment variables: {missing}")
|
|
raise ValueError(f"Missing required fields: {missing}")
|
|
|
|
logger.info("✅ Discord configuration loaded successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load Discord configuration: {e}")
|
|
raise
|
|
|
|
def get(self, key: str, default: Any = None) -> Optional[str]:
|
|
"""Get configuration value"""
|
|
return self._config.get(key, default)
|
|
|
|
def get_required(self, key: str) -> str:
|
|
"""Get required configuration value"""
|
|
value = self._config.get(key)
|
|
if not value:
|
|
raise ValueError(f"Required environment variable {key} is not set")
|
|
return value
|
|
|
|
def update_channel_ids(self, channel_name: str, channel_id: str):
|
|
"""Update channel ID in config"""
|
|
channel_key = f"{channel_name.upper()}_CHANNEL_ID"
|
|
self._config[channel_key] = channel_id
|
|
|
|
# Also update in file
|
|
self._update_env_file(channel_key, channel_id)
|
|
|
|
def update_category_ids(self, category_name: str, category_id: str):
|
|
"""Update category ID in config"""
|
|
category_key = f"{category_name.upper()}_CATEGORY_ID"
|
|
self._config[category_key] = category_id
|
|
|
|
# Also update in file
|
|
self._update_env_file(category_key, category_id)
|
|
|
|
def _update_env_file(self, key: str, value: str):
|
|
"""Update .env file with new value"""
|
|
try:
|
|
env_path = os.path.join(os.path.dirname(__file__), self.env_file)
|
|
|
|
# Read current file
|
|
if os.path.exists(env_path):
|
|
with open(env_path, 'r') as f:
|
|
lines = f.readlines()
|
|
else:
|
|
lines = []
|
|
|
|
# Update or add the line
|
|
updated = False
|
|
for i, line in enumerate(lines):
|
|
if line.startswith(f"{key}="):
|
|
lines[i] = f"{key}={value}\n"
|
|
updated = True
|
|
break
|
|
|
|
if not updated:
|
|
lines.append(f"{key}={value}\n")
|
|
|
|
# Write back to file
|
|
with open(env_path, 'w') as f:
|
|
f.writelines(lines)
|
|
|
|
logger.info(f"✅ Updated {key} in {self.env_file}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update {key} in {self.env_file}: {e}")
|
|
|
|
def is_configured(self) -> bool:
|
|
"""Check if the Discord bot is properly configured"""
|
|
return (
|
|
self.get('DISCORD_BOT_TOKEN') and
|
|
self.get('DISCORD_SERVER_ID') and
|
|
self.get('DISCORD_APPLICATION_ID')
|
|
)
|
|
|
|
def mask_sensitive_info(self, text: str) -> str:
|
|
"""Mask sensitive information in logs"""
|
|
sensitive_words = ['TOKEN', 'KEY']
|
|
masked_text = text
|
|
|
|
for word in sensitive_words:
|
|
if f"{word}_ID" not in masked_text: # Don't mask channel IDs
|
|
# Find and mask the value
|
|
import re
|
|
pattern = rf'{word}=\w+'
|
|
replacement = f'{word}=***MASKED***'
|
|
masked_text = re.sub(pattern, replacement, masked_text)
|
|
|
|
return masked_text
|
|
|
|
# Global instance for easy access
|
|
discord_env = DiscordEnvManager()
|
|
|
|
# Convenience functions
|
|
def get_discord_config():
|
|
"""Get Discord configuration"""
|
|
return discord_env
|
|
|
|
def is_discord_ready():
|
|
"""Check if Discord is ready for use"""
|
|
return discord_env.is_configured() |