分类
OpenAI开发框架
AI相关
2026-06-23
5

OpenAI Agent SDK

设计原则

  • 具有足够的功能,但不增加过多的新东西
  • 开箱即用,可以自定义内部细节

主要功能

  • 代理循环:内置,负责调用工具,结果给LLM并循环直到完成
  • 交接:多个代理之间的协调和委派
  • 护栏:与代理并运行输入验证和检查,如果检查失败则提前中断
  • Python优先:使用内置语言功能来协调和链接代理,不需要学习新的抽象概念
  • 函数工具:将任何Python函数转为工具
  • 跟踪:跟踪调试,监控,支持可视化

安装

pip install openai-agents

基础使用

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen2.5:7b-instruct-q4_K_M"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

from agents import Agent, Runner

# 关键:在 Agent 构造函数中直接指定 model 参数
agent = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)

result = Runner.run_sync(agent, "你是谁")
print(result.final_output)

异步写法

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen2.5:7b-instruct-q4_K_M"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

agent = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)

async def main():
    result = await Runner.run(agent, "你是谁")
    print(result.final_output)

if __name__  '__main__':
    asyncio.run(main())

流式传输

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen2.5:7b-instruct-q4_K_M"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

agent = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)


async def main():
    # 注意:不需要 await,直接调用
    result = Runner.run_streamed(agent, "讲一个300字的故事")

    # 遍历流式事件
    async for event in result.stream_events():
        if event.type  "raw_response_event":
            # raw_response_event 来自LLM的响应
            if hasattr(event.data, 'delta') and event.data.delta:
                print(event.data.delta, end='', flush=True)
        elif event.type  "run_item_stream_event":
            # run_item_stream_event 来自runner事件(工具调用,交接)
            # 可以选择打印工具调用等信息
            pass

if __name__  '__main__':
    asyncio.run(main())

连续对话

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen2.5:7b-instruct-q4_K_M"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

agent = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)

# 使用列表存储对话历史
conversation_history = []


async def main(msg):
    global conversation_history
    conversation_history.append({
        "role": "user",
        "content": msg,
    })
    result = Runner.run_streamed(agent, conversation_history)
    # 收集助手的完整回复
    full_response = ""
    # 遍历流式事件
    async for event in result.stream_events():
        if event.type  "raw_response_event":
            # raw_response_event 来自LLM的响应
            if hasattr(event.data, 'delta') and event.data.delta:
                print(event.data.delta, end='', flush=True)
                full_response += event.data.delta
        elif event.type  "run_item_stream_event":
            # run_item_stream_event 来自runner事件(工具调用,交接)
            # 可以选择打印工具调用等信息
            pass

    # 添加助手的回复到历史
    conversation_history.append({
        "role": "assistant",
        "content": full_response,
    })


async def run_conversation():
    await main("1+1=?")
    await asyncio.sleep(3)
    print()
    await main("再加1呢")
    await asyncio.sleep(3)
    print()
    await main("再加1呢")


if __name__  '__main__':
    asyncio.run(run_conversation())

多模态

import base64
from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen3-vl:2b"  # 改成支持多模态的模型!

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

agent = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME
)


# 图片转base64
def encode_image(image_path):
    with open(image_path, 'rb') as image_file:
        bin_content = image_file.read()
        return base64.b64encode(bin_content).decode("utf-8")


async def main():
    base64_image = encode_image("img.png")

    message = [
        {
            "role": "user",
            "content": [
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/png;base64,{base64_image}"
                    }
                },
                {
                    "type": "text",
                    "text": "图片中有什么内容?"
                }
            ]
        }
    ]

    result = Runner.run_streamed(agent, message)

    # 遍历流式事件
    async for event in result.stream_events():
        if event.type  "raw_response_event":
            # raw_response_event 来自LLM的响应
            if hasattr(event.data, 'delta') and event.data.delta:
                print(event.data.delta, end='', flush=True)
        elif event.type  "run_item_stream_event":
            # run_item_stream_event 来自runner事件(工具调用,交接)
            pass


