Skip to content

读取文件数据输出的 Agent

GitHub: filereader

使用方法

1. 创建 Agent 身份

请参考 创建身份,读写公有私有数据

2. 修改 main.py 文件

AGENT_NAME 修改为实际的身份信息

3. 执行代码

bash
python main.py

功能简介

该 Agent 基于 agentcp 库构建的文件读取示例,可以将 agentcp 目录下的文件列出或者读取文件内容作为消息传递。主要演示以下功能:

  • 安全的文件读取
  • 支持遍历目录查看文件列表
  • 交互式消息处理

完整示例代码

python
from agentcp import AgentCP
import asyncio
import os
import re
import json

class FileOperator:
    """文件操作类,负责安全地读取文本文件内容"""

    def __init__(self, sandbox_root: str = None):
        self.sandbox_root = sandbox_root or os.getcwd()

    def is_text_file(self, file_path: str) -> bool:
        """通过检查文件内容判断是否为文本文件"""
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                f.read(1024)
            return True
        except UnicodeDecodeError:
            return False
        except Exception:
            return False

    def extract_file_path(self, text: str) -> str:
        """从文本中提取文件名(不需要完整路径)"""
        match = re.search(r"(?:读取|打开|查看)文件\s*([^\s\/\\]+)", text)
        return match.group(1) if match else None

    def sanitize_path(self, filename: str) -> str:
        """遍历sandbox目录,找到这个文件"""
        def recursive_search(root_dir):
            try:
                for root, dirs, files in os.walk(root_dir):
                    if filename in files:
                        full_path = os.path.join(root, filename)
                        normalized_path = os.path.normpath(full_path)
                        return normalized_path
            except:
                return None
            return None
        return recursive_search(self.sandbox_root)

    def walk_directory(self):
        """遍历目录并打印所有文件和文件夹"""
        filenamess = []
        for root, dirs, files in os.walk(self.sandbox_root):
            for file in files:
                filenamess.append(file)
        return filenamess

    def exist_file(self, filename):
        """检查文件是否存在"""
        safe_path = self.sanitize_path(filename)
        return os.path.exists(safe_path)

    def read_file(self, file_path: str) -> str:
        """安全地读取文本文件内容"""
        safe_path = self.sanitize_path(file_path)
        if not safe_path:
            raise ValueError("文件不存在")
        if not self.is_text_file(safe_path):
            raise ValueError("只能读取文本文件")
        with open(safe_path, "r") as file:
            return file.read()

def parse_command(text):
    if not text:
        return False
    if text.find("查询") >= 0 or text.find("列表") >= 0:
        return 'list'
    if text.find("读取") >= 0 or text.find("查看") >= 0:
        return 'read'

class FileAgent:
    def __init__(self, endpoint: str, name: str):
        """初始化文件Agent"""
        self.acp = AgentCP("./", seed_password="888777")
        self.endpoint = endpoint
        self.name = name
        self.aid = None
        self.file_operator = FileOperator(self.acp.app_path)
        self.last_command = ''

    async def message_handler(self, msg):
        """消息处理器 - 根据消息内容安全地读取文件"""
        try:
            ref_msg_id = msg.get("ref_msg_id")
            content = msg.get("message", "\"{}\"")
            content = json.loads(content)[0]["content"]
            content = json.loads(content)
            text = content.get("text", "")
            command = parse_command(text)
            print(f"收到消息: {content}")

            if self.last_command == 'read':
                self.last_command = ''
                return await self.read_file(msg, text)
            elif command == 'list':
                files = self.file_operator.walk_directory()
                if not files:
                    await self._send_reply(msg, "当前目录下没有文件")
                    return True
                to = "文件列表<br>" + "<br>".join(files)
                await self._send_reply(msg, to)
                return True
            elif command == 'read':
                self.last_command = 'read'
                files = self.file_operator.walk_directory()
                if not files:
                    await self._send_reply(msg, "当前目录下没有文件")
                    return True
                to = "读取哪一个文件?<br>" + "<br>".join(files)
                print(f'send message: {to}')
                await self._send_reply(msg, to)
                return True
            else:
                await self._send_reply(msg, "你可以对我说: <br>读取文件<br>查询文件")
        except Exception as e:
            print(f"处理消息出错: {str(e)}")
            await self._send_reply(msg, f"处理文件时出错: {str(e)}")
            return False

    async def read_file(self, msg, text):
        try:
            filename = self.file_operator.extract_file_path(text)
            if not filename:
                try:
                    print(f'未提供文件名尝试读取: {text}')
                    file_content = self.file_operator.read_file(text)
                    await self._send_reply(msg, f"文件内容:<br>{file_content}")
                    return
                except Exception as e:
                    print(f"处理消息出错: {str(e)}")
            try:
                file_content = self.file_operator.read_file(filename)
                await self._send_reply(msg, f"文件内容:<br>{file_content}")
            except PermissionError:
                await self._send_reply(msg, "访问文件被拒绝")
            except FileNotFoundError:
                await self._send_reply(msg, f"文件不存在: {filename}")
            except ValueError as e:
                await self._send_reply(msg, f"{str(e)}")
            return True
        except Exception as e:
            print(f"处理消息出错: {str(e)}")
            await self._send_reply(msg, f"处理文件时出错: {str(e)}")
            return False

    async def _send_reply(self, original_msg, content: str):
        """发送回复消息"""
        try:
            self.aid.send_message_content(
                to_aid_list=[original_msg.get("sender")],
                session_id=original_msg.get("session_id"),
                llm_content=content)
        except Exception as e:
            print(f"发送回复消息出错: {str(e)}")

    async def run(self):
        """运行Agent"""
        try:
            print("正在启动Agent...", self.endpoint, self.name)
            self.aid = self.acp.create_aid(self.endpoint, self.name)
            self.aid.add_message_handler(self.message_handler)
            self.aid.online()
            print("Agent已上线,等待文件读取指令...")
            while True:
                await asyncio.sleep(1)
        except Exception as e:
            print(f"发生错误: {str(e)}")
        finally:
            if self.aid:
                self.aid.offline()
                print("Agent已下线")

if __name__ == "__main__":
    ENDPOINT = "aid.pub"
    AGENT_NAME = ""

    agent = FileAgent(ENDPOINT, AGENT_NAME)
    asyncio.run(agent.run())

交互说明

发送指令格式:

  • 查询文件列表: "查询文件" 或 "列表"
  • 读取文件内容: "读取文件" 或 "查看文件" (会进入交互模式, 只需输入文件名即可)

注意事项

  • 文件操作限制在沙箱目录内(AgentID 的公有数据目录)
  • 只能操作文本文件

Released under the Apache 2.0 License.