Background Workers Development Guide
PDaaS provides a robust infrastructure for implementing background workers and scheduled jobs. This guide covers how to create, test, deploy, and monitor custom workers for async tasks, periodic jobs, and long-running processes.
Overview
The worker infrastructure (backend/workers) provides:
- Base Worker Classes: Reusable foundation for all workers
- Scheduled Jobs: Cron-like scheduling with APScheduler
- Lifecycle Management: Start, stop, graceful shutdown
- Health Checks: Built-in health monitoring
- Signal Handling: SIGTERM/SIGINT support
- Registry System: Central worker registration and discovery
- CLI Tools: Command-line worker management
- Monitoring: Prometheus metrics integration
Quick Start
Creating Your First Worker
# backend/workers/examples/my_worker.py
from backend.utils.logger import get_logger
from backend.workers.base import ScheduledWorker
logger = get_logger(__name__)
class MyWorker(ScheduledWorker):
"""
Example worker that runs a task every hour.
This worker demonstrates basic scheduled worker implementation.
"""
def __init__(self, enabled: bool = True):
"""
Initialize the worker.
Args:
enabled: Whether the worker should run (default: True)
"""
super().__init__(
name="my_worker",
schedule="0 * * * *", # Every hour at minute 0
enabled=enabled,
timezone="UTC",
)
async def execute(self) -> None:
"""
Execute the scheduled job.
This method is called on schedule.
"""
logger.info("MyWorker is running!")
# Your business logic here
result = await self._do_work()
logger.info(f"MyWorker completed with result: {result}")
async def _do_work(self) -> str:
"""Perform the actual work."""
# Implement your logic here
return "success"
Registering the Worker
# backend/workers/__init__.py
from backend.workers.registry import register_worker
from backend.workers.examples.my_worker import MyWorker
# Register worker on module import
worker = MyWorker()
register_worker(worker)
Running Workers
# Run all registered workers
python -m backend.workers
# Or via CLI
python -m backend.workers.cli run
Worker Types
1. ScheduledWorker (Cron Jobs)
For periodic tasks that run on a schedule:
from backend.workers.base import ScheduledWorker
class CleanupWorker(ScheduledWorker):
"""Cleanup old data daily at 2 AM."""
def __init__(self):
super().__init__(
name="cleanup_worker",
schedule="0 2 * * *", # Daily at 2:00 AM
enabled=True,
)
async def execute(self) -> None:
"""Execute cleanup job."""
logger.info("Starting cleanup...")
# Cleanup expired sessions
deleted = await cleanup_expired_sessions()
logger.info(f"Cleanup completed: {deleted} sessions deleted")
Cron Expression Format:
* * * * *
│ │ │ │ │
│ │ │ │ └─── Day of week (0-6, Sunday=0)
│ │ │ └───── Month (1-12)
│ │ └─────── Day of month (1-31)
│ └───────── Hour (0-23)
└─────────── Minute (0-59)
Common Cron Patterns:
- Every minute:
* * * * * - Every hour:
0 * * * * - Daily at 2 AM:
0 2 * * * - Weekly on Sunday at 3 AM:
0 3 * * 0 - Monthly on 1st at midnight:
0 0 1 * *
2. BaseWorker (Custom Loop)
For workers with custom execution logic:
import asyncio
from backend.workers.base import BaseWorker
class StreamProcessorWorker(BaseWorker):
"""Process messages from a stream continuously."""
def __init__(self):
super().__init__(name="stream_processor", enabled=True)
self.stream_client = None
async def run(self) -> None:
"""Main worker loop."""
logger.info("Stream processor starting...")
# Initialize resources
self.stream_client = await create_stream_client()
try:
while not self._should_shutdown():
# Process next batch
messages = await self.stream_client.read_batch(timeout=5.0)
for msg in messages:
await self._process_message(msg)
# Check shutdown frequently
if self._should_shutdown():
break
# Update metrics
self.run_count += len(messages)
except Exception as e:
logger.error(f"Stream processor error: {e}", exc_info=True)
raise
finally:
await self.cleanup()
async def cleanup(self) -> None:
"""Clean up resources."""
if self.stream_client:
await self.stream_client.close()
logger.info("Stream processor stopped")
async def _process_message(self, message: dict) -> None:
"""Process a single message."""
# Your message processing logic
pass
Worker Lifecycle
Lifecycle Stages
- Initialization: Worker instance created with configuration
- Registration: Worker registered in global registry
- Start:
start()called, worker begins execution - Running: Worker executes tasks (
run()or scheduledexecute()) - Shutdown Signal: SIGTERM/SIGINT received or manual stop
- Cleanup:
cleanup()called to release resources - Stopped: Worker fully stopped
Graceful Shutdown
Workers support graceful shutdown:
class GracefulWorker(BaseWorker):
"""Example of graceful shutdown handling."""
async def run(self) -> None:
"""Main loop with shutdown checks."""
while not self._should_shutdown():
# Do work
await self._process_batch()
# Sleep with shutdown check
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=5.0
)
# Shutdown signal received
break
except asyncio.TimeoutError:
# Timeout, continue working
continue
async def cleanup(self) -> None:
"""Cleanup resources."""
logger.info("Cleaning up resources...")
await self._close_connections()
await self._flush_pending_work()
logger.info("Cleanup complete")
Signal Handling
Workers automatically handle signals:
# SIGTERM (graceful shutdown)
# - Sets shutdown event
# - Waits for workers to finish current task
# - Calls cleanup()
# - Exits gracefully
# SIGINT (Ctrl+C)
# - Same as SIGTERM
# - Allows manual interruption
# Timeout
# - After 60 seconds, force kill (SIGKILL)
Configuration
Worker Configuration Class
from backend.workers.config import WorkerConfig
# Configuration via environment variables
config = WorkerConfig()
# Or programmatic configuration
config = WorkerConfig(
enabled=True,
timezone="UTC",
health_check_port=8001,
health_check_enabled=True,
log_level="INFO"
)
Environment Variables
# General settings
WORKERS_ENABLED=true # Enable/disable all workers
WORKERS_TIMEZONE=UTC # Default timezone for schedules
# Health check settings
WORKERS_HEALTH_PORT=8001 # Health check HTTP port
WORKERS_HEALTH_ENABLED=true # Enable health check server
# Logging
WORKERS_LOG_LEVEL=INFO # Log level (DEBUG, INFO, WARNING, ERROR)
Per-Worker Configuration
class ConfigurableWorker(ScheduledWorker):
"""Worker with custom configuration."""
def __init__(self, batch_size: int = 100, timeout: float = 30.0):
super().__init__(
name="configurable_worker",
schedule="0 * * * *",
)
self.batch_size = batch_size
self.timeout = timeout
async def execute(self) -> None:
"""Execute with custom config."""
await self._process_batch(size=self.batch_size, timeout=self.timeout)
# Register with custom config
worker = ConfigurableWorker(batch_size=200, timeout=60.0)
register_worker(worker)
Error Handling
Automatic Error Handling
The base worker classes provide automatic error handling:
class RobustWorker(ScheduledWorker):
"""Worker with automatic error handling."""
async def execute(self) -> None:
"""
Execute job.
Errors are automatically:
- Logged with full traceback
- Recorded in self.last_error
- Counted in self.error_count
- Reported in health checks
"""
# If this raises an exception:
# - Error is logged
# - Metrics updated
# - Next scheduled run still happens
await self._do_risky_work()
Custom Error Handling
For custom error handling:
class RetryWorker(ScheduledWorker):
"""Worker with custom retry logic."""
async def execute(self) -> None:
"""Execute with retries."""
max_retries = 3
retry_delay = 5.0
for attempt in range(max_retries):
try:
await self._do_work()
logger.info(f"Work completed on attempt {attempt + 1}")
return
except TemporaryError as e:
if attempt < max_retries - 1:
logger.warning(f"Attempt {attempt + 1} failed, retrying: {e}")
await asyncio.sleep(retry_delay)
else:
logger.error(f"All {max_retries} attempts failed")
raise
except PermanentError as e:
logger.error(f"Permanent error, not retrying: {e}")
raise
Circuit Breaker Pattern
For external service dependencies:
from datetime import datetime, timedelta
class CircuitBreakerWorker(ScheduledWorker):
"""Worker with circuit breaker for external service."""
def __init__(self):
super().__init__(name="circuit_breaker_worker", schedule="* * * * *")
self.circuit_open = False
self.circuit_open_until = None
self.failure_threshold = 5
self.consecutive_failures = 0
self.reset_timeout = timedelta(minutes=5)
async def execute(self) -> None:
"""Execute with circuit breaker."""
# Check if circuit is open
if self.circuit_open:
if datetime.now() < self.circuit_open_until:
logger.info("Circuit breaker open, skipping execution")
return
else:
logger.info("Circuit breaker reset timeout reached, attempting")
self.circuit_open = False
self.consecutive_failures = 0
try:
# Try to call external service
await self._call_external_service()
# Success - reset failure counter
self.consecutive_failures = 0
except ExternalServiceError as e:
self.consecutive_failures += 1
logger.error(f"External service error: {e}")
# Open circuit if threshold reached
if self.consecutive_failures >= self.failure_threshold:
self.circuit_open = True
self.circuit_open_until = datetime.now() + self.reset_timeout
logger.error(f"Circuit breaker opened for {self.reset_timeout}")
Testing Workers
Unit Tests
Test worker logic in isolation:
# backend/workers/tests/test_my_worker.py
import pytest
from backend.workers.examples.my_worker import MyWorker
@pytest.mark.asyncio
async def test_my_worker_execute():
"""Test worker execute method."""
worker = MyWorker(enabled=True)
# Execute the worker
await worker.execute()
# Verify results
assert worker.run_count == 0 # Execute doesn't increment run_count
assert worker.last_error is None
@pytest.mark.asyncio
async def test_my_worker_lifecycle():
"""Test worker lifecycle."""
worker = MyWorker(enabled=True)
# Start worker in background
import asyncio
task = asyncio.create_task(worker.start())
# Wait a bit
await asyncio.sleep(0.1)
# Stop worker
await worker.stop()
# Wait for task to complete
await task
# Verify state
assert not worker.is_running
@pytest.mark.asyncio
async def test_my_worker_health_check():
"""Test worker health check."""
worker = MyWorker(enabled=True)
# Check health
health = await worker.health_check()
# Verify health data
assert health["name"] == "my_worker"
assert health["enabled"] is True
assert health["status"] in ["healthy", "unhealthy"]
Integration Tests
Test worker with real dependencies:
# backend/workers/tests/test_my_worker_integration.py
import pytest
from backend.workers.registry import WorkerRegistry
from backend.workers.examples.my_worker import MyWorker
@pytest.mark.asyncio
async def test_worker_registration():
"""Test worker can be registered and retrieved."""
registry = WorkerRegistry()
worker = MyWorker()
# Register
registry.register(worker)
# Retrieve
retrieved = registry.get("my_worker")
assert retrieved is not None
assert retrieved.name == "my_worker"
@pytest.mark.asyncio
async def test_worker_execution_with_dependencies(db_session, opensearch_client):
"""Test worker with real dependencies."""
worker = MyWorker()
# Execute with real database and OpenSearch
await worker.execute()
# Verify side effects in database
result = await db_session.execute("SELECT COUNT(*) FROM processed_items")
count = result.scalar()
assert count > 0
Testing Scheduled Execution
Mock APScheduler for testing:
import pytest
from unittest.mock import Mock, patch
from backend.workers.examples.my_worker import MyWorker
@pytest.mark.asyncio
@patch('backend.workers.base.AsyncIOScheduler')
async def test_scheduled_worker_job_added(mock_scheduler_class):
"""Test that job is added to scheduler."""
mock_scheduler = Mock()
mock_scheduler_class.return_value = mock_scheduler
worker = MyWorker()
# Start worker
import asyncio
task = asyncio.create_task(worker.start())
await asyncio.sleep(0.1)
# Verify scheduler was called
mock_scheduler.add_job.assert_called_once()
mock_scheduler.start.assert_called_once()
# Stop worker
await worker.stop()
await task
Health Checks and Monitoring
Health Check Server
Workers automatically expose health checks:
# Check overall health
curl http://localhost:8001/health
# Response
{
"status": "healthy",
"runner_status": "running",
"workers": {
"my_worker": {
"name": "my_worker",
"enabled": true,
"is_running": true,
"run_count": 42,
"error_count": 0,
"last_run_at": "2025-10-03T12:34:56.789Z",
"status": "healthy"
}
}
}
# Check specific worker
curl http://localhost:8001/health/my_worker
Prometheus Metrics
Workers expose Prometheus metrics:
from backend.workers.metrics import (
worker_starts,
worker_stops,
worker_job_duration,
worker_job_success,
worker_job_errors,
)
# Metrics are automatically recorded by ScheduledWorker
# Access at http://localhost:8001/metrics
# Example metrics:
# worker_starts_total{worker="my_worker"} 1
# worker_job_duration_seconds_bucket{worker="my_worker",le="1.0"} 42
# worker_job_success_total{worker="my_worker"} 42
# worker_job_errors_total{worker="my_worker",error_type="ValueError"} 2
Custom Metrics
Add custom metrics to your worker:
from prometheus_client import Counter, Gauge
# Define custom metrics
items_processed = Counter(
"my_worker_items_processed_total",
"Total items processed by MyWorker",
["item_type"]
)
queue_size = Gauge(
"my_worker_queue_size",
"Current queue size for MyWorker"
)
class MyWorker(ScheduledWorker):
async def execute(self) -> None:
"""Execute with custom metrics."""
# Process items
for item in await self._get_items():
await self._process_item(item)
# Record metric
items_processed.labels(item_type=item.type).inc()
# Update gauge
queue_size.set(await self._get_queue_size())
Deployment
Docker Compose
Deploy workers with Docker Compose:
# docker-compose.yml
services:
workers:
build: .
command: python -m backend.workers
environment:
- WORKERS_ENABLED=true
- WORKERS_TIMEZONE=UTC
- WORKERS_HEALTH_PORT=8001
- DATABASE_URL=postgresql://...
- OPENSEARCH_HOST=opensearch
ports:
- "8001:8001" # Health checks
depends_on:
- postgres
- opensearch
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 3
Systemd Service
Deploy with systemd:
# /etc/systemd/system/pdaas-workers.service
[Unit]
Description=PDaaS Background Workers
After=network.target postgresql.service opensearch.service
[Service]
Type=simple
User=pdaas
WorkingDirectory=/opt/pdaas
Environment="PYTHONPATH=/opt/pdaas"
Environment="WORKERS_ENABLED=true"
ExecStart=/opt/pdaas/.venv/bin/python -m backend.workers
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
# Graceful shutdown
TimeoutStopSec=60
KillMode=mixed
KillSignal=SIGTERM
[Install]
WantedBy=multi-user.target
Commands:
# Enable and start
sudo systemctl enable pdaas-workers
sudo systemctl start pdaas-workers
# Check status
sudo systemctl status pdaas-workers
# View logs
sudo journalctl -u pdaas-workers -f
# Restart
sudo systemctl restart pdaas-workers
# Stop
sudo systemctl stop pdaas-workers
Kubernetes
Deploy with Kubernetes:
# k8s/workers-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: pdaas-workers
spec:
replicas: 2
selector:
matchLabels:
app: pdaas-workers
template:
metadata:
labels:
app: pdaas-workers
spec:
containers:
- name: workers
image: pdaas:latest
command: ["python", "-m", "backend.workers"]
env:
- name: WORKERS_ENABLED
value: "true"
- name: WORKERS_TIMEZONE
value: "UTC"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: pdaas-secrets
key: database-url
ports:
- name: health
containerPort: 8001
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 10
periodSeconds: 10
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: pdaas-workers
spec:
selector:
app: pdaas-workers
ports:
- name: health
port: 8001
targetPort: 8001
CLI Management
Worker CLI Tool
Manage workers via CLI:
# List all registered workers
python -m backend.workers.cli list
# Output:
# ┌──────────────┬─────────┬─────────┐
# │ Name │ Status │ State │
# ├──────────────┼─────────┼─────────┤
# │ hello_worker │ Enabled │ Running │
# │ my_worker │ Enabled │ Stopped │
# └──────────────┴─────────┴─────────┘
# Total workers: 2
# Check health of all workers
python -m backend.workers.cli health
# Output:
# Overall Status: HEALTHY
# Runner Status: running
#
# ┌──────────────┬─────────┬──────┬────────┬─────────────────────┐
# │ Worker │ Status │ Runs │ Errors │ Last Run │
# ├──────────────┼─────────┼──────┼────────┼─────────────────────┤
# │ hello_worker │ healthy │ 42 │ 0 │ 2025-10-03T12:34:56 │
# │ my_worker │ healthy │ 10 │ 1 │ 2025-10-03T11:00:00 │
# └──────────────┴─────────┴──────┴────────┴─────────────────────┘
# Run all workers
python -m backend.workers.cli run
Best Practices
1. Idempotency
Make workers idempotent (safe to run multiple times):
class IdempotentWorker(ScheduledWorker):
"""Worker with idempotent operations."""
async def execute(self) -> None:
"""Execute idempotently."""
# Use transactions
async with db.transaction():
# Check if already processed
if await self._already_processed(item_id):
logger.info(f"Item {item_id} already processed, skipping")
return
# Process item
await self._process_item(item_id)
# Mark as processed
await self._mark_processed(item_id)
2. Resource Management
Properly manage resources:
class ResourceAwareWorker(ScheduledWorker):
"""Worker with proper resource management."""
async def execute(self) -> None:
"""Execute with resource management."""
# Use context managers
async with self._get_connection() as conn:
await self._process_with_connection(conn)
# Or manual cleanup
resource = None
try:
resource = await self._acquire_resource()
await self._use_resource(resource)
finally:
if resource:
await self._release_resource(resource)
3. Progress Tracking
Track progress for long-running jobs:
class ProgressTrackingWorker(ScheduledWorker):
"""Worker with progress tracking."""
async def execute(self) -> None:
"""Execute with progress tracking."""
items = await self._get_items_to_process()
total = len(items)
logger.info(f"Processing {total} items")
for i, item in enumerate(items, 1):
await self._process_item(item)
# Log progress every 10%
if i % (total // 10) == 0:
progress = (i / total) * 100
logger.info(f"Progress: {progress:.0f}% ({i}/{total})")
logger.info(f"Completed processing {total} items")
4. Rate Limiting
Implement rate limiting for external APIs:
import asyncio
from datetime import datetime, timedelta
class RateLimitedWorker(ScheduledWorker):
"""Worker with rate limiting."""
def __init__(self):
super().__init__(name="rate_limited_worker", schedule="* * * * *")
self.rate_limit = 100 # Requests per minute
self.rate_window = timedelta(minutes=1)
self.request_times = []
async def execute(self) -> None:
"""Execute with rate limiting."""
items = await self._get_items()
for item in items:
# Wait if rate limit reached
await self._wait_for_rate_limit()
# Process item
await self._process_item(item)
# Record request time
self.request_times.append(datetime.now())
async def _wait_for_rate_limit(self) -> None:
"""Wait if rate limit is reached."""
now = datetime.now()
cutoff = now - self.rate_window
# Remove old requests
self.request_times = [t for t in self.request_times if t > cutoff]
# Wait if at limit
if len(self.request_times) >= self.rate_limit:
oldest = self.request_times[0]
wait_time = (oldest + self.rate_window - now).total_seconds()
if wait_time > 0:
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
5. Monitoring and Alerting
Add comprehensive monitoring:
from backend.workers.metrics import worker_job_duration, worker_job_errors
class MonitoredWorker(ScheduledWorker):
"""Worker with comprehensive monitoring."""
async def execute(self) -> None:
"""Execute with monitoring."""
import time
start = time.time()
try:
# Your work here
result = await self._do_work()
# Record success metrics
duration = time.time() - start
worker_job_duration.labels(worker=self.name).observe(duration)
# Log important metrics
logger.info(
f"Worker completed",
extra={
"worker": self.name,
"duration": duration,
"result": result,
"run_count": self.run_count,
}
)
except Exception as e:
# Record error metrics
worker_job_errors.labels(
worker=self.name,
error_type=type(e).__name__
).inc()
# Log error with context
logger.error(
f"Worker failed: {e}",
extra={
"worker": self.name,
"error": str(e),
"run_count": self.run_count,
},
exc_info=True
)
raise
Troubleshooting
Worker Not Starting
Check if enabled:
echo $WORKERS_ENABLED # Should be "true"
Check registration:
from backend.workers.registry import get_registry
registry = get_registry()
workers = registry.list_workers()
print(f"Registered workers: {[w.name for w in workers]}")
Check logs:
# Systemd
sudo journalctl -u pdaas-workers -f
# Docker
docker logs -f pdaas-workers
# Direct
python -m backend.workers 2>&1 | tee worker.log
Worker Crashing
Enable debug logging:
import logging
logging.getLogger("backend.workers").setLevel(logging.DEBUG)
Check health endpoint:
curl http://localhost:8001/health
Review error counts:
python -m backend.workers.cli health
High Memory Usage
Reduce batch sizes:
- Process items in smaller batches
- Release resources promptly
- Use generators instead of lists
Monitor memory:
import psutil
import os
class MemoryMonitoredWorker(ScheduledWorker):
async def execute(self) -> None:
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
logger.info(f"Memory usage: {memory_mb:.1f} MB")
Schedule Not Running
Check cron expression:
from apscheduler.triggers.cron import CronTrigger
# Validate cron expression
trigger = CronTrigger.from_crontab("0 * * * *")
next_run = trigger.get_next_fire_time(None, datetime.now())
print(f"Next run: {next_run}")
Check timezone:
echo $WORKERS_TIMEZONE # Should match your desired timezone
Verify worker is enabled:
worker = get_registry().get("my_worker")
print(f"Enabled: {worker.enabled}")
Example: Complete Audit Cleanup Worker
Here's a complete example implementing the audit cleanup worker:
# backend/workers/audit_cleanup_worker.py
from datetime import datetime, timedelta, timezone
from typing import List
from backend.audit.client import OpenSearchClientFactory
from backend.audit.config import get_audit_config
from backend.utils.logger import get_logger
from backend.workers.base import ScheduledWorker
logger = get_logger(__name__)
class AuditCleanupWorker(ScheduledWorker):
"""
Worker that cleans up old audit indices.
Runs daily at 2 AM and deletes audit indices older than 90 days.
"""
def __init__(self, retention_days: int = 90):
"""
Initialize the cleanup worker.
Args:
retention_days: Number of days to retain audit data (default: 90)
"""
super().__init__(
name="audit_cleanup_worker",
schedule="0 2 * * *", # Daily at 2:00 AM
enabled=True,
timezone="UTC",
)
self.retention_days = retention_days
self.config = get_audit_config()
async def execute(self) -> None:
"""Execute cleanup job."""
logger.info(f"Starting audit cleanup (retention: {self.retention_days} days)")
try:
# Get OpenSearch client
client = await OpenSearchClientFactory.get_client(self.config)
# Find old indices
old_indices = await self._find_old_indices(client)
if not old_indices:
logger.info("No old indices to delete")
return
# Delete old indices
deleted_count = await self._delete_indices(client, old_indices)
logger.info(
f"Cleanup completed: {deleted_count} indices deleted",
extra={
"retention_days": self.retention_days,
"deleted_count": deleted_count,
}
)
except Exception as e:
logger.error(f"Cleanup failed: {e}", exc_info=True)
raise
async def _find_old_indices(self, client) -> List[str]:
"""Find indices older than retention period."""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=self.retention_days)
# Get all audit indices
all_indices = await client.cat.indices(index="audit-*", format="json")
old_indices = []
for index_info in all_indices:
index_name = index_info["index"]
# Extract date from index name
# Format: audit-{org}-{acc}-{service}-{date}
parts = index_name.split("-")
if len(parts) >= 5:
date_str = parts[-1]
try:
index_date = datetime.strptime(date_str, "%Y-%m-%d")
if index_date < cutoff_date:
old_indices.append(index_name)
except ValueError:
logger.warning(f"Could not parse date from index: {index_name}")
logger.info(f"Found {len(old_indices)} indices to delete")
return old_indices
async def _delete_indices(self, client, indices: List[str]) -> int:
"""Delete the specified indices."""
deleted_count = 0
for index_name in indices:
try:
await client.indices.delete(index=index_name)
logger.info(f"Deleted index: {index_name}")
deleted_count += 1
except Exception as e:
logger.error(f"Failed to delete index {index_name}: {e}")
return deleted_count
Support
For worker development assistance:
- Review the worker examples in
backend/workers/examples/ - Check the test suite for patterns:
backend/workers/tests/ - Reference the epic documentation:
features/audit-module/epics/06-workers-infrastructure.md - Check the troubleshooting section above