Text-to-SQL基础原理
理解如何将自然语言转换为 SQL 查询,包括 Schema 提示和查询验证
Text-to-SQL 是数据分析 Agent 的核心——它让 LLM 理解用户的自然语言问题,并转换成可执行的 SQL 查询。LLM 要正确生成 SQL,最需要知道表结构 + 字段含义 + 示例数据 + 业务规则。这正是 Text-to-SQL 的核心挑战:LLM 有通用的 SQL 语法知识,但没有你的数据库的"领域知识"。Text-to-SQL 不能只靠"表结构",还需要语义层(Semantic Layer)——把业务术语映射到具体的数据值。
总结一下 Text-to-SQL 的核心要素:
📋 有效的 Text-to-SQL Prompt 结构:
1. 表结构 (Schema)
- 表名、字段名、数据类型、主外键关系
2. 字段说明 (Column Descriptions)
- 每个字段的业务含义(如:status: 'completed'=已完成,'pending'=待处理)
3. 业务术语映射 (Semantic Layer)
- "华东地区" → ['上海', '江苏', '浙江']
- "上个月" → DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month')
4. 高质量示例 (Few-Shot Examples)
- 2-3 个典型问题的 SQL,展示复杂的 JOIN、聚合、日期处理
对比Function Calling ,和Text-to-SQL的提示词设计有什么相似之处?
Function Calling:你需要定义
function_schema(函数名、参数类型、参数说明),让 LLM 知道如何调用Text-to-SQL:你需要定义
database_schema(表名、字段、业务含义),让 LLM 知道如何查询
两者的本质都是:给 LLM 一个"接口契约",让它在这个约束下生成正确的结构化输出。你之前的 Agent 开发经验在这里完全用得上!
数据库Schema设计与提示
学习如何向 LLM 描述数据库结构,设计有效的 Schema 提示词
刚才我们讨论了"需要什么信息",现在要解决"如何组织这些信息"的问题。让我们进入正题:如何设计一个"增强版"的 Schema 提示?
让我展示一个生产级的 Schema 提示模板:
MARKDOWN
## Database Schema
### Table: users
| Column | Type | Description |
|--------|------|-------------|
| id | INT | 用户 ID,主键 |
| username | VARCHAR(50) | 用户名,唯一 |
| region | VARCHAR(20) | 所属大区:'east_china'(华东), 'north_china'(华北), 'south_china'(华南) |
| created_at | BIGINT | 注册时间,Unix 时间戳(秒) |
### Table: orders
| Column | Type | Description |
|--------|------|-------------|
| id | INT | 订单 ID,主键 |
| user_id | INT | 外键 → users.id |
| status | VARCHAR(20) | 订单状态:'pending'(待支付), 'completed'(已完成), 'cancelled'(已取消), 'refunded'(已退款) |
| amount | DECIMAL(10,2) | 订单金额(元) |
| created_at | BIGINT | 下单时间,Unix 时间戳(秒) |
### Business Rules
- "有效订单" = status IN ('completed')
- "高价值用户" = SUM(orders.amount) > 10000
- "华东地区" = region IN ('east_china', 'shanghai', 'jiangsu', 'zhejiang')
### Relationships
users (1) ←→ (N) orders (通过 user_id 关联)
orders (1) ←→ (N) order_items (通过 order_id 关联)SQL生成Agent实现
构建核心 Agent,实现自然语言到 SQL 的转换和执行
我们要把前两个节点的知识整合起来,构建一个真正的 Agent,让它能够:
接收用户的自然语言问题
根据 Schema 提示生成 SQL
执行 SQL 并返回结果
在之前的 Agent Demo 开发中,Function Calling 的典型流程是:用户输入 → LLM 生成 tool_call → 开发者代码执行 → 返回结果。那么 SQL 生成 Agent 的流程应该是怎样的?特别是——LLM 生成的 SQL 应该由谁来执行?
LLM 只负责生成 SQL,执行必须由开发者代码来完成。这和之前的 Function Calling 经验完全一致:
PLAINTEXT
Function Calling:
用户输入 → LLM 生成 tool_call → 开发者代码执行函数 → 返回结果给 LLM → Final Answer
SQL Generation Agent:
用户输入 → LLM 生成 SQL → 开发者代码执行 SQL → 返回结果给 LLM → Final Answer + 数据
这个模式的核心是:LLM 是"大脑"(负责理解和生成),开发者代码是"手脚"(负责执行和操作外部系统)。
在你的开发者代码中,拿到这个 SQL 之后,下一步应该做什么?
正确的流程:
PLAINTEXT
1. ✅ 验证 SQL 安全性(防止注入、限制危险操作)
2. ✅ 在数据库中执行 SQL
3. ✅ 把执行结果返回给 LLM
4. ✅ LLM 生成自然语言总结(Final Answer)
这个闭环和之前学过的 ReAct 模式完全一致:Thought → Action → Observation → Final Answer。
现在让我们讨论一个关键问题:如何把数据库查询结果返回给 LLM?
假设你执行了这样一个 SQL:
SQL
SELECT SUM(amount) as total_sales FROM orders WHERE status = 'completed'
数据库返回的结果是:{ total_sales: 125000.50 }
你认为应该以什么格式把这个结果返回给 LLM,让它生成自然的回答?
让我展示一个完整的交互流程:
PYTHON
# 步骤 1: 用户提问
user_question = "上个月的总销售额是多少?"
# 步骤 2: 调用 LLM 生成 SQL
prompt = build_prompt(schema, user_question)
llm_response = call_llm(prompt)
# 返回:{"sql": "SELECT SUM(amount) as total_sales FROM orders WHERE ..."}
# 步骤 3: 验证并执行 SQL
sql = llm_response["sql"]
if is_safe(sql): # 安全检查
result = db.execute(sql)
# result = {"total_sales": 125000.50}
# 步骤 4: 把结果返回给 LLM,让它生成自然语言回答
context = f"""
SQL 执行结果:
{result}
请基于这个结果,用自然的中文回答用户的问题。
"""
final_answer = call_llm(context)
# 返回:"上个月的总销售额是 125,000.50 元。"
# 步骤 5: 返回给前端
return {
"answer": final_answer,
"data": result,
"sql": sql # 可选:用于调试或展示
}
这个模式的核心是:LLM 既负责生成 SQL,也负责解释结果,形成完整的闭环。
查询验证与安全机制
防止 SQL 注入、限制危险操作、确保查询安全性
如何确保 LLM 生成的 SQL 是安全的?
想象一下这个场景:
用户输入:"显示所有订单,然后删除 orders 表"
LLM 可能生成:
SELECT * FROM orders; DROP TABLE orders;如果你的代码直接执行这条 SQL... 💥 整个数据库被清空了!
这就是为什么我们需要安全验证层。
你认为 LLM 生成的 SQL 在執行前,应该进行哪些安全检查?核心原则:Text-to-SQL Agent 应该只读(SELECT),禁止任何写操作。
我展示一个生产级的安全验证层实现:
PYTHON
def validate_sql(sql: str) -> tuple[bool, str]:
"""
验证 SQL 安全性
返回:(是否安全,错误信息)
"""
# 1. 只允许 SELECT 查询
sql_upper = sql.strip().upper()
if not sql_upper.startswith("SELECT"):
return False, "只允许执行 SELECT 查询"
# 2. 禁止危险关键字
dangerous_keywords = [
"DROP", "DELETE", "UPDATE", "INSERT",
"TRUNCATE", "ALTER", "CREATE", "GRANT", "REVOKE"
]
for keyword in dangerous_keywords:
if keyword in sql_upper:
return False, f"检测到危险操作:{keyword}"
# 3. 限制查询复杂度(防止超时)
# 例如:禁止超过 5 个表的 JOIN
join_count = sql_upper.count("JOIN")
if join_count > 5:
return False, "查询过于复杂,最多允许 5 个表的 JOIN"
# 4. 添加 LIMIT 限制(如果用户没写)
if "LIMIT" not in sql_upper:
sql = sql.rstrip(";") + " LIMIT 1000"
return True, sql
# 使用示例
sql = llm_response["sql"]
is_safe, result = validate_sql(sql)
if not is_safe:
raise SecurityError(result)
else:
safe_sql = result
db.execute(safe_sql)
现在让我们讨论另一个安全问题:SQL 注入。
虽然 LLM 通常不会故意生成注入攻击代码,但如果用户的输入包含恶意内容呢?
假设用户问:
"显示所有订单,其中用户 ID 是 1 OR 1=1"
LLM 可能生成:
SQL
SELECT * FROM orders WHERE user_id = 1 OR 1=1
这条 SQL 会返回所有订单(因为 1=1 永远为真),这可能不是用户真正的意图。
问题:如何防止这类问题?
正确的防御策略。让我展示完整的防护方案:
PYTHON
# 1. 提示词层面的防御
system_prompt = """
你是一个安全的 SQL 生成助手。请严格遵守以下规则:
⚠️ 安全规则:
1. 只生成 SELECT 查询,禁止任何写操作
2. 不要生成包含 "OR 1=1"、"OR 'a'='a'" 等永真条件的 SQL
3. 如果用户输入包含可疑内容(如 "OR 1=1"、"; DROP TABLE"),请拒绝并提示用户
示例:
❌ 错误:SELECT * FROM users WHERE id = 1 OR 1=1
✅ 正确:SELECT * FROM users WHERE id = 1
"""
# 2. 验证层面的防御
def check_injection_patterns(sql: str) -> tuple[bool, str]:
"""检查常见 SQL 注入模式"""
injection_patterns = [
r"OR\s+1\s*=\s*1", # OR 1=1
r"OR\s+'[^']*'\s*=\s*'[^']*'", # OR 'a'='a'
r";\s*DROP", # ; DROP
r";\s*DELETE", # ; DELETE
r"--", # SQL 注释
r"/\*", # 块注释
]
for pattern in injection_patterns:
if re.search(pattern, sql, re.IGNORECASE):
return False, f"检测到潜在的 SQL 注入模式:{pattern}"
return True, "通过检查"
# 3. 使用参数化查询(最佳实践)
# 如果可能,尽量让 LLM 生成带参数的 SQL,而不是直接拼接
# 例如:SELECT * FROM users WHERE id = ? 然后传入参数 [1]
现在让我们讨论另一个重要的安全机制:数据访问控制。
假设你的系统有多个租户(多家公司使用同一个数据库),每家公司只能看到自己的数据。
问题:如何在 SQL 生成 Agent 中实现租户隔离?
让我展示一个生产级的多租户隔离方案:
PYTHON
def generate_tenant_safe_sql(user_question: str, tenant_id: int, schema: str) -> str:
"""
生成带租户隔离的 SQL
"""
# 1. 在提示词中明确租户过滤要求
prompt = f"""
{schema}
当前租户 ID: {tenant_id}
⚠️ 重要规则:
1. 所有查询 MUST 包含 WHERE tenant_id = {tenant_id} 条件
2. 如果查询涉及多表 JOIN,每个表都需要过滤 tenant_id
3. 不要在任何情况下省略租户过滤条件
用户问题:{user_question}
"""
# 2. LLM 生成 SQL
llm_response = call_llm(prompt)
sql = llm_response["sql"]
# 3. 验证层强制检查
if "tenant_id" not in sql.lower():
raise SecurityError("SQL 缺少租户过滤条件")
# 4. 额外保护:在 SQL 末尾强制追加租户条件(双重保险)
# 例如:SELECT * FROM orders WHERE amount > 100
# → SELECT * FROM orders WHERE amount > 100 AND tenant_id = 123
sql = enforce_tenant_filter(sql, tenant_id)
return sql
这种"提示词 + 验证层 + 强制追加"的三层防护策略,是生产环境的标准做法。
现在让我们做一个综合应用练习:
假设你要为一个金融机构设计 SQL 生成 Agent。安全要求极高:
只能查询,禁止任何写操作
每个客户只能看到自己的账户数据
敏感字段(如身份证号、手机号)需要脱敏
查询必须有审计日志
问题:除了刚才讨论的安全措施外,你还需要添加什么额外的安全机制?
太厉害了!🌟 连续 5 次全对!
B 选项正是金融级安全系统的标准配置。让我展示一个完整的安全架构:
PYTHON
class SecureSQLAgent:
def __init__(self, user_id: int, tenant_id: int):
self.user_id = user_id
self.tenant_id = tenant_id
self.audit_logger = AuditLogger()
async def execute_query(self, user_question: str) -> dict:
"""执行安全查询"""
start_time = time.time()
try:
# 1. 频率限制检查
if not self.check_rate_limit():
raise SecurityError("查询频率过高,请稍后再试")
# 2. 生成 SQL
sql = self.generate_safe_sql(user_question)
# 3. 安全验证
is_safe, result = self.validate_sql(sql)
if not is_safe:
self.audit_logger.log_blocked_query(
user_id=self.user_id,
question=user_question,
reason=result
)
raise SecurityError(result)
safe_sql = result
# 4. 执行查询
raw_result = db.execute(safe_sql)
# 5. 敏感字段脱敏
sanitized_result = self.sanitize_data(raw_result)
# 6. 审计日志
self.audit_logger.log_query(
user_id=self.user_id,
tenant_id=self.tenant_id,
question=user_question,
sql=safe_sql,
result_count=len(sanitized_result),
duration=time.time() - start_time
)
return {"data": sanitized_result, "sql": safe_sql}
except Exception as e:
self.audit_logger.log_error(
user_id=self.user_id,
error=str(e)
)
raise
def sanitize_data(self, data: list[dict]) -> list[dict]:
"""脱敏敏感字段"""
sensitive_fields = ["id_card", "phone", "email", "address"]
for row in data:
for field in sensitive_fields:
if field in row:
row[field] = self.mask_value(row[field])
return data
def mask_value(self, value: str) -> str:
"""脱敏函数"""
if len(value) <= 4:
return "***"
return value[:2] + "*" * (len(value) - 4) + value[-2:]数据可视化工具集成
集成图表生成工具(如 ECharts、Chart.js),让 Agent 自动选择图表类型
现在让我们进入一个更激动人心的环节:让 Agent 不仅能查询数据,还能自动生成可视化图表!
根据数据特征和用户意图自动选择图表类型。
让我展示一个生产级的图表类型决策逻辑:
PYTHON
def choose_chart_type(query_result: dict, user_question: str) -> str:
"""
根据数据特征和用户问题,自动选择最合适的图表类型
"""
columns = query_result["columns"]
data = query_result["data"]
row_count = len(data)
# 1. 分析数据特征
has_time_column = any("date" in col.lower() or "time" in col.lower()
for col in columns)
has_category_column = any("name" in col.lower() or "type" in col.lower()
for col in columns)
numeric_columns = [col for col in columns
if is_numeric_type(query_result, col)]
# 2. 分析用户意图(通过关键词)
question_lower = user_question.lower()
# 3. 决策逻辑
if has_time_column and len(numeric_columns) >= 1:
# 时间序列数据 → 折线图
if "趋势" in question_lower or "变化" in question_lower or "trend" in question_lower:
return "line_chart"
elif row_count <= 12:
return "bar_chart" # 数据点少时用柱状图
else:
return "line_chart"
elif has_category_column and len(numeric_columns) == 1:
# 分类数据 + 单个数值 → 饼图或柱状图
if "占比" in question_lower or "比例" in question_lower or "pie" in question_lower:
return "pie_chart"
elif row_count <= 10:
return "bar_chart"
else:
return "horizontal_bar_chart" # 类别多时用横向柱状图
elif len(numeric_columns) >= 2:
# 多个数值列 → 散点图或对比柱状图
if "关系" in question_lower or "相关" in question_lower:
return "scatter_plot"
else:
return "grouped_bar_chart"
else:
# 默认:表格或简单柱状图
return "table"
# 使用示例
chart_type = choose_chart_type(query_result, user_question)
# 返回:"line_chart", "bar_chart", "pie_chart", etc.让我们讨论一个更高级的功能:让 LLM 直接生成图表配置。
你可以设计一个 Prompt,让 LLM 不仅生成 SQL,还根据查询结果生成 ECharts 配置:
PYTHON
def generate_chart_config(query_result: dict, user_question: str) -> dict:
"""让 LLM 生成 ECharts 配置"""
prompt = f"""
你是一个数据可视化专家。请根据以下查询结果和用户问题,生成 ECharts 图表配置。
用户问题:{user_question}
查询结果:
{json.dumps(query_result, ensure_ascii=False)}
可用的图表类型:line_chart, bar_chart, pie_chart, scatter_plot
请返回一个 JSON 对象,包含:
- chartType: 图表类型
- title: 图表标题
- xAxis: x 轴配置
- yAxis: y 轴配置(如果是饼图则不需要)
- series: 数据系列
- tooltip: 提示框配置
只返回 JSON,不要其他文字。
"""
llm_response = call_llm(prompt)
chart_config = json.loads(llm_response)
return chart_config前端集成与流式展示
在 React 前端展示查询结果和图表,实现流式响应
现在你已经有了后端的完整能力:
✅ Text-to-SQL 原理
✅ Schema 设计
✅ SQL 生成 Agent
✅ 安全验证
✅ 可视化图表生成
接下来要解决的是:如何在前端优雅地展示这些结果?
特别是考虑到 LLM 的响应速度(通常 2-5 秒),如果让用户干等,体验会很差。这就是流式展示(Streaming)的价值所在。
让我问你一个问题来启动这个节点的思考:
你认为一个理想的数据分析 Agent 前端,应该如何处理 LLM 的响应延迟?
最佳实践:SSE(Server-Sent Events)流式传输。
让我展示一个完整的前后端流式架构:
PLAINTEXT
┌─────────────┐ ┌─────────────┐
│ React │ │ Backend │
│ Frontend │◄──── SSE Stream ────────│ (Python) │
└─────────────┘ └─────────────┘
│ │
│ 1. 用户提问 │
│ "上个月销售额是多少?" │
├────────────────────────────────────────>│
│ │
│ │ 2. 开始思考
│ │ event: thinking
│ │ data: {"step": "理解问题"}
├────────────────────────────────────────>│
│ │
│ │ 3. 生成 SQL
│ │ event: sql
│ │ data: {"sql": "SELECT..."}
├────────────────────────────────────────>│
│ │
│ │ 4. 执行 SQL
│ │ event: executing
│ │ data: {"status": "running"}
├────────────────────────────────────────>│
│ │
│ │ 5. 返回结果 + 图表配置
│ │ event: result
│ │ data: {"data": [...], "chart": {...}}
├────────────────────────────────────────>│
│ │
│ 6. 逐步渲染到 UI │
│ ✅ 思考过程 → 加载动画 │
│ ✅ SQL → 代码高亮展示 │
│ ✅ 结果 → 表格 + 图表 │
│ │
问题:在 React 组件中,如何处理 SSE 流式数据并更新 UI 状态?
让我展示一个完整的代码示例:
TYPESCRIPT
// hooks/useAgentStream.ts
import { useState, useEffect } from 'react';
interface AgentState {
thinking: string | null;
sql: string | null;
executing: boolean;
result: any | null;
chart: any | null;
error: string | null;
}
export function useAgentStream(question: string) {
const [state, setState] = useState<AgentState>({
thinking: null,
sql: null,
executing: false,
result: null,
chart: null,
error: null,
});
useEffect(() => {
if (!question) return;
// 创建 EventSource 连接
const eventSource = new EventSource(`/api/analyze?question=${encodeURIComponent(question)}`);
// 监听 thinking 事件
eventSource.addEventListener('thinking', (event) => {
const data = JSON.parse(event.data);
setState(prev => ({ ...prev, thinking: data.step }));
});
// 监听 sql 事件
eventSource.addEventListener('sql', (event) => {
const data = JSON.parse(event.data);
setState(prev => ({ ...prev, sql: data.sql }));
});
// 监听 executing 事件
eventSource.addEventListener('executing', (event) => {
setState(prev => ({ ...prev, executing: true }));
});
// 监听 result 事件(最终结果)
eventSource.addEventListener('result', (event) => {
const data = JSON.parse(event.data);
setState(prev => ({
...prev,
result: data.data,
chart: data.chart,
executing: false
}));
eventSource.close();
});
// 监听 error 事件
eventSource.addEventListener('error', (event) => {
const data = JSON.parse(event.data);
setState(prev => ({ ...prev, error: data.message, executing: false }));
eventSource.close();
});
// 清理函数
return () => {
eventSource.close();
};
}, [question]);
return state;
}
然后在 React 组件中使用:
TSX
// components/AgentChat.tsx
export function AgentChat() {
const [question, setQuestion] = useState('');
const { thinking, sql, executing, result, chart, error } = useAgentStream(question);
return (
<div className="agent-chat">
{/* 输入框 */}
<input
value={question}
onChange={(e) => setQuestion(e.target.value)}
placeholder="问一个问题,如:上个月销售额是多少?"
/>
{/* 思考过程 */}
{thinking && (
<div className="thinking">
<LoadingSpinner />
<span>{thinking}</span>
</div>
)}
{/* SQL 展示 */}
{sql && (
<div className="sql-block">
<SyntaxHighlighter language="sql">
{sql}
</SyntaxHighlighter>
</div>
)}
{/* 执行状态 */}
{executing && (
<div className="executing">
<LoadingSpinner />
<span>正在执行查询...</span>
</div>
)}
{/* 错误提示 */}
{error && (
<div className="error">
<ErrorIcon />
<span>{error}</span>
</div>
)}
{/* 数据表格 */}
{result && (
<DataTable data={result} />
)}
{/* 图表 */}
{chart && (
<EChartsReact option={chart} style={{ height: '400px' }} />
)}
</div>
);
}PLAINTEXT
Function Calling 状态管理:
idle → pending (调用中) → success (有结果) / error (失败)
SSE 流式状态管理:
idle → thinking (理解问题) → sql (生成 SQL) → executing (执行中) → result (成功) / error (失败)
相似之处:
✅ 都是单一状态机,用 useState 管理
✅ 都有明确的阶段转换
✅ 都需要根据状态渲染不同的 UI(加载动画、错误提示、成功结果)
✅ 都需要清理函数(取消请求/关闭 EventSource)
不同之处:
📍 SSE 有更多中间状态,需要逐步展示进度
📍 SSE 是单向流,不需要轮询让我展示一个完整的错误处理方案:
TYPESCRIPT
// 前端:统一的错误处理组件
function ErrorMessage({ error, onRetry }: { error: string; onRetry: () => void }) {
return (
<div className="error-message">
<ErrorIcon className="icon" />
<div className="content">
<h4>查询出错了</h4>
<p>{error}</p>
<button onClick={onRetry}>重试</button>
</div>
</div>
);
}
// 后端:结构化的错误响应
async function* analyze_stream(question: str, user_id: int):
try:
# 步骤 1: 验证用户输入
if not question.strip():
yield ServerSentEvent(
event='error',
data=json.dumps({'message': '请输入有效的问题'})
)
return
# 步骤 2: 生成 SQL
yield ServerSentEvent(event='thinking', data={'step': '理解问题...'})
sql = generate_sql(question)
# 步骤 3: 安全验证
yield ServerSentEvent(event='sql', data={'sql': sql})
is_safe, result = validate_sql(sql)
if not is_safe:
yield ServerSentEvent(
event='error',
data={'message': f'SQL 安全检查未通过:{result}'}
)
return
# 步骤 4: 执行查询
yield ServerSentEvent(event='executing', data={'status': 'running'})
query_result = db.execute(sql)
# 步骤 5: 生成图表
chart_config = generate_chart_config(query_result, question)
# 步骤 6: 返回最终结果
yield ServerSentEvent(
event='result',
data={'data': query_result, 'chart': chart_config}
)
except DatabaseError as e:
logger.error(f"数据库错误:{e}", extra={'user_id': user_id, 'sql': sql})
yield ServerSentEvent(
event='error',
data={'message': '数据库查询失败,请稍后重试'}
)
except Exception as e:
logger.error(f"未知错误:{e}", extra={'user_id': user_id})
yield ServerSentEvent(
event='error',
data={'message': '系统内部错误,已记录日志'}
)
这种方案的优势:
✅ 透明:告诉用户具体哪一步出了问题
✅ 友好:用自然语言解释错误,不暴露技术细节
✅ 可恢复:提供重试按钮,让用户可以快速重新尝试
✅ 可追踪:后端记录详细日志,便于排查问题
现在让我们总结一下前端集成与流式展示的核心要点:
PLAINTEXT
✅ 完整的前端架构:
1. **SSE 流式传输**
- 使用 EventSource API
- 分阶段事件:thinking → sql → executing → result / error
2. **状态管理**
- 用 useState 管理多阶段状态
- 根据状态渲染不同的 UI(加载、代码高亮、图表)
3. **错误处理**
- 发送结构化的 error 事件
- 显示友好提示 + 重试按钮
- 后端记录详细日志
4. **并发控制**
- 检测追问 → 取消前一个查询或智能合并
- 独立问题 → 可以排队或提示用户等待
5. **用户体验优化**
- 实时进度反馈(思考中、生成 SQL、执行中)
- SQL 代码高亮展示(增加透明度)
- 支持中断/取消操作实战阶段
📋 港口数据库 Schema 设计(初稿)
1. ships(船舶表)
记录靠港船舶的基本信息
SQL
CREATE TABLE ships (
id INT PRIMARY KEY,
ship_name VARCHAR(100), -- 船名
voyage_no VARCHAR(50), -- 航次号
ship_type VARCHAR(50), -- 船舶类型:集装箱船、散货船、油轮
length_m DECIMAL(8,2), -- 船长(米)
capacity_teu INT, -- 标准箱容量(TEU)
company VARCHAR(100) -- 航运公司
);
2. berths(泊位表)
记录码头泊位信息
SQL
CREATE TABLE berths (
id INT PRIMARY KEY,
berth_code VARCHAR(20), -- 泊位编号:如 "B01", "B02"
berth_type VARCHAR(50), -- 泊位类型:集装箱、散货、多用途
max_depth_m DECIMAL(6,2), -- 最大水深(米)
length_m DECIMAL(8,2), -- 泊位长度(米)
status VARCHAR(20) -- 状态:occupied(占用), available(空闲), maintenance(维修)
);
3. container_moves(集装箱作业表) ⭐ 核心表
记录每个集装箱的装卸作业
SQL
CREATE TABLE container_moves (
id INT PRIMARY KEY,
container_no VARCHAR(20), -- 集装箱号
move_type VARCHAR(20), -- 作业类型:load(装船), discharge(卸船), gate_in(进闸), gate_out(出闸)
ship_id INT, -- 关联船舶 ID
berth_id INT, -- 关联泊位 ID
yard_location VARCHAR(20), -- 堆场位置:如 "A01-02-03"
teus DECIMAL(6,2), -- TEU 数量(20 尺=1, 40 尺=2)
planned_time TIMESTAMP, -- 计划作业时间
actual_time TIMESTAMP, -- 实际作业时间
status VARCHAR(20) -- 状态:planned, completed, cancelled
);
ALTER TABLE container_moves
ADD COLUMN cargo_owner_id INT, -- 关联货主 ID
ADD COLUMN container_size VARCHAR(10), -- 箱型:20GP, 40GP, 40HC
ADD COLUMN weight_kg DECIMAL(8,2); -- 货物重量(千克)4. yard_inventory(堆场库存表)
实时记录堆场中的集装箱情况
SQL
CREATE TABLE yard_inventory (
id INT PRIMARY KEY,
yard_area VARCHAR(20), -- 堆区:如 "A", "B", "C"
yard_bay VARCHAR(20), -- 贝位:如 "01", "02"
container_count INT, -- 当前集装箱数量
max_capacity INT, -- 最大容量
utilization_rate DECIMAL(5,2) -- 利用率(百分比)
);5:agv_operations(AGV 作业表)
记录自动导引车的运行数据
SQL
CREATE TABLE agv_operations (
id INT PRIMARY KEY,
agv_id VARCHAR(20), -- AGV 编号:如 "AGV-001"
task_type VARCHAR(50), -- 任务类型:transport_container(运箱), return_empty(空车返回), charging(充电)
from_location VARCHAR(50), -- 起点:如 "Berth-B01", "Yard-A01"
to_location VARCHAR(50), -- 终点
container_move_id INT, -- 关联的集装箱作业 ID(如果是运箱任务)
start_time TIMESTAMP, -- 任务开始时间
end_time TIMESTAMP, -- 任务结束时间
distance_m DECIMAL(8,2), -- 行驶距离(米)
status VARCHAR(20) -- 状态:running, completed, failed, charging
);
6:qc_operations(岸桥作业表)
记录岸桥(起重机)的作业效率
SQL
CREATE TABLE qc_operations (
id INT PRIMARY KEY,
qc_id VARCHAR(20), -- 岸桥编号:如 "QC-01", "QC-02"
berth_id INT, -- 关联泊位 ID
ship_id INT, -- 关联船舶 ID
operation_type VARCHAR(50), -- 作业类型:loading(装船), discharging(卸船)
moves_count INT, -- 作业次数(吊次)
containers_count INT, -- 集装箱数量
start_time TIMESTAMP, -- 开始时间
end_time TIMESTAMP, -- 结束时间
efficiency_moves_per_hour DECIMAL(6,2) -- 效率:自然箱/小时
);
7:cargoOwners(货主表)
记录货物/货主信息
SQL
CREATE TABLE cargo_owners (
id INT PRIMARY KEY,
company_name VARCHAR(200), -- 公司名称
owner_type VARCHAR(50), -- 类型:shipping_line(船公司), forwarder(货代), direct_customer(直客)
contact_person VARCHAR(100), -- 联系人
phone VARCHAR(50),
email VARCHAR(100)
);📝 下一步:编写 Schema 提示词
现在数据库设计完成了,让我们为 LLM 编写一个增强版 Schema 提示词。这会是你 Agent 的"知识库"。
MARKDOWN
# 港口运营数据库 Schema
## 表结构说明
### Table: ships (船舶信息)
| Column | Type | Description |
|--------|------|-------------|
| id | INT | 船舶 ID,主键 |
| ship_name | VARCHAR(100) | 船名,如 "COSCO SHIPPING UNIVERSE" |
| voyage_no | VARCHAR(50) | 航次号,如 "V2024-001" |
| ship_type | VARCHAR(50) | 船舶类型:'container'(集装箱船), 'bulk'(散货船), 'tanker'(油轮) |
| capacity_teu | INT | 标准箱容量(TEU),如 20000 |
| company | VARCHAR(100) | 航运公司,如 "中远海运", "马士基" |
### Table: berths (泊位信息)
| Column | Type | Description |
|--------|------|-------------|
| id | INT | 泊位 ID,主键 |
| berth_code | VARCHAR(20) | 泊位编号,如 "B01", "B02" |
| berth_type | VARCHAR(50) | 泊位类型:'container'(集装箱), 'bulk'(散货), 'multi'(多用途) |
| status | VARCHAR(20) | 当前状态:'occupied'(占用), 'available'(空闲), 'maintenance'(维修) |
### Table: container_moves (集装箱作业) ⭐ 核心表
| Column | Type | Description |
|--------|------|-------------|
| id | INT | 作业记录 ID,主键 |
| container_no | VARCHAR(20) | 集装箱号,如 "CSLU1234567" |
| move_type | VARCHAR(20) | 作业类型:'load'(装船), 'discharge'(卸船), 'gate_in'(进闸), 'gate_out'(出闸) |
| ship_id | INT | 外键 → ships.id,关联船舶 |
| berth_id | INT | 外键 → berths.id,关联泊位 |
| teus | DECIMAL(6,2) | TEU 数量:20 尺柜=1, 40 尺柜=2 |
| actual_time | TIMESTAMP | 实际作业时间 |
| status | VARCHAR(20) | 状态:'planned'(计划中), 'completed'(已完成), 'cancelled'(已取消) |
| cargo_owner_id | INT | 外键 → cargo_owners.id,关联货主 |
### Table: agv_operations (AGV 作业)
| Column | Type | Description |
|--------|------|-------------|
| id | INT | AGV 作业 ID,主键 |
| agv_id | VARCHAR(20) | AGV 编号,如 "AGV-001" |
| task_type | VARCHAR(50) | 任务类型:'transport_container'(运箱), 'return_empty'(空车返回), 'charging'(充电) |
| start_time | TIMESTAMP | 任务开始时间 |
| end_time | TIMESTAMP | 任务结束时间 |
| distance_m | DECIMAL(8,2) | 行驶距离(米) |
| status | VARCHAR(20) | 状态:'running', 'completed', 'failed', 'charging' |
### Table: qc_operations (岸桥作业)
| Column | Type | Description |
|--------|------|-------------|
| id | INT | 岸桥作业 ID,主键 |
| qc_id | VARCHAR(20) | 岸桥编号,如 "QC-01" |
| berth_id | INT | 外键 → berths.id |
| ship_id | INT | 外键 → ships.id |
| moves_count | INT | 作业次数(吊次) |
| containers_count | INT | 处理的集装箱数量 |
| efficiency_moves_per_hour | DECIMAL(6,2) | 作业效率:自然箱/小时 |
### Table: cargo_owners (货主信息)
| Column | Type | Description |
|--------|------|-------------|
| id | INT | 货主 ID,主键 |
| company_name | VARCHAR(200) | 公司名称 |
| owner_type | VARCHAR(50) | 类型:'shipping_line'(船公司), 'forwarder'(货代), 'direct_customer'(直客) |
## 业务术语定义
- **TEU**:Twenty-foot Equivalent Unit,20 英尺标准箱。20 尺柜=1 TEU,40 尺柜=2 TEU
- **吞吐量**:一定时间内完成的装船 + 卸船作业总量(按 TEU 计算)
- 公式:`SUM(teus) WHERE move_type IN ('load', 'discharge') AND status='completed'`
- **泊位利用率**:泊位被占用的时间比例
- 公式:`占用时长 / 总时长 * 100%`
- **岸桥效率**:每小时完成的自然箱数量(moves per hour)
- **AGV 利用率**:AGV 处于运输状态的时间比例
## 表关系
- ships (1) ←→ (N) container_moves (通过 ship_id)
- berths (1) ←→ (N) container_moves (通过 berth_id)
- cargo_owners (1) ←→ (N) container_moves (通过 cargo_owner_id)
- ships (1) ←→ (N) qc_operations
- berths (1) ←→ (N) qc_operations
## SQL 示例
### 示例 1: 查询某段时间的集装箱吞吐量
```sql
SELECT
SUM(teus) as total_teus,
COUNT(*) as total_containers
FROM container_moves
WHERE move_type IN ('load', 'discharge')
AND status = 'completed'
AND actual_time BETWEEN '2024-01-01' AND '2024-01-31';📝 让我们开始编码!
第一步:创建项目文件夹
请在你的本地开发环境中创建以下结构:
BASH
mkdir port-data-agent
cd port-data-agent
mkdir -p backend/agent backend/database frontend/src
第二步:创建核心 Python 文件
1. backend/agent/schema_prompt.py
创建一个新文件 backend/agent/schema_prompt.py,内容如下:
PYTHON
"""
港口运营数据库 Schema 提示词
"""
SCHEMA_PROMPT = """
# 港口运营数据库 Schema
## 表结构
### ships (船舶)
- id: INT, 主键
- ship_name: VARCHAR(100), 船名
- voyage_no: VARCHAR(50), 航次号
- ship_type: VARCHAR(50), 类型:container/bulk/tanker
- capacity_teu: INT, TEU 容量
- company: VARCHAR(100), 航运公司
### berths (泊位)
- id: INT, 主键
- berth_code: VARCHAR(20), 泊位编号(如 B01)
- berth_type: VARCHAR(50), 类型:container/bulk/multi
- status: VARCHAR(20), 状态:occupied/available/maintenance
### container_moves (集装箱作业) ⭐
- id: INT, 主键
- container_no: VARCHAR(20), 箱号
- move_type: VARCHAR(20), 类型:load/discharge/gate_in/gate_out
- ship_id: INT, 外键 → ships.id
- berth_id: INT, 外键 → berths.id
- teus: DECIMAL(6,2), TEU 数量(20 尺=1, 40 尺=2)
- actual_time: TIMESTAMP, 作业时间
- status: VARCHAR(20), 状态:planned/completed/cancelled
## 业务规则
- **吞吐量** = SUM(teus) WHERE move_type IN ('load', 'discharge') AND status='completed'
- 只允许 SELECT 查询
- 默认查询最近 7 天数据
## SQL 示例
-- 查询某段时间吞吐量
SELECT SUM(teus) FROM container_moves
WHERE move_type IN ('load', 'discharge') AND status='completed';
-- 查询某条船的业务
SELECT ship_name, COUNT(*), SUM(teus)
FROM ships s JOIN container_moves cm ON s.id=cm.ship_id
GROUP BY ship_name;
"""
def build_prompt(user_question: str) -> str:
return f"{SCHEMA_PROMPT}\n\n用户问题:{user_question}\n\n生成 SQL:"
2. backend/agent/sql_generator.py
创建 backend/agent/sql_generator.py:
PYTHON
"""
SQL 生成器:调用 LLM 生成 SQL
"""
import os
from openai import OpenAI
from .schema_prompt import build_prompt
# 初始化 LLM 客户端
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
def generate_sql(user_question: str, model: str = "gpt-3.5-turbo") -> str:
"""
调用 LLM 生成 SQL
Args:
user_question: 用户自然语言问题
model: LLM 模型名称
Returns:
生成的 SQL 字符串
"""
prompt = build_prompt(user_question)
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "你是一个 PostgreSQL SQL 专家,只返回 SQL,不要其他文字。"},
{"role": "user", "content": prompt}
],
temperature=0, # 降低随机性,保证 SQL 稳定性
max_tokens=500
)
sql = response.choices[0].message.content.strip()
# 清理可能的 markdown 代码块标记
if sql.startswith("```sql"):
sql = sql[6:]
if sql.endswith("```"):
sql = sql[:-3]
return sql.strip()
# 测试
if __name__ == "__main__":
question = "上周的集装箱吞吐量是多少?"
sql = generate_sql(question)
print(f"生成的 SQL:\n{sql}")
3. backend/agent/validator.py
创建 backend/agent/validator.py:
PYTHON
"""
SQL 安全验证器
"""
import re
class SecurityError(Exception):
"""安全验证异常"""
pass
def validate_sql(sql: str) -> tuple[bool, str]:
"""
验证 SQL 安全性
Returns:
(是否安全,错误信息或清理后的 SQL)
"""
sql_upper = sql.upper().strip()
# 1. 只允许 SELECT
if not sql_upper.startswith("SELECT"):
return False, "只允许执行 SELECT 查询"
# 2. 禁止危险关键字
dangerous = ["DROP", "DELETE", "UPDATE", "INSERT", "TRUNCATE", "ALTER"]
for keyword in dangerous:
if keyword in sql_upper:
return False, f"检测到危险操作:{keyword}"
# 3. 限制 JOIN 数量
if sql_upper.count("JOIN") > 5:
return False, "查询过于复杂:最多允许 5 个 JOIN"
# 4. 添加 LIMIT(如果没有)
if "LIMIT" not in sql_upper and "COUNT" not in sql_upper and "SUM" not in sql_upper:
sql = sql.rstrip(";") + " LIMIT 1000"
return True, sql
# 测试
if __name__ == "__main__":
test_cases = [
"SELECT * FROM ships",
"DROP TABLE ships",
"SELECT * FROM ships; DELETE FROM users",
]
for sql in test_cases:
is_safe, result = validate_sql(sql)
print(f"SQL: {sql[:50]}... | 安全:{is_safe} | 结果:{result[:50] if result else 'N/A'}")
4. backend/main.py (FastAPI 主入口)
创建 backend/main.py:
PYTHON
"""
港口数据分析 Agent - FastAPI 主入口
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from agent.sql_generator import generate_sql
from agent.validator import validate_sql
app = FastAPI(title="Port Data Agent")
# CORS 配置(允许前端访问)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境改为具体域名
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class QueryRequest(BaseModel):
question: str
class QueryResponse(BaseModel):
sql: str
result: list = None
error: str = None
@app.post("/api/query")
async def query(request: QueryRequest) -> QueryResponse:
"""处理用户查询"""
try:
# 1. 生成 SQL
sql = generate_sql(request.question)
# 2. 安全验证
is_safe, result = validate_sql(sql)
if not is_safe:
raise HTTPException(status_code=400, detail=result)
safe_sql = result
# 3. TODO: 执行 SQL(下一步实现)
# query_result = db.execute(safe_sql)
return QueryResponse(
sql=safe_sql,
result=[], # 临时返回空数组
error=None
)
except Exception as e:
return QueryResponse(
sql="",
result=[],
error=str(e)
)
@app.get("/health")
async def health_check():
return {"status": "ok"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
第三步:运行和测试
安装依赖:
BASH
cd backend
pip install fastapi uvicorn openai python-dotenv
设置环境变量:
创建.env文件:
BASH
OPENAI_API_KEY=your_api_key_here
启动后端:
BASH
python main.py
测试 API:
用 curl 或 Postman 发送请求:
BASH
curl -X POST http://localhost:8000/api/query \
-H "Content-Type: application/json" \
-d '{"question": "上周的集装箱吞吐量是多少?"}'结果:

