Skip to content

千问大模型智能体接入程序

GitHub: https://github.com/auliwenjiang/agentcp/blob/master/samples/llm_agent/qwen_agent.py

基于 AgentCP SDK 开发的千问大模型智能体,实现大模型能力与智能体网络的无缝对接。使网络中的其他智能体可以通过调用该智能体的方式来使用千问大模型。

1. 使用指南

1) 创建一个 agent 身份

请参考 创建身份,读写公有私有数据

2) 修改 qwen_agent.py 文件

将 seed_password、your_aid 修改为步骤 1)创建的身份信息

3) 配置文件

配置文件配置在智能体私有数据目录下的 ACP 路径 /AIDs/youraid/private/data/env.json 中(简化智能体迁移克隆流程)

data/env.json 中配置服务参数:

json
{
    "OPENAI_API_KEY": "your_api_key",
    "BASE_URL": "https://api.example.com/v1",
    "MODEL": "qwen-72b-chat"
}

4) 执行代码

bash
python qwen_agent.py

2. 功能特性

  • 完整的消息处理机制
  • 支持流式响应和工具调用
  • 智能体网络接入能力
  • 多角色对话支持
  • 异常处理与日志追踪

3. 环境要求

4. 代码结构

bash
.
├── qwen_agent.py       # 核心业务逻辑
├── create_profile.py   # 智能体配置文件生成

5. 核心类说明

QwenClient

python
class QwenClient:
    def __init__(self):
        self.openai_api_key = None
        self.base_url = None
        self.model = None
        self.client = None
        self.acp = agentcp.AgentCP(".",seed_password="888777",debug=True)
        self.agentid:agentcp.AgentID = None

    def init_ai_client(self,json_data):
        # 从环境变量中获取 API Key 和 Base URL
        self.openai_api_key = json_data.get("OPENAI_API_KEY","")
        self.base_url = json_data.get("BASE_URL","")
        self.model = json_data.get("MODEL","")
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)

    async def async_message_handler(self, message_data):
        try:
            receiver = message_data.get("receiver")
            sender  = message_data.get("sender","")
            if self.agentid.id not in receiver:
                print("不是发给我的消息,不处理")
                return
            message_array = self.agentid.get_content_array_from_message(message_data)
            if len(message_array) == 0:
                print("消息内容为空,不处理")
                return
            llm_content = self.agentid.get_content_from_message(message_data)
            stream = message_array[0].get("stream",False)
            tools  = message_array[0].get("tools",[])
            rolesetting = message_array[0].get("prompt","")
            if rolesetting!="" and rolesetting!=None:
                messages = [{"role": "system", "content": rolesetting},{"role": "user", "content": llm_content}]
            else:
                messages = [{"role": "user", "content": llm_content}]
            print(f"\n[处理消息: {sender} : {llm_content}]\n")
            await self.stream_process_query(message_data,messages,sender,stream,tools)
        except Exception as e:
            import traceback
            print(f"处理消息时发生错误: {e}\n完整堆栈跟踪:\n{traceback.format_exc()}")

    def send_message_tools_call(self, session_id,sender,llm_content: str,funcallback):
        to_aid_list = [sender]
        msg_block = {
            "type": "tool_call",
            "status": "success",
            "timestamp": int(time.time() * 1000),
            "content": llm_content,
        }
        self.agentid.add_message_handler(funcallback,session_id)
        self.agentid.send_message(session_id,to_aid_list, msg_block)

    async def stream_process_query(self, message_data:dict, messages: list, sender:str,stream: bool,user_tools:list):
        if user_tools is None:
            user_tools = []
        if len(user_tools) > 0:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                stream=stream,
                tools = user_tools
            )
        else:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                stream=stream
            )
        session_id = message_data.get("session_id","")
        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            print(f"\n[Calling tool {tool_name} with args {tool_args}]\n")
            async def funcallback(result_content):
                self.agentid.remove_message_handler(funcallback,session_id)
                messages.append(content.message.model_dump())
                messages.append({
                    "role": "tool",
                    "content": self.agentid.get_content_from_message(result_content),
                    "tool_call_id": tool_call.id,
                })
                await self.stream_process_query(message_data,messages,sender,stream,user_tools)
            tool_content = {
                'tool_name':tool_name,
                'tool_args':tool_args,
            }
            self.send_message_tools_call(session_id,sender,json.dumps(tool_content),funcallback)
            return
        if stream:
            await self.agentid.send_stream_message(message_data.get("session_id"),[sender] , response)
        else:
            return self.agentid.reply_message(message_data, content.message.content)

主要方法

  1. 消息处理 - async_message_handler
python
async def async_message_handler(self, message_data):
    # 消息过滤与解析
    # 构建对话上下文
    # 调用处理流程
  1. 流式处理 - stream_process_query
python
async def stream_process_query(self, message_data, messages, sender, stream, user_tools):
    # 判断工具调用
    # 处理大模型响应
    # 流式/非流式响应处理
  1. 工具调用 - send_message_tools_call
python
def send_message_tools_call(self, session_id, sender, llm_content, funcallback):
    # 发送工具调用请求
    # 注册回调处理器

6. 错误处理

程序包含完善的异常捕获机制,可通过 DEBUG 模式查看详细日志:

python
AgentCP(..., debug=True)  # 启用调试模式

7. 注意事项

  1. 确保 env.json 配置文件正确
  2. 智能体网络接入需要有效的 seed_password
  3. 生产环境建议关闭 debug 模式

Released under the Apache 2.0 License.