一、基础理论知识
01 阿里云百炼Key使用
碎碎念:调用云端模型,流式输出
from openai import OpenAI
import os
client = OpenAI(
# 如果没有配置环境变量,请用阿里云百炼API Key替换:api_key="sk-xxx"
# api_key="sk-8e23095db89341679edc0cfa52b8a53",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
messages = [{"role": "user", "content": "你是谁"}]
completion = client.chat.completions.create(
model="deepseek-v3", # 您可以按需更换为其它深度思考模型
messages=messages,
extra_body={"enable_thinking": True},
stream=True
)
is_answering = False # 是否进入回复阶段
print("\n" + "=" * 20 + "思考过程" + "=" * 20)
for chunk in completion:
delta = chunk.choices[0].delta
if hasattr(delta, "reasoning_content") and delta.reasoning_content is not None:
if not is_answering:
print(delta.reasoning_content, end="", flush=True)
if hasattr(delta, "content") and delta.content:
if not is_answering:
print("\n" + "=" * 20 + "完整回复" + "=" * 20)
is_answering = True
print(delta.content, end="", flush=True)
02 OpenAI使用
碎碎念:模型调用的格式,用户与助手角色
from openai import OpenAI
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
response = client.chat.completions.create(
model="qwen3-max",
messages=[
{"role": "user", "content": "你是python专家,并且不说废话"},
{"role": "assistant", "content": "是的,我是一个python专家"},
{"role": "user", "content": "那么,你能不能帮我写一个python程序,计算1到100的和"},
]
)
print(response.choices[0].message.content)
03 OpenAI库流式调用
碎碎念:像大模型发送问题,并用流式方式一段一段接收回复,实时打印出来
from openai import OpenAI
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
response = client.chat.completions.create(
model="qwen3-max",
messages=[
{"role": "user", "content": "你是python专家,并且不说废话"},
{"role": "assistant", "content": "是的,我是一个python专家"},
{"role": "user", "content": "那么,你能不能帮我写一个python程序,计算1到100的和"},
],
stream=True
)
for chunk in response:
print(chunk.choices[0].delta.content,
end=" ",
flush=True)
04 附带历史消息
碎碎念:系统给模型定身份,流式输出实时打印回答
from openai import OpenAI
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
response = client.chat.completions.create(
model="qwen3-max",
messages=[
{"role": "system", "content": "你是AI助理,回答很简洁"},
{"role": "user", "content": "小明有两条宠物"},
{"role": "assistant", "content": "好的"},
{"role": "user", "content": "小红有三条宠物"},
{"role": "assistant", "content": "好的"},
{"role": "user", "content": "总共有几只宠物"},
],
stream=True
)
for chunk in response:
print(chunk.choices[0].delta.content,
end=" ",
flush=True)
二、提示词工程
01 Prompt提示词案例-金融文本分类
碎碎念:few-shot提示词的形式,让大模型模仿示例,把新文本分到指定金融类别里。
from openai import OpenAI
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
examples_data = { # 示例数据
'新闻报道': '今日,股市经历了一轮震荡,受到宏观经济数据和全球贸易紧张局势的影响。投资者密切关注美联储可能的政策调整,以适应市场的不确定性。',
'财务报告': '本公司年度财务报告显示,去年公司实现了稳步增长的盈利,同时资产负债表呈现强劲的状况。经济环境的稳定和管理层的有效战略执行为公司的健康发展提供了坚实基础。',
'公司公告': '本公司高兴地宣布成功完成新一轮并购交易,收购了一家在人工智能领域领先的公司。这一战略举措将有助于扩大我们的业务领域,提高市场竞争力。',
'分析师报告': '最新的行业分析报告指出,科技公司的创新将成为未来增长的主要推动力。云计算、人工智能和数字化转型被认为是引领行业发展的关键因素,投资者应重点关注这些领域。'
}
# 分类列表
examples_types = ['新闻报道', '财务报告', '公司公告', '分析师报告']
# 提问数据
questions = [
"今日,央行发布公告宣布降低利率,以刺激经济增长。这一降息举措将影响贷款利率,并在未来几个季度内对金融市场产生影响。",
"ABC公司今日发布公告称,已成功完成对XYZ公司股权的收购交易。本次交易是ABC公司在扩大业务范围、加强市场竞争力方面的重要举措。据悉,此次收购将进一步巩固其在行业内的领先地位。",
"公司资产负债表显示,公司偿债能力强劲,现金流充足,为未来投资和扩张提供了坚实的财务基础。",
"最新的分析报告指出,可再生能源行业预计将在未来几年经历持续增长,投资者应该关注这一领域的投资机会。",
"小明喜欢小新哟"
]
"""
[
{"role": "system", "content": "你是金融专家,将文本分类为['新闻报道', '财务报道', '公司公告', '分析师报告'],不清楚的分类为'不清楚'"},
{"role": "user", "content": "今日,央行发布公告宣布................"},
{"role": "assistant", "content": "新闻报道"},
{"role": "user", "content": "ABC公司今日发布公告称,已成功完成对XYZ公司股................"},
{"role": "assistant", "content": "财务报告"},
...
]
"""
messages = [
{"role": "system", "content": "你是金融专家,将文本分类为['新闻报道', '财务报道', '公司公告', '分析师报告'],不清楚的分类为'不清楚'"},
]
for key, value in examples_data.items():
messages.append({"role": "user", "content": value})
messages.append({"role": "assistant", "content": key})
for q in questions:
response = client.chat.completions.create(
model="qwen3-max",
messages=messages + [{"role": "user", "content": f"按照示例,回答这段文本的分类类别: {q}"}]
)
print(response.choices[0].message.content)
02 提示词优化-金融信息抽取
碎碎念:先给模型任务说明,给模型标注样例,让模型模仿样例输出结构化结果
from openai import OpenAI
import json
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
# base_url="http://localhost:11434/v1"
)
schema = ['日期', '股票名称', '开盘价', '收盘价', '成交量']
examples_data = [ # 示例数据
{
"content": "2023-01-10,股市震荡。股票强大科技A股今日开盘价100人民币,一度飙升至105人民币,随后回落至98人民币,最终以102人民币收盘,成交量达到520000。",
"answers": {
"日期": "2023-01-10",
"股票名称": "强大科技A股",
"开盘价": "100人民币",
"收盘价": "102人民币",
"成交量": "520000"
}
},
{
"content": "2024-05-16,股市利好。股票英伟达美股今日开盘价105美元,一度飙升至109美元,随后回落至100美元,最终以116美元收盘,成交量达到3560000。",
"answers": {
"日期": "2024-05-16",
"股票名称": "英伟达美股",
"开盘价": "105美元",
"收盘价": "116美元",
"成交量": "3560000"
}
}
]
questions = [ # 提问问题
"2025-06-16,股市利好。股票传智教育A股今日开盘价66人民币,一度飙升至70人民币,随后回落至65人民币,最终以68人民币收盘,成交量达到123000。",
"2025-06-06,股市利好。股票黑马程序员A股今日开盘价200人民币,一度飙升至211人民币,随后回落至201人民币,最终以206人民币收盘。"
]
"""
[
{"role": "system", "content": f"你帮我完成信息抽取,我给你句子,你抽取{schema}信息,按JSON字符串输出,如果某些信息不存在,用'原文未提及'表示,请参考如下示例:"},
{"role": "user", "content": "2023-01-10,股市震荡。股票强大科技A股今日开盘价100人民币,一度飙升至105人民币,随后回落至98人民币,最终以102人民币收盘,成交量达到520000。"},
{"role": "assistant", "content": '{"日期": "2023-01-10", "股票名称": "强大科技A股", "开盘价": "100人民币", "收盘价": "102人民币", "成交量": "520000"}'},
{"role": "user", "content": "2024-05-16,股市利好。股票英伟达美股今日开盘价105美元,一度飙升至109美元,随后回落至100美元,最终以116美元收盘,成交量达到3560000。"},
{"role": "assistant", "content": '{"日期": "2024-05-16", "股票名称": "英伟达美股", "开盘价": "105美元", "收盘价": "116美元", "成交量": "3560000"}'},
...
]
"""
messages = [
{"role": "system", "content": f"你帮我完成信息抽取,我给你句子,你抽取{schema}信息,按JSON字符串输出,如果某些信息不存在,用'原文未提及'表示,请参考如下示例:"},
{"role": "user", "content": "2023-01-10,股市震荡。股票强大科技A股今日开盘价100人民币,一度飙升至105人民币,随后回落至98人民币,最终以102人民币收盘,成交量达到520000。"},
{"role": "assistant", "content": '{"日期": "2023-01-10", "股票名称": "强大科技A股", "开盘价": "100人民币", "收盘价": "102人民币", "成交量": "520000"}'},
{"role": "user", "content": "2024-05-16,股市利好。股票英伟达美股今日开盘价105美元,一度飙升至109美元,随后回落至100美元,最终以116美元收盘,成交量达到3560000。"},
{"role": "assistant", "content": '{"日期": "2024-05-16", "股票名称": "英伟达美股", "开盘价": "105美元", "收盘价": "116美元", "成交量": "3560000"}'},
]
for example in examples_data:
messages.append(
{"role": "user", "content": example["content"]}
)
messages.append(
{"role": "assistant", "content": json.dumps(example["answers"], ensure_ascii=False)}
)
# for x in messages:
# print(x)
'''
{"日期": "2025-06-16", "股票名称": "传智教育A股", "开盘价": "66人民币", "收盘价": "68人民币", "成交量": "123000"}
{"日期": "2025-06-06", "股票名称": "黑马程序员A股", "开盘价": "200人民币", "收盘价": "206人民币", "成交量": "原文未提及"}
'''
for q in questions:
response = client.chat.completions.create(
model="qwen3-max",
messages=messages + [{"role": "user", "content": f"按照上面的示例,现在抽取这个句子的信息{q}"}]
)
print(response.choices[0].message.content)
03 提示词优化,金融文本匹配判断
碎碎念:few-shot分类
from openai import OpenAI
client = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
# base_url="http://localhost:11434/v1"
)
examples_data = {
"是": [
("公司ABC发布了季度财报,显示盈利增长。", "财报披露,公司ABC利润上升。"),
("公司ITCAST发布了年度财报,显示盈利大幅度增长。", "财报披露,公司ITCAST更赚钱了。")
],
"不是": [
("黄金价格下跌,投资者抛售。", "外汇市场交易额创下新高。"),
("央行降息,刺激经济增长。", "新能源技术的创新。")
]
}
questions = [
("利率上升,影响房地产市场。", "高利率对房地产有一定的冲击。"),
("油价大幅度下跌,能源公司面临挑战。", "未来智能城市的建设趋势越加明显。"),
("股票市场今日大涨,投资者乐观。", "持续上涨的市场让投资者感到满意。")
]
"""
[
{"role": "system", "content": f"你帮我完成文本匹配,我给你2个句子,被[]包围,你判断它们是否匹配,回答是或不是,请参考如下示例:"},
{"role": "user", "content": "句子1:[公司ABC发布了季度财报,显示盈利增长。]句子2:[财报披露,公司ABC利润上升。]"},
{"role": "assistant", "content": "是"},
{"role": "user", "content": "句子1:[公司ITCAST发布了年度财报,显示盈利大幅度增长。]句子2:[财报披露,公司ITCAST更赚钱了。]"},
{"role": "assistant", "content": "是"},
{"role": "user", "content": "句子1:[黄金价格下跌,投资者抛售。]句子2:[外汇市场交易额创下新高。]"},
{"role": "assistant", "content": "不是"},
{"role": "user", "content": "句子1:[央行降息,刺激经济增长。]句子2:[新能源技术的创新。]"},
{"role": "assistant", "content": "不是"},
{"role": "user", "content": f"按照上述示例,回答这2个句子的情况。句子1:[...]句子2:[...]"}
"""
messages = [
{"role": "system", "content": f"你帮我完成文本匹配,我给你2个句子,被[]包围,你判断它们是否匹配,回答是或不是,请参考如下示例:"}
]
for key, value in examples_data.items():
for t in value:
messages.append(
{"role": "user", "content": f"句子1:[{t[0]}]句子2:[{t[1]}]"}
)
messages.append(
{"role": "assistant", "content": key}
)
for q in questions:
response = client.chat.completions.create(
model="qwen3-max",
messages=messages+[
{"role": "user", "content": f"句子1:[{q[0]}]句子2:[{q[1]}]"}
]
)
print(response.choices[0].message.content)
三、RAG开发
01 LangChain介绍
开发LLM相关业务功能的集成,python第三方库,提供各种接口
02 安装环境
pip install langchain langchain-community langchain-ollama dashscope chromadb
各包作用说明
- langchain:核心包,LangChain 框架的基础功能模块。
- langchain-community:社区支持包,提供更多第三方模型的调用能力(如阿里云通义千问模型)。
- langchain-ollama:Ollama 支持包,用于调用 Ollama 托管部署的本地大语言模型。
- dashscope:阿里云通义千问的 Python SDK,用于直接调用阿里云模型服务。
- chromadb:轻量级向量数据库,用于后续 RAG 等场景的向量存储与检索。
03 RAG检索增强
通用基础大模型存在的问题
- 知识非实时性:LLM 训练完成后不具备自动更新知识的能力,会导致部分信息滞后。
- 领域知识匮乏:大模型知识主要来源于公开互联网和开源数据集,无法覆盖特定领域或高度专业化的内部知识。
- 幻觉问题:LLM 有时会生成看似合理但实际上错误的信息。

RAG (Retrieval-Augmented Generation) 即检索增强生成,为大模型提供了从特定数据源检索到的信息,以此修正和补全答案。可以总结为一个公式:

RAG工作分为两条线:
- 离线准备线
- 在线服务线

04 扩展向量Vector
- 向量(Vector)就是文本的 “数学身份证”:把一段文字的语义信息,转换成一串固定长度的数字列表,让计算机能 “看懂” 文字的含义并做相似度计算。
- 文本嵌入模型embedding - 深度学习技术
- 维度越多,语义匹配越精准;但性能压力也会增大
05 余弦相似度算法
碎碎念:先自己实现点积和模长,再用公式算两个向量的余弦相似度,用来判断向量方向是否接近。
import numpy as np
"""
计算两个向量的余弦相似度(衡量方向相似性,剔除长度影响)
参数:
vec_a (np.array): 向量A
vec_b (np.array): 向量B
返回:
float: 余弦相似度结果(范围[-1,1],越接近1方向越一致)
公式:
cos_sim = (vec_a · vec_b) / (||vec_a|| × ||vec_b||)
拆解:
1. 点积:vec_a · vec_b = vec_a[0]×vec_b[0] + vec_a[1]×vec_b[1] + ... + vec_a[n]×vec_b[n]
2. 模长:||vec_a|| = √(vec_a[0]² + vec_a[1]² + ... + vec_a[n]²)
3. 模长:||vec_b|| = √(vec_b[0]² + vec_b[1]² + ... + vec_b[n]²)
A: [0.5, 0.5]
B: [0.7, 0.7]
C: [0.7, 0.5]
D: [-0.6, -0.5]
"""
def get_dot(vec_a, vec_b):
"""计算2个向量的点积,2个向量同维度"""
if len(vec_a) != len(vec_b):
raise ValueError
dot_sum = 0
for a, b in zip(vec_a, vec_b):
dot_sum += a * b
return dot_sum
def get_norm(vec):
"""计算向量的模长, 对向量的每个数字求平方乘积之和"""
sum_square = 0
for v in vec:
sum_square += v * v
# numpy sqrt函数完成开根号
return np.sqrt(sum_square)
def cosine_similarity(vec_a, vec_b):
"""余弦相似度:2个向量的点积 除以 2个向量模长的乘积"""
result = get_dot(vec_a, vec_b) / (get_norm(vec_a) * get_norm(vec_b))
return result
if __name__ == '__main__':
vec_a = [0.5, 0.5]
vec_b = [0.7, 0.7]
vec_c = [0.7, 0.5]
vec_d = [-0.6, -0.5]
print("ab:", cosine_similarity(vec_a, vec_b))
print("ac:", cosine_similarity(vec_a, vec_c))
print("ad:", cosine_similarity(vec_a, vec_d))
06 LangChain调用大语言模型
LangChain 目前支持三种类型的模型:LLMs(大语言模型)、Chat Models(聊天模型)、Embeddings Models(嵌入模型)。
- LLMs(大语言模型):是技术范畴的统称,指基于大参数数量、海量文本训练的 Transformer 架构模型,核心能力是理解和生成自然语言,主要服务于文本生成场景。
- Chat Models(聊天模型):是专为对话场景优化的 LLMs,核心能力是模拟人类对话的轮次交互,主要服务于聊天场景。
- 文本嵌入模型(Embeddings Models):接收文本作为输入,将文本转换为向量(Embedding),主要用于语义表示、相似度计算、检索等 RAG 相关场景。
LangChain 支持的三类模型,它们的使用场景不同,输入和输出不同,开发者需要根据项目需要选择相应。
我们所用的阿里云通义千问系列主要来自于:langchain_community 包。
from langchain_community.llms.tongyi import Tongyi
# 不用qwen3-max, 因为qwen3-max是聊天模型, qwen-max是大语言模型
model = Tongyi(model="qwen-max")
# 调用invoke向模型提问
res = model.invoke(input="你是谁啊能做什么?")
print(res)
# from langchain_ollama import OllamaLLM
'''本地测试'''
# model = OllamaLLM(model="qwen2.5:14b")
# res = model.invoke(input="你是谁啊能做什么?")
# print(res)
07 流式输出
res = model.stream(input="你是谁呀能做什么?")
for chunk in res:
print(chunk, end="", flush=True)
08 调用聊天模型
- AIMessage:AI 输出的消息,可以是针对问题的回答。(对应 OpenAI 库中的
assistant角色) - HumanMessage:人类消息即用户信息,由人给出并发送给 LLMs 的提示信息,比如 “实现一个快速排序方法”。(对应 OpenAI 库中的
user角色) - SystemMessage:用于指定模型所处的环境和背景,如角色扮演等。可在此给出具体指示,比如 “作为一个代码专家”,或者 “返回 json 格式”。(对应 OpenAI 库中的
system角色)
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
model = ChatTongyi(model="qwen3-max")
messages = [
SystemMessage(content="你是一个边塞诗人"),
HumanMessage(content="写一首唐诗"),
AIMessage(content="锄禾日当午,汗滴禾下土,谁知盘中餐,粒粒皆辛苦。"),
HumanMessage(content="按照你上一个回复的格式,在写一首唐诗")
]
res = model.stream(input=messages)
for chunk in res:
print(chunk.content, end="", flush=True)
09 消息的简写
messages = [
("system", "你是一个边塞诗人"),
("human", "写一首唐诗"),
("ai", "锄禾日当午,汗滴禾下土,谁知盘中餐,粒粒皆辛苦。"),
("system", "按照你上一个回复的格式,在写一首唐诗"),
]
是动态的,需要在运行时由LangChain内部机制转换为Message类对象
好处:简写形式避免导包、写起来更简单、简写形式支持内部填充{变量}占位、可在运行时填充具体值。
10 调用文本嵌入模型
Embeddings Models(嵌入模型)的特点:将字符串作为输入,返回一个浮点数的列表(向量)。
在 NLP(自然语言处理)中,Embedding(嵌入) 的核心作用就是将文本数据进行向量化,把人类可读的文字转换为计算机可计算的向量形式,从而实现语义相似度计算、检索等功能。
from langchain_community.embeddings import DashScopeEmbeddings
# 创建模型对象,不传model默认使用的是text-embeddings-v1
model = DashScopeEmbeddings()
print(model.embed_query("我喜欢你"))
print(model.embed_documents(["我喜欢你", "你真聪明"]))
模型使用总结