if __name__  '__main__':
    asyncio.run(main())

结构化输出示例

import base64
from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio
from pydantic import BaseModel

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen3-vl:2b"  # 改成支持多模态的模型!

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)


# 结构化输出
class Data(BaseModel):
    原信息: str
    建议: str


agent = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME,
    output_type=Data,  # 指定输出结构
)


# 图片转base64
def encode_image(image_path):
    with open(image_path, 'rb') as image_file:
        bin_content = image_file.read()
        return base64.b64encode(bin_content).decode("utf-8")


async def main():
    base64_image = encode_image("2.jpg")

    message = [
        {
            "role": "user",
            "content": [
                {
                    "type": "image_url",
                    "image_url": {
                        "url": f"data:image/png;base64,{base64_image}"
                    }
                },
                {
                    "type": "text",
                    "text": "图片中有什么内容?你对排版和配色有什么建议"
                }
            ]
        }
    ]

    result = Runner.run_streamed(agent, message)

    # 遍历流式事件
    async for event in result.stream_events():
        if event.type  "raw_response_event":
            # raw_response_event 来自LLM的响应
            if hasattr(event.data, 'delta') and event.data.delta:
                print(event.data.delta, end='', flush=True)
        elif event.type  "run_item_stream_event":
            # run_item_stream_event 来自runner事件(工具调用,交接)
            pass


if __name__  '__main__':
    asyncio.run(main())

工具调用

