13 KiB
Structured Logging System
Priority: P3 (Enhancement) Source Reference: From todos.md line 54 Status: Ready for Implementation
Problem Statement
The current logging system lacks structure, correlation IDs, and centralized aggregation capabilities. This makes it difficult to trace operations across the distributed system, debug issues, and perform effective log analysis for monitoring and troubleshooting.
Feature Description
Implement a comprehensive structured logging system with JSON format logs, correlation IDs for request tracing, centralized log aggregation, and performance metrics collection to improve observability and debugging capabilities.
Acceptance Criteria
- JSON-formatted structured logs throughout the application
- Correlation IDs for tracing requests across agent-server communication
- Centralized log aggregation and storage
- Performance metrics collection and reporting
- Log levels (debug, info, warn, error, fatal) with appropriate filtering
- Structured logging for both server and agent components
- Log retention policies and rotation
- Integration with external logging systems (optional)
Technical Approach
1. Structured Logging Library
Custom Logger Implementation (aggregator-common/internal/logging/):
package logging
import (
"context"
"time"
"github.com/sirupsen/logrus"
)
type CorrelationID string
type StructuredLogger struct {
logger *logrus.Logger
}
type LogFields struct {
CorrelationID string `json:"correlation_id"`
Component string `json:"component"`
Operation string `json:"operation"`
UserID string `json:"user_id,omitempty"`
AgentID string `json:"agent_id,omitempty"`
Duration time.Duration `json:"duration,omitempty"`
Error string `json:"error,omitempty"`
RequestID string `json:"request_id,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
func NewStructuredLogger(component string) *StructuredLogger {
logger := logrus.New()
logger.SetFormatter(&logrus.JSONFormatter{
TimestampFormat: time.RFC3339,
})
return &StructuredLogger{logger: logger}
}
func (l *StructuredLogger) WithContext(ctx context.Context) *logrus.Entry {
fields := logrus.Fields{
"component": l.component,
}
if correlationID := ctx.Value(CorrelationID("")); correlationID != nil {
fields["correlation_id"] = correlationID
}
return l.logger.WithFields(fields)
}
func (l *StructuredLogger) Info(ctx context.Context, operation string, fields LogFields) {
entry := l.WithContext(ctx)
entry = entry.WithField("operation", operation)
entry = entry.WithFields(l.convertFields(fields))
entry.Info()
}
2. Correlation ID Management
Middleware for HTTP Requests (aggregator-server/internal/middleware/):
func CorrelationIDMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
correlationID := c.GetHeader("X-Correlation-ID")
if correlationID == "" {
correlationID = generateCorrelationID()
}
c.Header("X-Correlation-ID", correlationID)
c.Set("correlation_id", correlationID)
ctx := context.WithValue(c.Request.Context(), logging.CorrelationID(correlationID))
c.Request = c.Request.WithContext(ctx)
c.Next()
}
}
func generateCorrelationID() string {
return uuid.New().String()
}
Agent-Side Correlation (aggregator-agent/internal/communication/):
func (c *Client) makeRequest(method, endpoint string, body interface{}) (*http.Response, error) {
correlationID := c.generateCorrelationID()
// Log request start
c.logger.Info(
context.WithValue(context.Background(), logging.CorrelationID(correlationID)),
"api_request_start",
logging.LogFields{
Method: method,
Endpoint: endpoint,
AgentID: c.agentID,
},
)
req, err := http.NewRequest(method, endpoint, nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Correlation-ID", correlationID)
req.Header.Set("Authorization", "Bearer "+c.token)
// ... rest of request handling
}
3. Centralized Log Storage
Log Aggregation Service (aggregator-server/internal/services/log_aggregation.go):
type LogEntry struct {
ID uuid.UUID `json:"id" db:"id"`
Timestamp time.Time `json:"timestamp" db:"timestamp"`
Level string `json:"level" db:"level"`
Component string `json:"component" db:"component"`
CorrelationID string `json:"correlation_id" db:"correlation_id"`
Message string `json:"message" db:"message"`
AgentID *uuid.UUID `json:"agent_id,omitempty" db:"agent_id"`
UserID *uuid.UUID `json:"user_id,omitempty" db:"user_id"`
Operation string `json:"operation" db:"operation"`
Duration *int `json:"duration,omitempty" db:"duration"`
Error *string `json:"error,omitempty" db:"error"`
Metadata map[string]interface{} `json:"metadata" db:"metadata"`
}
type LogAggregationService struct {
db *sql.DB
logBuffer chan LogEntry
batchSize int
}
func (s *LogAggregationService) ProcessLogs(ctx context.Context) {
ticker := time.NewTicker(5 * time.Second)
batch := make([]LogEntry, 0, s.batchSize)
for {
select {
case entry := <-s.logBuffer:
batch = append(batch, entry)
if len(batch) >= s.batchSize {
s.flushBatch(ctx, batch)
batch = batch[:0]
}
case <-ticker.C:
if len(batch) > 0 {
s.flushBatch(ctx, batch)
batch = batch[:0]
}
case <-ctx.Done():
// Flush remaining logs before exit
if len(batch) > 0 {
s.flushBatch(ctx, batch)
}
return
}
}
}
4. Performance Metrics Collection
Metrics Service (aggregator-server/internal/services/metrics.go):
type PerformanceMetrics struct {
RequestCount int64 `json:"request_count"`
ErrorCount int64 `json:"error_count"`
AverageLatency time.Duration `json:"average_latency"`
P95Latency time.Duration `json:"p95_latency"`
P99Latency time.Duration `json:"p99_latency"`
ActiveConnections int64 `json:"active_connections"`
DatabaseQueries int64 `json:"database_queries"`
}
type MetricsService struct {
requestLatencies []time.Duration
startTime time.Time
mutex sync.Mutex
}
func (m *MetricsService) RecordRequest(duration time.Duration) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.requestLatencies = append(m.requestLatencies, duration)
// Keep only last 10000 measurements
if len(m.requestLatencies) > 10000 {
m.requestLatencies = m.requestLatencies[1:]
}
}
func (m *MetricsService) GetMetrics() PerformanceMetrics {
m.mutex.Lock()
defer m.mutex.Unlock()
metrics := PerformanceMetrics{
RequestCount: int64(len(m.requestLatencies)),
}
if len(m.requestLatencies) > 0 {
sort.Slice(m.requestLatencies, func(i, j int) bool {
return m.requestLatencies[i] < m.requestLatencies[j]
})
var total time.Duration
for _, d := range m.requestLatencies {
total += d
}
metrics.AverageLatency = total / time.Duration(len(m.requestLatencies))
metrics.P95Latency = m.requestLatencies[int(float64(len(m.requestLatencies))*0.95)]
metrics.P99Latency = m.requestLatencies[int(float64(len(m.requestLatencies))*0.99)]
}
return metrics
}
5. Logging Integration Points
HTTP Request Logging (aggregator-server/internal/middleware/):
func RequestLoggingMiddleware(logger *logging.StructuredLogger) gin.HandlerFunc {
return func(c *gin.Context) {
start := time.Now()
c.Next()
duration := time.Since(start)
correlationID := c.GetString("correlation_id")
fields := logging.LogFields{
Duration: duration,
Method: c.Request.Method,
Path: c.Request.URL.Path,
Status: c.Writer.Status(),
ClientIP: c.ClientIP(),
UserAgent: c.Request.UserAgent(),
}
if c.Writer.Status() >= 400 {
fields.Error = c.Errors.String()
logger.Error(context.Background(), "http_request_error", fields)
} else {
logger.Info(context.Background(), "http_request", fields)
}
}
}
Database Query Logging:
func (q *Queries) LogQuery(query string, args []interface{}, duration time.Duration, err error) {
fields := logging.LogFields{
Operation: "database_query",
Query: query,
Duration: duration,
Args: args,
}
if err != nil {
fields.Error = err.Error()
q.logger.Error(context.Background(), "database_query_error", fields)
} else {
q.logger.Debug(context.Background(), "database_query", fields)
}
}
6. Log Retention and Rotation
Log Management Service:
type LogRetentionConfig struct {
RetentionDays int `json:"retention_days"`
MaxLogSize int64 `json:"max_log_size_bytes"`
Compression bool `json:"compression_enabled"`
ArchiveLocation string `json:"archive_location"`
}
func (s *LogService) CleanupOldLogs() error {
cutoff := time.Now().AddDate(0, 0, -s.config.RetentionDays)
_, err := s.db.Exec(`
DELETE FROM system_logs
WHERE timestamp < $1
`, cutoff)
return err
}
Definition of Done
- ✅ Structured JSON logging implemented across all components
- ✅ Correlation ID propagation for end-to-end request tracing
- ✅ Centralized log storage with efficient buffering
- ✅ Performance metrics collection and reporting
- ✅ Log level filtering and configuration
- ✅ Log retention and rotation policies
- ✅ Integration with existing HTTP, database, and agent communication layers
- ✅ Dashboard or interface for log viewing and searching
Test Plan
-
Unit Tests
- Structured log format validation
- Correlation ID propagation accuracy
- Log filtering and routing
-
Integration Tests
- End-to-end request tracing
- Agent-server communication logging
- Database query logging accuracy
-
Performance Tests
- Logging overhead under load
- Log aggregation throughput
- Buffer management efficiency
-
Retention Tests
- Log rotation functionality
- Archive creation and compression
- Cleanup policy enforcement
Files to Modify
aggregator-common/internal/logging/- New logging packageaggregator-server/internal/middleware/- Add correlation and request loggingaggregator-server/internal/services/- Add metrics and log aggregationaggregator-agent/internal/- Add structured logging to agent- Database schema - Add system_logs table
- Configuration - Add logging settings
Log Schema
CREATE TABLE system_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
level VARCHAR(20) NOT NULL,
component VARCHAR(100) NOT NULL,
correlation_id VARCHAR(100),
message TEXT NOT NULL,
agent_id UUID REFERENCES agents(id),
user_id UUID REFERENCES users(id),
operation VARCHAR(100),
duration INTEGER, -- milliseconds
error TEXT,
metadata JSONB,
INDEX idx_timestamp (timestamp),
INDEX idx_correlation_id (correlation_id),
INDEX idx_component (component),
INDEX idx_level (level)
);
Estimated Effort
- Development: 24-30 hours
- Testing: 16-20 hours
- Review: 8-10 hours
- Infrastructure Setup: 4-6 hours
Dependencies
- Logrus or similar structured logging library
- Database storage for log aggregation
- Configuration management for log settings
Risk Assessment
Low-Medium Risk - Enhancement that adds comprehensive logging. Main considerations are performance impact and log volume management. Proper buffering and async processing will mitigate risks.
Implementation Phases
Phase 1: Core Logging Infrastructure
- Implement structured logger
- Add correlation ID middleware
- Integrate with HTTP layer
Phase 2: Agent Logging
- Add structured logging to agent
- Implement correlation ID propagation
- Add communication layer logging
Phase 3: Log Aggregation
- Implement log buffering and storage
- Add performance metrics collection
- Create log retention system
Phase 4: Dashboard and Monitoring
- Log viewing interface
- Search and filtering capabilities
- Metrics dashboard integration
Future Enhancements
- External Log Integration: Elasticsearch, Splunk, etc.
- Real-time Log Streaming: WebSocket-based live log viewing
- Log Analysis: Automated log analysis and anomaly detection
- Compliance Reporting: SOX, GDPR compliance reporting
- Distributed Tracing: Integration with OpenTelemetry