11 通用prompt(zero-shot)
zero-shot思想下,可以基于PromptTemplate直接完成
from langchain_core.prompts import PromptTemplate
from langchain_community.llms.tongyi import Tongyi
prompt_template = PromptTemplate.from_template(
"我的邻居姓{lastname}, 刚生了{gender}, 你帮我起个名字, 简单回答。"
)
model = Tongyi(model="qwen-max")
# # 调用.fromat方法注入信息即可
# prompt_text = prompt_template.format(lastname="张", gender="女儿")
# model = Tongyi(model="qwen-max")
# res = model.invoke(input=prompt_text)
# print(res)
chain = prompt_template | model
res = chain.invoke(input={"lastname": "张", "gender": "女儿"})
print(res)
12 FewShot模板
FewShotPromptTemplate 类对象构建需要 5 个核心参数:
example_prompt:示例数据的提示词模板(定义单条示例的格式,由PromptTemplate构成)examples:示例数据,类型为list,内部嵌套字典(每条示例是一个字典,键对应example_prompt的输入变量)prefix:组装提示词时,放在所有示例之前的内容(用于说明任务规则、背景)suffix:组装提示词时,放在所有示例之后的内容(用于拼接待处理的输入问题)input_variables:列表类型,最终要注入模板的变量名(即suffix中需要填充的变量)
rom langchain_core.prompts import FewShotPromptTemplate, PromptTemplate
from langchain_community.llms.tongyi import Tongyi
# 示例模板
example_template = PromptTemplate.from_template("单词: {word},反义词:{antonym}")
# 示例的动态数据注入 要求是list内部套字典
examples_data = [
{"word": "大", "antonym": "小"},
{"word": "上", "antonym": "下"}
]
few_shot_template = FewShotPromptTemplate(
example_prompt=example_template, # 示例数据的模版
examples=examples_data, # 示例的数据
prefix="告知我单词的反义词,我提供如下的示例: ", # 示例之前的提示词
suffix="基于前面的示例告诉我, {input_word}的反义词是?", # 示例之后的提示词
input_variables=['input_word'] # 声明在前缀或后缀中所需要注入的变量名
)
prompt_text = few_shot_template.invoke(input={"input_word": "左"}).to_string()
print(prompt_text)
model = Tongyi(model="qwen-max")
print(model.invoke(input=prompt_text))
13 模板类的format和invoke方法

