Engineering Notes

Engineering Notes

Thoughts and Ideas on AI by Muthukrishnan

Agent Debugging and Observability for Seeing Inside the Black Box

05 Nov 2025

When your AI agent fails, hallucinates, or makes unexpected decisions, how do you find out why? Unlike traditional software where you can step through code line-by-line, AI agents operate through chains of LLM calls, tool invocations, and state transitions that can be opaque and non-deterministic. This article explores the essential techniques for making agent behavior visible, debuggable, and monitorable.

Concept Introduction

Observability means instrumenting your agent so you can see what it’s thinking, what tools it’s calling, and why it made specific decisions. Unlike traditional software where you can step through code line-by-line, AI agents operate through chains of LLM calls, tool invocations, and state transitions that are opaque and non-deterministic.

Agent observability encompasses several layers:

Unlike traditional application observability (metrics, logs, traces), agent observability must capture the semantic content of LLM interactions, not just HTTP status codes.

Historical & Theoretical Context

The challenge of AI system debugging isn’t new. In the 1980s, expert systems faced similar opacity problems: rules fired in unexpected orders, and inference chains were hard to trace. This led to the development of explanation systems that could justify their reasoning.

Modern agent observability evolved from three traditions:

The key insight: agents are both programs (requiring traditional debugging) and cognitive systems (requiring semantic understanding).

Core Observability Patterns

The Trace Hierarchy

Every agent execution forms a tree of operations:

Agent Run
├── LLM Call #1 (planning)
│   ├── Prompt construction
│   ├── API request
│   └── Response parsing
├── Tool Call: search_database
│   ├── Input validation
│   ├── Query execution
│   └── Result formatting
├── LLM Call #2 (synthesis)
│   └── ...
└── Final response

Each node should capture:

The Event Structure Pattern

from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime

@dataclass
class AgentEvent:
    event_id: str
    parent_id: Optional[str]  # For nesting
    event_type: str  # "llm_call", "tool_use", "decision"
    timestamp: datetime
    inputs: dict[str, Any]
    outputs: dict[str, Any]
    metadata: dict[str, Any]

    def to_json(self) -> dict:
        """Serialize for storage/analysis"""
        return {
            "id": self.event_id,
            "parent": self.parent_id,
            "type": self.event_type,
            "time": self.timestamp.isoformat(),
            "inputs": self.inputs,
            "outputs": self.outputs,
            "meta": self.metadata
        }

Design Patterns & Architectures

The Decorator Pattern wraps agent operations with observability:

from functools import wraps
import time
import uuid

def trace_operation(operation_type: str):
    """Decorator to automatically trace agent operations"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            event_id = str(uuid.uuid4())
            start_time = time.time()

            # Capture inputs
            event = AgentEvent(
                event_id=event_id,
                parent_id=get_current_context().parent_id,
                event_type=operation_type,
                timestamp=datetime.now(),
                inputs={"args": args, "kwargs": kwargs},
                outputs={},
                metadata={}
            )

            try:
                result = func(*args, **kwargs)
                event.outputs = {"result": result}
                event.metadata["success"] = True
                return result
            except Exception as e:
                event.outputs = {"error": str(e)}
                event.metadata["success"] = False
                raise
            finally:
                event.metadata["duration_ms"] = (time.time() - start_time) * 1000
                log_event(event)

        return wrapper
    return decorator

Context Propagation uses thread-local context to maintain trace hierarchy:

from contextvars import ContextVar

# Context variable for current trace
current_trace_context: ContextVar[dict] = ContextVar('trace_context')

class TraceContext:
    def __init__(self, parent_id: Optional[str] = None):
        self.parent_id = parent_id or str(uuid.uuid4())
        self.events = []

    def __enter__(self):
        self.token = current_trace_context.set(self)
        return self

    def __exit__(self, *args):
        current_trace_context.reset(self.token)
        # Flush events to storage
        flush_events(self.events)

Practical Application

Here’s a complete example using LangChain with observability:

from langchain.agents import AgentExecutor, create_openai_functions_agent
from langchain_openai import ChatOpenAI
from langchain.tools import Tool
from langchain.callbacks.base import BaseCallbackHandler

class ObservabilityCallback(BaseCallbackHandler):
    """Custom callback to capture agent execution events"""

    def __init__(self):
        self.events = []
        self.current_chain_id = None

    def on_llm_start(self, serialized, prompts, **kwargs):
        event_id = str(uuid.uuid4())
        self.events.append({
            "id": event_id,
            "type": "llm_start",
            "timestamp": datetime.now().isoformat(),
            "prompts": prompts,
            "model": serialized.get("id", ["unknown"])[-1]
        })
        return event_id

    def on_llm_end(self, response, **kwargs):
        self.events.append({
            "type": "llm_end",
            "timestamp": datetime.now().isoformat(),
            "generations": [gen.text for gen in response.generations[0]],
            "tokens": response.llm_output.get("token_usage", {})
        })

    def on_tool_start(self, serialized, input_str, **kwargs):
        self.events.append({
            "type": "tool_start",
            "timestamp": datetime.now().isoformat(),
            "tool": serialized.get("name"),
            "input": input_str
        })

    def on_tool_end(self, output, **kwargs):
        self.events.append({
            "type": "tool_end",
            "timestamp": datetime.now().isoformat(),
            "output": output
        })

    def get_trace(self):
        """Return complete trace as structured data"""
        return {
            "events": self.events,
            "total_tokens": sum(
                e.get("tokens", {}).get("total_tokens", 0)
                for e in self.events
            ),
            "duration_ms": self._calculate_duration()
        }

    def _calculate_duration(self):
        if len(self.events) < 2:
            return 0
        start = datetime.fromisoformat(self.events[0]["timestamp"])
        end = datetime.fromisoformat(self.events[-1]["timestamp"])
        return (end - start).total_seconds() * 1000

# Usage
callback = ObservabilityCallback()
llm = ChatOpenAI(temperature=0, callbacks=[callback])

agent = create_openai_functions_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, callbacks=[callback])

result = executor.invoke({"input": "What's the weather in Paris?"})

# Get complete trace
trace = callback.get_trace()
print(f"Total tokens used: {trace['total_tokens']}")
print(f"Execution time: {trace['duration_ms']}ms")
print(f"Event sequence: {[e['type'] for e in trace['events']]}")

Latest Developments & Research

OpenTelemetry for LLMs (2024)

The OpenTelemetry project added semantic conventions for LLM observability:

from opentelemetry import trace
from opentelemetry.semconv.trace import SpanAttributes

tracer = trace.get_tracer(__name__)

with tracer.start_as_current_span("llm_call") as span:
    span.set_attribute("llm.system", "openai")
    span.set_attribute("llm.request.model", "gpt-4")
    span.set_attribute("llm.request.temperature", 0.7)

    response = llm.invoke(prompt)

    span.set_attribute("llm.response.tokens", response.usage.total_tokens)
    span.set_attribute("llm.response.finish_reason", response.choices[0].finish_reason)

Agent Replay Systems (2024)

Frameworks like LangGraph now support deterministic replay:

# Record execution
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
result = app.invoke(input, config={"configurable": {"thread_id": "1"}})

# Replay exact same execution
replay = app.replay(thread_id="1", step=3)  # Replay from step 3

Research Directions

Cross-Disciplinary Insight

Agent observability mirrors control theory from engineering. In a control system (thermostat, autopilot), you need:

  1. State estimation: What’s the current state? (Agent observability)
  2. Error detection: Is behavior deviating? (Monitoring/alerting)
  3. Feedback loops: Adjust parameters based on observations (Prompt tuning)

Like a control engineer designing a dashboard, you’re building instrumentation to understand a complex dynamic system. The key difference is that your “system” makes decisions through learned patterns rather than explicit equations.

Daily Challenge

Exercise: Build a Trace Analyzer

Create a tool that takes an agent trace (JSON format) and answers:

  1. Which step took the longest?
  2. What was the token cost breakdown by operation?
  3. Were there any retry attempts? Why?
  4. Visualize the execution timeline

Starter code:

def analyze_trace(trace_file: str):
    """Analyze agent execution trace"""
    with open(trace_file) as f:
        trace = json.load(f)

    # Your implementation here:
    # 1. Calculate durations
    # 2. Sum token costs
    # 3. Detect retries (failed then successful tool calls)
    # 4. Generate timeline visualization (matplotlib or mermaid)

    return {
        "slowest_step": ...,
        "token_breakdown": {...},
        "retry_events": [...],
        "timeline": "..."
    }

Bonus: Implement a “diff” function that compares two traces and highlights differences.

References & Further Reading

Papers

Tools & Frameworks

Blog Posts

GitHub Repositories


● Intelligence at Every Action

AI Native
Project Management

Stop using tools that bolt on AI as an afterthought. Jovis is built AI-first — smart routing, proactive monitoring, and intelligent workflows from the ground up.