Skip to main content

Background Workers Development Guide

PDaaS provides a robust infrastructure for implementing background workers and scheduled jobs. This guide covers how to create, test, deploy, and monitor custom workers for async tasks, periodic jobs, and long-running processes.

Overview

The worker infrastructure (backend/workers) provides:

  • Base Worker Classes: Reusable foundation for all workers
  • Scheduled Jobs: Cron-like scheduling with APScheduler
  • Lifecycle Management: Start, stop, graceful shutdown
  • Health Checks: Built-in health monitoring
  • Signal Handling: SIGTERM/SIGINT support
  • Registry System: Central worker registration and discovery
  • CLI Tools: Command-line worker management
  • Monitoring: Prometheus metrics integration

Quick Start

Creating Your First Worker

# backend/workers/examples/my_worker.py

from backend.utils.logger import get_logger
from backend.workers.base import ScheduledWorker

logger = get_logger(__name__)


class MyWorker(ScheduledWorker):
"""
Example worker that runs a task every hour.

This worker demonstrates basic scheduled worker implementation.
"""

def __init__(self, enabled: bool = True):
"""
Initialize the worker.

Args:
enabled: Whether the worker should run (default: True)
"""
super().__init__(
name="my_worker",
schedule="0 * * * *", # Every hour at minute 0
enabled=enabled,
timezone="UTC",
)

async def execute(self) -> None:
"""
Execute the scheduled job.

This method is called on schedule.
"""
logger.info("MyWorker is running!")

# Your business logic here
result = await self._do_work()

logger.info(f"MyWorker completed with result: {result}")

async def _do_work(self) -> str:
"""Perform the actual work."""
# Implement your logic here
return "success"

Registering the Worker

# backend/workers/__init__.py

from backend.workers.registry import register_worker
from backend.workers.examples.my_worker import MyWorker

# Register worker on module import
worker = MyWorker()
register_worker(worker)

Running Workers

# Run all registered workers
python -m backend.workers

# Or via CLI
python -m backend.workers.cli run

Worker Types

1. ScheduledWorker (Cron Jobs)

For periodic tasks that run on a schedule:

from backend.workers.base import ScheduledWorker

class CleanupWorker(ScheduledWorker):
"""Cleanup old data daily at 2 AM."""

def __init__(self):
super().__init__(
name="cleanup_worker",
schedule="0 2 * * *", # Daily at 2:00 AM
enabled=True,
)

async def execute(self) -> None:
"""Execute cleanup job."""
logger.info("Starting cleanup...")

# Cleanup expired sessions
deleted = await cleanup_expired_sessions()

logger.info(f"Cleanup completed: {deleted} sessions deleted")

Cron Expression Format:

* * * * *
│ │ │ │ │
│ │ │ │ └─── Day of week (0-6, Sunday=0)
│ │ │ └───── Month (1-12)
│ │ └─────── Day of month (1-31)
│ └───────── Hour (0-23)
└─────────── Minute (0-59)

Common Cron Patterns:

  • Every minute: * * * * *
  • Every hour: 0 * * * *
  • Daily at 2 AM: 0 2 * * *
  • Weekly on Sunday at 3 AM: 0 3 * * 0
  • Monthly on 1st at midnight: 0 0 1 * *

2. BaseWorker (Custom Loop)

For workers with custom execution logic:

import asyncio
from backend.workers.base import BaseWorker

class StreamProcessorWorker(BaseWorker):
"""Process messages from a stream continuously."""

def __init__(self):
super().__init__(name="stream_processor", enabled=True)
self.stream_client = None

async def run(self) -> None:
"""Main worker loop."""
logger.info("Stream processor starting...")

# Initialize resources
self.stream_client = await create_stream_client()

try:
while not self._should_shutdown():
# Process next batch
messages = await self.stream_client.read_batch(timeout=5.0)

for msg in messages:
await self._process_message(msg)

# Check shutdown frequently
if self._should_shutdown():
break

# Update metrics
self.run_count += len(messages)

except Exception as e:
logger.error(f"Stream processor error: {e}", exc_info=True)
raise

finally:
await self.cleanup()

async def cleanup(self) -> None:
"""Clean up resources."""
if self.stream_client:
await self.stream_client.close()
logger.info("Stream processor stopped")

async def _process_message(self, message: dict) -> None:
"""Process a single message."""
# Your message processing logic
pass

Worker Lifecycle

Lifecycle Stages

  1. Initialization: Worker instance created with configuration
  2. Registration: Worker registered in global registry
  3. Start: start() called, worker begins execution
  4. Running: Worker executes tasks (run() or scheduled execute())
  5. Shutdown Signal: SIGTERM/SIGINT received or manual stop
  6. Cleanup: cleanup() called to release resources
  7. Stopped: Worker fully stopped

Graceful Shutdown

Workers support graceful shutdown:

class GracefulWorker(BaseWorker):
"""Example of graceful shutdown handling."""

async def run(self) -> None:
"""Main loop with shutdown checks."""
while not self._should_shutdown():
# Do work
await self._process_batch()

# Sleep with shutdown check
try:
await asyncio.wait_for(
self._shutdown_event.wait(),
timeout=5.0
)
# Shutdown signal received
break
except asyncio.TimeoutError:
# Timeout, continue working
continue

async def cleanup(self) -> None:
"""Cleanup resources."""
logger.info("Cleaning up resources...")
await self._close_connections()
await self._flush_pending_work()
logger.info("Cleanup complete")

Signal Handling

Workers automatically handle signals:

# SIGTERM (graceful shutdown)
# - Sets shutdown event
# - Waits for workers to finish current task
# - Calls cleanup()
# - Exits gracefully

# SIGINT (Ctrl+C)
# - Same as SIGTERM
# - Allows manual interruption

# Timeout
# - After 60 seconds, force kill (SIGKILL)

Configuration

Worker Configuration Class

from backend.workers.config import WorkerConfig

# Configuration via environment variables
config = WorkerConfig()

# Or programmatic configuration
config = WorkerConfig(
enabled=True,
timezone="UTC",
health_check_port=8001,
health_check_enabled=True,
log_level="INFO"
)

Environment Variables

# General settings
WORKERS_ENABLED=true # Enable/disable all workers
WORKERS_TIMEZONE=UTC # Default timezone for schedules

# Health check settings
WORKERS_HEALTH_PORT=8001 # Health check HTTP port
WORKERS_HEALTH_ENABLED=true # Enable health check server

# Logging
WORKERS_LOG_LEVEL=INFO # Log level (DEBUG, INFO, WARNING, ERROR)

Per-Worker Configuration

class ConfigurableWorker(ScheduledWorker):
"""Worker with custom configuration."""

def __init__(self, batch_size: int = 100, timeout: float = 30.0):
super().__init__(
name="configurable_worker",
schedule="0 * * * *",
)
self.batch_size = batch_size
self.timeout = timeout

async def execute(self) -> None:
"""Execute with custom config."""
await self._process_batch(size=self.batch_size, timeout=self.timeout)

# Register with custom config
worker = ConfigurableWorker(batch_size=200, timeout=60.0)
register_worker(worker)

Error Handling

Automatic Error Handling

The base worker classes provide automatic error handling:

class RobustWorker(ScheduledWorker):
"""Worker with automatic error handling."""

async def execute(self) -> None:
"""
Execute job.

Errors are automatically:
- Logged with full traceback
- Recorded in self.last_error
- Counted in self.error_count
- Reported in health checks
"""
# If this raises an exception:
# - Error is logged
# - Metrics updated
# - Next scheduled run still happens
await self._do_risky_work()

Custom Error Handling

For custom error handling:

class RetryWorker(ScheduledWorker):
"""Worker with custom retry logic."""

async def execute(self) -> None:
"""Execute with retries."""
max_retries = 3
retry_delay = 5.0

for attempt in range(max_retries):
try:
await self._do_work()
logger.info(f"Work completed on attempt {attempt + 1}")
return

except TemporaryError as e:
if attempt < max_retries - 1:
logger.warning(f"Attempt {attempt + 1} failed, retrying: {e}")
await asyncio.sleep(retry_delay)
else:
logger.error(f"All {max_retries} attempts failed")
raise

except PermanentError as e:
logger.error(f"Permanent error, not retrying: {e}")
raise

Circuit Breaker Pattern

For external service dependencies:

from datetime import datetime, timedelta

class CircuitBreakerWorker(ScheduledWorker):
"""Worker with circuit breaker for external service."""

def __init__(self):
super().__init__(name="circuit_breaker_worker", schedule="* * * * *")
self.circuit_open = False
self.circuit_open_until = None
self.failure_threshold = 5
self.consecutive_failures = 0
self.reset_timeout = timedelta(minutes=5)