from langchain_core.prompts import PromptTemplate
from langchain_core.prompts import FewShotPromptTemplate
from langchain_core.prompts import ChatPromptTemplate
template = PromptTemplate.from_template("我的邻居是:{lastname}, 最喜欢: {hobby}")
res = template.format(lastname="Rudy", hobby=" running")
print(res, type(res))
res2 = template.invoke({"lastname": "Rudy", "hobby": " running"})
print(res2, type(res2))
14 ChatPromptTemplate使用
- PromptTemplate:通用提示词模板,支持动态注入变量信息,适合单轮、无示例的简单任务。
- FewShotPromptTemplate:支持基于模板注入任意数量的示例信息,适合少样本学习场景(如分类、抽取、匹配)。
- ChatPromptTemplate:支持注入任意数量的历史会话信息,适合多轮对话场景。
ChatPromptTemplate 核心用法
- 核心方法:通过
from_messages方法,从列表中获取多轮次会话作为聊天的基础模板。 - 关键区别:
PromptTemplate使用from_template,仅能接入单条消息;ChatPromptTemplate使用from_messages,可以接入一个消息列表(多轮对话历史)。
MessagePlacehoder 用于在聊天模板中预留动态位置(如历史对话),通过指定 history 作为占位键,在执行推理时利用 invoke 方法传入真实的历史会话列表。format 不支持动态注入消息列表类的复杂结构,因此必须使用 invoke。
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_models.tongyi import ChatTongyi
chat_prompt_template = ChatPromptTemplate.from_messages(
[
("system", "你是一个 helpful assistant"),
MessagesPlaceholder("history"),
('human', "请再来一首唐诗"),
]
)
history_data = [
("human", "你来写一首唐诗"),
("ai", "床前明月光,疑是地上霜。"),
("human", "再来一句"),
("ai", "白日依山,黄河入海。"),
]
prompt_text = chat_prompt_template.invoke({"history": history_data})
model = ChatTongyi(model="qwen3-max")
res = model.invoke(prompt_text)
print(res)
15 Chain 基础使用
将组件串联,上一个组件的输出作为下一个组件的输入是 LangChain 链(尤其是 | 管道链)的核心工作原理,这也是链式调用的核心价值:实现数据的自动化流转与组件的协同工作,如下。
chain = prompt_template | model
核心前提:即 Runnable 子类对象才能入链(以及 Callable、Mapping 接口子类对象也可加入(后续了解用的不多))。
我们目前所学习到的组件,均是 Runnable 接口的子类
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_community.chat_models.tongyi import ChatTongyi
chat_prompt_template = ChatPromptTemplate.from_messages(
[
("system", "你是一个 helpful assistant"),
MessagesPlaceholder("history"),
('human', "请再来一首唐诗"),
]
)
history_data = [
("human", "你来写一首唐诗"),
("ai", "床前明月光,疑是地上霜。"),
("human", "再来一句"),
("ai", "白日依山,黄河入海。"),
]
model = ChatTongyi(model="qwen3-max")
chain = chat_prompt_template | model
# 通过链式调用invoke或stream
# res= chain.invoke({"history": history_data})
# print(res.content)
for chunk in chain.stream({"history": history_data}):
print(chunk.content, end="", flush=True)
16 运算法重载
chain = chat_prompt_template | model
在语法上使用了 | 运算符的重写a + b本质调用的是a.__add__(b)a | b本质调用的是a.__or__(b)
__or__方法,即可对 | 符号的功能进行重写。- 让
a|b|c的代码得到一个自定义的类对象 (类似列表即[a, b, c]) - 调用 run 方法依次输出 a、b、c
- 我们需要重写 | 即
__or__方法
class Test(object):
def __init__(self, name):
self.name = name
def __or__(self, other):
return MySequence(self, other)
def __str__(self):
return self.name
class MySequence(object):
def __init__(self, *args):
self.sequence = []
for arg in args:
self.sequence.append(arg)
def __or__(self, other):
self.sequence.append(other)
return self
def run(self):
for i in self.sequence:
print(i)
'''
a
b
c
<class '__main__.MySequence'>
'''
if __name__ == '__main__':
a = Test('a')
b = Test('b')
c = Test('c')
d = a | b | c
d.run()
print(type(d))
17 Runable示例
LangChain 中的绝大多数核心组件都继承了 Runnable 抽象基类(位于 langchain_core.runnables.base)。
chain = prompt | model
而得到这个类型的原因就是 Runnable 基类内部对
__or__魔术方法的改写。|添加新的组件,依旧会得到 RunnableSequence,这就是链的基础架构。from langchain_core.prompts import PromptTemplate
from langchain_community.llms.tongyi import Tongyi
prompt = PromptTemplate.from_template("你是一个AI助手!")
model = Tongyi(models="qwen3-max")
chain = prompt | model | prompt | model
# chain.invoke()
# chain.stream()
'''
<class 'langchain_core.runnables.base.RunnableSequence'>
'''
print(chain)
print(type(chain))
18 StrOutParser字符串输出解析器
chain = prompt | model | model
- prompt 的结果是PromptValue类型,输入给了 model
- model 的输出结果是:AIMessage
需要用StrOutParser解析器把AIMessage转换成可用类型
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_community.chat_models.tongyi import ChatTongyi
parser = StrOutputParser()
model = ChatTongyi(models="qwen3-max")
prompt = PromptTemplate.from_template(
"我邻居姓:{lastname}, 刚生了{gender}, 请起名,仅告知我名字无需其他内容。"
)
chain = prompt | model | parser | model | parser
res: str = chain.invoke({'lastname': "葛", "gender": "女儿"})
print(res)
print(type(res))
19 JsonOutputParser解析器
- 模型输入:PromptValue 或字符串或序列(BaseMessage、list、tuple、str、dict)。
- 模型输出:AIMessage
- 提示词模板输入:要求是字典
- 提示词模板输出:PromptValue 对象
- StrOutputParser:AIMessage 输入、str 输出
- JsonOutputParser:AIMessage 输入、dict 输出
from langchain_core.output_parsers import StrOutputParser, JsonOutputParser
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.prompts import PromptTemplate
str_parser = StrOutputParser()
json_parser = JsonOutputParser()
model = ChatTongyi(models="qwen3-max")
first_prompt = PromptTemplate.from_template(
"我邻居姓:{lastname}, 刚生了{gender}, 请起名,并封装到JSON格式返回给我,"
"要求key是name,value是起的名字"
)
second_prompt = PromptTemplate.from_template(
"姓名{name}, 请帮我解析含义"
)
chain = first_prompt | model | json_parser | second_prompt | model | str_parser
for chunk in chain.stream({"lastname": "葛", "gender": "女儿"}):
print(chunk, end="", flush=True)
20 自定义函数加入链
如果想要在链中加入自定义函数,可以选择:
- 将函数封装入 RunnableLambda 类对象,其是 Runnable 接口实例,可以直接入链
- 直接将函数入链,函数会自动转换为 RunnableLambda 对象
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnableLambda
str_parser = StrOutputParser()
model = ChatTongyi(models="qwen3-max")
first_prompt = PromptTemplate.from_template(
"我邻居姓:{lastname}, 刚生了{gender}, 请起名,仅生成一个名字,并告知我名字,不要额外信息,"
"要求key是name,value是起的名字"
)
second_prompt = PromptTemplate.from_template(
"姓名{name}, 请帮我解析含义"
)
# my_func = RunnableLambda(lambda ai_msg: {"name": ai_msg.content})
# chain = first_prompt | model | my_func | second_prompt | model | str_parser
chain = first_prompt | model | RunnableLambda(lambda ai_msg: {"name": ai_msg.content}) | second_prompt | model | str_parser
for chunk in chain.stream({"lastname": "葛", "gender": "女孩"}):
print(chunk, end= "", flush=True)
21 Memory 临时记忆
- 基于
RunnableWithMessageHistory在原有链的基础上创建带有历史记录功能的新链(新 Runnable 实例) - 基于
InMemoryChatMessageHistory为历史记录提供内存存储(临时用)
from langchain_community.chat_models import ChatTongyi
from langchain_core.prompts import PromptTemplate, ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
model = ChatTongyi(models="qwen3-max")
str_parser = StrOutputParser()
# prompt = PromptTemplate.from_template(
# "你需要根据会话历史回应用户问题,对话历史: {chat_history}, 用户提问: {input}, 请回答"
# )
prompt = ChatPromptTemplate.from_messages(
[
("system", "你需要根据会话历史回应用户问题,对话历史:"),
MessagesPlaceholder("chat_history"),
("human", "用户提问: {input}"),
]
)
def print_prompt(full_prompt):
print("=" * 20, full_prompt.to_string(), "=" * 20)
return full_prompt
base_chain = prompt | model | str_parser
store = {}
def get_history(session_id):
if session_id not in store:
store[session_id] = InMemoryChatMessageHistory()
return store[session_id]
# 创建一个新的链
conversion_chain = RunnableWithMessageHistory(
base_chain, # 被增强的原有chain
get_history, # 通过会话id获取InMemoryChatMessageHistory类对象
input_messages_key="input", # 表示用户输入在模板中的占位符
history_messages_key="chat_history",
)
'''
第1次执行: 好的,小明有两只喵,听起来很可爱呢!你是不是想继续讲一个关于小明和他猫咪的故事呢?或者你有什么问题想要问关于这两只喵的?
第2次执行: 小刚有1只汪,听起来也很可爱!你是不是想讲一个关于小刚和他狗狗的故事呢?或者你想知道更多关于这只汪的信息?
第3次执行: 小明有两只喵,小刚有1只汪,所以总共有 2 + 1 = 3 只宠物。
'''
if __name__ == '__main__':
session_config = {
"configurable": {
"session_id": "user_01"
}
}
res = conversion_chain.invoke({"input": "小明有两只喵,"}, session_config)
print("第1次执行:", res)
res = conversion_chain.invoke({"input": "小刚有1只汪,"}, session_config)
print("第2次执行:", res)
res = conversion_chain.invoke({"input": "总共有几只宠物,"}, session_config)
print("第3次执行:", res)
22 长期会话记忆
FileChatMessageHistory类实现,核心思路:
- 基于文件存储会话记录,以
session_id为文件名,不同session_id有不同文件存储消息
继承BaseChatMessageHistory实现如下 3 个方法:
add_messages:同步模式,添加消息messages:同步模式,获取消息clear:同步模式,清除消息
import os, json
from langchain_community.chat_models import ChatTongyi
from langchain_core.messages import message_to_dict, messages_from_dict, BaseMessage
from langchain_core.chat_history import BaseChatMessageHistory, InMemoryChatMessageHistory
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnableWithMessageHistory
from sqlalchemy import Sequence
class FileChatMessageHistory(BaseChatMessageHistory):
def __init__(self, session_id, storage_path):
self.session_id = session_id
self.storage_path = storage_path
self.file_path = os.path.join(self.storage_path, self.session_id)
os.makedirs(os.path.dirname(self.file_path), exist_ok=True)
def add_messages(self, messages: Sequence[BaseMessage]) -> None:
all_messages = list(self.messages)
all_messages.extend(messages)
# new_messages = []
# for message in all_messages:
# d = message_to_dict(message)
# new_messages.append(d)
new_messages = [message_to_dict(message) for message in all_messages]
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump(new_messages, f)
@property
def messages(self) -> list[BaseMessage]:
try:
with open(self.file_path, "r", encoding="utf-8") as f:
messages_data = json.load(f)
return messages_from_dict(messages_data)
except FileNotFoundError:
return []
def clear(self) -> None:
with open(self.file_path, "w", encoding="utf-8") as f:
json.dump([], f)
model = ChatTongyi(models="qwen3-max")
str_parser = StrOutputParser()
# prompt = PromptTemplate.from_template(
# "你需要根据会话历史回应用户问题,对话历史: {chat_history}, 用户提问: {input}, 请回答"
# )
prompt = ChatPromptTemplate.from_messages(
[
("system", "你需要根据会话历史回应用户问题,对话历史:"),
MessagesPlaceholder("chat_history"),
("human", "用户提问: {input}"),
]
)
def print_prompt(full_prompt):
print("=" * 20, full_prompt.to_string(), "=" * 20)
return full_prompt
base_chain = prompt | print_prompt |model | str_parser
store = {}
def get_history(session_id):
return FileChatMessageHistory(session_id, "./chat_history")
# 创建一个新的链
conversion_chain = RunnableWithMessageHistory(
base_chain, # 被增强的原有chain
get_history, # 通过会话id获取InMemoryChatMessageHistory类对象
input_messages_key="input", # 表示用户输入在模板中的占位符
history_messages_key="chat_history",
)
'''
==================== System: 你需要根据会话历史回应用户问题,对话历史:
Human: 小明有两只喵,
AI: 好的,小明有两只喵,听起来很可爱呢!你是不是想继续讲一个关于小明和他猫咪的故事呢?或者你有什么问题想要问关于这两只喵的?
Human: 小刚有1只汪,
AI: 小刚有1只汪,听起来也很可爱呢!和小明的两只喵相比,小刚的汪是不是更特别呢?你是不是想继续讲关于小刚和他狗狗的故事,或者有什么问题想问呢?😊
Human: 用户提问: 小明有两只喵, ====================
第1次执行: 小明有两只喵,听起来很可爱呢!你是不是想继续讲一个关于小明和他猫咪的故事呢?或者你有什么问题想要问关于这两只喵的?
'''
if __name__ == '__main__':
session_config = {
"configurable": {
"session_id": "user_01"
}
}
res = conversion_chain.invoke({"input": "小明有两只喵,"}, session_config)
print("第1次执行:", res)
res = conversion_chain.invoke({"input": "小刚有1只汪,"}, session_config)
print("第2次执行:", res)
res = conversion_chain.invoke({"input": "总共有几只宠物,"}, session_config)
print("第3次执行:", res)
23 CSVLoader文档加载器
文档加载器提供了一套标准接口,用于将不同来源(如 CSV、PDF 或 JSON 等)的数据读取为 LangChain 的文档格式。这确保了无论数据来源如何,都能对其进行一致性处理。
from langchain_community.document_loaders import CSVLoader
loader = CSVLoader(
file_path="data/stu.csv",
csv_args={
"delimiter": ",", # 指定分隔符
"quotechar": '"', # 指定带有分隔符文本的引号包围是单引号还是双引号
"fieldnames": ['a', 'b', 'c', 'd']
},
encoding="utf-8",
)
# 批量加载 .load() -> [Document, Document,...]
documents = loader.load()
for document in documents:
print(document)
# 懒加载 .lazy_load() 迭代器[Document]
for document in loader.lazy_load():
print(document)
24 JSONLoader
JSONLoader 用于将 JSON 数据加载为 Document 类型对象。
使用 JSONLoader 需要额外安装:
pip install jq
jq 是一个跨平台的 JSON 解析工具,LangChain 底层对 JSON 的解析就是基于 jq 工具实现的。
将 JSON 数据的信息抽取出来,封装为 Document 对象,抽取的时候依赖 jq_schema 语法。
from langchain_community.document_loaders import JSONLoader
loader = JSONLoader(
file_path= "./data/stu.json",
jq_schema=".",
text_content= False
)
document = loader.load()
print(document)
25 TextLoader和文档分隔器
TextLoader 是一个简单的加载器,可以加载文本文件内容,返回仅有一个 Document 对象的 list。
RecursiveCharacterTextSplitter 递归字符文本分割器,是 LangChain 官方推荐的默认分割器。
- 基于文本的自然段落分割大文档为小文档
- 可以指定小文档的最大字符数、重叠字符数
- 可以手动指定段落划分的依据(符号)以及字符数量统计函数
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
# pip install langchain_text_splitters
loader = TextLoader('./data/Python基础语法', encoding='utf-8')
docs = loader.load()
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, # 分段的最大字符数
chunk_overlap=50, # 分段之间允许重叠字符数
# 文本自然段落分隔的依据符号
separators=["\n\n", "\n", " ", ""],
length_function=len, #统计字符的依据函数
)
split_docs = splitter.split_documents(docs)
print(split_docs)
for doc in split_docs:
print("="*20)
print(doc)
print("="*20)
26 PyPDFLoader
PyPDFLoader 使用。PyPDFLoader 加载器,依赖 PyPDF 库,pip install pypdffrom langchain_community.document_loaders import PyPDFLoader
loader = PyPDFLoader(
file_path="./data/pdf1.pdf",
mode="single", # 默认是page模式,每个页面形成一个Document文档对象
password="xxx"
)
for doc in loader.lazy_load():
print(doc)
27 VectorStores向量存储
InMemoryVectorStore,完成内存向量存储Chroma,外部数据库向量存储
add_document,添加文档到向量存储delete,从向量存储中删除文档similarity_search:相似度搜索

