From 6e5c56e5268f355e32ea62d6a4fedd5552dd7e66 Mon Sep 17 00:00:00 2001 From: Suhani Nagpal Date: Fri, 22 May 2026 15:44:20 +0530 Subject: [PATCH 1/5] chore: scaffold agent-framework integration package Empty file skeleton for the Microsoft Agent Framework integration under python/frameworks/agent-framework. Layout mirrors traceai_pipecat (exporter-swap pattern) with integration.py for the public swap API and exporters/ for the mapped HTTP/gRPC exporters. --- python/frameworks/agent-framework/README.md | 0 python/frameworks/agent-framework/examples/basic_agent.py | 0 python/frameworks/agent-framework/pyproject.toml | 0 python/frameworks/agent-framework/tests/__init__.py | 0 python/frameworks/agent-framework/tests/test_base_exporter.py | 0 python/frameworks/agent-framework/tests/test_integration.py | 0 .../agent-framework/traceai_agent_framework/__init__.py | 0 .../agent-framework/traceai_agent_framework/exporters/__init__.py | 0 .../traceai_agent_framework/exporters/base_exporter.py | 0 .../traceai_agent_framework/exporters/grpc_exporter.py | 0 .../traceai_agent_framework/exporters/http_exporter.py | 0 .../agent-framework/traceai_agent_framework/integration.py | 0 .../frameworks/agent-framework/traceai_agent_framework/version.py | 0 13 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 python/frameworks/agent-framework/README.md create mode 100644 python/frameworks/agent-framework/examples/basic_agent.py create mode 100644 python/frameworks/agent-framework/pyproject.toml create mode 100644 python/frameworks/agent-framework/tests/__init__.py create mode 100644 python/frameworks/agent-framework/tests/test_base_exporter.py create mode 100644 python/frameworks/agent-framework/tests/test_integration.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/__init__.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/__init__.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/base_exporter.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/grpc_exporter.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/http_exporter.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/integration.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/version.py diff --git a/python/frameworks/agent-framework/README.md b/python/frameworks/agent-framework/README.md new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/examples/basic_agent.py b/python/frameworks/agent-framework/examples/basic_agent.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/pyproject.toml b/python/frameworks/agent-framework/pyproject.toml new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/tests/__init__.py b/python/frameworks/agent-framework/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/tests/test_base_exporter.py b/python/frameworks/agent-framework/tests/test_base_exporter.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/tests/test_integration.py b/python/frameworks/agent-framework/tests/test_integration.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/traceai_agent_framework/__init__.py b/python/frameworks/agent-framework/traceai_agent_framework/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/__init__.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/base_exporter.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/base_exporter.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/grpc_exporter.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/grpc_exporter.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/http_exporter.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/http_exporter.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/traceai_agent_framework/integration.py b/python/frameworks/agent-framework/traceai_agent_framework/integration.py new file mode 100644 index 00000000..e69de29b diff --git a/python/frameworks/agent-framework/traceai_agent_framework/version.py b/python/frameworks/agent-framework/traceai_agent_framework/version.py new file mode 100644 index 00000000..e69de29b From b2fce16dcca4c039b5369b647ce2c54d15d81476 Mon Sep 17 00:00:00 2001 From: Suhani Nagpal Date: Mon, 25 May 2026 18:05:38 +0530 Subject: [PATCH 2/5] feat: add Microsoft Agent Framework integration Installs an AgentFrameworkSpanProcessor on the user's tracer provider to re-key the framework's native gen_ai.* spans into Future AGI conventions (gen_ai.span.kind, input.value/output.value with mime types, flattened per-message attrs, derived total_tokens). For chain spans the framework leaves without I/O (workflow.run, executor.process, etc.), bubbles input/output up from the earliest/latest descendant. Public API: enable_fi_attribute_mapping(tracer_provider=None) and the AgentFrameworkSpanProcessor class. --- .../frameworks/agent-framework/CHANGELOG.md | 3 + python/frameworks/agent-framework/README.md | 78 +++ .../agent-framework/examples/basic_agent.py | 57 ++ .../agent-framework/examples/requirements.txt | 4 + .../examples/workflow_multi_agent.py | 86 +++ .../frameworks/agent-framework/pyproject.toml | 25 + .../tests/_fixtures/sample_spans.json | 103 ++++ .../tests/test_base_exporter.py | 0 .../agent-framework/tests/test_e2e_agent.py | 215 ++++++++ .../agent-framework/tests/test_integration.py | 136 +++++ .../agent-framework/tests/test_processor.py | 508 ++++++++++++++++++ .../traceai_agent_framework/__init__.py | 17 + .../exporters/__init__.py | 0 .../exporters/base_exporter.py | 0 .../exporters/grpc_exporter.py | 0 .../exporters/http_exporter.py | 0 .../traceai_agent_framework/integration.py | 103 ++++ .../traceai_agent_framework/processor.py | 394 ++++++++++++++ .../traceai_agent_framework/version.py | 1 + 19 files changed, 1730 insertions(+) create mode 100644 python/frameworks/agent-framework/CHANGELOG.md create mode 100644 python/frameworks/agent-framework/examples/requirements.txt create mode 100644 python/frameworks/agent-framework/examples/workflow_multi_agent.py create mode 100644 python/frameworks/agent-framework/tests/_fixtures/sample_spans.json delete mode 100644 python/frameworks/agent-framework/tests/test_base_exporter.py create mode 100644 python/frameworks/agent-framework/tests/test_e2e_agent.py create mode 100644 python/frameworks/agent-framework/tests/test_processor.py delete mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/__init__.py delete mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/base_exporter.py delete mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/grpc_exporter.py delete mode 100644 python/frameworks/agent-framework/traceai_agent_framework/exporters/http_exporter.py create mode 100644 python/frameworks/agent-framework/traceai_agent_framework/processor.py diff --git a/python/frameworks/agent-framework/CHANGELOG.md b/python/frameworks/agent-framework/CHANGELOG.md new file mode 100644 index 00000000..0d85dcdb --- /dev/null +++ b/python/frameworks/agent-framework/CHANGELOG.md @@ -0,0 +1,3 @@ +## [0.1.0] - 2026-05-25 +### Feature +- Added support for Microsoft Agent Framework instrumentation. diff --git a/python/frameworks/agent-framework/README.md b/python/frameworks/agent-framework/README.md index e69de29b..bc87681f 100644 --- a/python/frameworks/agent-framework/README.md +++ b/python/frameworks/agent-framework/README.md @@ -0,0 +1,78 @@ +# Microsoft Agent Framework OpenTelemetry Integration + +## Overview +This integration provides support for using OpenTelemetry with Microsoft Agent Framework. It enables tracing and monitoring of applications built with Agent Framework. + +## Installation + +1. **Install traceAI Agent Framework** + +```bash +pip install traceAI-agent-framework +``` + +2. **Install Microsoft Agent Framework** + +```bash +pip install agent-framework +``` + + +### Set Environment Variables +Set up your environment variables to authenticate with FutureAGI + +```python +import os + +os.environ["FI_API_KEY"] = FI_API_KEY +os.environ["FI_SECRET_KEY"] = FI_SECRET_KEY +os.environ["OPENAI_API_KEY"] = OPENAI_API_KEY +``` + +## Quickstart + +### Register Tracer Provider +Set up the trace provider to establish the observability pipeline. The trace provider: + +```python +from fi_instrumentation import register +from fi_instrumentation.fi_types import ProjectType + +trace_provider = register( + project_type=ProjectType.OBSERVE, + project_name="agent_framework_app", + set_global_tracer_provider=True, +) +``` + +### Configure Agent Framework Instrumentation +Turn on Agent Framework's native OpenTelemetry emission and install the FI attribute mapping. + +```python +from agent_framework.observability import enable_instrumentation +from traceai_agent_framework import enable_fi_attribute_mapping + +enable_instrumentation(enable_sensitive_data=True) +enable_fi_attribute_mapping() +``` + +### Create Agent Framework Components +Set up your Agent Framework client with built-in observability. + +```python +import asyncio +from agent_framework import Agent +from agent_framework.openai import OpenAIChatClient + +agent = Agent( + OpenAIChatClient(model="gpt-4o-mini"), + name="weather_agent", + instructions="You are a concise weather assistant.", +) + +async def main(): + response = await agent.run("What's the weather in Paris?") + print(response) + +asyncio.run(main()) +``` \ No newline at end of file diff --git a/python/frameworks/agent-framework/examples/basic_agent.py b/python/frameworks/agent-framework/examples/basic_agent.py index e69de29b..d4282889 100644 --- a/python/frameworks/agent-framework/examples/basic_agent.py +++ b/python/frameworks/agent-framework/examples/basic_agent.py @@ -0,0 +1,57 @@ +"""Minimal Microsoft Agent Framework example traced into Future AGI. + +Run with: + export FI_API_KEY=... + export FI_SECRET_KEY=... + export OPENAI_API_KEY=... + python examples/basic_agent.py +""" + +import asyncio +import os + +from agent_framework import Agent +from agent_framework.observability import enable_instrumentation +from agent_framework.openai import OpenAIChatClient +from fi_instrumentation import register +from fi_instrumentation.fi_types import ProjectType + +from traceai_agent_framework import enable_fi_attribute_mapping + + +def main() -> None: + if not os.getenv("OPENAI_API_KEY"): + raise SystemExit("Set OPENAI_API_KEY before running this example.") + + # 1) FI tracer provider on the global slot so agent_framework reads from there. + register( + project_type=ProjectType.OBSERVE, + project_name="agent-framework-basic-example", + set_global_tracer_provider=True, + ) + + # 2) Agent Framework's own observability + sensitive-data opt-in so messages + # are captured in spans (required to see prompts/completions in FI). + enable_instrumentation(enable_sensitive_data=True) + + # 3) Install our SpanProcessor on the FI tracer provider; it re-keys + # Agent Framework's gen_ai.* spans into FI conventions as they end. + enable_fi_attribute_mapping() + + # 4) Build an agent. Anything you do with it from here on emits traced spans. + agent = Agent( + OpenAIChatClient(), + name="weather_agent", + description="Answers questions about weather.", + instructions="You are a concise, friendly weather assistant.", + ) + + async def run() -> None: + response = await agent.run("What's the weather like in Paris in spring?") + print("Agent response:\n", response) + + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/python/frameworks/agent-framework/examples/requirements.txt b/python/frameworks/agent-framework/examples/requirements.txt new file mode 100644 index 00000000..a29dc375 --- /dev/null +++ b/python/frameworks/agent-framework/examples/requirements.txt @@ -0,0 +1,4 @@ +traceAI-agent-framework>=0.1.0 +agent-framework>=1.0.0 +fi-instrumentation-otel>=0.1.14 +openai>=1.0.0 diff --git a/python/frameworks/agent-framework/examples/workflow_multi_agent.py b/python/frameworks/agent-framework/examples/workflow_multi_agent.py new file mode 100644 index 00000000..79334f9b --- /dev/null +++ b/python/frameworks/agent-framework/examples/workflow_multi_agent.py @@ -0,0 +1,86 @@ +"""Multi-agent workflow example traced into Future AGI. + +Two agents handing off through a WorkflowBuilder graph: + researcher → summarizer → output + +Run with: + export FI_API_KEY=... + export FI_SECRET_KEY=... + export OPENAI_API_KEY=... + python examples/workflow_multi_agent.py + +In the FI dashboard you should see a ``workflow.run`` (CHAIN) span containing +nested ``executor.process`` (CHAIN) spans and ``invoke_agent`` (AGENT) spans +for each agent invocation. +""" + +import asyncio +import os + +from agent_framework import Agent, WorkflowBuilder, WorkflowContext +from agent_framework._workflows._function_executor import executor +from agent_framework.observability import enable_instrumentation +from agent_framework.openai import OpenAIChatClient +from fi_instrumentation import register +from fi_instrumentation.fi_types import ProjectType + +from traceai_agent_framework import enable_fi_attribute_mapping + + +# --- Workflow nodes --------------------------------------------------------- + + +@executor(id="researcher") +async def research(topic: str, ctx: WorkflowContext[str]) -> None: + """Step 1: ask the research agent for a few bullet points.""" + agent = Agent( + OpenAIChatClient(), + name="researcher", + instructions="You are a research assistant. Return 3 short bullet points.", + ) + response = await agent.run(f"Research this topic: {topic}") + await ctx.send_message(str(response)) + + +@executor(id="summarizer") +async def summarize(notes: str, ctx: WorkflowContext[None, str]) -> None: + """Step 2: ask the summarizer agent to compress to one sentence.""" + agent = Agent( + OpenAIChatClient(), + name="summarizer", + instructions="Compress the input into a single concise sentence.", + ) + response = await agent.run(notes) + await ctx.yield_output(str(response)) + + +# --- Main ------------------------------------------------------------------- + + +def main() -> None: + if not os.getenv("OPENAI_API_KEY"): + raise SystemExit("Set OPENAI_API_KEY before running this example.") + + register( + project_type=ProjectType.OBSERVE, + project_name="agent-framework-workflow-example", + set_global_tracer_provider=True, + ) + enable_instrumentation(enable_sensitive_data=True) + enable_fi_attribute_mapping() + + workflow = ( + WorkflowBuilder(start_executor=research) + .add_edge(research, summarize) + .build() + ) + + async def run() -> None: + result = await workflow.run("the discovery of penicillin") + print("Workflow result:\n", result) + + asyncio.run(run()) + + +if __name__ == "__main__": + main() diff --git a/python/frameworks/agent-framework/pyproject.toml b/python/frameworks/agent-framework/pyproject.toml index e69de29b..3f52e614 100644 --- a/python/frameworks/agent-framework/pyproject.toml +++ b/python/frameworks/agent-framework/pyproject.toml @@ -0,0 +1,25 @@ +[tool.poetry] +name = "traceAI-agent-framework" +version = "0.1.0" +description = "OpenTelemetry instrumentation for Microsoft Agent Framework" +authors = ["Future AGI "] +readme = "README.md" +packages = [ + { include = "traceai_agent_framework" } +] + +[tool.poetry.dependencies] +python = ">=3.10,<3.15" +fi-instrumentation-otel = ">=0.1.14" + +[tool.poetry.group.dev.dependencies] +pytest = "^8.0.0" +pytest-asyncio = "^0.23.0" +agent-framework = ">=1.0.0" + +[tool.pytest.ini_options] +asyncio_mode = "auto" + +[build-system] +requires = ["poetry-core"] +build-backend = "poetry.core.masonry.api" diff --git a/python/frameworks/agent-framework/tests/_fixtures/sample_spans.json b/python/frameworks/agent-framework/tests/_fixtures/sample_spans.json new file mode 100644 index 00000000..838a2a63 --- /dev/null +++ b/python/frameworks/agent-framework/tests/_fixtures/sample_spans.json @@ -0,0 +1,103 @@ +[ + { + "name": "execute_tool get_weather", + "scope": "agent_framework", + "kind": "INTERNAL", + "status": "UNSET", + "attributes": { + "gen_ai.operation.name": "execute_tool", + "gen_ai.tool.name": "get_weather", + "gen_ai.tool.call.id": "call-123", + "gen_ai.tool.type": "function", + "gen_ai.tool.description": "Get the current weather for a city.", + "gen_ai.tool.call.result": "The weather in Paris is sunny, 22 degrees.", + "gen_ai.tool.call.arguments": "{\"city\": \"Paris\"}" + } + }, + { + "name": "invoke_agent weather_agent", + "scope": "agent_framework", + "kind": "INTERNAL", + "status": "UNSET", + "attributes": { + "gen_ai.input.messages": "[{\"role\": \"user\", \"parts\": [{\"type\": \"text\", \"content\": \"What's the weather in Paris?\"}]}]", + "gen_ai.system_instructions": "[{\"type\": \"text\", \"content\": \"You are a helpful weather assistant.\"}]", + "gen_ai.request.choice.count": 1, + "gen_ai.operation.name": "invoke_agent", + "gen_ai.provider.name": "microsoft.agent_framework", + "gen_ai.agent.id": "396693c8-ed0d-44b1-9ad9-ae5b0f9b4ad5", + "gen_ai.agent.name": "weather_agent", + "gen_ai.agent.description": "Tells you the weather", + "gen_ai.tool.definitions": "[{\"type\": \"function\", \"function\": {\"name\": \"get_weather\", \"description\": \"Get the current weather for a city.\", \"parameters\": {\"properties\": {\"city\": {\"title\": \"City\", \"type\": \"string\"}}, \"required\": [\"city\"], \"title\": \"get_weather_input\", \"type\": \"object\"}}}]", + "gen_ai.response.id": "resp-abc", + "gen_ai.output.messages": "[{\"role\": \"assistant\", \"parts\": [{\"type\": \"text\", \"content\": \"The weather in Paris is sunny.\"}]}]" + } + }, + { + "name": "workflow.build", + "scope": "agent_framework", + "kind": "INTERNAL", + "status": "UNSET", + "attributes": { + "workflow_builder.name": "WorkflowBuilder-d9fed0bb-2d5e-4a93-a5c3-e116d5ed366f", + "workflow.id": "a86ba8e4-0a04-49fd-915a-902f18b83605", + "workflow.definition": "{\"name\": \"WorkflowBuilder-d9fed0bb-2d5e-4a93-a5c3-e116d5ed366f\", \"id\": \"a86ba8e4-0a04-49fd-915a-902f18b83605\", \"start_executor_id\": \"upper\", \"max_iterations\": 100, \"edge_groups\": [{\"id\": \"InternalEdgeGroup/1d68d9bb-9f75-496e-ad64-a1215e304d96\", \"type\": \"InternalEdgeGroup\", \"edges\": [{\"source_id\": \"internal:upper\", \"target_id\": \"upper\"}]}, {\"id\": \"InternalEdgeGroup/481bb9d6-475e-444f-ac05-a0a0739dcc90\", \"type\": \"InternalEdgeGroup\", \"edges\": [{\"source_id\": \"internal:exclaim\", \"target_id\": \"exclaim\"}]}, {\"id\": \"SingleEdgeGroup/9d7bd9c3-be19-4bf8-9589-4e331c7c02ce\", \"type\": \"SingleEdgeGroup\", \"edges\": [{\"source_id\": \"upper\", \"target_id\": \"exclaim\"}]}], \"executors\": {\"upper\": {\"id\": \"upper\", \"type\": \"FunctionExecutor\"}, \"exclaim\": {\"id\": \"exclaim\", \"type\": \"FunctionExecutor\"}}, \"output_executors\": null, \"intermediate_executors\": null}" + } + }, + { + "name": "message.send", + "scope": "agent_framework", + "kind": "PRODUCER", + "status": "UNSET", + "attributes": { + "message.type": "str" + } + }, + { + "name": "executor.process upper", + "scope": "agent_framework", + "kind": "INTERNAL", + "status": "UNSET", + "attributes": { + "executor.id": "upper", + "executor.type": "FunctionExecutor", + "message.type": "MessageType.STANDARD", + "message.payload_type": "str" + } + }, + { + "name": "edge_group.process SingleEdgeGroup", + "scope": "agent_framework", + "kind": "INTERNAL", + "status": "UNSET", + "attributes": { + "edge_group.type": "SingleEdgeGroup", + "edge_group.id": "SingleEdgeGroup/9d7bd9c3-be19-4bf8-9589-4e331c7c02ce", + "message.source_id": "upper", + "edge_group.delivered": true, + "edge_group.delivery_status": "delivered" + } + }, + { + "name": "executor.process exclaim", + "scope": "agent_framework", + "kind": "INTERNAL", + "status": "UNSET", + "attributes": { + "executor.id": "exclaim", + "executor.type": "FunctionExecutor", + "message.type": "MessageType.STANDARD", + "message.payload_type": "WorkflowMessage" + } + }, + { + "name": "workflow.run", + "scope": "agent_framework", + "kind": "INTERNAL", + "status": "UNSET", + "attributes": { + "workflow.id": "a86ba8e4-0a04-49fd-915a-902f18b83605", + "workflow.name": "WorkflowBuilder-d9fed0bb-2d5e-4a93-a5c3-e116d5ed366f" + } + } +] \ No newline at end of file diff --git a/python/frameworks/agent-framework/tests/test_base_exporter.py b/python/frameworks/agent-framework/tests/test_base_exporter.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/frameworks/agent-framework/tests/test_e2e_agent.py b/python/frameworks/agent-framework/tests/test_e2e_agent.py new file mode 100644 index 00000000..40156b57 --- /dev/null +++ b/python/frameworks/agent-framework/tests/test_e2e_agent.py @@ -0,0 +1,215 @@ +"""End-to-end tests: real Agent + real Workflow through the mapping exporter. + +These tests are gated on ``agent-framework`` being installed (Phase 4 dep). +They use an offline stub chat client so no API keys / HTTP are required. +""" + +import os + +import pytest + +os.environ.setdefault("FI_API_KEY", "test-key") +os.environ.setdefault("FI_SECRET_KEY", "test-secret") + +from opentelemetry import trace as trace_api # noqa: E402 +from opentelemetry.sdk.trace import TracerProvider # noqa: E402 +from opentelemetry.sdk.trace.export import SimpleSpanProcessor # noqa: E402 +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( # noqa: E402 + InMemorySpanExporter, +) + +agent_framework = pytest.importorskip("agent_framework") + +from agent_framework import ( # noqa: E402 + Agent, + BaseChatClient, + ChatResponse, + Content, + Message, + Role, + WorkflowBuilder, + WorkflowContext, +) +from agent_framework._workflows._function_executor import executor # noqa: E402 +from agent_framework.observability import ( # noqa: E402 + OtelAttr, + enable_instrumentation, + get_function_span, +) + +from traceai_agent_framework import AgentFrameworkSpanProcessor # noqa: E402 + + +class _StubChatClient(BaseChatClient): + """Offline chat client returning a canned response. No HTTP.""" + + async def _inner_get_response(self, *, messages, stream, options, **kwargs): + return ChatResponse( + messages=[ + Message( + role=Role("assistant"), + contents=[Content.from_text("Sunny in Paris.")], + ) + ], + model="stub-v1", + response_id="resp-1", + ) + + +# Workflow executors must be module-level to be picklable / re-usable. +@executor(id="upper") +async def _to_upper(text: str, ctx: WorkflowContext[str]) -> None: + await ctx.send_message(text.upper()) + + +@executor(id="exclaim") +async def _add_exclaim(text: str, ctx: WorkflowContext[None, str]) -> None: + await ctx.yield_output(text + "!") + + +@pytest.fixture +def captured(monkeypatch): + """Reset the global tracer provider, install our SpanProcessor + an in-memory exporter.""" + monkeypatch.setattr(trace_api, "_TRACER_PROVIDER", None) + monkeypatch.setattr( + trace_api, + "_TRACER_PROVIDER_SET_ONCE", + type(trace_api._TRACER_PROVIDER_SET_ONCE)(), + ) + exporter = InMemorySpanExporter() + provider = TracerProvider() + # Our processor runs first (mutates attributes); SimpleSpanProcessor with the + # in-memory exporter runs next and captures the mutated spans. + provider.add_span_processor(AgentFrameworkSpanProcessor()) + provider.add_span_processor(SimpleSpanProcessor(exporter)) + trace_api.set_tracer_provider(provider) + enable_instrumentation(enable_sensitive_data=True, force=True) + yield exporter + + +# --------------------------------------------------------------------------- +# AGENT path +# --------------------------------------------------------------------------- + + +async def test_agent_run_emits_agent_span_with_fi_attrs(captured): + agent = Agent(_StubChatClient(), name="weather_agent", description="weather") + await agent.run("What's the weather in Paris?") + + agent_spans = [ + s for s in captured.get_finished_spans() if s.name.startswith("invoke_agent") + ] + assert len(agent_spans) == 1, [s.name for s in captured.get_finished_spans()] + + attrs = agent_spans[0].attributes + assert attrs["gen_ai.span.kind"] == "AGENT" + assert attrs["input.mime_type"] == "application/json" + assert attrs["output.mime_type"] == "application/json" + assert attrs["gen_ai.input.messages.0.message.role"] == "user" + assert attrs["gen_ai.input.messages.0.message.content"] == "What's the weather in Paris?" + assert attrs["gen_ai.output.messages.0.message.role"] == "assistant" + assert attrs["gen_ai.output.messages.0.message.content"] == "Sunny in Paris." + # Native gen_ai.* attrs preserved. + assert attrs["gen_ai.agent.name"] == "weather_agent" + assert attrs["gen_ai.provider.name"] == "microsoft.agent_framework" + + +# --------------------------------------------------------------------------- +# WORKFLOW path +# --------------------------------------------------------------------------- + + +async def test_workflow_run_emits_chain_spans(captured): + wf = ( + WorkflowBuilder(start_executor=_to_upper) + .add_edge(_to_upper, _add_exclaim) + .build() + ) + await wf.run("hello") + + spans = captured.get_finished_spans() + + # workflow.run must exist with CHAIN kind + wf_run = next((s for s in spans if s.name == "workflow.run"), None) + assert wf_run is not None + assert wf_run.attributes["gen_ai.span.kind"] == "CHAIN" + + # at least one executor.process.* and one edge_group.process.* span, both CHAIN + executor_spans = [s for s in spans if s.name.startswith("executor.process")] + edge_spans = [s for s in spans if s.name.startswith("edge_group.process")] + assert executor_spans + assert edge_spans + assert all(s.attributes["gen_ai.span.kind"] == "CHAIN" for s in executor_spans) + assert all(s.attributes["gen_ai.span.kind"] == "CHAIN" for s in edge_spans) + + # CHAIN should NOT lift input.value / output.value + assert "input.value" not in wf_run.attributes + assert "output.value" not in wf_run.attributes + + +# --------------------------------------------------------------------------- +# TOOL path: emit a tool span via the framework's helper +# --------------------------------------------------------------------------- + + +async def test_tool_span_emitted_via_helper_carries_fi_attrs(captured): + tool_attrs = { + OtelAttr.OPERATION.value: OtelAttr.TOOL_EXECUTION_OPERATION.value, + OtelAttr.TOOL_NAME.value: "get_weather", + OtelAttr.TOOL_CALL_ID.value: "call-x", + OtelAttr.TOOL_TYPE.value: "function", + } + with get_function_span(tool_attrs) as span: + span.set_attribute(OtelAttr.TOOL_ARGUMENTS.value, '{"city": "Paris"}') + span.set_attribute(OtelAttr.TOOL_RESULT.value, "sunny, 22 degrees") + + tool_spans = [ + s for s in captured.get_finished_spans() if s.name.startswith("execute_tool") + ] + assert len(tool_spans) == 1 + attrs = tool_spans[0].attributes + assert attrs["gen_ai.span.kind"] == "TOOL" + assert attrs["input.value"] == '{"city": "Paris"}' + assert attrs["input.mime_type"] == "application/json" + assert attrs["output.value"] == "sunny, 22 degrees" + assert attrs["output.mime_type"] == "text/plain" + assert attrs["gen_ai.tool.name"] == "get_weather" + + +# --------------------------------------------------------------------------- +# All-kinds coverage in a single run +# --------------------------------------------------------------------------- + + +async def test_single_run_covers_agent_chain_and_tool(captured): + # Workflow → CHAIN + wf = ( + WorkflowBuilder(start_executor=_to_upper) + .add_edge(_to_upper, _add_exclaim) + .build() + ) + await wf.run("hello") + + # Agent → AGENT + agent = Agent(_StubChatClient(), name="weather_agent") + await agent.run("Paris weather?") + + # Tool span via helper → TOOL + with get_function_span( + { + OtelAttr.OPERATION.value: "execute_tool", + OtelAttr.TOOL_NAME.value: "get_weather", + OtelAttr.TOOL_CALL_ID.value: "call-1", + OtelAttr.TOOL_TYPE.value: "function", + } + ) as span: + span.set_attribute(OtelAttr.TOOL_RESULT.value, "ok") + + kinds = { + s.attributes.get("gen_ai.span.kind") + for s in captured.get_finished_spans() + if s.attributes is not None + } + assert "AGENT" in kinds + assert "CHAIN" in kinds + assert "TOOL" in kinds diff --git a/python/frameworks/agent-framework/tests/test_integration.py b/python/frameworks/agent-framework/tests/test_integration.py index e69de29b..e5207082 100644 --- a/python/frameworks/agent-framework/tests/test_integration.py +++ b/python/frameworks/agent-framework/tests/test_integration.py @@ -0,0 +1,136 @@ +"""Lifecycle tests for enable_fi_attribute_mapping().""" + +import os + +import pytest +from opentelemetry import trace as trace_api + +# Dummy creds so register() doesn't complain. +os.environ.setdefault("FI_API_KEY", "test-key") +os.environ.setdefault("FI_SECRET_KEY", "test-secret") + +from traceai_agent_framework import ( # noqa: E402 + AgentFrameworkSpanProcessor, + enable_fi_attribute_mapping, +) + + +def _processors_on(provider): + """Return the list of installed span processors on the given provider.""" + active = getattr(provider, "_active_span_processor", None) + if active is None: + return [] + return list(getattr(active, "_span_processors", ())) + + +def _count_our_processors(provider): + return sum( + 1 for p in _processors_on(provider) + if isinstance(p, AgentFrameworkSpanProcessor) + ) + + +@pytest.fixture +def fresh_global_provider(monkeypatch): + """Reset the global tracer provider between tests so state doesn't leak.""" + original = trace_api._TRACER_PROVIDER + monkeypatch.setattr(trace_api, "_TRACER_PROVIDER", None) + monkeypatch.setattr( + trace_api, + "_TRACER_PROVIDER_SET_ONCE", + type(trace_api._TRACER_PROVIDER_SET_ONCE)(), + ) + yield + trace_api._TRACER_PROVIDER = original + + +def _register(set_global=True): + """Helper to call register().""" + from fi_instrumentation import register + from fi_instrumentation.fi_types import ProjectType + return register( + project_type=ProjectType.OBSERVE, + project_name="test-project", + set_global_tracer_provider=set_global, + verbose=False, + ) + + +# --------------------------------------------------------------------------- +# Global-provider path +# --------------------------------------------------------------------------- + + +def test_returns_false_when_no_provider_is_installed(fresh_global_provider): + """No register() called → no real provider on the global → cannot add a processor.""" + assert enable_fi_attribute_mapping() is False + + +def test_installs_processor_on_global_provider_after_register(fresh_global_provider): + provider = _register(set_global=True) + assert _count_our_processors(provider) == 0 + + assert enable_fi_attribute_mapping() is True + assert _count_our_processors(provider) == 1 + + +def test_idempotent_install_on_same_provider(fresh_global_provider): + """Calling enable twice should not install the processor twice.""" + provider = _register(set_global=True) + assert enable_fi_attribute_mapping() is True + assert enable_fi_attribute_mapping() is False + assert _count_our_processors(provider) == 1 + + +# --------------------------------------------------------------------------- +# Explicit-provider path +# --------------------------------------------------------------------------- + + +def test_install_on_explicit_provider_does_not_require_global(fresh_global_provider): + """User who keeps set_global_tracer_provider=False can still pass the provider.""" + provider = _register(set_global=False) + assert enable_fi_attribute_mapping(tracer_provider=provider) is True + assert _count_our_processors(provider) == 1 + + +def test_idempotent_install_on_explicit_provider(fresh_global_provider): + provider = _register(set_global=False) + assert enable_fi_attribute_mapping(tracer_provider=provider) is True + assert enable_fi_attribute_mapping(tracer_provider=provider) is False + assert _count_our_processors(provider) == 1 + + +# --------------------------------------------------------------------------- +# Native-instrumentation handling +# --------------------------------------------------------------------------- + + +def test_enable_turns_on_native_instrumentation(fresh_global_provider): + """If the framework's instrumentation flag is off, our helper turns it on.""" + from agent_framework.observability import OBSERVABILITY_SETTINGS + + _register(set_global=True) + enable_fi_attribute_mapping() + assert OBSERVABILITY_SETTINGS.enable_instrumentation is True + + +def test_user_explicit_disable_is_respected(fresh_global_provider): + """If user explicitly disabled the framework's instrumentation, we must not re-enable it.""" + from agent_framework.observability import ( + OBSERVABILITY_SETTINGS, + disable_instrumentation, + enable_instrumentation as _af_enable, + ) + + disable_instrumentation() + assert OBSERVABILITY_SETTINGS.enable_instrumentation is False + + _register(set_global=True) + enable_fi_attribute_mapping() + + # Disable wins because our helper does not pass force=True. + assert OBSERVABILITY_SETTINGS.enable_instrumentation is False + + # Clean up so the explicit-disable doesn't bleed into other tests. + _af_enable(force=True) diff --git a/python/frameworks/agent-framework/tests/test_processor.py b/python/frameworks/agent-framework/tests/test_processor.py new file mode 100644 index 00000000..fe8879b0 --- /dev/null +++ b/python/frameworks/agent-framework/tests/test_processor.py @@ -0,0 +1,508 @@ +"""Unit tests for the attribute mapping helpers in processor.""" + +import json +from pathlib import Path + +import pytest + +from traceai_agent_framework.processor import ( + AgentFrameworkSpanProcessor, + _classify_span_kind, + _flatten_messages, + _map_attributes_to_fi_conventions, +) + + +FIXTURES = json.loads( + (Path(__file__).parent / "_fixtures" / "sample_spans.json").read_text() +) + + +def _fixture(name: str) -> dict: + for f in FIXTURES: + if f["name"] == name: + return dict(f["attributes"]) + raise KeyError(f"fixture not found: {name}") + + +# --------------------------------------------------------------------------- +# Pass-through behaviour +# --------------------------------------------------------------------------- + + +def test_empty_attributes_returns_empty(): + assert _map_attributes_to_fi_conventions({}) == {} + + +def test_non_agent_framework_span_passes_through_untouched(): + src = {"foo": "bar", "http.method": "POST", "custom.attr": 42} + out = _map_attributes_to_fi_conventions(src) + assert out == src + assert "gen_ai.span.kind" not in out + + +# --------------------------------------------------------------------------- +# Span-kind classification +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "op,expected", + [ + ("chat", "LLM"), + ("embeddings", "EMBEDDING"), + ("execute_tool", "TOOL"), + ("invoke_agent", "AGENT"), + ("create_agent", "AGENT"), + ], +) +def test_classify_by_operation_name(op, expected): + assert _classify_span_kind({"gen_ai.operation.name": op}) == expected + + +@pytest.mark.parametrize( + "key", + [ + "workflow.id", + "workflow_builder.name", + "executor.id", + "edge_group.id", + "message.type", + "message.source_id", + ], +) +def test_classify_workflow_attrs_as_chain(key): + assert _classify_span_kind({key: "value"}) == "CHAIN" + + +def test_classify_unknown_returns_none(): + assert _classify_span_kind({"random.key": "value"}) is None + + +# --------------------------------------------------------------------------- +# LLM enrichment (synthetic chat span; fixtures don't include chat) +# --------------------------------------------------------------------------- + + +def _make_chat_attrs(input_msgs, output_msgs, input_tokens=None, output_tokens=None): + attrs = { + "gen_ai.operation.name": "chat", + "gen_ai.request.model": "gpt-4o", + "gen_ai.input.messages": json.dumps(input_msgs), + "gen_ai.output.messages": json.dumps(output_msgs), + } + if input_tokens is not None: + attrs["gen_ai.usage.input_tokens"] = input_tokens + if output_tokens is not None: + attrs["gen_ai.usage.output_tokens"] = output_tokens + return attrs + + +def test_llm_lifts_messages_to_input_output_value(): + attrs = _make_chat_attrs( + [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}], + [{"role": "assistant", "parts": [{"type": "text", "content": "hello"}]}], + ) + out = _map_attributes_to_fi_conventions(attrs) + assert out["gen_ai.span.kind"] == "LLM" + assert out["input.value"] == attrs["gen_ai.input.messages"] + assert out["input.mime_type"] == "application/json" + assert out["output.value"] == attrs["gen_ai.output.messages"] + assert out["output.mime_type"] == "application/json" + + +def test_llm_flattens_messages_with_indexed_keys(): + attrs = _make_chat_attrs( + [ + {"role": "system", "parts": [{"type": "text", "content": "be terse"}]}, + {"role": "user", "parts": [{"type": "text", "content": "hi"}]}, + ], + [{"role": "assistant", "parts": [{"type": "text", "content": "ok"}]}], + ) + out = _map_attributes_to_fi_conventions(attrs) + assert out["gen_ai.input.messages.0.message.role"] == "system" + assert out["gen_ai.input.messages.0.message.content"] == "be terse" + assert out["gen_ai.input.messages.1.message.role"] == "user" + assert out["gen_ai.input.messages.1.message.content"] == "hi" + assert out["gen_ai.output.messages.0.message.role"] == "assistant" + assert out["gen_ai.output.messages.0.message.content"] == "ok" + + +def test_llm_derives_total_tokens(): + attrs = _make_chat_attrs([], [], input_tokens=42, output_tokens=17) + out = _map_attributes_to_fi_conventions(attrs) + assert out["gen_ai.usage.total_tokens"] == 59 + + +def test_llm_skips_total_tokens_when_one_missing(): + attrs = _make_chat_attrs([], [], input_tokens=42) + out = _map_attributes_to_fi_conventions(attrs) + assert "gen_ai.usage.total_tokens" not in out + + +def test_llm_preserves_existing_total_tokens(): + attrs = _make_chat_attrs([], [], input_tokens=10, output_tokens=10) + attrs["gen_ai.usage.total_tokens"] = 999 # pre-set by some other source + out = _map_attributes_to_fi_conventions(attrs) + assert out["gen_ai.usage.total_tokens"] == 999 + + +def test_llm_without_messages_still_stamps_kind(): + out = _map_attributes_to_fi_conventions({"gen_ai.operation.name": "chat"}) + assert out["gen_ai.span.kind"] == "LLM" + assert "input.value" not in out + + +# --------------------------------------------------------------------------- +# TOOL enrichment (uses real fixture) +# --------------------------------------------------------------------------- + + +def test_tool_lifts_args_and_result_from_fixture(): + out = _map_attributes_to_fi_conventions(_fixture("execute_tool get_weather")) + assert out["gen_ai.span.kind"] == "TOOL" + assert out["input.value"] == '{"city": "Paris"}' + assert out["input.mime_type"] == "application/json" + assert "Paris is sunny" in out["output.value"] + assert out["output.mime_type"] == "text/plain" + + +def test_tool_json_string_result_detected_as_json_mime(): + attrs = { + "gen_ai.operation.name": "execute_tool", + "gen_ai.tool.call.result": '{"temp": 22}', + } + out = _map_attributes_to_fi_conventions(attrs) + assert out["output.value"] == '{"temp": 22}' + assert out["output.mime_type"] == "application/json" + + +def test_tool_with_no_args_or_result_just_stamps_kind(): + attrs = {"gen_ai.operation.name": "execute_tool", "gen_ai.tool.name": "noop"} + out = _map_attributes_to_fi_conventions(attrs) + assert out["gen_ai.span.kind"] == "TOOL" + assert "input.value" not in out + assert "output.value" not in out + + +# --------------------------------------------------------------------------- +# AGENT enrichment (uses real fixture) +# --------------------------------------------------------------------------- + + +def test_agent_from_fixture_flattens_messages(): + out = _map_attributes_to_fi_conventions(_fixture("invoke_agent weather_agent")) + assert out["gen_ai.span.kind"] == "AGENT" + assert out["gen_ai.input.messages.0.message.role"] == "user" + assert out["gen_ai.input.messages.0.message.content"] == "What's the weather in Paris?" + assert out["gen_ai.output.messages.0.message.role"] == "assistant" + assert out["gen_ai.output.messages.0.message.content"] == "The weather in Paris is sunny." + + +def test_agent_preserves_native_gen_ai_attrs(): + out = _map_attributes_to_fi_conventions(_fixture("invoke_agent weather_agent")) + assert out["gen_ai.agent.name"] == "weather_agent" + assert out["gen_ai.agent.id"] + assert out["gen_ai.provider.name"] == "microsoft.agent_framework" + + +# --------------------------------------------------------------------------- +# CHAIN enrichment (uses real fixtures) +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "name", + [ + "workflow.build", + "workflow.run", + "executor.process upper", + "executor.process exclaim", + "edge_group.process SingleEdgeGroup", + "message.send", + ], +) +def test_chain_kinds_from_fixtures(name): + out = _map_attributes_to_fi_conventions(_fixture(name)) + assert out["gen_ai.span.kind"] == "CHAIN" + assert "input.value" not in out + assert "output.value" not in out + + +# --------------------------------------------------------------------------- +# Message-flattener edge cases (the framework's parts-shape varies) +# --------------------------------------------------------------------------- + + +def test_flatten_handles_malformed_json(): + assert _flatten_messages("not json", "gen_ai.input.messages") == {} + + +def test_flatten_handles_non_list_json(): + assert _flatten_messages('{"role": "user"}', "gen_ai.input.messages") == {} + + +def test_flatten_handles_empty_parts(): + out = _flatten_messages( + json.dumps([{"role": "user", "parts": []}]), + "gen_ai.input.messages", + ) + assert out["gen_ai.input.messages.0.message.role"] == "user" + assert "gen_ai.input.messages.0.message.content" not in out + + +def test_flatten_skips_non_text_parts(): + """Image/reasoning/function-call parts are not text-flattened in MVP.""" + out = _flatten_messages( + json.dumps( + [ + { + "role": "user", + "parts": [ + {"type": "uri", "uri": "https://example.com/img.png"}, + {"type": "text", "content": "look at this"}, + ], + } + ] + ), + "gen_ai.input.messages", + ) + assert out["gen_ai.input.messages.0.message.content"] == "look at this" + + +def test_flatten_joins_multiple_text_parts(): + out = _flatten_messages( + json.dumps( + [{"role": "user", "parts": [ + {"type": "text", "content": "line one"}, + {"type": "text", "content": "line two"}, + ]}] + ), + "gen_ai.input.messages", + ) + assert out["gen_ai.input.messages.0.message.content"] == "line one\nline two" + + +def test_llm_malformed_messages_does_not_crash(): + attrs = { + "gen_ai.operation.name": "chat", + "gen_ai.input.messages": "definitely not json {{{", + } + out = _map_attributes_to_fi_conventions(attrs) + # input.value is still lifted (it's just the raw string) + assert out["gen_ai.span.kind"] == "LLM" + assert out["input.value"] == "definitely not json {{{" + # but no flattened keys + assert "gen_ai.input.messages.0.message.role" not in out + + +# --------------------------------------------------------------------------- +# AgentFrameworkSpanProcessor: per-span enrichment + chain bubble-up +# --------------------------------------------------------------------------- + + +class _FakeScope: + def __init__(self, name="agent_framework"): + self.name = name + + +class _FakeCtx: + def __init__(self, span_id): + self.span_id = span_id + + +class _FakeReadableSpan: + """Minimal stand-in for a ReadableSpan with the attributes our processor reads.""" + + def __init__(self, span_id, parent_id, start_time, end_time, attrs, + scope_name="agent_framework"): + self._attributes = dict(attrs) if attrs else {} + self.start_time = start_time + self.end_time = end_time + self.context = _FakeCtx(span_id) + self.parent = _FakeCtx(parent_id) if parent_id is not None else None + self.instrumentation_scope = _FakeScope(scope_name) + + +def test_processor_mutates_attrs_in_on_end(): + processor = AgentFrameworkSpanProcessor() + span = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"gen_ai.operation.name": "execute_tool", + "gen_ai.tool.call.arguments": '{"x": 1}'}, + ) + processor.on_end(span) + assert span._attributes["gen_ai.span.kind"] == "TOOL" + assert span._attributes["input.value"] == '{"x": 1}' + + +def test_processor_ignores_non_agent_framework_spans(): + """Spans from other instrumentation scopes pass through untouched.""" + processor = AgentFrameworkSpanProcessor() + span = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"http.method": "POST"}, + scope_name="opentelemetry.instrumentation.requests", + ) + processor.on_end(span) + assert "gen_ai.span.kind" not in span._attributes + assert span._attributes == {"http.method": "POST"} + + +def test_processor_continues_on_individual_span_failure(): + """One broken span shouldn't crash the processor for others.""" + processor = AgentFrameworkSpanProcessor() + + class _Broken: + instrumentation_scope = _FakeScope("agent_framework") + @property + def _attributes(self): + raise RuntimeError("boom") + + good = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"gen_ai.operation.name": "execute_tool"}, + ) + processor.on_end(_Broken()) # must not raise + processor.on_end(good) + assert good._attributes["gen_ai.span.kind"] == "TOOL" + + +def test_chain_io_bubbles_up_from_descendant(): + """ + Tree (events arrive in end order: deepest first, root last): + workflow.run (CHAIN) + └── executor.process (CHAIN) + └── invoke_agent (AGENT, has I/O) + """ + processor = AgentFrameworkSpanProcessor() + + agent = _FakeReadableSpan( + span_id=3, parent_id=2, start_time=20, end_time=80, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"hi"}]}]', + "gen_ai.output.messages": '[{"role":"assistant","parts":[{"type":"text","content":"hello"}]}]', + }, + ) + executor = _FakeReadableSpan( + span_id=2, parent_id=1, start_time=10, end_time=90, + attrs={"executor.id": "step1"}, + ) + workflow = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"workflow.id": "wf-1"}, + ) + + processor.on_end(agent) + processor.on_end(executor) + processor.on_end(workflow) + + # workflow.run got the bubbled input/output from its grandchild agent + assert workflow._attributes["gen_ai.span.kind"] == "CHAIN" + assert workflow._attributes["input.value"].startswith("[{") + assert workflow._attributes["output.value"].startswith("[{") + # executor.process also got them (still a chain) + assert executor._attributes["gen_ai.span.kind"] == "CHAIN" + assert "input.value" in executor._attributes + assert "output.value" in executor._attributes + + +def test_chain_picks_earliest_input_latest_output_across_siblings(): + """Two descendants under one CHAIN parent: earliest input + latest output win.""" + processor = AgentFrameworkSpanProcessor() + + # Both children end before the parent (workflow). Earliest by start_time wins + # for input; latest by end_time wins for output. + early = _FakeReadableSpan( + span_id=2, parent_id=1, start_time=10, end_time=50, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"first"}]}]', + "gen_ai.output.messages": '[{"role":"assistant","parts":[{"type":"text","content":"first-out"}]}]', + }, + ) + late = _FakeReadableSpan( + span_id=3, parent_id=1, start_time=100, end_time=180, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"second"}]}]', + "gen_ai.output.messages": '[{"role":"assistant","parts":[{"type":"text","content":"final-out"}]}]', + }, + ) + workflow = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=200, + attrs={"workflow.id": "wf-1"}, + ) + + processor.on_end(early) + processor.on_end(late) + processor.on_end(workflow) + + # Earliest input (start_time=10) should be "first" + assert "first" in workflow._attributes["input.value"] + # Latest output (end_time=180) should be "final-out" + assert "final-out" in workflow._attributes["output.value"] + + +def test_chain_with_no_descendants_seen_stays_without_io(): + """If we never saw the chain's children (e.g. across batches), no bubble.""" + processor = AgentFrameworkSpanProcessor() + chain = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"workflow.id": "wf-1"}, + ) + processor.on_end(chain) + assert chain._attributes["gen_ai.span.kind"] == "CHAIN" + assert "input.value" not in chain._attributes + assert "output.value" not in chain._attributes + + +def test_chain_existing_io_not_overridden_by_bubble(): + """If the chain span somehow already has I/O, the bubble must not override it.""" + processor = AgentFrameworkSpanProcessor() + + child = _FakeReadableSpan( + span_id=2, parent_id=1, start_time=10, end_time=50, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"child"}]}]', + "gen_ai.output.messages": '[{"role":"assistant","parts":[{"type":"text","content":"child-out"}]}]', + }, + ) + chain = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={ + "workflow.id": "wf-1", + "input.value": "already set", + "output.value": "already set", + }, + ) + + processor.on_end(child) + processor.on_end(chain) + + assert chain._attributes["input.value"] == "already set" + assert chain._attributes["output.value"] == "already set" + + +def test_processor_shutdown_clears_state(): + processor = AgentFrameworkSpanProcessor() + child = _FakeReadableSpan( + span_id=2, parent_id=999, start_time=10, end_time=50, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"x"}]}]', + }, + ) + processor.on_end(child) + assert 999 in processor._desc_io + processor.shutdown() + assert processor._desc_io == {} + # After shutdown, further on_end calls are silently no-op. + after = _FakeReadableSpan( + span_id=3, parent_id=None, start_time=0, end_time=100, + attrs={"gen_ai.operation.name": "execute_tool"}, + ) + processor.on_end(after) + assert "gen_ai.span.kind" not in after._attributes diff --git a/python/frameworks/agent-framework/traceai_agent_framework/__init__.py b/python/frameworks/agent-framework/traceai_agent_framework/__init__.py index e69de29b..d754685c 100644 --- a/python/frameworks/agent-framework/traceai_agent_framework/__init__.py +++ b/python/frameworks/agent-framework/traceai_agent_framework/__init__.py @@ -0,0 +1,17 @@ +""" +TraceAI Microsoft Agent Framework integration. + +Future AGI integration for Microsoft Agent Framework. We add a SpanProcessor +to the user's TracerProvider that re-keys the framework's native ``gen_ai.*`` +attributes into Future AGI conventions on every span as it ends. +""" + +from .integration import enable_fi_attribute_mapping +from .processor import AgentFrameworkSpanProcessor +from .version import __version__ + +__all__ = [ + "enable_fi_attribute_mapping", + "AgentFrameworkSpanProcessor", + "__version__", +] diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/__init__.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/base_exporter.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/base_exporter.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/grpc_exporter.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/grpc_exporter.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/frameworks/agent-framework/traceai_agent_framework/exporters/http_exporter.py b/python/frameworks/agent-framework/traceai_agent_framework/exporters/http_exporter.py deleted file mode 100644 index e69de29b..00000000 diff --git a/python/frameworks/agent-framework/traceai_agent_framework/integration.py b/python/frameworks/agent-framework/traceai_agent_framework/integration.py index e69de29b..9ffc3d61 100644 --- a/python/frameworks/agent-framework/traceai_agent_framework/integration.py +++ b/python/frameworks/agent-framework/traceai_agent_framework/integration.py @@ -0,0 +1,103 @@ +""" +Integration functions for Microsoft Agent Framework with Future AGI. + +This module provides the public entry point used to wire up FI conventions +on Agent Framework's native OpenTelemetry spans. We register an +:class:`AgentFrameworkSpanProcessor` on a ``TracerProvider`` so that every +Agent Framework span gets the FI-specific attributes added before reaching +any exporter. +""" + +import logging +from typing import Optional + +from opentelemetry import trace as trace_api +from opentelemetry.sdk.trace import TracerProvider + +from .processor import AgentFrameworkSpanProcessor + +logger = logging.getLogger(__name__) + + +def _ensure_native_instrumentation_enabled() -> None: + """Turn on agent_framework's native OTel emission, only if not already on. + + The framework's ``enable_instrumentation()`` re-reads ``ENABLE_SENSITIVE_DATA`` + from the environment when called with no kwargs, which would override any + explicit ``enable_sensitive_data=True`` the user set. Guarding on the + existing flag preserves that user choice. + """ + try: + from agent_framework.observability import ( + OBSERVABILITY_SETTINGS, + enable_instrumentation as _af_enable_instrumentation, + ) + except ImportError: + logger.debug( + "agent-framework is not installed; skipping native instrumentation enable. " + "Install it with: pip install agent-framework" + ) + return + try: + if not OBSERVABILITY_SETTINGS.enable_instrumentation: + _af_enable_instrumentation() + except Exception as e: # pragma: no cover - defensive + logger.warning( + "Could not enable agent_framework native instrumentation: %s", e + ) + + +def enable_fi_attribute_mapping( + tracer_provider: Optional[TracerProvider] = None, +) -> bool: + """Install the FI attribute mapping on a tracer provider. + + Adds an :class:`AgentFrameworkSpanProcessor` to ``tracer_provider`` (or to + the global OTel tracer provider if none is passed). Idempotent: calling + twice on the same provider only installs one processor. + + Args: + tracer_provider: The provider to install on. If omitted, the global + tracer provider is used. Pass the provider returned by + ``fi_instrumentation.register(...)`` when you kept the FI default + ``set_global_tracer_provider=False``. + + Returns: + True if the processor was installed; False if there was no usable + tracer provider, or if our processor was already installed. + """ + _ensure_native_instrumentation_enabled() + + provider = tracer_provider if tracer_provider is not None else trace_api.get_tracer_provider() + + active = getattr(provider, "_active_span_processor", None) + if active is None: + logger.warning( + "Tracer provider %s has no active span processor. " + "Did you forget to call fi_instrumentation.register(...), or call " + "register(set_global_tracer_provider=True), or pass the provider " + "directly to enable_fi_attribute_mapping(tracer_provider=...)?", + type(provider).__name__, + ) + return False + + existing = tuple(getattr(active, "_span_processors", ())) + if any(isinstance(p, AgentFrameworkSpanProcessor) for p in existing): + return False + + # Prepend to the multi-processor's tuple directly rather than calling + # ``provider.add_span_processor``: FI's TracerProvider drops its default + # exporter on the first ``add_span_processor`` call, so the public path + # would silently lose spans. Prepending also ensures our mutations land + # before any synchronous downstream processor reads attrs. + new_processor = AgentFrameworkSpanProcessor() + try: + active._span_processors = (new_processor,) + existing + except AttributeError: + provider.add_span_processor(new_processor) + + logger.info( + "Installed AgentFrameworkSpanProcessor on %s alongside %d existing processor(s)", + type(provider).__name__, len(existing), + ) + return True diff --git a/python/frameworks/agent-framework/traceai_agent_framework/processor.py b/python/frameworks/agent-framework/traceai_agent_framework/processor.py new file mode 100644 index 00000000..ca213622 --- /dev/null +++ b/python/frameworks/agent-framework/traceai_agent_framework/processor.py @@ -0,0 +1,394 @@ +""" +SpanProcessor for the Microsoft Agent Framework + Future AGI integration. + +Microsoft Agent Framework emits OpenTelemetry spans using the GenAI semantic +conventions (``gen_ai.*``) on the ``"agent_framework"`` instrumentation scope. +Future AGI's ``SpanAttributes`` is built on the same conventions, so most +attributes pass through unchanged. This processor adds the few keys FI needs +that the framework doesn't emit: + + * ``gen_ai.span.kind`` (FI-specific) classified from operation name / attrs + * ``input.value`` / ``output.value`` (+ mime types) for the FI dashboard + * Flattened ``gen_ai.input.messages.{i}.message.role`` / ``.content`` from + the framework's JSON-string ``gen_ai.input.messages`` / ``gen_ai.output.messages`` + * ``gen_ai.usage.total_tokens`` derived from input + output tokens when present + * For CHAIN spans (``workflow.run``, ``executor.process``, etc.) that the + framework leaves without I/O, bubbles ``input.value`` / ``output.value`` + up from the earliest / latest descendant span. +""" + +import json +import threading +from typing import Any, Dict, List, Optional + +from opentelemetry.context import Context +from opentelemetry.sdk.trace import ReadableSpan, Span +from opentelemetry.sdk.trace import SpanProcessor + +from fi_instrumentation.fi_types import ( + FiMimeTypeValues, + FiSpanKindValues, + MessageAttributes, + SpanAttributes, +) + +# --------------------------------------------------------------------------- +# Attribute keys we read from the framework's gen_ai.* output. +# --------------------------------------------------------------------------- + +_OP = SpanAttributes.GEN_AI_OPERATION_NAME +_INPUT_MSGS = SpanAttributes.GEN_AI_INPUT_MESSAGES +_OUTPUT_MSGS = SpanAttributes.GEN_AI_OUTPUT_MESSAGES +_TOOL_ARGS = SpanAttributes.GEN_AI_TOOL_CALL_ARGUMENTS +_TOOL_RESULT = SpanAttributes.GEN_AI_TOOL_CALL_RESULT +_INPUT_TOKENS = SpanAttributes.GEN_AI_USAGE_INPUT_TOKENS +_OUTPUT_TOKENS = SpanAttributes.GEN_AI_USAGE_OUTPUT_TOKENS +_TOTAL_TOKENS = SpanAttributes.GEN_AI_USAGE_TOTAL_TOKENS + +# The instrumentation scope Microsoft Agent Framework emits on. +_AGENT_FRAMEWORK_SCOPE = "agent_framework" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _safe_json_loads(value: Any) -> Any: + if not isinstance(value, str): + return value + try: + return json.loads(value) + except (json.JSONDecodeError, ValueError): + return None + + +def _extract_text_from_parts(parts: Any) -> Optional[str]: + """Agent Framework messages carry content as ``parts: [{type, content}]``. + + Concatenates the text from ``type="text"`` parts. Non-text parts (image, + blob, reasoning, function-call variants) are skipped; the raw JSON blob + remains on the span as ``input.value`` / ``output.value`` so the + information is not lost. + """ + if not isinstance(parts, list): + return None + texts: List[str] = [] + for part in parts: + if not isinstance(part, dict): + continue + if part.get("type") == "text": + content = part.get("content") + if isinstance(content, str): + texts.append(content) + return "\n".join(texts) if texts else None + + +def _flatten_messages(messages_json: str, prefix: str) -> Dict[str, Any]: + """Flatten Agent Framework message JSON into per-index FI attributes. + + Input shape (what the framework emits): + [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}, ...] + + Output keys: + {prefix}.{i}.message.role + {prefix}.{i}.message.content (joined text from text-type parts) + """ + parsed = _safe_json_loads(messages_json) + if not isinstance(parsed, list): + return {} + out: Dict[str, Any] = {} + for i, msg in enumerate(parsed): + if not isinstance(msg, dict): + continue + role = msg.get("role") + if role is not None: + out[f"{prefix}.{i}.{MessageAttributes.MESSAGE_ROLE}"] = role + text = _extract_text_from_parts(msg.get("parts")) + if text is not None: + out[f"{prefix}.{i}.{MessageAttributes.MESSAGE_CONTENT}"] = text + return out + + +_CHAIN_PREFIXES = ("workflow.", "workflow_builder.", "executor.", "edge_group.") +_CHAIN_KEYS = {"message.type", "message.source_id", "message.target_id"} + + +def _classify_span_kind(attributes: Dict[str, Any]) -> Optional[str]: + """Return a FiSpanKindValues string, or ``None`` to leave the span alone.""" + op = attributes.get(_OP) + if op == "chat": + return FiSpanKindValues.LLM.value + if op == "embeddings": + return FiSpanKindValues.EMBEDDING.value + if op == "execute_tool": + return FiSpanKindValues.TOOL.value + if op in ("invoke_agent", "create_agent"): + return FiSpanKindValues.AGENT.value + + for key in attributes: + if not isinstance(key, str): + continue + if key.startswith(_CHAIN_PREFIXES) or key in _CHAIN_KEYS: + return FiSpanKindValues.CHAIN.value + return None + + +# --------------------------------------------------------------------------- +# Per-kind enrichment +# --------------------------------------------------------------------------- + + +def _surface_messages_io(mapped: Dict[str, Any]) -> None: + """Lift ``gen_ai.input/output.messages`` into ``input.value``/``output.value`` + flatten.""" + in_msgs = mapped.get(_INPUT_MSGS) + if isinstance(in_msgs, str): + mapped[SpanAttributes.INPUT_VALUE] = in_msgs + mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + for k, v in _flatten_messages(in_msgs, _INPUT_MSGS).items(): + mapped[k] = v + + out_msgs = mapped.get(_OUTPUT_MSGS) + if isinstance(out_msgs, str): + mapped[SpanAttributes.OUTPUT_VALUE] = out_msgs + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + for k, v in _flatten_messages(out_msgs, _OUTPUT_MSGS).items(): + mapped[k] = v + + +def _derive_total_tokens(mapped: Dict[str, Any]) -> None: + """If input + output tokens are both present, compute the total.""" + if _TOTAL_TOKENS in mapped: + return + inp = mapped.get(_INPUT_TOKENS) + out = mapped.get(_OUTPUT_TOKENS) + if isinstance(inp, (int, float)) and isinstance(out, (int, float)): + mapped[_TOTAL_TOKENS] = int(inp) + int(out) + + +def _enrich_tool(mapped: Dict[str, Any]) -> None: + """TOOL spans: lift tool call arguments and result into input/output.""" + args = mapped.get(_TOOL_ARGS) + if args is not None: + mapped[SpanAttributes.INPUT_VALUE] = ( + args if isinstance(args, str) else json.dumps(args) + ) + mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + + result = mapped.get(_TOOL_RESULT) + if result is None: + return + if isinstance(result, (dict, list)): + mapped[SpanAttributes.OUTPUT_VALUE] = json.dumps(result) + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + elif isinstance(result, str): + mapped[SpanAttributes.OUTPUT_VALUE] = result + stripped = result.strip() + if (stripped.startswith("{") and stripped.endswith("}")) or ( + stripped.startswith("[") and stripped.endswith("]") + ): + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + else: + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.TEXT.value + else: + mapped[SpanAttributes.OUTPUT_VALUE] = str(result) + + +# --------------------------------------------------------------------------- +# Main per-span mapping function +# --------------------------------------------------------------------------- + + +def _map_attributes_to_fi_conventions(attributes: Dict[str, Any]) -> Dict[str, Any]: + """Add FI-specific keys on top of the framework's native gen_ai.* attributes.""" + if not attributes: + return {} + mapped = dict(attributes) + + kind = _classify_span_kind(attributes) + if kind is None: + return mapped + mapped[SpanAttributes.GEN_AI_SPAN_KIND] = kind + + if kind in ( + FiSpanKindValues.LLM.value, + FiSpanKindValues.EMBEDDING.value, + FiSpanKindValues.AGENT.value, + ): + _surface_messages_io(mapped) + _derive_total_tokens(mapped) + elif kind == FiSpanKindValues.TOOL.value: + _enrich_tool(mapped) + # CHAIN: stamped here; I/O bubbled in by AgentFrameworkSpanProcessor.on_end(). + + return mapped + + +# --------------------------------------------------------------------------- +# Descendant-IO aggregation (for bubbling I/O up to CHAIN spans) +# --------------------------------------------------------------------------- + + +class _SpanIO: + """Holds the earliest input and latest output among a span and its descendants.""" + + __slots__ = ("input_value", "input_mime", "input_time", + "output_value", "output_mime", "output_time") + + def __init__(self) -> None: + self.input_value: Optional[str] = None + self.input_mime: Optional[str] = None + self.input_time: Optional[int] = None + self.output_value: Optional[str] = None + self.output_mime: Optional[str] = None + self.output_time: Optional[int] = None + + def absorb_input(self, value: Optional[str], mime: Optional[str], time_ns: Optional[int]) -> None: + if value is None or time_ns is None: + return + if self.input_time is None or time_ns < self.input_time: + self.input_value = value + self.input_mime = mime + self.input_time = time_ns + + def absorb_output(self, value: Optional[str], mime: Optional[str], time_ns: Optional[int]) -> None: + if value is None or time_ns is None: + return + if self.output_time is None or time_ns > self.output_time: + self.output_value = value + self.output_mime = mime + self.output_time = time_ns + + def merge(self, other: "_SpanIO") -> None: + self.absorb_input(other.input_value, other.input_mime, other.input_time) + self.absorb_output(other.output_value, other.output_mime, other.output_time) + + +# --------------------------------------------------------------------------- +# The SpanProcessor itself +# --------------------------------------------------------------------------- + + +class AgentFrameworkSpanProcessor(SpanProcessor): + """OTel SpanProcessor that re-keys Agent Framework spans into FI conventions. + + On ``on_end``: + 1. Per-span enrichment: stamp ``gen_ai.span.kind`` and (for kinds with + data) lift ``input.value``/``output.value`` + flatten messages. + 2. For CHAIN spans, bubble I/O in from descendant spans we've already seen. + 3. Propagate this span's "best I/O" up so its own parent can use it later. + + Spans from other instrumentation scopes pass through untouched. + """ + + def __init__(self) -> None: + self._desc_io: Dict[int, _SpanIO] = {} + self._lock = threading.Lock() + self._disabled = False + + # SpanProcessor interface ------------------------------------------------ + + def on_start(self, span: Span, parent_context: Optional[Context] = None) -> None: + return + + def on_end(self, span: ReadableSpan) -> None: + if self._disabled: + return + + scope = getattr(span.instrumentation_scope, "name", None) + if scope != _AGENT_FRAMEWORK_SCOPE: + # Track parent relationship so descendant I/O from foreign spans + # can still bubble into our CHAIN spans, but do not mutate attrs. + self._track_for_parent(span, _attrs_dict(span)) + return + + try: + current_attrs = _attrs_dict(span) + mapped = _map_attributes_to_fi_conventions(current_attrs) + + if mapped.get(SpanAttributes.GEN_AI_SPAN_KIND) == FiSpanKindValues.CHAIN.value: + self._apply_chain_bubble(span, mapped) + + # Write back onto the live ReadableSpan; ``span.attributes`` is a + # MappingProxyType over ``span._attributes``, so downstream + # processors and exporters will see the mutated dict. + setattr(span, "_attributes", mapped) + + self._track_for_parent(span, mapped) + except Exception: + # Never crash the SDK over a mapping bug. + return + + def shutdown(self) -> None: + self._disabled = True + with self._lock: + self._desc_io.clear() + + def force_flush(self, timeout_millis: int = 30000) -> bool: + return True + + # Internals -------------------------------------------------------------- + + def _apply_chain_bubble(self, span: ReadableSpan, mapped: Dict[str, Any]) -> None: + """If this CHAIN span is missing I/O, fill from accumulated descendants.""" + sid = span.context.span_id + with self._lock: + bucket = self._desc_io.get(sid) + if bucket is None: + return + + if ( + SpanAttributes.INPUT_VALUE not in mapped + and bucket.input_value is not None + ): + mapped[SpanAttributes.INPUT_VALUE] = bucket.input_value + if bucket.input_mime: + mapped[SpanAttributes.INPUT_MIME_TYPE] = bucket.input_mime + + if ( + SpanAttributes.OUTPUT_VALUE not in mapped + and bucket.output_value is not None + ): + mapped[SpanAttributes.OUTPUT_VALUE] = bucket.output_value + if bucket.output_mime: + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = bucket.output_mime + + def _track_for_parent(self, span: ReadableSpan, attrs: Dict[str, Any]) -> None: + """Roll this span's I/O (plus any descendants') up to its parent's bucket.""" + parent = getattr(span, "parent", None) + parent_id = getattr(parent, "span_id", None) if parent is not None else None + + sid = span.context.span_id + with self._lock: + own = self._desc_io.pop(sid, _SpanIO()) + + own.absorb_input( + attrs.get(SpanAttributes.INPUT_VALUE), + attrs.get(SpanAttributes.INPUT_MIME_TYPE), + span.start_time, + ) + own.absorb_output( + attrs.get(SpanAttributes.OUTPUT_VALUE), + attrs.get(SpanAttributes.OUTPUT_MIME_TYPE), + span.end_time, + ) + + if parent_id is None: + return + + with self._lock: + parent_bucket = self._desc_io.setdefault(parent_id, _SpanIO()) + parent_bucket.merge(own) + + +# --------------------------------------------------------------------------- +# Small helper +# --------------------------------------------------------------------------- + + +def _attrs_dict(span: ReadableSpan) -> Dict[str, Any]: + attrs = getattr(span, "_attributes", None) + if attrs is None: + return {} + if isinstance(attrs, dict): + return attrs + return dict(attrs) diff --git a/python/frameworks/agent-framework/traceai_agent_framework/version.py b/python/frameworks/agent-framework/traceai_agent_framework/version.py index e69de29b..3dc1f76b 100644 --- a/python/frameworks/agent-framework/traceai_agent_framework/version.py +++ b/python/frameworks/agent-framework/traceai_agent_framework/version.py @@ -0,0 +1 @@ +__version__ = "0.1.0" From c9a072b8530b2fea329d6934d0ac3cca24e07522 Mon Sep 17 00:00:00 2001 From: Suhani Nagpal Date: Tue, 26 May 2026 13:49:23 +0530 Subject: [PATCH 3/5] fix(agent-framework): polish to parity with OpenInference + bug fixes Mapping additions: - Extract tool_call_response and reasoning parts in message flattening; tool result text now lands in messages.{i}.message.content + messages.{i}.message.tool_call_id, and reasoning parts join into message.content - Surface graph.node.id / graph.node.name per kind (LLM/AGENT/TOOL/CHAIN), enabling the FI dashboard's agent-graph view to render workflow / executor / edge_group nodes - Bundle gen_ai.request.parameters JSON so the dashboard's "Model Parameters" panel renders for LLM/AGENT/EMBEDDING spans - Bundle a small metadata JSON (choice.count, server.address, agent_framework.function.*) for dashboard "Metadata" panel - Smart plain-text formatting for input.value / output.value when there is a single text-only message; keep raw JSON when the message structure is complex Integration / bug fixes: - Preserve FI's default exporter processor: install our SpanProcessor by prepending to the active multi-processor's tuple instead of calling provider.add_span_processor, which FI's TracerProvider wipes on first call - Don't clobber the user's enable_sensitive_data choice: only call enable_instrumentation() when it isn't already on; calling with no kwargs would otherwise re-read ENABLE_SENSITIVE_DATA from env and silently flip it off Examples: - basic_agent.py now uses a real get_weather tool that calls wttr.in via urllib (no API key needed); demonstrates a full LLM + tool + LLM trace end-to-end Packaging: - Move agent-framework from dev-only dep to runtime dep so pip install traceAI-agent-framework also installs the framework, matching anthropic/openai/crewai etc. Existing test updates reflect the new behavior (plain text input/output for the single-message case). --- .../agent-framework/examples/basic_agent.py | 27 ++- .../frameworks/agent-framework/pyproject.toml | 2 +- .../agent-framework/tests/test_e2e_agent.py | 5 +- .../agent-framework/tests/test_integration.py | 1 + .../agent-framework/tests/test_processor.py | 24 ++- .../traceai_agent_framework/processor.py | 196 ++++++++++++++++-- 6 files changed, 224 insertions(+), 31 deletions(-) diff --git a/python/frameworks/agent-framework/examples/basic_agent.py b/python/frameworks/agent-framework/examples/basic_agent.py index d4282889..00945c02 100644 --- a/python/frameworks/agent-framework/examples/basic_agent.py +++ b/python/frameworks/agent-framework/examples/basic_agent.py @@ -1,5 +1,8 @@ """Minimal Microsoft Agent Framework example traced into Future AGI. +Calls a real LLM (OpenAI) and a real tool (wttr.in — no API key needed). +You should see invoke_agent / chat / execute_tool spans in the FI dashboard. + Run with: export FI_API_KEY=... export FI_SECRET_KEY=... @@ -9,6 +12,8 @@ import asyncio import os +import urllib.parse +import urllib.request from agent_framework import Agent from agent_framework.observability import enable_instrumentation @@ -19,6 +24,17 @@ from traceai_agent_framework import enable_fi_attribute_mapping +def get_weather(city: str) -> str: + """Get the current weather for a city. + + Args: + city: City name, e.g. "Paris", "Tokyo", "New York". + """ + url = f"https://wttr.in/{urllib.parse.quote(city)}?format=3" + with urllib.request.urlopen(url, timeout=10) as resp: + return resp.read().decode("utf-8").strip() + + def main() -> None: if not os.getenv("OPENAI_API_KEY"): raise SystemExit("Set OPENAI_API_KEY before running this example.") @@ -38,16 +54,17 @@ def main() -> None: # Agent Framework's gen_ai.* spans into FI conventions as they end. enable_fi_attribute_mapping() - # 4) Build an agent. Anything you do with it from here on emits traced spans. + # 4) Build an agent with a real weather tool. agent = Agent( - OpenAIChatClient(), + OpenAIChatClient(model="gpt-4o-mini"), name="weather_agent", - description="Answers questions about weather.", - instructions="You are a concise, friendly weather assistant.", + description="Answers questions about weather using the get_weather tool.", + instructions="You are a concise weather assistant. Always use get_weather.", + tools=[get_weather], ) async def run() -> None: - response = await agent.run("What's the weather like in Paris in spring?") + response = await agent.run("What's the weather in Paris right now?") print("Agent response:\n", response) asyncio.run(run()) diff --git a/python/frameworks/agent-framework/pyproject.toml b/python/frameworks/agent-framework/pyproject.toml index 3f52e614..0ad02203 100644 --- a/python/frameworks/agent-framework/pyproject.toml +++ b/python/frameworks/agent-framework/pyproject.toml @@ -10,12 +10,12 @@ packages = [ [tool.poetry.dependencies] python = ">=3.10,<3.15" +agent-framework = ">=1.0.0" fi-instrumentation-otel = ">=0.1.14" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" pytest-asyncio = "^0.23.0" -agent-framework = ">=1.0.0" [tool.pytest.ini_options] asyncio_mode = "auto" diff --git a/python/frameworks/agent-framework/tests/test_e2e_agent.py b/python/frameworks/agent-framework/tests/test_e2e_agent.py index 40156b57..79d03d8f 100644 --- a/python/frameworks/agent-framework/tests/test_e2e_agent.py +++ b/python/frameworks/agent-framework/tests/test_e2e_agent.py @@ -103,8 +103,9 @@ async def test_agent_run_emits_agent_span_with_fi_attrs(captured): attrs = agent_spans[0].attributes assert attrs["gen_ai.span.kind"] == "AGENT" - assert attrs["input.mime_type"] == "application/json" - assert attrs["output.mime_type"] == "application/json" + # Single text-only message on both sides → plain-text format. + assert attrs["input.mime_type"] == "text/plain" + assert attrs["output.mime_type"] == "text/plain" assert attrs["gen_ai.input.messages.0.message.role"] == "user" assert attrs["gen_ai.input.messages.0.message.content"] == "What's the weather in Paris?" assert attrs["gen_ai.output.messages.0.message.role"] == "assistant" diff --git a/python/frameworks/agent-framework/tests/test_integration.py b/python/frameworks/agent-framework/tests/test_integration.py index e5207082..46525860 100644 --- a/python/frameworks/agent-framework/tests/test_integration.py +++ b/python/frameworks/agent-framework/tests/test_integration.py @@ -134,3 +134,4 @@ def test_user_explicit_disable_is_respected(fresh_global_provider): # Clean up so the explicit-disable doesn't bleed into other tests. _af_enable(force=True) + diff --git a/python/frameworks/agent-framework/tests/test_processor.py b/python/frameworks/agent-framework/tests/test_processor.py index fe8879b0..d9cab512 100644 --- a/python/frameworks/agent-framework/tests/test_processor.py +++ b/python/frameworks/agent-framework/tests/test_processor.py @@ -99,16 +99,31 @@ def _make_chat_attrs(input_msgs, output_msgs, input_tokens=None, output_tokens=N def test_llm_lifts_messages_to_input_output_value(): + """Single text-only user message → plain text in input.value (not JSON blob).""" attrs = _make_chat_attrs( [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}], [{"role": "assistant", "parts": [{"type": "text", "content": "hello"}]}], ) out = _map_attributes_to_fi_conventions(attrs) assert out["gen_ai.span.kind"] == "LLM" + assert out["input.value"] == "hi" + assert out["input.mime_type"] == "text/plain" + assert out["output.value"] == "hello" + assert out["output.mime_type"] == "text/plain" + + +def test_llm_lifts_messages_keeps_json_when_complex(): + """Multi-message input → raw JSON blob (not collapsed to plain text).""" + attrs = _make_chat_attrs( + [ + {"role": "system", "parts": [{"type": "text", "content": "be terse"}]}, + {"role": "user", "parts": [{"type": "text", "content": "hi"}]}, + ], + [{"role": "assistant", "parts": [{"type": "text", "content": "ok"}]}], + ) + out = _map_attributes_to_fi_conventions(attrs) assert out["input.value"] == attrs["gen_ai.input.messages"] assert out["input.mime_type"] == "application/json" - assert out["output.value"] == attrs["gen_ai.output.messages"] - assert out["output.mime_type"] == "application/json" def test_llm_flattens_messages_with_indexed_keys(): @@ -400,8 +415,9 @@ def test_chain_io_bubbles_up_from_descendant(): # workflow.run got the bubbled input/output from its grandchild agent assert workflow._attributes["gen_ai.span.kind"] == "CHAIN" - assert workflow._attributes["input.value"].startswith("[{") - assert workflow._attributes["output.value"].startswith("[{") + # Single text-only user/assistant message → plain text format + assert workflow._attributes["input.value"] == "hi" + assert workflow._attributes["output.value"] == "hello" # executor.process also got them (still a chain) assert executor._attributes["gen_ai.span.kind"] == "CHAIN" assert "input.value" in executor._attributes diff --git a/python/frameworks/agent-framework/traceai_agent_framework/processor.py b/python/frameworks/agent-framework/traceai_agent_framework/processor.py index ca213622..358759cf 100644 --- a/python/frameworks/agent-framework/traceai_agent_framework/processor.py +++ b/python/frameworks/agent-framework/traceai_agent_framework/processor.py @@ -30,6 +30,7 @@ FiSpanKindValues, MessageAttributes, SpanAttributes, + ToolCallAttributes, ) # --------------------------------------------------------------------------- @@ -62,25 +63,62 @@ def _safe_json_loads(value: Any) -> Any: return None -def _extract_text_from_parts(parts: Any) -> Optional[str]: - """Agent Framework messages carry content as ``parts: [{type, content}]``. +def _flatten_message_parts(parts: Any, msg_index_prefix: str) -> Dict[str, Any]: + """Walk a message's ``parts`` list and emit FI-flavored sub-attributes. - Concatenates the text from ``type="text"`` parts. Non-text parts (image, - blob, reasoning, function-call variants) are skipped; the raw JSON blob - remains on the span as ``input.value`` / ``output.value`` so the - information is not lost. + Handles the part types the framework emits: + * ``text`` -> appended to ``message.content`` + * ``tool_call_response`` -> appended to ``message.content`` + ``message.tool_call_id`` + * ``tool_call`` -> ``message.tool_calls.{j}.tool_call.{id,function.name,function.arguments}`` + + ``msg_index_prefix`` is ``{messages_prefix}.{i}`` — e.g. ``gen_ai.output.messages.0``. """ if not isinstance(parts, list): - return None + return {} + out: Dict[str, Any] = {} texts: List[str] = [] + tool_call_idx = 0 + for part in parts: if not isinstance(part, dict): continue - if part.get("type") == "text": + ptype = part.get("type") + + if ptype in ("text", "reasoning"): content = part.get("content") if isinstance(content, str): texts.append(content) - return "\n".join(texts) if texts else None + + elif ptype == "tool_call_response": + response = part.get("response") + if isinstance(response, str): + texts.append(response) + elif response is not None: + texts.append(json.dumps(response)) + call_id = part.get("id") + if call_id is not None: + out[f"{msg_index_prefix}.{MessageAttributes.MESSAGE_TOOL_CALL_ID}"] = call_id + + elif ptype == "tool_call": + tc_prefix = ( + f"{msg_index_prefix}.{MessageAttributes.MESSAGE_TOOL_CALLS}.{tool_call_idx}" + ) + call_id = part.get("id") + name = part.get("name") + arguments = part.get("arguments") + if call_id is not None: + out[f"{tc_prefix}.{ToolCallAttributes.TOOL_CALL_ID}"] = call_id + if name is not None: + out[f"{tc_prefix}.{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}"] = name + if arguments is not None: + out[f"{tc_prefix}.{ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON}"] = ( + arguments if isinstance(arguments, str) else json.dumps(arguments) + ) + tool_call_idx += 1 + + if texts: + out[f"{msg_index_prefix}.{MessageAttributes.MESSAGE_CONTENT}"] = "\n".join(texts) + return out def _flatten_messages(messages_json: str, prefix: str) -> Dict[str, Any]: @@ -91,7 +129,9 @@ def _flatten_messages(messages_json: str, prefix: str) -> Dict[str, Any]: Output keys: {prefix}.{i}.message.role - {prefix}.{i}.message.content (joined text from text-type parts) + {prefix}.{i}.message.content (text + tool_call_response text) + {prefix}.{i}.message.tool_call_id (when a tool_call_response is present) + {prefix}.{i}.message.tool_calls.{j}.tool_call.* (for tool_call parts) """ parsed = _safe_json_loads(messages_json) if not isinstance(parsed, list): @@ -100,12 +140,11 @@ def _flatten_messages(messages_json: str, prefix: str) -> Dict[str, Any]: for i, msg in enumerate(parsed): if not isinstance(msg, dict): continue + msg_index_prefix = f"{prefix}.{i}" role = msg.get("role") if role is not None: - out[f"{prefix}.{i}.{MessageAttributes.MESSAGE_ROLE}"] = role - text = _extract_text_from_parts(msg.get("parts")) - if text is not None: - out[f"{prefix}.{i}.{MessageAttributes.MESSAGE_CONTENT}"] = text + out[f"{msg_index_prefix}.{MessageAttributes.MESSAGE_ROLE}"] = role + out.update(_flatten_message_parts(msg.get("parts"), msg_index_prefix)) return out @@ -138,23 +177,79 @@ def _classify_span_kind(attributes: Dict[str, Any]) -> Optional[str]: # --------------------------------------------------------------------------- +def _all_text_message_content(msg: Dict[str, Any]) -> Optional[str]: + """If a message has only text/reasoning parts, return joined content. Else None.""" + parts = msg.get("parts") + if not isinstance(parts, list) or not parts: + return None + texts: List[str] = [] + for part in parts: + if not isinstance(part, dict): + return None + if part.get("type") not in ("text", "reasoning"): + return None # has a tool_call etc.; bail + content = part.get("content") + if isinstance(content, str): + texts.append(content) + return "\n".join(texts) if texts else None + + def _surface_messages_io(mapped: Dict[str, Any]) -> None: - """Lift ``gen_ai.input/output.messages`` into ``input.value``/``output.value`` + flatten.""" + """Lift ``gen_ai.input/output.messages`` into ``input.value``/``output.value`` + flatten. + + For single text-only messages we use plain text instead of the raw JSON blob — + matches OpenInference's UX choice and renders cleaner in the FI dashboard. + """ in_msgs = mapped.get(_INPUT_MSGS) if isinstance(in_msgs, str): - mapped[SpanAttributes.INPUT_VALUE] = in_msgs - mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + parsed = _safe_json_loads(in_msgs) + plain = None + if isinstance(parsed, list) and len(parsed) == 1 and isinstance(parsed[0], dict): + plain = _all_text_message_content(parsed[0]) + if plain is not None: + mapped[SpanAttributes.INPUT_VALUE] = plain + mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.TEXT.value + else: + mapped[SpanAttributes.INPUT_VALUE] = in_msgs + mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value for k, v in _flatten_messages(in_msgs, _INPUT_MSGS).items(): mapped[k] = v out_msgs = mapped.get(_OUTPUT_MSGS) if isinstance(out_msgs, str): - mapped[SpanAttributes.OUTPUT_VALUE] = out_msgs - mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + parsed = _safe_json_loads(out_msgs) + plain = None + if isinstance(parsed, list) and parsed and isinstance(parsed[-1], dict): + plain = _all_text_message_content(parsed[-1]) + if plain is not None: + mapped[SpanAttributes.OUTPUT_VALUE] = plain + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.TEXT.value + else: + mapped[SpanAttributes.OUTPUT_VALUE] = out_msgs + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value for k, v in _flatten_messages(out_msgs, _OUTPUT_MSGS).items(): mapped[k] = v +def _bundle_request_parameters(mapped: Dict[str, Any]) -> None: + """Collect ``gen_ai.request.*`` settings (temperature, top_p, max_tokens, + choice.count, etc.) into a single ``gen_ai.request.parameters`` JSON so + the FI dashboard's "Model Parameters" panel has one key to read. + Excludes ``gen_ai.request.model`` — that's its own first-class attribute.""" + if SpanAttributes.GEN_AI_REQUEST_PARAMETERS in mapped: + return + params: Dict[str, Any] = {} + for key, val in list(mapped.items()): + if ( + isinstance(key, str) + and key.startswith("gen_ai.request.") + and key != SpanAttributes.GEN_AI_REQUEST_MODEL + ): + params[key.split("gen_ai.request.", 1)[1]] = val + if params: + mapped[SpanAttributes.GEN_AI_REQUEST_PARAMETERS] = json.dumps(params) + + def _derive_total_tokens(mapped: Dict[str, Any]) -> None: """If input + output tokens are both present, compute the total.""" if _TOTAL_TOKENS in mapped: @@ -165,6 +260,65 @@ def _derive_total_tokens(mapped: Dict[str, Any]) -> None: mapped[_TOTAL_TOKENS] = int(inp) + int(out) +def _add_graph_node_attrs(mapped: Dict[str, Any], kind: str) -> None: + """Stamp ``graph.node.id`` / ``graph.node.name`` so the FI agent-graph view + has stable identifiers for LLM / TOOL / AGENT spans.""" + if kind == FiSpanKindValues.LLM.value or kind == FiSpanKindValues.EMBEDDING.value: + name = mapped.get("gen_ai.response.model") or mapped.get("gen_ai.request.model") + node_id = mapped.get("gen_ai.response.id") or name + if node_id is not None: + mapped[SpanAttributes.GRAPH_NODE_ID] = f"llm_{node_id}" + if name is not None: + mapped[SpanAttributes.GRAPH_NODE_NAME] = name + + elif kind == FiSpanKindValues.AGENT.value: + agent_id = mapped.get("gen_ai.agent.id") + agent_name = mapped.get("gen_ai.agent.name") + if agent_id is not None: + mapped[SpanAttributes.GRAPH_NODE_ID] = f"agent_{agent_id}" + if agent_name is not None: + mapped[SpanAttributes.GRAPH_NODE_NAME] = agent_name + + elif kind == FiSpanKindValues.TOOL.value: + tool_name = mapped.get("gen_ai.tool.name") + call_id = mapped.get("gen_ai.tool.call.id") + if tool_name is not None: + mapped[SpanAttributes.GRAPH_NODE_NAME] = tool_name + node_id = f"tool_{tool_name}" + (f"_{call_id}" if call_id else "") + mapped[SpanAttributes.GRAPH_NODE_ID] = node_id + + elif kind == FiSpanKindValues.CHAIN.value: + if workflow_id := mapped.get("workflow.id"): + mapped[SpanAttributes.GRAPH_NODE_ID] = f"workflow_{workflow_id}" + if workflow_name := mapped.get("workflow.name"): + mapped[SpanAttributes.GRAPH_NODE_NAME] = workflow_name + elif executor_id := mapped.get("executor.id"): + mapped[SpanAttributes.GRAPH_NODE_ID] = f"executor_{executor_id}" + if executor_type := mapped.get("executor.type"): + mapped[SpanAttributes.GRAPH_NODE_NAME] = executor_type + elif edge_group_id := mapped.get("edge_group.id"): + mapped[SpanAttributes.GRAPH_NODE_ID] = f"edge_group_{edge_group_id}" + if edge_group_type := mapped.get("edge_group.type"): + mapped[SpanAttributes.GRAPH_NODE_NAME] = edge_group_type + + +_METADATA_KEYS = ( + "gen_ai.request.choice.count", + "server.address", + "agent_framework.function.invocation.duration", + "agent_framework.function.name", +) + + +def _bundle_metadata(mapped: Dict[str, Any]) -> None: + """Bundle a few miscellaneous attrs into a single ``metadata`` JSON string.""" + if "metadata" in mapped: + return + extras = {k: mapped[k] for k in _METADATA_KEYS if k in mapped} + if extras: + mapped["metadata"] = json.dumps(extras) + + def _enrich_tool(mapped: Dict[str, Any]) -> None: """TOOL spans: lift tool call arguments and result into input/output.""" args = mapped.get(_TOOL_ARGS) @@ -216,10 +370,14 @@ def _map_attributes_to_fi_conventions(attributes: Dict[str, Any]) -> Dict[str, A ): _surface_messages_io(mapped) _derive_total_tokens(mapped) + _bundle_request_parameters(mapped) elif kind == FiSpanKindValues.TOOL.value: _enrich_tool(mapped) # CHAIN: stamped here; I/O bubbled in by AgentFrameworkSpanProcessor.on_end(). + _add_graph_node_attrs(mapped, kind) + _bundle_metadata(mapped) + return mapped From 3b378fc8132f70b6cb5db8b786a52e5c0d06d835 Mon Sep 17 00:00:00 2001 From: Suhani Nagpal Date: Tue, 26 May 2026 13:49:47 +0530 Subject: [PATCH 4/5] test(agent-framework): add coverage for gap fixes and bug fixes +30 tests, total 87 passing in <1s. Each test directly protects against a regression in something the polish round added or a bug that was caught during real-dashboard verification. test_processor.py (+23): - Multi-part message flatten: tool_call, tool_call_response, reasoning, mixed text+tool_call - graph.node.* per kind (LLM/AGENT/TOOL + workflow/executor/edge_group fallback chain for CHAIN) - gen_ai.request.parameters bundle (collected, excludes model, skipped when empty) - Cross-batch bubble-up + per-trace state isolation - embeddings -> EMBEDDING kind, create_agent -> AGENT kind - Smart formatting branches (plain text vs JSON for input.value/output.value) - Status is never set by the processor (Phase 0 finding) - Defensive: span with None attributes, span with MappingProxyType attrs test_integration.py (+7): - FI's default exporter processor survives our install (the silent-span-loss bug) - Our processor is prepended so synchronous downstream processors see mutated attrs - User's explicit enable_sensitive_data=True is not clobbered (the silent-message-loss bug) - Native instrumentation skipped gracefully when agent_framework is not installed --- .../agent-framework/tests/test_integration.py | 113 +++++ .../agent-framework/tests/test_processor.py | 405 ++++++++++++++++++ 2 files changed, 518 insertions(+) diff --git a/python/frameworks/agent-framework/tests/test_integration.py b/python/frameworks/agent-framework/tests/test_integration.py index 46525860..8050a362 100644 --- a/python/frameworks/agent-framework/tests/test_integration.py +++ b/python/frameworks/agent-framework/tests/test_integration.py @@ -135,3 +135,116 @@ def test_user_explicit_disable_is_respected(fresh_global_provider): # Clean up so the explicit-disable doesn't bleed into other tests. _af_enable(force=True) + +# --------------------------------------------------------------------------- +# FI default exporter preservation (the bug that lost spans silently) +# --------------------------------------------------------------------------- + + +def test_install_preserves_fi_default_batch_processor(fresh_global_provider): + """FI's TracerProvider drops its default exporter on first add_span_processor(). + Our install must preserve that default so spans actually reach FI's backend. + """ + from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor + + provider = _register(set_global=True) + pre_count = sum( + isinstance(p, (BatchSpanProcessor, SimpleSpanProcessor)) + for p in _processors_on(provider) + ) + assert pre_count >= 1, "FI register() should install at least one batch/simple processor" + + assert enable_fi_attribute_mapping() is True + + post_processors = _processors_on(provider) + fi_export_processors = [ + p for p in post_processors + if isinstance(p, (BatchSpanProcessor, SimpleSpanProcessor)) + ] + ours = [p for p in post_processors if isinstance(p, AgentFrameworkSpanProcessor)] + assert len(ours) == 1 + assert len(fi_export_processors) >= 1, ( + "FI's default export processor must survive our install — otherwise " + "spans get mutated but never exported." + ) + + +def test_install_prepends_processor_first_in_chain(fresh_global_provider): + """Our processor must run BEFORE downstream processors so any synchronous + processor (e.g., SimpleSpanProcessor) sees the mutated attributes.""" + provider = _register(set_global=True) + assert enable_fi_attribute_mapping() is True + + processors = _processors_on(provider) + # Our processor should be first + assert isinstance(processors[0], AgentFrameworkSpanProcessor), ( + f"Expected AgentFrameworkSpanProcessor first, got {type(processors[0]).__name__}" + ) + + +# --------------------------------------------------------------------------- +# Sensitive-data flag preservation (the bug that silently disabled messages) +# --------------------------------------------------------------------------- + + +def test_enable_does_not_clobber_user_sensitive_data_choice(fresh_global_provider): + """If the user explicitly enabled sensitive_data, our integration must not + re-call enable_instrumentation() with no kwargs (which would silently + reset sensitive_data via the env var).""" + from agent_framework.observability import OBSERVABILITY_SETTINGS, enable_instrumentation + + # Simulate the user explicitly opting in + enable_instrumentation(enable_sensitive_data=True) + assert OBSERVABILITY_SETTINGS.enable_sensitive_data is True + + _register(set_global=True) + enable_fi_attribute_mapping() + + # Our integration must NOT have flipped sensitive_data back to False + assert OBSERVABILITY_SETTINGS.enable_sensitive_data is True + + +def test_enable_turns_on_native_when_off(fresh_global_provider): + """If native instrumentation is off, our integration turns it on.""" + from agent_framework.observability import ( + OBSERVABILITY_SETTINGS, + disable_instrumentation, + enable_instrumentation as _af_enable, + ) + # Force on, then off via the standard path + _af_enable(force=True) + # We can't easily turn it "off" without disable_instrumentation, but that's + # sticky. Instead, verify: when it's already on, our helper doesn't re-call + # — which means the existing sensitive_data choice is preserved. + _af_enable(enable_sensitive_data=True, force=True) + assert OBSERVABILITY_SETTINGS.enable_instrumentation is True + assert OBSERVABILITY_SETTINGS.enable_sensitive_data is True + + _register(set_global=True) + enable_fi_attribute_mapping() + + assert OBSERVABILITY_SETTINGS.enable_instrumentation is True + assert OBSERVABILITY_SETTINGS.enable_sensitive_data is True + + +# --------------------------------------------------------------------------- +# Native instrumentation skip when agent_framework not installed +# --------------------------------------------------------------------------- + + +def test_ensure_native_instrumentation_skips_when_framework_missing(monkeypatch, fresh_global_provider): + """If agent_framework can't be imported, our helper should log and skip, + not raise.""" + import builtins + real_import = builtins.__import__ + + def _fake_import(name, *args, **kwargs): + if name.startswith("agent_framework"): + raise ImportError(f"simulated missing {name}") + return real_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", _fake_import) + + from traceai_agent_framework.integration import _ensure_native_instrumentation_enabled + # Must not raise + _ensure_native_instrumentation_enabled() diff --git a/python/frameworks/agent-framework/tests/test_processor.py b/python/frameworks/agent-framework/tests/test_processor.py index d9cab512..205b012d 100644 --- a/python/frameworks/agent-framework/tests/test_processor.py +++ b/python/frameworks/agent-framework/tests/test_processor.py @@ -522,3 +522,408 @@ def test_processor_shutdown_clears_state(): ) processor.on_end(after) assert "gen_ai.span.kind" not in after._attributes + + +# --------------------------------------------------------------------------- +# Multi-part message flattening: tool_call / tool_call_response / reasoning +# --------------------------------------------------------------------------- + + +def test_flatten_extracts_tool_call_response_content(): + out = _flatten_messages( + json.dumps([ + {"role": "tool", "parts": [ + {"type": "tool_call_response", "id": "call_abc", "response": "sunny in Paris, 22°C"} + ]} + ]), + "gen_ai.output.messages", + ) + assert out["gen_ai.output.messages.0.message.role"] == "tool" + assert out["gen_ai.output.messages.0.message.content"] == "sunny in Paris, 22°C" + assert out["gen_ai.output.messages.0.message.tool_call_id"] == "call_abc" + + +def test_flatten_extracts_tool_call_args_and_id(): + out = _flatten_messages( + json.dumps([ + {"role": "assistant", "parts": [ + {"type": "tool_call", "id": "call_xyz", + "name": "get_weather", "arguments": {"city": "Paris"}} + ]} + ]), + "gen_ai.output.messages", + ) + tc_prefix = "gen_ai.output.messages.0.message.tool_calls.0" + assert out[f"{tc_prefix}.tool_call.id"] == "call_xyz" + assert out[f"{tc_prefix}.tool_call.function.name"] == "get_weather" + assert out[f"{tc_prefix}.tool_call.function.arguments"] == '{"city": "Paris"}' + + +def test_flatten_extracts_reasoning_parts_as_text(): + """Reasoning parts should join into message.content like text parts do.""" + out = _flatten_messages( + json.dumps([ + {"role": "assistant", "parts": [ + {"type": "reasoning", "content": "I should call get_weather"}, + {"type": "text", "content": "Calling tool now."}, + ]} + ]), + "gen_ai.output.messages", + ) + content = out["gen_ai.output.messages.0.message.content"] + assert "I should call get_weather" in content + assert "Calling tool now." in content + + +def test_flatten_handles_mixed_text_and_tool_call_parts(): + """A single message can contain text AND a tool_call — flatten both.""" + out = _flatten_messages( + json.dumps([ + {"role": "assistant", "parts": [ + {"type": "text", "content": "Let me check that."}, + {"type": "tool_call", "id": "c1", "name": "get_weather", + "arguments": {"city": "Tokyo"}}, + ]} + ]), + "gen_ai.output.messages", + ) + assert out["gen_ai.output.messages.0.message.role"] == "assistant" + assert out["gen_ai.output.messages.0.message.content"] == "Let me check that." + assert out["gen_ai.output.messages.0.message.tool_calls.0.tool_call.id"] == "c1" + assert out["gen_ai.output.messages.0.message.tool_calls.0.tool_call.function.name"] == "get_weather" + + +# --------------------------------------------------------------------------- +# graph.node.* per kind +# --------------------------------------------------------------------------- + + +def test_graph_node_for_llm_uses_response_id(): + out = _map_attributes_to_fi_conventions({ + "gen_ai.operation.name": "chat", + "gen_ai.request.model": "gpt-4o", + "gen_ai.response.id": "resp-abc", + }) + assert out["graph.node.id"] == "llm_resp-abc" + assert out["graph.node.name"] == "gpt-4o" + + +def test_graph_node_for_llm_falls_back_to_model_when_no_response_id(): + out = _map_attributes_to_fi_conventions({ + "gen_ai.operation.name": "chat", + "gen_ai.request.model": "gpt-4o", + }) + assert out["graph.node.id"] == "llm_gpt-4o" + + +def test_graph_node_for_agent_uses_agent_id(): + out = _map_attributes_to_fi_conventions({ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.agent.id": "ag-1", + "gen_ai.agent.name": "weather_agent", + }) + assert out["graph.node.id"] == "agent_ag-1" + assert out["graph.node.name"] == "weather_agent" + + +def test_graph_node_for_tool_uses_name_and_call_id(): + out = _map_attributes_to_fi_conventions({ + "gen_ai.operation.name": "execute_tool", + "gen_ai.tool.name": "get_weather", + "gen_ai.tool.call.id": "call-1", + }) + assert out["graph.node.id"] == "tool_get_weather_call-1" + assert out["graph.node.name"] == "get_weather" + + +def test_graph_node_for_chain_uses_workflow_id(): + out = _map_attributes_to_fi_conventions({ + "workflow.id": "wf-99", + "workflow.name": "MyFlow", + }) + assert out["graph.node.id"] == "workflow_wf-99" + assert out["graph.node.name"] == "MyFlow" + + +def test_graph_node_for_chain_uses_executor_id_when_no_workflow_id(): + out = _map_attributes_to_fi_conventions({ + "executor.id": "exec-5", + "executor.type": "FunctionExecutor", + }) + assert out["graph.node.id"] == "executor_exec-5" + assert out["graph.node.name"] == "FunctionExecutor" + + +def test_graph_node_for_chain_uses_edge_group_id_when_no_workflow_or_executor(): + out = _map_attributes_to_fi_conventions({ + "edge_group.id": "eg-7", + "edge_group.type": "SingleEdgeGroup", + }) + assert out["graph.node.id"] == "edge_group_eg-7" + assert out["graph.node.name"] == "SingleEdgeGroup" + + +# --------------------------------------------------------------------------- +# gen_ai.request.parameters bundling +# --------------------------------------------------------------------------- + + +def test_request_params_bundled_for_llm(): + attrs = { + "gen_ai.operation.name": "chat", + "gen_ai.request.model": "gpt-4o", + "gen_ai.request.temperature": 0.7, + "gen_ai.request.top_p": 0.9, + "gen_ai.request.max_tokens": 1000, + "gen_ai.request.choice.count": 1, + } + out = _map_attributes_to_fi_conventions(attrs) + params = json.loads(out["gen_ai.request.parameters"]) + assert params["temperature"] == 0.7 + assert params["top_p"] == 0.9 + assert params["max_tokens"] == 1000 + assert params["choice.count"] == 1 + + +def test_request_params_excludes_model_key(): + attrs = { + "gen_ai.operation.name": "chat", + "gen_ai.request.model": "gpt-4o", + "gen_ai.request.temperature": 0.5, + } + out = _map_attributes_to_fi_conventions(attrs) + params = json.loads(out["gen_ai.request.parameters"]) + assert "model" not in params # gen_ai.request.model is excluded + + +def test_request_params_skipped_when_no_request_attrs(): + """LLM span with only model + nothing else should NOT have a parameters bundle.""" + attrs = { + "gen_ai.operation.name": "chat", + "gen_ai.request.model": "gpt-4o", + } + out = _map_attributes_to_fi_conventions(attrs) + assert "gen_ai.request.parameters" not in out + + +# --------------------------------------------------------------------------- +# Cross-batch bubble-up (state preservation across on_end calls) +# --------------------------------------------------------------------------- + + +def test_bubble_state_isolated_across_traces(): + """A descendant from trace A must not bubble into trace B's parent.""" + processor = AgentFrameworkSpanProcessor() + + agent_a = _FakeReadableSpan( + span_id=10, parent_id=1, start_time=10, end_time=50, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"trace A"}]}]', + }, + ) + workflow_a = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"workflow.id": "wf-a"}, + ) + workflow_b = _FakeReadableSpan( + span_id=2, parent_id=None, start_time=200, end_time=300, + attrs={"workflow.id": "wf-b"}, + ) + + processor.on_end(agent_a) + processor.on_end(workflow_a) + processor.on_end(workflow_b) + + # workflow_a should have bubbled-in input from its descendant + assert "trace A" in workflow_a._attributes["input.value"] + # workflow_b had no descendants — must not pick up A's data + assert "input.value" not in workflow_b._attributes + + +def test_bubble_up_works_when_descendants_end_long_before_parent(): + """Descendants from earlier on_end calls should still bubble into a later parent.""" + processor = AgentFrameworkSpanProcessor() + + # Imagine 3 separate invocations of on_end before the workflow ends + child1 = _FakeReadableSpan( + span_id=20, parent_id=2, start_time=10, end_time=20, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"first"}]}]', + "gen_ai.output.messages": '[{"role":"assistant","parts":[{"type":"text","content":"early"}]}]', + }, + ) + executor1 = _FakeReadableSpan( + span_id=2, parent_id=1, start_time=5, end_time=25, + attrs={"executor.id": "e1"}, + ) + child2 = _FakeReadableSpan( + span_id=30, parent_id=3, start_time=40, end_time=80, + attrs={ + "gen_ai.operation.name": "invoke_agent", + "gen_ai.output.messages": '[{"role":"assistant","parts":[{"type":"text","content":"late"}]}]', + }, + ) + executor2 = _FakeReadableSpan( + span_id=3, parent_id=1, start_time=35, end_time=85, + attrs={"executor.id": "e2"}, + ) + workflow = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"workflow.id": "wf"}, + ) + + # End them in their natural order: deepest first, root last + processor.on_end(child1) + processor.on_end(executor1) + processor.on_end(child2) + processor.on_end(executor2) + processor.on_end(workflow) + + # workflow gets first input (from child1) and last output (from child2) + assert "first" in workflow._attributes["input.value"] + assert "late" in workflow._attributes["output.value"] + + +# --------------------------------------------------------------------------- +# embeddings + create_agent classification +# --------------------------------------------------------------------------- + + +def test_classify_embeddings_is_embedding_kind(): + assert _classify_span_kind({"gen_ai.operation.name": "embeddings"}) == "EMBEDDING" + + +def test_embedding_span_gets_messages_lifted_like_llm(): + attrs = { + "gen_ai.operation.name": "embeddings", + "gen_ai.request.model": "text-embedding-3", + "gen_ai.input.messages": '[{"role":"user","parts":[{"type":"text","content":"embed me"}]}]', + "gen_ai.usage.input_tokens": 5, + } + out = _map_attributes_to_fi_conventions(attrs) + assert out["gen_ai.span.kind"] == "EMBEDDING" + assert out["input.value"] == "embed me" # single-text → plain text + assert "gen_ai.input.messages.0.message.content" in out + + +def test_classify_create_agent_is_agent_kind(): + assert _classify_span_kind({"gen_ai.operation.name": "create_agent"}) == "AGENT" + + +# --------------------------------------------------------------------------- +# Smart formatting branches +# --------------------------------------------------------------------------- + + +def test_output_value_is_plain_text_for_single_text_assistant_msg(): + attrs = _make_chat_attrs( + [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}], + [{"role": "assistant", "parts": [{"type": "text", "content": "hello"}]}], + ) + out = _map_attributes_to_fi_conventions(attrs) + assert out["output.value"] == "hello" + assert out["output.mime_type"] == "text/plain" + + +def test_output_value_stays_json_for_multi_message_output(): + """If output has multiple messages OR non-text parts, output.value should be raw JSON.""" + out_msgs = [ + {"role": "assistant", "parts": [ + {"type": "tool_call", "id": "c1", "name": "f", "arguments": {}} + ]}, + {"role": "tool", "parts": [ + {"type": "tool_call_response", "id": "c1", "response": "ok"} + ]}, + {"role": "assistant", "parts": [{"type": "text", "content": "done"}]}, + ] + attrs = _make_chat_attrs( + [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}], + out_msgs, + ) + out = _map_attributes_to_fi_conventions(attrs) + # Last message is assistant with text-only → plain text is fine for output + assert out["output.value"] == "done" + + +def test_input_with_tool_role_message_uses_json_format(): + """If input is a tool-role message (not a single user msg), keep JSON format.""" + in_msgs = [ + {"role": "tool", "parts": [ + {"type": "tool_call_response", "id": "c1", "response": "data"} + ]} + ] + attrs = _make_chat_attrs( + in_msgs, + [{"role": "assistant", "parts": [{"type": "text", "content": "ok"}]}], + ) + out = _map_attributes_to_fi_conventions(attrs) + # Single-message but not a text-only message → keeps JSON + assert out["input.mime_type"] == "application/json" + + +# --------------------------------------------------------------------------- +# Status untouched (Phase 0 finding) +# --------------------------------------------------------------------------- + + +def test_processor_does_not_set_status_attribute(): + """The processor must not touch span status; that's the framework's job.""" + processor = AgentFrameworkSpanProcessor() + span = _FakeReadableSpan( + span_id=1, parent_id=None, start_time=0, end_time=100, + attrs={"gen_ai.operation.name": "chat", "gen_ai.request.model": "gpt-4o"}, + ) + # Mark a custom status sentinel on the fake before processing + original_status = "untouched-sentinel" + span._status = original_status # type: ignore[attr-defined] + processor.on_end(span) + assert span._status == original_status, "processor must not set or clear status" + + +# --------------------------------------------------------------------------- +# Defensive: edge cases on the span itself +# --------------------------------------------------------------------------- + + +def test_processor_handles_span_with_none_attributes(): + """Span with _attributes=None must not crash the processor.""" + processor = AgentFrameworkSpanProcessor() + + class _NoAttrsSpan: + instrumentation_scope = _FakeScope("agent_framework") + _attributes = None + @property + def parent(self): return None + class _Ctx: + span_id = 1 + context = _Ctx() + start_time = 0 + end_time = 100 + + # Must not raise + processor.on_end(_NoAttrsSpan()) + + +def test_processor_handles_mapping_proxy_attributes(): + """OTel SDK sometimes wraps attrs in MappingProxyType; we should coerce safely.""" + import types as _types + processor = AgentFrameworkSpanProcessor() + underlying = {"gen_ai.operation.name": "execute_tool"} + + class _MappedSpan: + instrumentation_scope = _FakeScope("agent_framework") + _attributes = _types.MappingProxyType(underlying) + parent = None + class _Ctx: + span_id = 1 + context = _Ctx() + start_time = 0 + end_time = 100 + + span = _MappedSpan() + processor.on_end(span) + # After mutation, _attributes is a fresh dict (not the proxy) with the new keys + assert isinstance(span._attributes, dict) + assert span._attributes["gen_ai.span.kind"] == "TOOL" From 188cae013ef5192517f259036efc2e02784098ef Mon Sep 17 00:00:00 2001 From: Suhani Nagpal Date: Wed, 27 May 2026 11:11:37 +0530 Subject: [PATCH 5/5] fix(agent-framework): preserve raw gen_ai.messages JSON on input/output values Drop the plain-text downgrade in _surface_messages_io. input.value and output.value now always mirror the raw gen_ai.input.messages / gen_ai.output.messages JSON blob with mime_type=application/json, preserving Microsoft's native parts shape (tool_call / tool_call_response / text). Matches the openai/anthropic/litellm sibling adapters' pattern. Removes the _all_text_message_content helper that was only used by the dropped path. Updates 5 tests to assert the JSON-always behavior. --- .../agent-framework/tests/test_e2e_agent.py | 8 ++- .../agent-framework/tests/test_processor.py | 57 ++++++++----------- .../traceai_agent_framework/processor.py | 46 +++------------ 3 files changed, 35 insertions(+), 76 deletions(-) diff --git a/python/frameworks/agent-framework/tests/test_e2e_agent.py b/python/frameworks/agent-framework/tests/test_e2e_agent.py index 79d03d8f..85d48078 100644 --- a/python/frameworks/agent-framework/tests/test_e2e_agent.py +++ b/python/frameworks/agent-framework/tests/test_e2e_agent.py @@ -103,9 +103,11 @@ async def test_agent_run_emits_agent_span_with_fi_attrs(captured): attrs = agent_spans[0].attributes assert attrs["gen_ai.span.kind"] == "AGENT" - # Single text-only message on both sides → plain-text format. - assert attrs["input.mime_type"] == "text/plain" - assert attrs["output.mime_type"] == "text/plain" + # input/output values always mirror the raw gen_ai.*.messages JSON shape. + assert attrs["input.mime_type"] == "application/json" + assert attrs["output.mime_type"] == "application/json" + assert attrs["input.value"] == attrs["gen_ai.input.messages"] + assert attrs["output.value"] == attrs["gen_ai.output.messages"] assert attrs["gen_ai.input.messages.0.message.role"] == "user" assert attrs["gen_ai.input.messages.0.message.content"] == "What's the weather in Paris?" assert attrs["gen_ai.output.messages.0.message.role"] == "assistant" diff --git a/python/frameworks/agent-framework/tests/test_processor.py b/python/frameworks/agent-framework/tests/test_processor.py index 205b012d..04e700a4 100644 --- a/python/frameworks/agent-framework/tests/test_processor.py +++ b/python/frameworks/agent-framework/tests/test_processor.py @@ -99,17 +99,17 @@ def _make_chat_attrs(input_msgs, output_msgs, input_tokens=None, output_tokens=N def test_llm_lifts_messages_to_input_output_value(): - """Single text-only user message → plain text in input.value (not JSON blob).""" + """input.value/output.value mirror the raw gen_ai.*.messages JSON blob.""" attrs = _make_chat_attrs( [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}], [{"role": "assistant", "parts": [{"type": "text", "content": "hello"}]}], ) out = _map_attributes_to_fi_conventions(attrs) assert out["gen_ai.span.kind"] == "LLM" - assert out["input.value"] == "hi" - assert out["input.mime_type"] == "text/plain" - assert out["output.value"] == "hello" - assert out["output.mime_type"] == "text/plain" + assert out["input.value"] == attrs["gen_ai.input.messages"] + assert out["input.mime_type"] == "application/json" + assert out["output.value"] == attrs["gen_ai.output.messages"] + assert out["output.mime_type"] == "application/json" def test_llm_lifts_messages_keeps_json_when_complex(): @@ -413,11 +413,13 @@ def test_chain_io_bubbles_up_from_descendant(): processor.on_end(executor) processor.on_end(workflow) - # workflow.run got the bubbled input/output from its grandchild agent + # workflow.run got the bubbled input/output from its grandchild agent. + # input.value/output.value are the raw gen_ai.*.messages JSON blobs. assert workflow._attributes["gen_ai.span.kind"] == "CHAIN" - # Single text-only user/assistant message → plain text format - assert workflow._attributes["input.value"] == "hi" - assert workflow._attributes["output.value"] == "hello" + assert "hi" in workflow._attributes["input.value"] + assert "hello" in workflow._attributes["output.value"] + assert workflow._attributes["input.mime_type"] == "application/json" + assert workflow._attributes["output.mime_type"] == "application/json" # executor.process also got them (still a chain) assert executor._attributes["gen_ai.span.kind"] == "CHAIN" assert "input.value" in executor._attributes @@ -804,7 +806,8 @@ def test_embedding_span_gets_messages_lifted_like_llm(): } out = _map_attributes_to_fi_conventions(attrs) assert out["gen_ai.span.kind"] == "EMBEDDING" - assert out["input.value"] == "embed me" # single-text → plain text + assert out["input.value"] == attrs["gen_ai.input.messages"] + assert out["input.mime_type"] == "application/json" assert "gen_ai.input.messages.0.message.content" in out @@ -813,22 +816,24 @@ def test_classify_create_agent_is_agent_kind(): # --------------------------------------------------------------------------- -# Smart formatting branches +# input.value / output.value always use the raw JSON shape # --------------------------------------------------------------------------- -def test_output_value_is_plain_text_for_single_text_assistant_msg(): +def test_input_output_value_always_json_mime_type(): attrs = _make_chat_attrs( [{"role": "user", "parts": [{"type": "text", "content": "hi"}]}], [{"role": "assistant", "parts": [{"type": "text", "content": "hello"}]}], ) out = _map_attributes_to_fi_conventions(attrs) - assert out["output.value"] == "hello" - assert out["output.mime_type"] == "text/plain" + assert out["input.value"] == attrs["gen_ai.input.messages"] + assert out["output.value"] == attrs["gen_ai.output.messages"] + assert out["input.mime_type"] == "application/json" + assert out["output.mime_type"] == "application/json" -def test_output_value_stays_json_for_multi_message_output(): - """If output has multiple messages OR non-text parts, output.value should be raw JSON.""" +def test_input_output_value_preserves_complex_message_shape(): + """Tool-call / tool-result / multi-turn shapes pass through verbatim.""" out_msgs = [ {"role": "assistant", "parts": [ {"type": "tool_call", "id": "c1", "name": "f", "arguments": {}} @@ -843,24 +848,8 @@ def test_output_value_stays_json_for_multi_message_output(): out_msgs, ) out = _map_attributes_to_fi_conventions(attrs) - # Last message is assistant with text-only → plain text is fine for output - assert out["output.value"] == "done" - - -def test_input_with_tool_role_message_uses_json_format(): - """If input is a tool-role message (not a single user msg), keep JSON format.""" - in_msgs = [ - {"role": "tool", "parts": [ - {"type": "tool_call_response", "id": "c1", "response": "data"} - ]} - ] - attrs = _make_chat_attrs( - in_msgs, - [{"role": "assistant", "parts": [{"type": "text", "content": "ok"}]}], - ) - out = _map_attributes_to_fi_conventions(attrs) - # Single-message but not a text-only message → keeps JSON - assert out["input.mime_type"] == "application/json" + assert out["output.value"] == attrs["gen_ai.output.messages"] + assert out["output.mime_type"] == "application/json" # --------------------------------------------------------------------------- diff --git a/python/frameworks/agent-framework/traceai_agent_framework/processor.py b/python/frameworks/agent-framework/traceai_agent_framework/processor.py index 358759cf..6f1f9adc 100644 --- a/python/frameworks/agent-framework/traceai_agent_framework/processor.py +++ b/python/frameworks/agent-framework/traceai_agent_framework/processor.py @@ -177,56 +177,24 @@ def _classify_span_kind(attributes: Dict[str, Any]) -> Optional[str]: # --------------------------------------------------------------------------- -def _all_text_message_content(msg: Dict[str, Any]) -> Optional[str]: - """If a message has only text/reasoning parts, return joined content. Else None.""" - parts = msg.get("parts") - if not isinstance(parts, list) or not parts: - return None - texts: List[str] = [] - for part in parts: - if not isinstance(part, dict): - return None - if part.get("type") not in ("text", "reasoning"): - return None # has a tool_call etc.; bail - content = part.get("content") - if isinstance(content, str): - texts.append(content) - return "\n".join(texts) if texts else None - - def _surface_messages_io(mapped: Dict[str, Any]) -> None: """Lift ``gen_ai.input/output.messages`` into ``input.value``/``output.value`` + flatten. - For single text-only messages we use plain text instead of the raw JSON blob — - matches OpenInference's UX choice and renders cleaner in the FI dashboard. + The raw JSON is preserved on ``input.value``/``output.value`` (mime + ``application/json``) so the framework's native message shape is retained, + matching the openai/anthropic/litellm sibling adapters. """ in_msgs = mapped.get(_INPUT_MSGS) if isinstance(in_msgs, str): - parsed = _safe_json_loads(in_msgs) - plain = None - if isinstance(parsed, list) and len(parsed) == 1 and isinstance(parsed[0], dict): - plain = _all_text_message_content(parsed[0]) - if plain is not None: - mapped[SpanAttributes.INPUT_VALUE] = plain - mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.TEXT.value - else: - mapped[SpanAttributes.INPUT_VALUE] = in_msgs - mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + mapped[SpanAttributes.INPUT_VALUE] = in_msgs + mapped[SpanAttributes.INPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value for k, v in _flatten_messages(in_msgs, _INPUT_MSGS).items(): mapped[k] = v out_msgs = mapped.get(_OUTPUT_MSGS) if isinstance(out_msgs, str): - parsed = _safe_json_loads(out_msgs) - plain = None - if isinstance(parsed, list) and parsed and isinstance(parsed[-1], dict): - plain = _all_text_message_content(parsed[-1]) - if plain is not None: - mapped[SpanAttributes.OUTPUT_VALUE] = plain - mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.TEXT.value - else: - mapped[SpanAttributes.OUTPUT_VALUE] = out_msgs - mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value + mapped[SpanAttributes.OUTPUT_VALUE] = out_msgs + mapped[SpanAttributes.OUTPUT_MIME_TYPE] = FiMimeTypeValues.JSON.value for k, v in _flatten_messages(out_msgs, _OUTPUT_MSGS).items(): mapped[k] = v