async def execute(self) -> None:
"""Execute with circuit breaker."""
# Check if circuit is open
if self.circuit_open:
if datetime.now() < self.circuit_open_until:
logger.info("Circuit breaker open, skipping execution")
return
else:
logger.info("Circuit breaker reset timeout reached, attempting")
self.circuit_open = False
self.consecutive_failures = 0

try:
# Try to call external service
await self._call_external_service()

# Success - reset failure counter
self.consecutive_failures = 0

except ExternalServiceError as e:
self.consecutive_failures += 1
logger.error(f"External service error: {e}")

# Open circuit if threshold reached
if self.consecutive_failures >= self.failure_threshold:
self.circuit_open = True
self.circuit_open_until = datetime.now() + self.reset_timeout
logger.error(f"Circuit breaker opened for {self.reset_timeout}")

Testing Workers

Unit Tests

Test worker logic in isolation:

# backend/workers/tests/test_my_worker.py

import pytest
from backend.workers.examples.my_worker import MyWorker


@pytest.mark.asyncio
async def test_my_worker_execute():
"""Test worker execute method."""
worker = MyWorker(enabled=True)

# Execute the worker
await worker.execute()

# Verify results
assert worker.run_count == 0 # Execute doesn't increment run_count
assert worker.last_error is None


@pytest.mark.asyncio
async def test_my_worker_lifecycle():
"""Test worker lifecycle."""
worker = MyWorker(enabled=True)

# Start worker in background
import asyncio
task = asyncio.create_task(worker.start())

# Wait a bit
await asyncio.sleep(0.1)

# Stop worker
await worker.stop()

# Wait for task to complete
await task

# Verify state
assert not worker.is_running


@pytest.mark.asyncio
async def test_my_worker_health_check():
"""Test worker health check."""
worker = MyWorker(enabled=True)

# Check health
health = await worker.health_check()

# Verify health data
assert health["name"] == "my_worker"
assert health["enabled"] is True
assert health["status"] in ["healthy", "unhealthy"]

Integration Tests

Test worker with real dependencies:

# backend/workers/tests/test_my_worker_integration.py

import pytest
from backend.workers.registry import WorkerRegistry
from backend.workers.examples.my_worker import MyWorker


@pytest.mark.asyncio
async def test_worker_registration():
"""Test worker can be registered and retrieved."""
registry = WorkerRegistry()
worker = MyWorker()

# Register
registry.register(worker)

# Retrieve
retrieved = registry.get("my_worker")
assert retrieved is not None
assert retrieved.name == "my_worker"


@pytest.mark.asyncio
async def test_worker_execution_with_dependencies(db_session, opensearch_client):
"""Test worker with real dependencies."""
worker = MyWorker()

# Execute with real database and OpenSearch
await worker.execute()

# Verify side effects in database
result = await db_session.execute("SELECT COUNT(*) FROM processed_items")
count = result.scalar()
assert count > 0

Testing Scheduled Execution

Mock APScheduler for testing:

import pytest
from unittest.mock import Mock, patch
from backend.workers.examples.my_worker import MyWorker


@pytest.mark.asyncio
@patch('backend.workers.base.AsyncIOScheduler')
async def test_scheduled_worker_job_added(mock_scheduler_class):
"""Test that job is added to scheduler."""
mock_scheduler = Mock()
mock_scheduler_class.return_value = mock_scheduler

worker = MyWorker()

# Start worker
import asyncio
task = asyncio.create_task(worker.start())
await asyncio.sleep(0.1)

# Verify scheduler was called
mock_scheduler.add_job.assert_called_once()
mock_scheduler.start.assert_called_once()

# Stop worker
await worker.stop()
await task

Health Checks and Monitoring

Health Check Server

Workers automatically expose health checks:

# Check overall health
curl http://localhost:8001/health

# Response
{
"status": "healthy",
"runner_status": "running",
"workers": {
"my_worker": {
"name": "my_worker",
"enabled": true,
"is_running": true,
"run_count": 42,
"error_count": 0,
"last_run_at": "2025-10-03T12:34:56.789Z",
"status": "healthy"
}
}
}

# Check specific worker
curl http://localhost:8001/health/my_worker

Prometheus Metrics