(1)内存存储
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_community.document_loaders import CSVLoader
vector_store = InMemoryVectorStore(embedding=DashScopeEmbeddings())
loader = CSVLoader(
file_path="./data/info.csv",
encoding="utf-8",
source_column="source",
)
documents = loader.load()
# 向量存储的 新增、删除、检索
vector_store.add_documents(
documents=documents, # 被添加的文档
ids=["id" + str(i) for i in range(1, len(documents) + 1)] # 给添加的文档提供id(字符串) list[str]
)
# 删除 传入[id, id ...]
vector_store.delete(["id1", "id2"])
# 检索
result = vector_store.similarity_search(
query="AI应用开发",
k=2
)
'''
[Document(id='id3', metadata={'source': 'AI应用开发,是最喜欢的技术', 'row': 2}, page_content='source: AI应用开发,是最喜欢的技术\ninfo: None')]
'''
print(result)
(2)外部向量持久化存储
安装langchain-chroma chromadb
# Chroma
vector_store = Chroma(
persist_directory="./chroma_db", # 指定数据存放的文件夹
collection_name="test", # 数据库的表名称
embedding_function=DashScopeEmbeddings(), # 嵌入模型
)
28 基于向量检索构建提示词
向量存储实例可通过 add_texts(list[str]) 方法,将文本列表快速添加到向量库中。
- 通过向量存储对用户提问进行相似性检索,匹配相关上下文信息。
- 将用户提问与匹配到的上下文信息一同封装到提示词模板中,输入给大语言模型进行回答。
29 RunnablePassthrough使用
向量检索加入链
from langchain_community.chat_models import ChatTongyi
from langchain_core.documents import Document
from langchain_core.runnables import RunnablePassthrough
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
model = ChatTongyi(model="qwen3-max")
prompt = ChatPromptTemplate.from_messages(
[
("system", "以我提供的已知参考资料为主,简洁和专业的回答用户问题,参考资料:{context}."),
('user', "用户提问:{input}")
]
)
vector_store = InMemoryVectorStore(embedding=DashScopeEmbeddings(model="text-embedding-v4"))
# 准备一下资料(向量库的数据)
# add_texts 传入一个list[str]
vector_store.add_texts(["AI应用开发,是最喜欢的技术", "学AI, 掌握人生先锋", "AI,让生活更美好"])
input_text = "为什么学AI?"
# langchain中向量存储对象,有一个方法:as_retriever, 可以返回一个Runable接口的子类
retriever = vector_store.as_retriever(search_kwargs={"k": 2})
def format_func(docs: list[Document]):
if not docs:
return "无相关资料"
formatted_str = "["
for doc in docs:
formatted_str += doc.page_content
formatted_str += "]"
return formatted_str
def print_prompt(prompt):
print(prompt.to_string())
print("=" * 20)
return prompt
chain = (
{"input": RunnablePassthrough(), "context": retriever | format_func} | prompt | print_prompt | model | StrOutputParser()
)
res = chain.invoke(input_text)
'''
System: 以我提供的已知参考资料为主,简洁和专业的回答用户问题,参考资料:[学AI, 掌握人生先锋AI,让生活更美好].
Human: 用户提问:为什么学AI?
====================
学AI是因为它能帮助我们掌握人生先锋技术,让生活更美好。通过学习AI,不仅可以提升个人竞争力,还能更好地理解和应用智能科技,解决实际问题,创造更多价值。
'''
print(res)
四、RAG项目
1.案例介绍
-
离线处理:向私有知识库(向量存储)源源不断添加私有知识文档。
- 向知识库添加来自未来的知识文档(基于模型训练完成时间)
- 向模型添加私有知识文档
- 给出模型参考资料,规避模型幻觉(一本正经的胡说八道)
-
在线处理:用户提问会先基于私有知识库做检索,获取参考资料,同步组装新提示词询问大模型获取结果。

本次项目以 "某东商品衣服" 为例,以衣服属性构建本地知识。使用者可以自由更新本地知识,用户问题的答案也是基于本地知识生成的。


2.文本上传
# 基于Streamlit完成WEB网页上传服务
import streamlit as st
st.title("知识库")
# file_uploader
uploader_file = st.file_uploader(
"请上传TXT文件",
type=['txt'],
accept_multiple_files=False, # False表示仅支持一个文件的上传
)
if uploader_file is not None:
file_name = uploader_file.name
file_type = uploader_file.type
file_size = uploader_file.size / 1024
st.subheader(f"文件名:{file_name}")
st.write(f"格式: {file_type} | 大小: {file_size:.2f} KB")
text = uploader_file.getvalue().decode('utf-8')
st.write(text)
3.md5工具开发
"""
知识库
"""
import os
import config_data as config
import hashlib
def check_md5(md5_str: str):
"""
检查文件MD5
return False(md5未处理过) True(已经处理过)
"""
if not os.path.exists(config.md5_path):
open(config.md5_path, 'w', encoding='utf-8').close()
return False
else:
for line in open(config.md5_path, 'r', encoding='utf-8').readlines():
line = line.strip()
if line == md5_str:
return True
def save_md5(md5_str: str):
"""
保存文件MD5
"""
with open(config.md5_path, 'a', encoding='utf-8') as f:
f.write(md5_str + '\n')
pass
def get_string_md5(input_str: str, encoding: str = 'utf-8'):
"""
获取字符串MD5
"""
str_bytes = input_str.encode(encoding=encoding)
# 创建md5对象
md5_obj = hashlib.md5()
md5_obj.update(str_bytes)
md5_hex = md5_obj.hexdigest()
return md5_hex
class KnowledgeBaseService(object):
def __init__(self):
self.chroma = None # 向量存储的实例 Chroma向量库对象
self.spliter = None # 文本分割器的对象
def upload_by_str(self, data, filename):
"""将传入的字符串,进行向量化,存入向量数据库中"""
pass
if __name__ == '__main__':
r1 = get_string_md5("AI开发1")
r2 = get_string_md5("AI开发1")
r3 = get_string_md5("AI开发3")
save_md5("747211adaeb43406bd69aa62ac04e017")
print(check_md5("747211adaeb43406bd69aa62ac04e017"))
4.知识库更新
class KnowledgeBaseService(object):
def __init__(self):
# 如果文件不存在则创建,如果存在则跳过
os.makedirs(config.persist_directory, exist_ok=True)
self.chroma = Chroma(
collection_name=config.collection_name, # 数据库的表名
embedding_function=DashScopeEmbeddings(model="text-embedding-v4"),
persist_directory=config.persist_directory, # 数据库本地存储文件夹
) # 向量存储的实例 Chroma向量库对象
self.spliter = RecursiveCharacterTextSplitter(
chunk_size=config.chunk_size, # 分割后的文本段最大长度
chunk_overlap=config.chunk_overlap, # 连续文本段之间的字符重叠数量
separators=config.separators, # 自然段落划分的符号
length_function=len # 统计字符的依据函数
) # 文本分割器的对象
def upload_by_str(self, data, filename):
"""将传入的字符串,进行向量化,存入向量数据库中"""
md5_hex = get_string_md5(data)
if check_md5(md5_hex):
return "[跳过]内容已经存在知识库中"
if len(data) > config.max_split_char_number:
knowledge_chunks: list[str] = self.spliter.split_text(data)
else:
knowledge_chunks = [data]
metadata = {
"source": filename,
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"operator": "Rudy"
}
self.chroma.add_texts( # 内容就加载到向量库中了
knowledge_chunks,
metadatas=[metadata for _ in knowledge_chunks],
)
save_md5(md5_hex)
return "[成功]内容已经成功载入向量库"
5 离线流程开发

import time
# 基于Streamlit完成WEB网页上传服务
import streamlit as st
from knowledge_base import KnowledgeBaseService
st.title("知识库")
# file_uploader
uploader_file = st.file_uploader(
"请上传TXT文件",
type=['txt'],
accept_multiple_files=False, # False表示仅支持一个文件的上传
)
service = KnowledgeBaseService()
# session_state就是一个字典
if "service" not in st.session_state:
st.session_state["service"] = KnowledgeBaseService()
if uploader_file is not None:
file_name = uploader_file.name
file_type = uploader_file.type
file_size = uploader_file.size / 1024
st.subheader(f"文件名:{file_name}")
st.write(f"格式: {file_type} | 大小: {file_size:.2f} KB")
text = uploader_file.getvalue().decode('utf-8')
with st.spinner("载入知识库中。。。"):
time.sleep(1)
result = st.session_state["service"].upload_by_str(text, file_name)
st.write(result)
6 在线流程

vector_stores.py 向量存储
from langchain_chroma import Chroma
import config_data as config
class VectorStoreService(object):
def __init__(self, embedding):
"""
:param embedding: 嵌入模型的传入
"""
self.embedding = embedding
self.vector_store = Chroma(
collection_name=config.collection_name, # 数据库的表名
embedding_function=self.embedding,
persist_directory=config.persist_directory, # 数据库本地存储文件夹
) # 向量存储的实例 Chroma向量库对象
def get_retriever(self):
"""返回向量检索器,方便加入Chain"""
return self.vector_store.as_retriever(search_kwargs={"k": config.similarity_threshold})
"""
[Document(id='cf897da0-4f68-44d4-a207-962d06696017', metadata={'source': '尺码推荐.txt', 'created_at': xxx]
"""
if __name__ == '__main__':
from langchain_community.embeddings import DashScopeEmbeddings
retriever = VectorStoreService(DashScopeEmbeddings(model="text-embedding-v4")).get_retriever()
res = retriever.invoke('我的体重130斤,尺码推荐')
print(res)
7 rag服务核心代码
from langchain_core.documents import Document
from vector_stores import VectorStoreService
from langchain_community.embeddings import DashScopeEmbeddings
import config_data as config
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_models.tongyi import ChatTongyi
def print_prompt(prompt):
print("=" * 20)
print(prompt.to_string())
print("=" * 20)
return prompt
class RagService(object):
def __init__(self):
self.vector_service = VectorStoreService(
embedding=DashScopeEmbeddings(model=config.embedding_model_name),
)
self.prompt_template = ChatPromptTemplate.from_messages(
[
('system', "以我提供的已知参考资料为主,"
"简洁和专业的回答用户问题。参考资料:{context}"),
('user', "用户提问:{input}"),
]
)
self.chat_model = ChatTongyi(model=config.chat_model_name)
self.chain = self.__get_chain()
def __get_chain(self):
"""获取最终的执行链"""
retriever = self.vector_service.get_retriever()
def format_document(docs: list[Document]):
if not docs:
return "无相关资料"
formatted_str = ""
for doc in docs:
formatted_str += f"文档片段:{doc.page_content}\n文档元数据:{doc.metadata}\n\n"
return formatted_str
chain = (
{
"input": RunnablePassthrough(),
"context": retriever | format_document
} | self.prompt_template | print_prompt | self.chat_model | StrOutputParser()
)
return chain
if __name__ == '__main__':
res = RagService().chain.invoke("我体重180斤, 尺码推荐")
print(res)

8 历史会话
from langchain_core.documents import Document
from RAG项目案例.file_history_store import get_history
from vector_stores import VectorStoreService
from langchain_community.embeddings import DashScopeEmbeddings
import config_data as config
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.runnables import RunnablePassthrough, RunnableLambda
from langchain_core.output_parsers import StrOutputParser
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.runnables import RunnableWithMessageHistory
def print_prompt(prompt):
print("=" * 20)
print(prompt.to_string())
print("=" * 20)
return prompt
class RagService(object):
def __init__(self):
self.vector_service = VectorStoreService(
embedding=DashScopeEmbeddings(model=config.embedding_model_name),
)
self.prompt_template = ChatPromptTemplate.from_messages(
[
('system', "以我提供的已知参考资料为主,"
"简洁和专业的回答用户问题。参考资料:{context}"),
('system', '并且我提供用户的对话历史记录, 如下:'),
MessagesPlaceholder("history"),
('user', "用户提问:{input}"),
]
)
self.chat_model = ChatTongyi(model=config.chat_model_name)
self.chain = self.__get_chain()
def __get_chain(self):
"""获取最终的执行链"""
retriever = self.vector_service.get_retriever()
def format_document(docs: list[Document]):
if not docs:
return "无相关资料"
formatted_str = ""
for doc in docs:
formatted_str += f"文档片段:{doc.page_content}\n文档元数据:{doc.metadata}\n\n"
return formatted_str
# 调试输入格式
def format_for_retriever(value: dict) -> str:
return value["input"]
# 调试输入格式
def format_for_prompt_template(value):
print("-" * 20, value)
new_value = {}
new_value["input"] = value["input"]["input"]
new_value["context"] = value["context"]
new_value["history"] = value["input"]["history"]
return new_value
chain = (
{
"input": RunnablePassthrough(),
"context": RunnableLambda(format_for_retriever) | retriever | format_document
} | RunnableLambda(format_for_prompt_template) | self.prompt_template | print_prompt | self.chat_model | StrOutputParser()
)
conversion_chain = RunnableWithMessageHistory(
chain,
get_history,
input_messages_key="input",
history_messages_key="history",
)
return conversion_chain
if __name__ == '__main__':
# session_id配置
session_config = {
"configurable": {
"session_id": "user_001",
}
}
res = RagService().chain.invoke({"input": "我体重130斤, 尺码推荐"}, session_config)
print(res)
9 聊天页面开发
import streamlit as st
from rag import RagService
import time
import config_data as config
# 标题
st.title("智能客服")
st.divider() # 分隔符
if "message" not in st.session_state:
st.session_state["message"] = [{"role": "assistant", "content": "你好,有什么问题吗?"}]
if "rag" not in st.session_state:
st.session_state["rag"] = RagService()
for message in st.session_state["message"]:
st.chat_message(message["role"]).write(message["content"])
# 在页面最下方提供用户输入栏
prompt = st.chat_input()
if prompt:
st.chat_message("user").write(prompt)
st.session_state["message"].append({"role": "user", "content": prompt})
ai_res_list = []
with st.spinner("AI思考中:"):
res_stream = st.session_state["rag"].chain.stream({"input": prompt}, config.session_config)
time.sleep(1)
def capture(generator, cache_list):
for chunk in generator:
cache_list.append(chunk)
yield chunk
st.chat_message("assistant").write_stream(capture(res_stream, ai_res_list))
st.session_state["message"].append({"role": "assistant", "content": "".join(ai_res_list)})
四、Agent智能体
01 智能体介绍
- 没有 Agent 时,LLM 只能基于自身训练数据回答问题,遇到需要实时数据、复杂计算、外部工具调用的场景就会卡壳。
- 有了 Agent 后,LLM 就像一个 "指挥官",能思考任务步骤→选择合适工具→执行工具调用→根据结果调整策略,直到完成任务。

02 智能体初体验
from langchain.agents import create_agent
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.tools import tool
@tool(description="获取天气情况")
def get_weather() -> str:
return "晴天"
agent = create_agent(
model=ChatTongyi(model="qwen3-max"),
tools=[get_weather],
system_prompt="您是一个聊天助手,可以回答用户问题"
)
res = agent.invoke(
{
"messages": [
{"role": "user", "content": "明天上海的天气怎么样?"}
]
}
)
'''
HumanMessage 明天上海的天气怎么样?
AIMessage
ToolMessage 晴天
AIMessage 明天上海的天气是晴天,适合外出活动!记得做好防晒措施哦。
'''
for msg in res["messages"]:
print(type(msg).__name__, msg.content)
03 Agent流式输出
from langchain.agents import create_agent
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.tools import tool
@tool(description="获取股价, 传入股票名称, 返回字符串信息")
def get_price(name: str) -> str:
return f"股票{name}的价格是20元"
@tool(description="获取股票信息, 传入股票名称, 返回字符串信息")
def get_info(name: str) -> str:
return f"股票{name}, 是一家A股上市公司,专注于AI开发!"
agent = create_agent(
model=ChatTongyi(model="qwen3-max"),
tools=[get_price, get_info],
system_prompt="你是一个智能助手,可以回答股票相关问题,记住请告知我思考过程,让我知道你为什么调用某个工具"
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "股票AI的股价是多少,并介绍一下?"}]},
stream_mode="values"
):
latest_message = chunk['messages'][-1]
if latest_message.content:
print(type(latest_message).__name__, latest_message.content)
try:
if latest_message.tool_calls:
print(f"工具调用:{[tc['name'] for tc in latest_message.tool_calls]}")
except AttributeError as e:
pass
'''
HumanMessage 股票AI的股价是多少,并介绍一下?
工具调用:['get_price', 'get_info']
ToolMessage 股票AI, 是一家A股上市公司,专注于AI开发!
AIMessage 股票AI的当前股价是20元。这是一家A股上市公司,专注于AI(人工智能)领域的开发。如果您对该公司或其业务有更多具体问题,欢迎继续提问!
'''
04 ReAct框架
Agent ReAct 是大模型智能体的核心思考与行动框架,全称 Reasoning + Acting(推理 + 行动),是让 Agent 像人类一样「思考问题→制定策略→执行行动→验证结果」的关键逻辑。
简单来说:ReAct 让 Agent 不再是 “直接回答问题”,而是通过 “自然语言思考过程” 指导工具调用,一步步解决复杂问题,完美适配需要多步推理、工具协作的场景(如智能客服、报告生成、任务规划等)。

from langchain.agents import create_agent
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.tools import tool
@tool(description="获取体重,返回值是整数,单位千克")
def get_weight() -> int:
return 90
@tool(description="获取身高,返回值是整数,单位厘米")
def get_height(name: str) -> str:
return 175
agent = create_agent(
model=ChatTongyi(model="qwen3-max"),
tools=[get_weight, get_height],
system_prompt="你是一个严格遵循ReAct框架的智能体,必须按[思考-行动-观察-再思考】的流程解决问题,"
"且每轮仅能思考并调用一个工具,禁止单词调用多个工具。"
"并告知我你的思考过程,工具的调用原因,按思考、行动、观察三个结构告知我"
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "计算我的BMI,并给出计算公式?"}]},
stream_mode="values"
):
latest_message = chunk['messages'][-1]
if latest_message.content:
print(type(latest_message).__name__, latest_message.content)
try:
if latest_message.tool_calls:
print(f"工具调用:{[tc['name'] for tc in latest_message.tool_calls]}")
except AttributeError as e:
pass
05 middleware中间件

from langchain.agents import create_agent
from langchain.agents.middleware import before_agent, after_agent, before_model, after_model, wrap_tool_call, wrap_model_call
from langchain_community.chat_models.tongyi import ChatTongyi
from langchain_core.tools import tool
from langchain.agents import AgentState
from langgraph.runtime import Runtime
@tool(description="查询天气,传入城市名称字符串,返回字符串天气信息")
def get_weather(city: str) -> str:
return f"{city}天气:晴天"
"""
1.agent执行前
2.agent执行后
3.model执行前
4.model执行后
5.工具执行中
6.模型执行中
"""
@before_agent
def log_before_agent(state: AgentState, runtime: Runtime) -> None:
# agent执行前会调用这个函数并传入state和runtime两个对象
print(f"[before agent] agent启动, 并附带{len(state['messages'])}消息")
@after_agent
def log_after_agent(state: AgentState, runtime: Runtime) -> None:
print(f"[after agent] agent执行完毕,共生成{len(state['messages'])}消息")
@after_model
def log_befor_model(state: AgentState, runtime: Runtime) -> None:
print(f"[after_model]模型即将调用,并附带{len(state['messages'])}消息")
@before_model
def log_after_model(state: AgentState, runtime: Runtime) -> None:
print(f"[after_model]模型调用结束,并附带{len(state['messages'])}消息")
@wrap_model_call
def model_call_hook(request, handler):
print("模型调用中")
return handler(request)
@wrap_tool_call
def moniter_tool(request, handler):
print(f"工具执行:{request.tool_call['name']}")
print(f"工具执行传入参数:{request.tool_call['args']}")
return handler(request)
agent = create_agent(
model=ChatTongyi(model="qwen3-max"),
tools=[get_weather],
middleware=[log_before_agent, log_after_agent, log_befor_model, log_after_model, moniter_tool, model_call_hook]
)
res = agent.invoke({"messages": [{"role": "user", "content": "上海这里的天气如何,如何穿衣养生?"}]})
print("*" * 20, res)
06 智能体项目
(1) 智能问答服务
- 处理购买前的产品咨询(如功能、价格、对比等)。
- 解决购买后的使用问题(如操作指导、故障处理、维护建议等)。
- 基于 RAG 技术,从知识库中检索准确信息并生成自然语言回答,确保响应及时且可靠。
(2) 使用报告与优化建议生成
- 针对已购买用户,自动分析扫地机器人的使用数据(如清洁频率、耗材状态、错误日志等)。
- 生成个性化报告,总结使用情况并提供优化建议(如清洁计划调整、部件更换提醒等)。
- 支持用户主动查询报告或系统定期推送,帮助用户最大化产品价值。

07 日志和路径工具开发

1)日志工具
import logging
from datetime import datetime
from path_tool import get_abs_path
import os
# 日志保存的根目录
LOG_ROOT = get_abs_path('logs')
# 确保日志的目录存在
os.makedirs(LOG_ROOT, exist_ok=True)
# 日志的格式配置
DEFAULT_LOG_FORMAT = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(filename)s - %(lineno)d - %(message)s'
)
def get_logger(
name: str = "agent",
console_level: int = logging.INFO,
file_level: int = logging.DEBUG,
log_file=None
) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# 避免重复添加Handler
if logger.handlers:
return logger
# 控制条Handler
console_handler = logging.StreamHandler()
console_handler.setLevel(console_level)
console_handler.setFormatter(DEFAULT_LOG_FORMAT)
logger.addHandler(console_handler)
# 文件Handler
if not log_file:
log_file = os.path.join(LOG_ROOT, f"{name}_{datetime.now().strftime('%Y-%m-%d')}.log")
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(file_level)
file_handler.setFormatter(DEFAULT_LOG_FORMAT)
logger.addHandler(file_handler)
return logger
# 快捷获取日志
logger = get_logger()
if __name__ == '__main__':
logger.info("信息日志")
logger.error("错误日志")
logger.debug("调试日志")
logger.warning("警告日志")
2)路径工具
import os
def get_project_root() -> str:
"""
获取当前工程的根目录
:return:
"""
# 当前文件的绝对路径
current_file = os.path.abspath(__file__)
# 获取工程的根目录,先获取文件所在的文件夹绝对路径
current_dir = os.path.dirname(current_file)
# 获取工程根目录
project_root = os.path.dirname(current_dir)
return project_root
def get_abs_path(relative_path: str) -> str:
"""
传递相对路径,得到绝对路径
:param relative_path:
:return:
"""
project_root = get_project_root()
return os.path.join(project_root, relative_path)
if __name__ == '__main__':
print(get_abs_path('config/config.txt'))
08 配置工具-提示词工具开发
1)配置处理工具
import yaml
from utils.path_tool import get_abs_path
def load_rag_config(config_path: str = get_abs_path('config/rag.yml'), encoding: str = 'utf-8'):
with open(config_path, 'r', encoding=encoding) as f:
return yaml.load(f, Loader=yaml.FullLoader)
def load_chroma_config(config_path: str = get_abs_path('config/chroma.yml'), encoding: str = 'utf-8'):
with open(config_path, 'r', encoding=encoding) as f:
return yaml.load(f, Loader=yaml.FullLoader)
def load_prompts_config(config_path: str = get_abs_path('config/prompts.yml'), encoding: str = 'utf-8'):
with open(config_path, 'r', encoding=encoding) as f:
return yaml.load(f, Loader=yaml.FullLoader)
def load_agent_config(config_path: str = get_abs_path('config/agent.yml'), encoding: str = 'utf-8'):
with open(config_path, 'r', encoding=encoding) as f:
return yaml.load(f, Loader=yaml.FullLoader)
rag_conf = load_rag_config()
chroma_conf = load_chroma_config()
prompts_conf = load_prompts_config()
agent_conf = load_agent_config()
if __name__ == '__main__':
print(rag_conf["chat_model_name"])
2)文件处理工具
import hashlib
import os
from langchain_community.document_loaders import PyPDFLoader, TextLoader
from langchain_core.documents import Document
from utils.logger_handler import logger
def get_file_md5_hex(filepath: str): # 获取文件的md5的十六进制字符串
if not os.path.exists(filepath):
logger.error(f"[md5计算]文件{filepath}不存在")
return
if not os.path.isfile(filepath):
logger.error(f"[md5计算]文件{filepath}不是文件")
return
md5_obj = hashlib.md5()
chunk_size = 4096 # 4KB分片, 避免文件过大爆内存
try:
with open(filepath, 'rb') as f:
while chunk := f.read(chunk_size):
md5_obj.update(chunk)
except Exception as e:
logger.error(f"[md5计算]文件{filepath}计算md5出错")
logger.error(e)
return
def listdir_with_allowed_type(path: str, allowed_types: tuple[str]): # 获取目录下允许的文件类型
files = []
if not os.path.isdir(path):
logger.error(f"[listdir_with_allowed_type]{path}不是文件夹")
return allowed_types
for f in os.listdir(path):
if f.endswith(allowed_types):
files.append(os.path.join(path, f))
return tuple(files)
def pad_loader(filepath: str, passwd=None) -> list[Document]: # 加载pad文件
return PyPDFLoader(filepath, passwd).load()
def txt_loader(filepath: str) -> list[Document]: # 加载txt文件
return TextLoader(filepath).load()
3)提示词加载工具
from utils.config_handler import prompts_conf
from utils.path_tool import get_abs_path
from utils.logger_handler import logger
def load_system_prompts():
try:
system_prompt_path = get_abs_path(prompts_conf['main_prompt_path'])
except KeyError as e:
logger.error(f"[load_system_prompts]在yaml配置项中没有main_prompt_path配置项")
raise e
try:
return open(system_prompt_path, "r", encoding="utf-8").read()
except Exception as e:
logger.error(f"[oad_system_prompts]解析系统提示词出错, {str(e)}")
raise e
def load_rag_prompts():
try:
rag_prompt_path = get_abs_path(prompts_conf['rag_summarize_prompt_path'])
except KeyError as e:
logger.error(f"[load_rag_prompts]在yaml配置项中没有rag_summarize_prompt_path配置项")
raise e
try:
return open(rag_prompt_path, "r", encoding="utf-8").read()
except Exception as e:
logger.error(f"[load_rag_prompts]解析系统提示词出错, {str(e)}")
raise e
def load_report_prompts():
try:
report_prompt_path = get_abs_path(prompts_conf['report_prompt_path'])
except KeyError as e:
logger.error(f"[load_report_prompts]在yaml配置项中没有report_prompt_path配置项")
raise e
try:
return open(report_prompt_path, "r", encoding="utf-8").read()
except Exception as e:
logger.error(f"[load_report_prompts]解析系统提示词出错, {str(e)}")
raise e
if __name__ == '__main__':
print(load_report_prompts())
09 向量存储服务开发

from langchain_chroma import Chroma
from langchain_core.documents import Document
from utils.config_handler import chroma_conf
from model.factory import embed_model
from langchain_text_splitters import RecursiveCharacterTextSplitter
from utils.file_handler import txt_loader, pdf_loader, listdir_with_allowed_type, get_file_md5_hex
from utils.path_tool import get_abs_path
from utils.logger_handler import logger
import os
class VectorStoreService:
def __init__(self):
self.vector_store = Chroma(
collection_name=chroma_conf["collection_name"],
embedding_function=embed_model,
persist_directory=chroma_conf["persist_directory"]
)
self.spliter = RecursiveCharacterTextSplitter(
chunk_size=chroma_conf["chunk_size"],
chunk_overlap=chroma_conf["chunk_overlap"],
separators=chroma_conf["separators"],
length_function=len,
)
def get_retriever(self):
return self.vector_store.as_retriever(search_kwargs={"k": chroma_conf["k"]})
def load_document(self):
"""
从数据文件夹内读取数据文件,转为向量存入向量库
要计算文件的MD5去重
:return: None
"""
def check_md5_hex(md5_for_check: str):
if not os.path.exists(get_abs_path(chroma_conf["md5_hex_store"])):
# 创建文件
open(get_abs_path(chroma_conf["md5_hex_store"]), 'w', encoding='utf-8').close()
return False # md5 没处理过
with open(get_abs_path(chroma_conf["md5_hex_store"]), 'r', encoding='utf-8') as f:
for line in f.readlines():
line = line.strip()
if line == md5_for_check:
return True # md5已处理过
return False
def save_md5_hex(md5_for_check: str):
with open(get_abs_path(chroma_conf["md5_hex_store"]), 'a', encoding='utf-8') as f:
f.write(md5_for_check + '\n')
def get_file_documents(read_path: str):
if read_path.endswith('txt'):
return txt_loader(read_path)
if read_path.endswith('pdf'):
return pdf_loader(read_path)
return []
allowed_file_path: list[str] = listdir_with_allowed_type(
get_abs_path(chroma_conf["data_path"]),
tuple(chroma_conf["allow_knowledge_file_type"])
)
for path in allowed_file_path:
md5_hex = get_file_md5_hex(path)
# if md5_hex is None:
# logger.warning(f"[加载知识库] {path}无法计算MD5,跳过")
# continue
if check_md5_hex(md5_hex):
logger.info(f"[加载知识库] {path}内容已经存在知识库内, 跳过")
continue
try:
documents: list[Document] = get_file_documents(path)
if not documents:
logger.warning(f"[加载知识库] {path}内没有有效文本内容, 跳过")
continue
split_document: list[Document] = self.spliter.split_documents(documents)
if not split_document:
logger.warning(f"[加载知识库] {path}分片后没有有效文本内容, 跳过")
continue
# 将内容存入向量库
self.vector_store.add_documents(split_document)
# 记录这个已经处理好的文件的md5,避免下次重复加载
save_md5_hex(md5_hex)
logger.info(f"[加载知识库] {path}内容已成功载入向量库")
except Exception as e:
# exc_info=True会记录详细的报错堆栈,如果为False仅记录报错信息本身
logger.error(f"[加载知识库] {path}处理失败: {e}", exc_info=True)
continue
if __name__ == '__main__':
vs = VectorStoreService()
vs.load_document()
retriever = vs.get_retriever()
res = retriever.invoke("迷路")
for r in res:
print(r.page_content)
print("-"*20)
10 Rag 服务开发

