Datadog 기반 모니터링, APM, 로깅, 캐싱, 스케줄링 패턴 레퍼런스. Use when: Datadog 설정, APM 트레이싱, ddtrace 설정, 트레이스 연결, 로그 설정, structlog 설정, JSON 로그, 로그 포맷, Datadog 로그 수집, 헬스체크 엔드포인트, /health 만들기, 상태 확인 API, 커스텀 메트릭, DogStatsD, 요청 추적, request ID, X-Request-ID, 요청 로깅 미들웨어, 응답 시간 측정, RequestLoggingMiddleware, 캐시 설정, 캐시 전략, cashews 캐시, redis 캐시, 캐시 무효화, TTL 설정, 스케줄링, 정기 작업, 크론잡, APScheduler, BackgroundTasks, Celery 워커. NOT for: Prometheus 설정, Grafana 대시보드, 일반 Redis 사용법.
Install
npx skillscat add nomik94/claude-code-preset/monitoring Install via the SkillsCat registry.
SKILL.md
모니터링 스킬 (Datadog)
Datadog APM (ddtrace)
설치 및 기본 설정
poetry add ddtrace# app/main.py
from ddtrace import patch_all, tracer
patch_all()
tracer.configure(
hostname="localhost", # DD Agent 호스트 (컨테이너: datadog-agent)
port=8126, # DD Agent APM 포트
)
app = FastAPI()자동 계측
ddtrace-run으로 실행하면 FastAPI, SQLAlchemy, httpx, redis 등 자동 계측:
ddtrace-run uvicorn app.main:app --host 0.0.0.0 --port 8000Docker Compose에 Datadog Agent 추가
services:
datadog-agent:
image: gcr.io/datadoghq/agent:7
environment:
DD_API_KEY: ${DD_API_KEY}
DD_SITE: "datadoghq.com" # 또는 ap1.datadoghq.com (Asia)
DD_APM_ENABLED: "true"
DD_APM_NON_LOCAL_TRAFFIC: "true"
DD_LOGS_ENABLED: "true"
DD_LOGS_CONFIG_CONTAINER_COLLECT_ALL: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
- /proc/:/host/proc/:ro
- /sys/fs/cgroup/:/host/sys/fs/cgroup:ro
ports:
- "8126:8126" # APM
- "8125:8125" # DogStatsD
app:
build: .
command: ddtrace-run uvicorn app.main:app --host 0.0.0.0 --port 8000
environment:
DD_SERVICE: "my-api"
DD_ENV: ${DD_ENV:-local}
DD_VERSION: ${APP_VERSION:-0.1.0}
DD_TRACE_AGENT_HOSTNAME: "datadog-agent"
DD_LOGS_INJECTION: "true" # 로그에 trace_id 자동 주입
labels:
com.datadoghq.ad.logs: '[{"source": "python", "service": "my-api"}]'
depends_on:
- datadog-agent커스텀 스팬
from ddtrace import tracer
@tracer.wrap(service="my-api", resource="process_order")
async def process_order(order_id: int) -> OrderEntity:
...
async def complex_operation():
with tracer.trace("custom.operation", service="my-api") as span:
span.set_tag("order_id", order_id)
span.set_tag("user_id", user_id)
result = await do_work()
span.set_metric("items_count", len(result.items))
return result구조화 로깅 (structlog + Datadog)
Datadog 연동 설정
import structlog
from ddtrace import tracer
def add_datadog_trace_context(logger, method_name, event_dict):
"""structlog에 Datadog trace/span ID 자동 주입"""
span = tracer.current_span()
if span:
event_dict["dd.trace_id"] = str(span.trace_id)
event_dict["dd.span_id"] = str(span.span_id)
event_dict["dd.service"] = span.service
event_dict["dd.env"] = tracer.config.env or ""
event_dict["dd.version"] = tracer.config.version or ""
return event_dict
def setup_logging(debug: bool = False):
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.UnicodeDecoder(),
add_datadog_trace_context,
]
if debug:
renderer = structlog.dev.ConsoleRenderer(colors=True)
else:
renderer = structlog.processors.JSONRenderer()
structlog.configure(
processors=[*shared_processors, renderer],
wrapper_class=structlog.stdlib.BoundLogger,
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
)사용 예시
logger = structlog.get_logger()
logger.info("user_created", user_id=42, email="user@example.com")
# JSON 출력 (Datadog에서 자동 파싱):
# {"event": "user_created", "user_id": 42, "dd.trace_id": "123...", "dd.span_id": "456...", ...}커스텀 메트릭 (DogStatsD)
from datadog import DogStatsd
statsd = DogStatsd(host="datadog-agent", port=8125)
statsd.increment("api.request.count", tags=["endpoint:/users", "method:GET"])
statsd.histogram("api.response_time", elapsed_ms, tags=["endpoint:/users"])
statsd.gauge("db.connection_pool.active", pool.checkedin(), tags=["db:primary"])
statsd.set("api.unique_users", user_id, tags=["env:production"])요청 로깅 미들웨어
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
request_id = request.headers.get("X-Request-ID", str(uuid.uuid4())[:8])
start = time.perf_counter()
structlog.contextvars.bind_contextvars(
request_id=request_id, method=request.method, path=request.url.path)
span = tracer.current_span()
if span:
span.set_tag("http.request_id", request_id)
response = await call_next(request)
elapsed = time.perf_counter() - start
logger.info("request_completed", status_code=response.status_code,
elapsed_ms=round(elapsed * 1000, 2))
statsd.histogram("http.request.duration", elapsed * 1000,
tags=[f"path:{request.url.path}", f"status:{response.status_code}"])
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = f"{elapsed:.4f}"
return response헬스 체크
@router.get("/health")
async def health_check(db: AsyncSession = Depends(get_db)):
components = {}
try:
await db.execute(text("SELECT 1"))
components["database"] = {"status": "up"}
except Exception as e:
components["database"] = {"status": "down", "error": str(e)}
try:
r = aioredis.from_url(settings.redis.url)
await r.ping()
components["redis"] = {"status": "up"}
except Exception as e:
components["redis"] = {"status": "down", "error": str(e)}
overall = "up" if all(c["status"] == "up" for c in components.values()) else "degraded"
statsd.service_check("api.health", 0 if overall == "up" else 2,
tags=[f"component:{k}" for k in components])
return {"status": overall, "version": settings.app_version, "components": components}스케줄링
# APScheduler — periodic
scheduler = AsyncIOScheduler()
scheduler.add_job(cleanup_expired_tokens, CronTrigger(hour=3), id="cleanup")
scheduler.add_job(sync_external_data, IntervalTrigger(minutes=30), id="sync")
# BackgroundTasks — one-off
background_tasks.add_task(send_welcome_email, user.email)
# Celery — heavy async (ddtrace 자동 계측)
@celery.task(bind=True, max_retries=3)
def generate_report(self, report_id: int):
try: ...
except Exception as exc: self.retry(exc=exc, countdown=60)캐싱
# Declarative (cashews)
@cache(ttl="10m", key="user:{user_id}")
async def get_by_id(self, user_id: int) -> dict: ...
@cache.invalidate("user:{user_id}")
@cache.invalidate("users:list:*")
async def update(self, user_id: int, data) -> UserEntity: ...
# Manual (redis.asyncio)
class CacheService:
async def get(self, key: str) -> dict | None: ...
async def set(self, key: str, value: dict, ttl: int = 300) -> None: ...
async def delete_pattern(self, pattern: str) -> None: ...