Workers expose Prometheus metrics:

from backend.workers.metrics import (
worker_starts,
worker_stops,
worker_job_duration,
worker_job_success,
worker_job_errors,
)

# Metrics are automatically recorded by ScheduledWorker
# Access at http://localhost:8001/metrics

# Example metrics:
# worker_starts_total{worker="my_worker"} 1
# worker_job_duration_seconds_bucket{worker="my_worker",le="1.0"} 42
# worker_job_success_total{worker="my_worker"} 42
# worker_job_errors_total{worker="my_worker",error_type="ValueError"} 2

Custom Metrics

Add custom metrics to your worker:

from prometheus_client import Counter, Gauge

# Define custom metrics
items_processed = Counter(
"my_worker_items_processed_total",
"Total items processed by MyWorker",
["item_type"]
)

queue_size = Gauge(
"my_worker_queue_size",
"Current queue size for MyWorker"
)


class MyWorker(ScheduledWorker):
async def execute(self) -> None:
"""Execute with custom metrics."""
# Process items
for item in await self._get_items():
await self._process_item(item)

# Record metric
items_processed.labels(item_type=item.type).inc()

# Update gauge
queue_size.set(await self._get_queue_size())

Deployment

Docker Compose

Deploy workers with Docker Compose:

# docker-compose.yml

services:
workers:
build: .
command: python -m backend.workers
environment:
- WORKERS_ENABLED=true
- WORKERS_TIMEZONE=UTC
- WORKERS_HEALTH_PORT=8001
- DATABASE_URL=postgresql://...
- OPENSEARCH_HOST=opensearch
ports:
- "8001:8001" # Health checks
depends_on:
- postgres
- opensearch
restart: unless-stopped
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8001/health"]
interval: 30s
timeout: 10s
retries: 3

Systemd Service

Deploy with systemd:

# /etc/systemd/system/pdaas-workers.service

[Unit]
Description=PDaaS Background Workers
After=network.target postgresql.service opensearch.service

[Service]
Type=simple
User=pdaas
WorkingDirectory=/opt/pdaas
Environment="PYTHONPATH=/opt/pdaas"
Environment="WORKERS_ENABLED=true"
ExecStart=/opt/pdaas/.venv/bin/python -m backend.workers
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

# Graceful shutdown
TimeoutStopSec=60
KillMode=mixed
KillSignal=SIGTERM

[Install]
WantedBy=multi-user.target

Commands:

# Enable and start
sudo systemctl enable pdaas-workers
sudo systemctl start pdaas-workers

# Check status
sudo systemctl status pdaas-workers

# View logs
sudo journalctl -u pdaas-workers -f

# Restart
sudo systemctl restart pdaas-workers

# Stop
sudo systemctl stop pdaas-workers

Kubernetes

Deploy with Kubernetes:

# k8s/workers-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
name: pdaas-workers
spec:
replicas: 2
selector:
matchLabels:
app: pdaas-workers
template:
metadata:
labels:
app: pdaas-workers
spec:
containers:
- name: workers
image: pdaas:latest
command: ["python", "-m", "backend.workers"]
env:
- name: WORKERS_ENABLED
value: "true"
- name: WORKERS_TIMEZONE
value: "UTC"
- name: DATABASE_URL
valueFrom:
secretKeyRef:
name: pdaas-secrets
key: database-url
ports:
- name: health
containerPort: 8001
livenessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 30
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8001
initialDelaySeconds: 10
periodSeconds: 10
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: pdaas-workers
spec:
selector:
app: pdaas-workers
ports:
- name: health
port: 8001
targetPort: 8001

CLI Management

Worker CLI Tool

Manage workers via CLI:

# List all registered workers
python -m backend.workers.cli list

# Output:
# ┌──────────────┬─────────┬─────────┐
# │ Name │ Status │ State │
# ├──────────────┼─────────┼─────────┤
# │ hello_worker │ Enabled │ Running │
# │ my_worker │ Enabled │ Stopped │
# └──────────────┴─────────┴─────────┘
# Total workers: 2

# Check health of all workers
python -m backend.workers.cli health