from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
"""
总结服务类:用户提问,搜索参考资料,将提问和参考资料交给模型,让模型总结回复
"""
from rag.vector_store import VectorStoreService
from utils.prompt_loader import load_rag_prompts
from langchain_core.prompts import PromptTemplate
from model.factory import chat_model
def print_prompt(prompt):
print("=" * 20)
print(prompt.to_string())
print("=" * 20)
return prompt
class RagSummarizeService(object):
def __init__(self):
self.vector_store = VectorStoreService()
self.retriever = self.vector_store.get_retriever()
self.prompt_text = load_rag_prompts()
self.prompt_template = PromptTemplate.from_template(self.prompt_text)
self.model = chat_model
self.chain = self._init_chain()
def _init_chain(self):
chain = self.prompt_template| print_prompt | self.model | StrOutputParser()
return chain
def retriever_docs(self, query: str) -> list[Document]:
return self.retriever.invoke(query)
def rag_summarize(self, query: str) -> str:
context_docs = self.retriever_docs(query)
context = ""
counter = 0
for doc in context_docs:
counter += 1
context += f"[参考资料{counter}]:参考资料“ {doc.page_content}|参考元数据: {doc.metadata}\n"
return self.chain.invoke(
{
"input": query,
"context": context,
}
)
if __name__ == '__main__':
rag = RagSummarizeService()
print(rag.rag_summarize("小户型适合哪种扫地机器人!"))
11 tools开发

from langchain_core.tools import tool
from rag.rag_service import RagSummarizeService
import random
from utils.config_handler import agent_conf
from utils.path_tool import get_abs_path
import os
from utils.logger_handler import logger
rag = RagSummarizeService()
user_ids = ["1234567890", "9876543210", "1111111111", "2222222222", "3333333333"]
month_arr = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12"]
extra_data = {}
@tool(description="从向量存储中检索参考资料")
def rag_summarize(query: str) -> str:
return rag.rag_summarize(query)
@tool(description="获取指定城市的天气,以消息字符串的形式返回")
def get_weather(city: str) -> str:
return f"城市{city}的天气为晴天,气温26摄氏度,空气湿度50%,南风1级"
@tool(description="获取用户所在的名称,以纯字符串形式返回")
def get_user_location() -> str:
return random.choice(["北京", "上海", "广州", "深圳", "天津", "重庆", "武汉", "成都", "杭州", "苏州"])
@tool(description="获取用户的ID,以纯字符串形式返回")
def get_user_id() -> str:
return random.choice(user_ids)
@tool(description="获取当前月份,以纯字符形式返回")
def get_current_month() -> str:
return random.choice(month_arr)
def generate_external_data():
"""
{
"user_id": "1234567890",
"user_location": "北京",
"current_month": "January"
}
:return:
"""
if not extra_data:
extra_data_path = get_abs_path(agent_conf["extra_data_path"])
if not os.path.exists(extra_data_path):
raise FileNotFoundError(f"外部文件{extra_data_path}不存在")
with open(extra_data_path, "r", encoding="utf-8") as f:
for line in f.readlines()[1:]:
arr: list[str] = line.strip().split(',')
user_id: str = arr[0].replace('"', "")
feature: str = arr[1].replace('"', "")
efficiency: str = arr[2].replace('"', "")
consumables: str = arr[3].replace('"', "")
comparison: str = arr[4].replace('"', "")
time: str = arr[5].replace('"', "")
if user_id not in extra_data:
extra_data[user_id] = {}
extra_data[user_id][time] = {
"特征": feature,
"效率": efficiency,
"消耗品": consumables,
"比较": comparison
}
@tool(description="从外部系统中获取指定用户在指定月份的使用记录,以纯字符串形式返回,如果未检索到返回空字符串")
def fetch_external_data(user_id: str, month: str) -> str:
generate_external_data()
try:
return extra_data[user_id][month]
except KeyError:
logger.warning(f"{fetch_external_data}未能检索到用户:{user_id}在{month}的使用记录数据")
return ""
@tool(description="无入参,无返回值,调用后触发中间件自动为报告生成的场景动态注入上下文信息,为后续提示词切换提供上下文信息")
def fill_context_for_report():
return "fill_context_for_report已调用"
if __name__ == '__main__':
print(fetch_external_data("1005", "2025-06"))
12 middleware中间件

from typing import Callable
from langchain.agents import AgentState
from langchain.agents.middleware import wrap_tool_call, before_model, dynamic_prompt, ModelRequest
from langchain.tools.tool_node import ToolCallRequest
from langchain_core.messages import ToolMessage
from langgraph.runtime import Runtime
from langgraph.types import Command
from utils.logger_handler import logger
from utils.prompt_loader import load_report_prompts, load_system_prompts
@wrap_tool_call
def monitor_tool(
# 请求的数据封装
request: ToolCallRequest,
# 执行的函数本身
handler: Callable[[ToolCallRequest], ToolMessage | Command],
) -> ToolMessage | Command: # 工具执行的监控
logger.info(f"[tool monitor]执行工具: {request.tool_call['name']}")
logger.info(f"[tool monitor]工具参数: {request.tool_call['args']}")
try:
result = handler(request)
logger.info(f"[tool monitor]工具{request.tool_call['name']}调用成功")
if request.tool_call['name'] == 'fill_context_for_report':
request.runtime.context["report"] = True
return result
except Exception as e:
logger.error(f"[tool monitor]工具{request.tool_call['name']}调用失败: {e}")
raise e
@before_model
def log_before_model(
state: AgentState, # 整个Agent智能体中的状态记录
runtime: Runtime, # 记录了整个执行过程中的上下文信息
): # 在模型执行前输出日志
logger.info(f"[log_before_model]即将调用模型,带有{len(state['messages'])}条消息")
logger.debug(f"[log_before_model]{type(state['messages'][-1]).__name__} | state['messages'][-1].content")
return None
@dynamic_prompt # 在每一次生成提示词之前,调用此函数
def report_prompt_switch(request: ModelRequest): # 动态切换提示词
is_report = request.runtime.context.get('report', False)
if is_report:
return load_report_prompts()
return load_system_prompts()
agent构建

from langchain.agents import create_agent
from model.factory import chat_model
from utils.prompt_loader import load_system_prompts
from agent.tools.agent_tools import (rag_summarize, get_weather, get_user_location, get_user_id,
get_current_month, fetch_external_data, fill_context_for_report)
from agent.tools.middleware import monitor_tool, log_before_model, report_prompt_switch
class ReactAgent:
def __init__(self):
self.agent = create_agent(
model=chat_model,
system_prompt=load_system_prompts(),
tools=[rag_summarize, get_weather, get_user_location, get_user_id,
get_current_month, fetch_external_data, fill_context_for_report],
middleware=[monitor_tool, log_before_model, report_prompt_switch],
)
def execute_stream(self, query: str):
input_dict = {
"messages": [
{
"role": "user", "content": query,
}
]
}
# 第三个参数context就是上下文runtime中的信息,就是我们做提示词切换的标记
for chunk in self.agent.stream(input_dict, stream_mode="values", context={"report": False}):
latest_message = chunk["messages"][-1]
yield latest_message.content.strip() + "\n"
if __name__ == '__main__':
agent = ReactAgent()
# for chunk in agent.execute_stream("扫地机器人在我所在的地区的气温下如何保养"):
for chunk in agent.execute_stream("给我生成我的使用报告"):
print(chunk, end="", flush=True)
13 用户界面
'''
@create_time: 2026/3/20 下午8:00
@Author: GeChao
@File: app.py
'''
import time
import streamlit as st
from agent.react_agent import ReactAgent
# 标题
st.title("智扫通机器人智能客服")
st.divider()
if "agent" not in st.session_state:
st.session_state["agent"] = ReactAgent()
if "messages" not in st.session_state:
st.session_state["messages"] = []
for message in st.session_state["messages"]:
st.chat_message(message["role"]).write(message["content"])
# 用户输入提示词
prompt = st.chat_input()
if prompt:
st.chat_message("user").write(prompt)
st.session_state["messages"].append({"role": "user", "content": prompt})
response_messages = []
with st.spinner("智能客服思考中..."):
res_stream = st.session_state["agent"].execute_stream(prompt)
def capture(generator, cache_list):
for chunk in generator:
cache_list.append(chunk)
for char in chunk:
time.sleep(0.01)
yield char
st.chat_message("assistant").write_stream(capture(res_stream, response_messages))
st.session_state["messages"].append({"role": "assistant", "content": response_messages[-1]})
st.rerun()

非特殊说明,本博所有文章均为博主原创。
如若转载,请注明出处:https://blog.grover.top/2025/08/10/ai%e5%ba%94%e7%94%a8%e5%9f%ba%e7%a1%80/
zhy
花太多了,观看有点困难
Rudy博主
@zhy: 等我调好格式
百无聊赖x
可以获取pdf格式吗?方便平板学习,想随手画一些重点0.0