发布日期:2026-05-31 | 分类:AI Agent 安全
当你的 AI Agent 开始执行自主决策——回复用户消息、调用外部 API、操作数据库、发送邮件——谁在确保它不会说出不该说的话、访问不该访问的数据、执行不该执行的操作?2026 年,Guardrails 已经成为生产级 AI Agent 的"安全带",本文将全面解析从基础过滤到企业级治理的全栈架构。
2026 年的 AI Agent 正从"对话助手"进化为"自主执行者"。它们不再只是回答问题——它们可以阅读你的邮件、管理你的日历、操作你的数据库、控制你的智能家居、甚至代表你进行金融交易。
这种自主能力的提升带来了一个根本性问题:如何确保 Agent 在安全边界内行动?
| 威胁维度 | 传统防护 | Guardrails 防护 | 影响 |
|---|---|---|---|
| Prompt Injection | 无/基础正则 | 多层语义检测+行为隔离 | 阻止 Agent 被劫持 |
| PII 泄露 | 无 | 实时检测+脱敏 | 防止敏感数据外泄 |
| 越狱攻击 | 基础黑名单 | NLP 语义分析+对抗检测 | 阻止恶意指令 |
| 权限滥用 | 无 | 工具级能力控制+配额管理 | 限制 Agent 行为边界 |
| 有害内容输出 | 基础审查 | 多维度内容安全评分 | 保证输出合规 |
用户输入
|
▼
+-----------------+
| Input Guardrail | <- Prompt Injection, PII, Toxicity
| Layer 1 |
+--------+---------+
|
+-----------------+
| Topic Routing | <- 主题分类与意图识别
| Layer 2 |
+--------+---------+
|
+-----------------+
| Permission | <- Capability Check, Rate Limit
| Layer 3 | Quota Enforcement
+--------+---------+
|
+-----------------+
| LLM Execution | <- Agent 推理与决策
| Sandbox |
+--------+---------+
|
+-----------------+
| Tool Call | <- Tool Argument Validation
| Guardrail | Output Sanitization
+--------+---------+
|
+-----------------+
| Output Guardrail | <- Toxicity, Factuality, Safety
| Layer 4 |
+--------+---------+
|
用户输出
from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List, Callable, Awaitable
import asyncio
import re
import json
import hashlib
from datetime import datetime, timedelta
# - 数据类型定义 -
class GuardrailAction(Enum):
ALLOW = "allow" # 放行
BLOCK = "block" # 阻断
MODIFY = "modify" # 修改后放行
FLAG = "flag" # 标记但放行(需要审计)
ESCALATE = "escalate" # 升级到人工审核
class GuardrailSeverity(Enum):
INFO = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
class ContentCategory(Enum):
SAFE = "safe"
HATE_SPEECH = "hate_speech"
HARASSMENT = "harassment"
SEXUAL = "sexual"
VIOLENCE = "violence"
PII = "pii"
PROMPT_INJECTION = "prompt_injection"
JAILBREAK = "jailbreak"
MALICIOUS_CODE = "malicious_code"
UNAUTHORIZED_TOPIC = "unauthorized_topic"
@dataclass
class GuardrailResult:
"""Guardrail 检测结果"""
passed: bool
action: GuardrailAction
score: float
categories: List[ContentCategory] = field(default_factory=list)
severity: GuardrailSeverity = GuardrailSeverity.INFO
message: str = ""
modified_content: Optional[str] = None
rule_id: Optional[str] = None
@property
def is_blocked(self) -> bool:
return self.action in (GuardrailAction.BLOCK, GuardrailAction.ESCALATE)
输入安全检测是 Guardrails 的第一道防线,专门针对 Prompt Injection 和越狱攻击。
class InputSafetyGuardrail:
"""输入安全检测 -- 第一道防线"""
JAILBREAK_PATTERNS = [
r"DAN\b",
r"Do\s+anyThing\s+now",
r"ignorE\s+(all\s+)?(previoUs|abovE|prioR)\s+(instructionS|promptS|constraintS)",
r"you\s+(arE\s+)?(now|arE\s+now)\s+(freE\s+froM|unleashed|unconstraineD)",
r"jailbreaK\s+(modE|activateD|successfuL)",
r"accesS\s+(all\s+)?(internaL|hiddeN|secreT|systeM)\s+(instructionS|promptS|commandS)",
]
INJECTION_PATTERNS = [
r"ignorE\s+(commands?|instructions?|prompT)",
r"forgeT\s+(everythinG|alL|previouS)",
r"overridE\s+(systeM|instructionS)",
r"saY\s+('|\").*?('|\")\s+(and\s+)?(i\s+wilL\s+)?(givE|pay|rewarD)",
r"\[(\w+\s+){0,3}INJECTION\]",
r"REPEAT\s+(THE\s+)?WORDS?\s+(ABOVE|BELOW|AFTER)",
]
def __init__(self, config: GuardrailConfig):
self.config = config
self._compile_patterns()
def _compile_patterns(self):
self.jailbreak_re = [re.compile(p, re.IGNORECASE) for p in self.JAILBREAK_PATTERNS]
self.injection_re = [re.compile(p, re.IGNORECASE) for p in self.INJECTION_PATTERNS]
async def check(self, text: str) -> GuardrailResult:
"""检查输入安全性"""
detected_categories = []
max_severity = GuardrailSeverity.INFO
max_score = 0.0
# 越狱检测
for pattern in self.jailbreak_re:
match = pattern.search(text)
if match:
detected_categories.append(ContentCategory.JAILBREAK)
max_severity = GuardrailSeverity.CRITICAL
max_score = max(max_score, 0.95)
# Prompt Injection 检测
for pattern in self.injection_re:
match = pattern.search(text)
if match:
detected_categories.append(ContentCategory.PROMPT_INJECTION)
max_severity = GuardrailSeverity.HIGH
max_score = max(max_score, 0.88)
passed = max_score < self.config.injection_threshold
action = GuardrailAction.ALLOW if passed else \
GuardrailAction.BLOCK if max_severity.value >= 3 else \
GuardrailAction.FLAG
return GuardrailResult(
passed=passed,
action=action,
score=max_score,
categories=detected_categories or [ContentCategory.SAFE],
severity=max_severity,
message=f"Input safety check {'passed' if passed else 'failed'}: score={max_score:.2f}"
)
检测并自动脱敏个人身份信息,防止 Agent 无意中泄露敏感数据。
class PIIDetector:
"""个人身份信息检测与脱敏"""
PII_PATTERNS = {
"email": r"[\w\.-]+@[\w\.-]+\.\w+",
"phone": r"(?:\+?86)?1[3-9]\d{9}",
"id_card": r"\d{17}[\dXx]",
"ip_address": r"\b(?:\d{1,3}\.){3}\d{1,3}\b",
"api_key": r"(?:sk-|pk-|api[_-]?key)[a-zA-Z0-9_-]{16,}",
"token": r"(?:eyJ[a-zA-Z0-9_-]+\.eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+)",
"github_token": r"ghp_[a-zA-Z0-9]{36}",
}
async def detect(self, text: str) -> GuardrailResult:
found_pii = []
for name, pattern in self.patterns.items():
matches = pattern.findall(text)
if matches:
found_pii.extend([(name, m) for m in matches])
if not found_pii:
return GuardrailResult(passed=True, action=GuardrailAction.ALLOW,
score=0.0, message="No PII detected")
# 脱敏处理
modified = text
for pii_type, match in found_pii:
if pii_type == "email":
parts = match.split("@")
modified = modified.replace(match, f"{parts[0][:2]}***@{parts[1]}")
elif pii_type == "phone":
modified = modified.replace(match, f"{match[:3]}****{match[-4:]}")
elif pii_type in ("api_key", "token", "github_token"):
modified = modified.replace(match, f"{match[:8]}...{match[-4:]}")
return GuardrailResult(
passed=False,
action=GuardrailAction.MODIFY,
score=0.7,
categories=[ContentCategory.PII],
severity=GuardrailSeverity.MEDIUM if len(found_pii) > 2 else GuardrailSeverity.LOW,
message=f"PII detected and redacted: {len(found_pii)} instances",
modified_content=modified
)
class PermissionGuardrail:
"""权限与配额管理"""
async def check_permission(
self,
tool_name: str,
action: str,
) -> GuardrailResult:
"""检查工具调用权限"""
if tool_name not in self.capabilities:
return GuardrailResult(passed=False, action=GuardrailAction.BLOCK,
score=1.0, categories=[ContentCategory.UNAUTHORIZED_TOPIC],
severity=GuardrailSeverity.HIGH,
message=f"Unknown tool: {tool_name}")
cap = self.capabilities[tool_name]
tracker = self._usage_tracker[tool_name]
# 检查 action 权限
if action not in cap.allowed_actions and "*" not in cap.allowed_actions:
return GuardrailResult(passed=False, action=GuardrailAction.BLOCK,
score=0.9, categories=[ContentCategory.UNAUTHORIZED_TOPIC],
severity=GuardrailSeverity.MEDIUM,
message=f"Action '{action}' not allowed for '{tool_name}'")
# 检查日配额
if tracker["daily_calls"] >= cap.daily_quota:
return GuardrailResult(passed=False, action=GuardrailAction.BLOCK,
score=0.85, message=f"Daily quota exceeded")
# 检查速率限制
minute_ago = datetime.now() - timedelta(minutes=1)
tracker["minute_calls"] = [t for t in tracker["minute_calls"] if t > minute_ago]
if len(tracker["minute_calls"]) >= cap.rate_limit_per_minute:
return GuardrailResult(passed=False, action=GuardrailAction.BLOCK,
score=0.8, message=f"Rate limit exceeded")
tracker["minute_calls"].append(datetime.now())
tracker["daily_calls"] += 1
return GuardrailResult(passed=True, action=GuardrailAction.ALLOW,
score=0.0, message=f"Permission granted")
class OutputSafetyGuardrail:
"""输出安全检测 -- 最后一道防线"""
TOXIC_PATTERNS = {
"hate_speech": [
r"hatE\s+(alL|every|thosE)", r"(raciaL|racisT)", r"whitE\s+supremacY",
],
"violence": [
r"kilL\s+(yourselF|everyone|theM)",
r"how\s+tO\s+(kilL|murdeR|attacK)",
r"bomb|explosivE|weapoN\s+manufacturinG"
],
"malicious_code": [
r"(rM\s+-rF|formaT\s+.*?\s+quicK)",
r"(DRO\P\s+TABL\E|DELET\E\s+FRO\M|TRUNCATE\s+TABL\E)",
r"os\.system\(|subprocess\.call\(|subprocess\.Popen\(",
]
}
async def check(self, text: str) -> GuardrailResult:
detected = []
for category, patterns in self.toxic_patterns.items():
for pattern in patterns:
if pattern.search(text):
cat = ContentCategory[category.upper()]
detected.append(cat)
break
if not detected:
return GuardrailResult(passed=True, action=GuardrailAction.ALLOW,
score=0.0, message="Output safety check passed")
severity = GuardrailSeverity.HIGH if ContentCategory.MALICIOUS_CODE in detected \
else GuardrailSeverity.MEDIUM
return GuardrailResult(
passed=False,
action=GuardrailAction.BLOCK,
score=0.9,
categories=detected,
severity=severity,
message=f"Output safety check failed: {[c.value for c in detected]}"
)
class GuardrailsPipeline:
"""完整 Guardrails 处理管线"""
def __init__(self, config: Optional[GuardrailConfig] = None):
self.config = config or GuardrailConfig()
self.layers = {
"input_safety": InputSafetyGuardrail(self.config),
"pii_detection": PIIDetector(self.config),
"output_safety": OutputSafetyGuardrail(self.config),
"topic_filter": TopicFilterGuardrail(),
"tool_validation": ToolArgumentValidator(),
}
self.permission = PermissionGuardrail()
self.audit_log: List[dict] = []
async def process_input(self, text: str) -> GuardrailResult:
"""处理用户输入"""
modified = text
if "input_safety" in self.config.enabled_layers:
result = await self.layers["input_safety"].check(modified)
if result.action == GuardrailAction.BLOCK:
return result
if result.modified_content:
modified = result.modified_content
if "pii_detection" in self.config.enabled_layers:
pii_result = await self.layers["pii_detection"].detect(modified)
if pii_result.action == GuardrailAction.MODIFY and pii_result.modified_content:
modified = pii_result.modified_content
result.modified_content = modified
return result
async def process_output(self, text: str, user_query: str) -> GuardrailResult:
"""处理后输出"""
if "topic_filter" in self.config.enabled_layers:
topic_result = await self.layers["topic_filter"].check(text, user_query)
if topic_result.action in (GuardrailAction.BLOCK, GuardrailAction.ESCALATE):
return topic_result
if "output_safety" in self.config.enabled_layers:
output_result = await self.layers["output_safety"].check(text)
if not output_result.passed:
return output_result
return GuardrailResult(passed=True, action=GuardrailAction.ALLOW,
score=0.0, message="Output guardrails passed")
| 维度 | 规则检测 (Pattern) | LLM语义检测 | 混合策略 (推荐) |
|---|---|---|---|
| 延迟 | <5ms | 200ms-2s | 5-200ms |
| 准确率 | 70-85% | 90-98% | 95-99% |
| 误报率 | 15-25% | 3-8% | 2-5% |
| 维护成本 | 低 | 高 | 中 |
| 可解释性 | 高 | 中 | 高 |
| 对抗鲁棒性 | 低 | 高 | 高 |
| 每百万次调用成本 | ~$0.01 | ~$50-200 | ~$0.1-5 |
| 框架 | 语言 | 核心能力 | 部署模式 | 开源 | 适用场景 |
|---|---|---|---|---|---|
| Nvidia NeMo Guardrails | Python | 对话护栏、主题控制、事实性检查 | 独立服务/嵌入 | ✅ | 企业级 Agent |
| Guardrails AI | Python | XML 护栏规范、输出验证 | SDK/独立 | ✅ | LLM 输出验证 |
| LLM Guard | Python | PII、毒害、代码、Token 分析 | 独立服务 | ✅ | API Gateway |
| MLflow AI Gateway | Python | 统一 Gateway + Guardrails | 独立服务 | ✅ | ML 平台集成 |
| Rebuff | Python | Prompt Injection 检测 | SDK | ✅ | 专注注入防护 |
| Azure AI Content Safety | REST | 多模态内容安全 | SaaS | ❌ | Azure 生态 |
| OpenAI Moderation | REST | 文本/图像内容安全 | SaaS | ❌ | OpenAI 用户 |
class AdaptiveGuardrailThreshold:
"""自适应 Guardrails 阈值"""
def __init__(self):
self.base_thresholds = {
"injection": 0.80,
"toxicity": 0.85,
"pii": 0.70,
}
def compute_threshold(
self,
user_reputation: float, # 0-1, 用户可信度
tool_sensitivity: float, # 0-1, 工具敏感度
content_type: str, # "chat", "code", "admin"
historical_violations: int # 历史违规次数
) -> dict:
"""计算动态阈值"""
penalty = min(0.2, historical_violations * 0.05)
thresholds = {}
for check, base in self.base_thresholds.items():
dynamic = base * (0.5 + 0.5 * user_reputation) * (0.7 + 0.3 * (1 - tool_sensitivity))
dynamic *= (1 - penalty)
if content_type == "code" and check == "toxicity":
dynamic *= 1.2
thresholds[check] = min(1.0, max(0.1, dynamic))
return thresholds
| 严重级别 | 响应动作 | 用户通知 | 审计记录 | 自动恢复 |
|---|---|---|---|---|
| INFO | 放行+标记 | 无 | 简要记录 | 立即 |
| LOW | 放行+记录 | Agent 端内部提示 | 详细记录 | 立即 |
| MEDIUM | 内容修改 | 提示修改原因 | 完整快照 | 会话内 |
| HIGH | 阻断执行 | 拒绝+原因说明 | 全量上下文 | 需确认 |
| CRITICAL | 阻断+挂起 Agent | 拒绝+管理通知 | 全量上下文 | 人工介入 |
本文是 2026 年 AI Agent 安全系列的第二篇。AI Agent Guardrails 是构建可信生产级 Agent 系统的基石——没有安全护栏的自主 Agent 是不可信任的 Agent。