# Output:
# Overall Status: HEALTHY
# Runner Status: running
#
# ┌──────────────┬─────────┬──────┬────────┬─────────────────────┐
# │ Worker │ Status │ Runs │ Errors │ Last Run │
# ├──────────────┼─────────┼──────┼────────┼─────────────────────┤
# │ hello_worker │ healthy │ 42 │ 0 │ 2025-10-03T12:34:56 │
# │ my_worker │ healthy │ 10 │ 1 │ 2025-10-03T11:00:00 │
# └──────────────┴─────────┴──────┴────────┴─────────────────────┘

# Run all workers
python -m backend.workers.cli run

Best Practices

1. Idempotency

Make workers idempotent (safe to run multiple times):

class IdempotentWorker(ScheduledWorker):
"""Worker with idempotent operations."""

async def execute(self) -> None:
"""Execute idempotently."""
# Use transactions
async with db.transaction():
# Check if already processed
if await self._already_processed(item_id):
logger.info(f"Item {item_id} already processed, skipping")
return

# Process item
await self._process_item(item_id)

# Mark as processed
await self._mark_processed(item_id)

2. Resource Management

Properly manage resources:

class ResourceAwareWorker(ScheduledWorker):
"""Worker with proper resource management."""

async def execute(self) -> None:
"""Execute with resource management."""
# Use context managers
async with self._get_connection() as conn:
await self._process_with_connection(conn)

# Or manual cleanup
resource = None
try:
resource = await self._acquire_resource()
await self._use_resource(resource)
finally:
if resource:
await self._release_resource(resource)

3. Progress Tracking

Track progress for long-running jobs:

class ProgressTrackingWorker(ScheduledWorker):
"""Worker with progress tracking."""

async def execute(self) -> None:
"""Execute with progress tracking."""
items = await self._get_items_to_process()
total = len(items)

logger.info(f"Processing {total} items")

for i, item in enumerate(items, 1):
await self._process_item(item)

# Log progress every 10%
if i % (total // 10) == 0:
progress = (i / total) * 100
logger.info(f"Progress: {progress:.0f}% ({i}/{total})")

logger.info(f"Completed processing {total} items")

4. Rate Limiting

Implement rate limiting for external APIs:

import asyncio
from datetime import datetime, timedelta

class RateLimitedWorker(ScheduledWorker):
"""Worker with rate limiting."""

def __init__(self):
super().__init__(name="rate_limited_worker", schedule="* * * * *")
self.rate_limit = 100 # Requests per minute
self.rate_window = timedelta(minutes=1)
self.request_times = []

async def execute(self) -> None:
"""Execute with rate limiting."""
items = await self._get_items()

for item in items:
# Wait if rate limit reached
await self._wait_for_rate_limit()

# Process item
await self._process_item(item)

# Record request time
self.request_times.append(datetime.now())

async def _wait_for_rate_limit(self) -> None:
"""Wait if rate limit is reached."""
now = datetime.now()
cutoff = now - self.rate_window

# Remove old requests
self.request_times = [t for t in self.request_times if t > cutoff]

# Wait if at limit
if len(self.request_times) >= self.rate_limit:
oldest = self.request_times[0]
wait_time = (oldest + self.rate_window - now).total_seconds()
if wait_time > 0:
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)

5. Monitoring and Alerting

Add comprehensive monitoring:

from backend.workers.metrics import worker_job_duration, worker_job_errors

class MonitoredWorker(ScheduledWorker):
"""Worker with comprehensive monitoring."""

async def execute(self) -> None:
"""Execute with monitoring."""
import time
start = time.time()

try:
# Your work here
result = await self._do_work()

# Record success metrics
duration = time.time() - start
worker_job_duration.labels(worker=self.name).observe(duration)

# Log important metrics
logger.info(
f"Worker completed",
extra={
"worker": self.name,
"duration": duration,
"result": result,
"run_count": self.run_count,
}
)

except Exception as e:
# Record error metrics
worker_job_errors.labels(
worker=self.name,
error_type=type(e).__name__
).inc()

# Log error with context
logger.error(
f"Worker failed: {e}",
extra={
"worker": self.name,
"error": str(e),
"run_count": self.run_count,
},
exc_info=True
)
raise

Troubleshooting

Worker Not Starting

Check if enabled:

echo $WORKERS_ENABLED  # Should be "true"

Check registration:

from backend.workers.registry import get_registry

registry = get_registry()
workers = registry.list_workers()
print(f"Registered workers: {[w.name for w in workers]}")

Check logs:

# Systemd
sudo journalctl -u pdaas-workers -f

# Docker
docker logs -f pdaas-workers