使用"""描述好工具说明,agent会自己调用,并再次执行,不需手动执行

import httpx
from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner, function_tool
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen2.5:7b-instruct-q4_K_M"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)


# 天气工具
@function_tool
def get_weather(latitude, longitude):
    """根据指定坐标提供当地摄氏温度"""
    url = f"https://api.open-meteo.com/v1/forecast"
    params = {
        'latitude': latitude,
        'longitude': longitude,
        'current': 'temperature_2m,relative_humidity_2m,wind_speed_10m',
        'hourly': 'temperature_2m,relative_humidity_2m,precipitation,wind_speed_10m,cloud_cover',
        'timezone': 'Asia/Shanghai',
        'forecast_days': 1
    }

    response = httpx.get(url, params=params)
    data = response.json()

    return f"当前温度: {data['current']['temperature_2m']}°C"


agent = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME,
    tools=[get_weather]  # 工具,可以多个

)


async def main():
    # 注意:不需要 await,直接调用
    result = Runner.run_streamed(agent, "今天贵阳天气如何")

    # 遍历流式事件
    async for event in result.stream_events():
        if event.type  "raw_response_event":
            # raw_response_event 来自LLM的响应
            if hasattr(event.data, 'delta') and event.data.delta:
                print(event.data.delta, end='', flush=True)
        elif event.type  "run_item_stream_event":
            # run_item_stream_event 来自runner事件(工具调用,交接)
            # 可以选择打印工具调用等信息
            pass


if __name__  '__main__':
    asyncio.run(main())

MCP

pip install mcp
  • 启动MCPServer
  • 把MCPServer传递给Agent

MCP脚本

import os
from mcp.server.fastmcp import FastMCP

mcp = FastMCP("MCP Demo")

@mcp.tool()
def ls():
    """列出目录中的文件名"""
    return os.listdir(".")

if __name__  '__main__':
    mcp.run()

测试运行

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio
from agents.mcp import MCPServerStdio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen2.5:7b-instruct-q4_K_M"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
set_default_openai_api("chat_completions")
set_tracing_disabled(disabled=True)


async def main():
    # 启动独立的 MCP 服务器进程
    async with MCPServerStdio(
            name="MCP Demo",
            params={
                "command": "python",
                "args": ["mcp_server.py"]  # 指向服务器文件
            }
    ) as mcp_server:

        agent = Agent(
            name="AI助手",
            instructions="你是一个友好的AI助手,使用中文交流,可以使用 ls 工具查看文件列表",
            model=OPENAI_MODEL_NAME,
            mcp_servers=[mcp_server],
        )

        result = Runner.run_streamed(agent, "当前目录有哪些文件")

        async for event in result.stream_events():
            if event.type  "raw_response_event":
                if hasattr(event.data, 'delta') and event.data.delta:
                    print(event.data.delta, end='', flush=True)


if __name__  '__main__':
    asyncio.run(main())

多Agent编排

模式:

  • 顺序
  • 并行
  • 路由(交接)
  • Agent即工具
  • 监督
  • 护栏(并行监督)

顺序

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen3-vl:2b"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

agent_a = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,使用中文交流",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)

agent_b = Agent(
    name="AI助手",
    instructions="对前一个Agent回答进行简要总结",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)


async def main():
    result1 = await Runner.run(agent_a, "太阳围绕地球,还是地球围绕太阳?")
    print(result1.final_output)
    print("----------------------------")
    result2 = await Runner.run(agent_b, result1.final_output)
    print(result2.final_output)


if __name__  '__main__':
    asyncio.run(main())

并行

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen3-vl:2b"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

agent_a = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,只使用中文交流",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)

agent_b = Agent(
    name="AI助手",
    instructions="你是一个友好的AI助手,只使用英文交流",
    model=OPENAI_MODEL_NAME  # 添加这一行,明确指定模型名称
)


async def main():
    result1, result2 = await asyncio.gather(
        Runner.run(agent_a, "每天应该喝多少水?"),
        Runner.run(agent_b, "每天应该喝多少水?")
    )
    print(result1.final_output)
    print("----------------------------")
    print(result2.final_output)


if __name__  '__main__':
    asyncio.run(main())

路由/交接

原理是把其他的Agent封装为了函数进行调用

from openai import AsyncOpenAI
from agents import (
    set_default_openai_client,
    set_default_openai_api,
    set_tracing_disabled
)
from agents import Agent, Runner
import asyncio

# Ollama 配置
OPENAI_BASE_URL = "http://localhost:11434/v1"
OPENAI_API_KEY = "ollama"
OPENAI_MODEL_NAME = "qwen3-vl:2b"

client = AsyncOpenAI(
    base_url=OPENAI_BASE_URL,
    api_key=OPENAI_API_KEY,
)

# 使用自定义客户端
set_default_openai_client(client=client, use_for_tracing=False)
# 使用兼容的API模式
set_default_openai_api("chat_completions")
# 禁用OpenAI的跟踪服务
set_tracing_disabled(disabled=True)

agent_a = Agent(
    name="AI助手Java",
    instructions="很擅长Java解答,只用英文交流",
    model=OPENAI_MODEL_NAME,
    handoff_description="只使用英文回答"
)

agent_b = Agent(
    name="AI助手Python",
    instructions="很擅长Python解答,只用中文交流",
    model=OPENAI_MODEL_NAME,
    handoff_description="只是用中文回答"
)

agent_c = Agent(
    name="前台助手",
    instructions="不回答问题,根据问题范畴把问题交给适合的Agent",
    model=OPENAI_MODEL_NAME,
    handoffs=[agent_a, agent_b] # 可交接Agent
)


async def main():
    result = Runner.run_streamed(agent_c, "Java的基础数据类型,直接说答案")
    async for event in result.stream_events():
        # print(event)
        if event.type  "raw_response_event":
            # raw_response_event 来自LLM的响应
            if hasattr(event.data, 'delta') and event.data.delta:
                print(event.data.delta, end='', flush=True)
        elif event.type  "run_item_stream_event":
            # run_item_stream_event 来自runner事件(工具调用,交接)
            # 可以选择打印工具调用等信息
            pass


if __name__  '__main__':
    asyncio.run(main())
目录
统计
21
分类
206
文档
2
坚持