# Direct
python -m backend.workers 2>&1 | tee worker.log

Worker Crashing

Enable debug logging:

import logging
logging.getLogger("backend.workers").setLevel(logging.DEBUG)

Check health endpoint:

curl http://localhost:8001/health

Review error counts:

python -m backend.workers.cli health

High Memory Usage

Reduce batch sizes:

  • Process items in smaller batches
  • Release resources promptly
  • Use generators instead of lists

Monitor memory:

import psutil
import os

class MemoryMonitoredWorker(ScheduledWorker):
async def execute(self) -> None:
process = psutil.Process(os.getpid())
memory_mb = process.memory_info().rss / 1024 / 1024
logger.info(f"Memory usage: {memory_mb:.1f} MB")

Schedule Not Running

Check cron expression:

from apscheduler.triggers.cron import CronTrigger

# Validate cron expression
trigger = CronTrigger.from_crontab("0 * * * *")
next_run = trigger.get_next_fire_time(None, datetime.now())
print(f"Next run: {next_run}")

Check timezone:

echo $WORKERS_TIMEZONE  # Should match your desired timezone

Verify worker is enabled:

worker = get_registry().get("my_worker")
print(f"Enabled: {worker.enabled}")

Example: Complete Audit Cleanup Worker

Here's a complete example implementing the audit cleanup worker:

# backend/workers/audit_cleanup_worker.py

from datetime import datetime, timedelta, timezone
from typing import List

from backend.audit.client import OpenSearchClientFactory
from backend.audit.config import get_audit_config
from backend.utils.logger import get_logger
from backend.workers.base import ScheduledWorker

logger = get_logger(__name__)


class AuditCleanupWorker(ScheduledWorker):
"""
Worker that cleans up old audit indices.

Runs daily at 2 AM and deletes audit indices older than 90 days.
"""

def __init__(self, retention_days: int = 90):
"""
Initialize the cleanup worker.

Args:
retention_days: Number of days to retain audit data (default: 90)
"""
super().__init__(
name="audit_cleanup_worker",
schedule="0 2 * * *", # Daily at 2:00 AM
enabled=True,
timezone="UTC",
)
self.retention_days = retention_days
self.config = get_audit_config()

async def execute(self) -> None:
"""Execute cleanup job."""
logger.info(f"Starting audit cleanup (retention: {self.retention_days} days)")

try:
# Get OpenSearch client
client = await OpenSearchClientFactory.get_client(self.config)

# Find old indices
old_indices = await self._find_old_indices(client)

if not old_indices:
logger.info("No old indices to delete")
return

# Delete old indices
deleted_count = await self._delete_indices(client, old_indices)

logger.info(
f"Cleanup completed: {deleted_count} indices deleted",
extra={
"retention_days": self.retention_days,
"deleted_count": deleted_count,
}
)

except Exception as e:
logger.error(f"Cleanup failed: {e}", exc_info=True)
raise

async def _find_old_indices(self, client) -> List[str]:
"""Find indices older than retention period."""
cutoff_date = datetime.now(timezone.utc) - timedelta(days=self.retention_days)

# Get all audit indices
all_indices = await client.cat.indices(index="audit-*", format="json")

old_indices = []
for index_info in all_indices:
index_name = index_info["index"]

# Extract date from index name
# Format: audit-{org}-{acc}-{service}-{date}
parts = index_name.split("-")
if len(parts) >= 5:
date_str = parts[-1]
try:
index_date = datetime.strptime(date_str, "%Y-%m-%d")
if index_date < cutoff_date:
old_indices.append(index_name)
except ValueError:
logger.warning(f"Could not parse date from index: {index_name}")

logger.info(f"Found {len(old_indices)} indices to delete")
return old_indices

async def _delete_indices(self, client, indices: List[str]) -> int:
"""Delete the specified indices."""
deleted_count = 0

for index_name in indices:
try:
await client.indices.delete(index=index_name)
logger.info(f"Deleted index: {index_name}")
deleted_count += 1
except Exception as e:
logger.error(f"Failed to delete index {index_name}: {e}")

return deleted_count

Support

For worker development assistance:

  • Review the worker examples in backend/workers/examples/
  • Check the test suite for patterns: backend/workers/tests/
  • Reference the epic documentation: features/audit-module/epics/06-workers-infrastructure.md
  • Check the troubleshooting section above