Appearance
AI 相关的面试题
langchain 相关知识
问题 1:LLM 出现幻觉(Hallucination)的深层原因是什么
- 语言模型是概率模型,不是事实模型:LLM 的本质是“预测下一个最可能的词”,不是在“查找真相”,而是在生成语言模式。当输入提示不明确或知识缺失时,会凭统计相关性“合理地编造”。
- 训练数据中存在噪声和虚假样本:大模型学习了互联网上的海量文本,而这些内容本身可能包含错误或臆测信息。模型学到这些偏差后,在回答中会自然复现。
- 缺乏事实验证机制:模型输出结果时不会自动校验真伪,也不会访问实时数据。在多轮推理中,错误会被“递进强化”——尤其是 Agent 模式下的反射循环,会放大错误逻辑。
- Prompt 上下文过短或缺乏约束:当上下文被截断、知识片段不完整,模型会自动“补空缺”,生成符合语义但不符合事实的回答。
- 任务模糊或目标歧义:如果任务没有明确评价标准,模型会更倾向于填补内容空白,从而编造细节。
问题 2:RAG(检索增强生成)是什么?
RAG(Retrieval-Augmented Generation)是当前企业级 AI 应用最核心的架构思路之一,让模型“具备最新知识”,而不依赖模型固有训练语料。
- 文档嵌入(Embedding)
- 把知识库(PDF、Markdown、数据库内容等)切成小块(Chunk)。
- 然后用 Embedding Model(如
text-embedding-3-large或bge-m3)将文本转为高维向量。
- 向量检索(Vector Search)
- 用户提问时,将 Query 也转成向量。
- 计算 Query 向量与文档向量的相似度。
- 检索出最相关的若干段落。
- 增强生成(Augmented Generation)
- 把检索结果拼入 Prompt 的上下文中。
- 交由 LLM 生成最终回答。
问题 3:提示词技巧有哪些
我一般会按这个结构来写提示词,稳定性会高很多:
- 先定角色和目标:比如“你是资深前端架构师,目标是给出可落地方案”。
- 补充上下文:项目背景、技术栈、约束条件、输入数据格式。
- 明确输出格式:比如“用分点回答,先结论后原因,最后给代码示例”。
- 写清边界和禁区:不能编造、不能省略风险、不能使用某些库。
- 给 1-2 个高质量示例:Few-shot 对输出风格和准确度帮助很大。
- 要求自检:让模型输出前做一次检查,比如“是否满足约束、是否有不确定项”。
- 迭代优化:一次不满意就补充反馈,告诉它哪里偏了。
问题 4:在上下文窗口大小限制的情况下,如何更合理的处理上下文
我的思路是“分层 + 按需注入”,不是把所有内容一股脑塞进去:
- 分层优先级:系统指令 > 当前任务核心信息 > 最近相关对话 > 历史背景。
- 只保留与当前问题强相关的信息:无关上下文直接丢掉。
- 用检索代替硬塞:历史知识放到向量库,提问时再召回。
- 短期窗口 + 长期摘要:最近几轮保留原文,久远内容保留摘要。
- 结构化注入:把上下文整理成
事实/约束/待办/结论,比原始聊天记录更省 token。 - 控制预算:给每类信息设 token 上限,防止某一块把窗口吃满。
langchain 中 SummarizationMiddleware 摘要中间件的使用和源码分析
对应链接:https://docs.langchain.com/oss/python/langchain/middleware/built-in#summarization
使用
在接近 token 限制时自动总结对话历史,保留最近的消息同时压缩旧的内容。摘要对以下情况很有用:
- 长时间对话超出上下文窗口
- 多轮对话,包含大量历史记录
- 需要保留完整对话上下文的应用场景
使用也非常简单:
py
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
agent = create_agent(
model="gpt-4.1",
middleware=[
SummarizationMiddleware(
model="gpt-4.1-mini",
trigger=("tokens", 4000),
keep=("messages", 20),
),
],
)源码
这个内容看似很多,其实核心就是写好一段提示词,根据用户希望的切割方式(例如:按message数量、按token数量、按比例),在每次调用大模型前的钩子进行处理,然后调用大模型对内容进行总结,总结完成后的内容再拼接到原来的消息队列中。
源码
py
"""Summarization middleware(带注释副本)。
这个文件是 LangChain `SummarizationMiddleware` 的注释版拷贝,
用于更直观地理解其控制流程。
"""
import uuid
import warnings
from collections.abc import Callable, Iterable, Mapping
from functools import partial
from typing import Any, Literal, cast
from langchain_core.messages import (
AIMessage,
AnyMessage,
MessageLikeRepresentation,
RemoveMessage,
ToolMessage,
)
from langchain_core.messages.human import HumanMessage
from langchain_core.messages.utils import (
count_tokens_approximately,
get_buffer_string,
trim_messages,
)
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.runtime import Runtime
from typing_extensions import override
from langchain.agents.middleware.types import AgentMiddleware, AgentState, ContextT, ResponseT
from langchain.chat_models import BaseChatModel, init_chat_model
# TokenCounter 接收消息序列并返回 token 数。
TokenCounter = Callable[[Iterable[MessageLikeRepresentation]], int]
DEFAULT_SUMMARY_PROMPT = """<role>
上下文提取助手
</role>
<primary_objective>
你在此任务中的唯一目标,是从下面的对话历史中提取最高质量、最相关的上下文信息。
</primary_objective>
<objective_information>
你即将接近可接受的输入 token 总上限,因此必须从对话历史中提取最高质量、最相关的信息。
这些上下文将覆盖下方展示的对话历史。因此,你提取的内容必须只包含对继续推进总体目标最重要的信息。
</objective_information>
# instructions 指示
<instructions>
下方的对话历史将被你在本步骤提取的上下文替换。
为了避免重复已经完成的操作,你提取的上下文应聚焦于对总体目标最关键的信息。
请使用以下章节组织你的摘要。每个章节都相当于一份检查清单:你必须填入相关信息;若该部分无内容,请明确写“无”。
## 会话意图
用户的主要目标或请求是什么?你正在尝试完成的整体任务是什么?内容应简洁,但要完整到足以理解整个会话的目的。
## 摘要
从对话历史中提取并记录所有最重要的上下文。包括本次对话中形成的重要选择、结论或策略;补充关键决策背后的理由;记录被否决的方案及未采纳原因。
## 产物
在本次对话中,创建、修改或访问了哪些产物、文件或资源?如果有文件修改,请列出具体文件路径并简要说明每个文件的改动。该部分用于避免产物信息被静默丢失。
## 下一步
为了达成会话意图,还剩哪些具体任务需要完成?你接下来应该做什么?
</instructions>
用户会向你发送完整的消息历史,你需要从中提取上下文以生成替代内容。请仔细通读全部内容,并深入判断哪些信息对总体目标最重要且必须保留:
基于以上要求,请认真阅读完整对话历史,提取最重要、最相关的上下文来替换原历史,从而释放对话上下文空间。
只输出提取后的上下文。不要添加任何额外信息,也不要在提取内容前后添加其他文字。
<messages>
待总结的消息:
{messages}
</messages>""" # noqa: E501
# 默认行为:
# - 摘要后保留最近 20 条消息。
# - 生成摘要前,先把摘要输入裁剪到 4000 token。
# - 如果裁剪异常,回退为最后 15 条消息。
_DEFAULT_MESSAGES_TO_KEEP = 20
_DEFAULT_TRIM_TOKEN_LIMIT = 4000
_DEFAULT_FALLBACK_MESSAGE_COUNT = 15
# 上下文策略的类型别名。
ContextFraction = tuple[Literal["fraction"], float]
ContextTokens = tuple[Literal["tokens"], int]
ContextMessages = tuple[Literal["messages"], int]
ContextSize = ContextFraction | ContextTokens | ContextMessages
"""
# 该函数返回一个偏函数(类似js的一个新函数)
res = _get_approximate_token_counter()
token_count = res('123') # 返回的是token数:15
"""
def _get_approximate_token_counter(model: BaseChatModel) -> TokenCounter:
"""按模型类型调整近似 token 计数参数。
Anthropic 聊天模型会使用专门的 chars-per-token 估计值。
"""
if model._llm_type == "anthropic-chat": # noqa: SLF001
return partial(
count_tokens_approximately,
use_usage_metadata_scaling=True,
chars_per_token=3.3,
)
return partial(count_tokens_approximately, use_usage_metadata_scaling=True)
class SummarizationMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]):
"""当达到阈值时,对旧会话历史做摘要压缩。
高层流程:
1) 每次模型调用前检查是否触发摘要。
2) 计算安全切分点(避免拆开 AI-Tool 成对消息)。
3) 对较早的前缀消息生成摘要。
4) 用“摘要消息 + 保留后缀”重写历史。
"""
"""
使用方法:
SummarizationMiddleware(
model="gpt-4.1-mini",
trigger=("tokens", 4000),
keep=("messages", 20),
),
"""
def __init__(
self,
model: str | BaseChatModel,
*,
trigger: ContextSize | list[ContextSize] | None = None,
keep: ContextSize = ("messages", _DEFAULT_MESSAGES_TO_KEEP),
token_counter: TokenCounter = count_tokens_approximately,
summary_prompt: str = DEFAULT_SUMMARY_PROMPT,
trim_tokens_to_summarize: int | None = _DEFAULT_TRIM_TOKEN_LIMIT,
**deprecated_kwargs: Any,
) -> None:
super().__init__()
# 支持模型名字符串或模型实例。
# isinstance()的核心作用是检查一个对象是否是指定类(或多个类)的实例,或者是该类子类的实例
if isinstance(model, str):
# 这是1.x新api,旧的是 ChatOpenAI
model = init_chat_model(model)
self.model = model
# 统一成列表形式,便于后续逐条判断触发条件。
# trigger=("tokens", 4000),
if trigger is None:
self.trigger: ContextSize | list[ContextSize] | None = None
trigger_conditions: list[ContextSize] = []
elif isinstance(trigger, list):
# 对列表里每一项做校验
validated_list = [self._validate_context_size(item, "trigger") for item in trigger]
self.trigger = validated_list
trigger_conditions = validated_list
else:
validated = self._validate_context_size(trigger, "trigger")
self.trigger = validated
trigger_conditions = [validated]
# self.trigger:保留用户传入的“原样”
# self._trigger_conditions:内部使用的、校验过的列表,长度可能是 0、1 或多个;后面逻辑只依赖它来做“是否触发摘要”的判断
self._trigger_conditions = trigger_conditions
# keep 只接受一个 ContextSize 元组。
# keep=("messages", 20)
self.keep = self._validate_context_size(keep, "keep")
# 未传自定义计数器时,使用按 provider 调优的近似计数器。
# 判断用户是否传递了自定义token计算,没有就使用上面定义的函数_get_approximate_token_counter
# 得到一个偏函数token_counter,后续通过token_counter()计算token数
if token_counter is count_tokens_approximately:
self.token_counter = _get_approximate_token_counter(self.model)
# 用于后缀二分比较时,关闭 usage_metadata 缩放。
self._partial_token_counter: TokenCounter = partial( # type: ignore[call-arg]
self.token_counter, use_usage_metadata_scaling=False
)
else:
self.token_counter = token_counter
self._partial_token_counter = token_counter
# 摘要提示词,默认为上面定义的DEFAULT_SUMMARY_PROMPT
self.summary_prompt = summary_prompt
# 摘要前裁剪的 token 上限,默认4000
self.trim_tokens_to_summarize = trim_tokens_to_summarize
# fraction 模式依赖 model.profile.max_input_tokens。
requires_profile = any(condition[0] == "fraction" for condition in self._trigger_conditions)
if self.keep[0] == "fraction":
requires_profile = True
if requires_profile and self._get_profile_limits() is None:
msg = (
"使用分数标记限制时,需要模型配置文件信息, "
"并且对于指定型号不可用。请使用绝对标记"
"改为计数,或通过"
'`\n\nChatModel(..., profile={"max_input_tokens": ...}).\n\n`'
"模型最大输入标记的期望整数值"
)
raise ValueError(msg)
@override
def before_model(
self, state: AgentState[Any], runtime: Runtime[ContextT]
) -> dict[str, Any] | None:
"""同步钩子:每次模型调用前执行。"""
# 实现功能:用来在上下文太长时先做摘要、再让模型只看到「摘要 + 最近几条消息」,避免超长上下文。
messages = state["messages"]
self._ensure_message_ids(messages)
# 计算token数,这是在init中初始化好的偏函数
total_tokens = self.token_counter(messages)
# 判断是否满足触发条件(messages/tokens/fraction)
# 没达到就不做事,返回 None,框架不会改 state。
if not self._should_summarize(messages, total_tokens):
return None
# 根据 keep 策略算一个下标:这条之前的算“旧历史”、之后的算“要保留的最近消息”。
cutoff_index = self._determine_cutoff_index(messages)
if cutoff_index <= 0:
return None
# 按 cutoff_index 切成两段:前面要拿去摘要,后面保留不动。
messages_to_summarize, preserved_messages = self._partition_messages(messages, cutoff_index)
# 用摘要模型对前面那段消息生成一段文字摘要(同步调用)
summary = self._create_summary(messages_to_summarize)
# 把摘要包成一条(或几条)固定格式的消息
new_messages = self._build_new_messages(summary)
# 用“摘要上下文 + 保留的最近消息”替换完整历史。
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*new_messages,
*preserved_messages,
]
}
@override
async def abefore_model(
self, state: AgentState[Any], runtime: Runtime[ContextT]
) -> dict[str, Any] | None:
"""异步钩子:每次模型调用前执行。"""
messages = state["messages"]
self._ensure_message_ids(messages)
total_tokens = self.token_counter(messages)
if not self._should_summarize(messages, total_tokens):
return None
cutoff_index = self._determine_cutoff_index(messages)
if cutoff_index <= 0:
return None
messages_to_summarize, preserved_messages = self._partition_messages(messages, cutoff_index)
summary = await self._acreate_summary(messages_to_summarize)
new_messages = self._build_new_messages(summary)
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*new_messages,
*preserved_messages,
]
}
def _should_summarize_based_on_reported_tokens(
self, messages: list[AnyMessage], threshold: float
) -> bool:
"""备用判断:使用最近 AIMessage 上报的 token 用量。"""
last_ai_message = next((msg for msg in reversed(messages) if isinstance(msg, AIMessage)), None)
if (
isinstance(last_ai_message, AIMessage)
and last_ai_message.usage_metadata is not None
and (reported_tokens := last_ai_message.usage_metadata.get("total_tokens", -1))
and reported_tokens >= threshold
and (message_provider := last_ai_message.response_metadata.get("model_provider"))
and message_provider == self.model._get_ls_params().get("ls_provider") # noqa: SLF001
):
return True
return False
# 判断当前对话是否该做一次摘要
def _should_summarize(self, messages: list[AnyMessage], total_tokens: int) -> bool:
"""判断是否满足触发条件(messages/tokens/fraction)。"""
if not self._trigger_conditions:
return False
for kind, value in self._trigger_conditions:
if kind == "messages" and len(messages) >= value:
return True
if kind == "tokens" and total_tokens >= value:
return True
if kind == "tokens" and self._should_summarize_based_on_reported_tokens(messages, value):
return True
if kind == "fraction":
max_input_tokens = self._get_profile_limits()
if max_input_tokens is None:
continue
threshold = int(max_input_tokens * value)
if threshold <= 0:
threshold = 1
if total_tokens >= threshold:
return True
if self._should_summarize_based_on_reported_tokens(messages, threshold):
return True
return False
# 根据 keep 策略计算历史切分点。
"""
self.keep 的 kind 决定怎么算切分点:
keep == ("messages", N):按条数保留,直接走「保留最近 N 条」的逻辑,得到初步切分点后再做安全调整。
keep == ("tokens", N) 或 ("fraction", r):先按 token 预算算切分点;若算不出来(例如没有 profile、或 token 逻辑失败),就退化成「按条数保留」,用默认条数(如 20 条)再算一次。
也就是说:要么按条数、要么按 token(含比例),按 token 失败就退回按条数。
1、按条数:先算目标下标,再调到“安全”边界
当按条数保留时,用 _find_safe_cutoff(messages, messages_to_keep):
在保证不拆开「AI–Tool 成对消息」的前提下,把切分点前移或后移到安全位置,返回最终下标。
2、按 token:二分找“最小后缀起点”,再调到安全边界
当按 token 保留时,用 _find_token_based_cutoff(messages):
二分查找最早下标,使后缀 token 数不超过 keep 预算,得到初步切分点后再做安全调整。
如果token在句子中,则整段保留
"""
def _determine_cutoff_index(self, messages: list[AnyMessage]) -> int:
"""根据 keep 策略计算历史切分点。"""
kind, value = self.keep
if kind in {"tokens", "fraction"}:
token_based_cutoff = self._find_token_based_cutoff(messages)
if token_based_cutoff is not None:
return token_based_cutoff
return self._find_safe_cutoff(messages, _DEFAULT_MESSAGES_TO_KEEP)
return self._find_safe_cutoff(messages, cast("int", value))
def _find_token_based_cutoff(self, messages: list[AnyMessage]) -> int | None:
"""找到最早下标,使后缀 token 数不超过 keep 预算。"""
if not messages:
return 0
kind, value = self.keep
if kind == "fraction":
max_input_tokens = self._get_profile_limits()
if max_input_tokens is None:
return None
target_token_count = int(max_input_tokens * value)
elif kind == "tokens":
target_token_count = int(value)
else:
return None
if target_token_count <= 0:
target_token_count = 1
if self.token_counter(messages) <= target_token_count:
return 0
# 二分查找:寻找满足 token 预算的最小后缀起点。
left, right = 0, len(messages)
cutoff_candidate = len(messages)
max_iterations = len(messages).bit_length() + 1
for _ in range(max_iterations):
if left >= right:
break
mid = (left + right) // 2
if self._partial_token_counter(messages[mid:]) <= target_token_count:
cutoff_candidate = mid
right = mid
else:
left = mid + 1
if cutoff_candidate == len(messages):
cutoff_candidate = left
if cutoff_candidate >= len(messages):
if len(messages) == 1:
return 0
cutoff_candidate = len(messages) - 1
# 避免把 tool 响应块与其来源 AI tool call 拆开。
return self._find_safe_cutoff_point(messages, cutoff_candidate)
def _get_profile_limits(self) -> int | None:
"""读取 model.profile['max_input_tokens'](若存在)。"""
try:
profile = self.model.profile
except AttributeError:
return None
if not isinstance(profile, Mapping):
return None
max_input_tokens = profile.get("max_input_tokens")
if not isinstance(max_input_tokens, int):
return None
return max_input_tokens
@staticmethod
def _validate_context_size(context: ContextSize, parameter_name: str) -> ContextSize:
"""校验配置元组格式和值范围。"""
kind, value = context
if kind == "fraction":
if not 0 < value <= 1:
raise ValueError(
f"Fractional {parameter_name} values must be between 0 and 1, got {value}."
)
elif kind in {"tokens", "messages"}:
if value <= 0:
raise ValueError(
f"{parameter_name} thresholds must be greater than 0, got {value}."
)
else:
raise ValueError(f"Unsupported context size type {kind} for {parameter_name}.")
return context
@staticmethod
def _build_new_messages(summary: str) -> list[HumanMessage]:
"""把摘要包装成单条 HumanMessage。"""
return [
HumanMessage(
content=f"Here is a summary of the conversation to date:\n\n{summary}",
additional_kwargs={"lc_source": "summarization"},
)
]
@staticmethod
def _ensure_message_ids(messages: list[AnyMessage]) -> None:
"""为缺失 ID 的消息补 ID,确保 reducer 可安全替换/删除。"""
for msg in messages:
if msg.id is None:
msg.id = str(uuid.uuid4())
@staticmethod
def _partition_messages(
conversation_messages: list[AnyMessage], cutoff_index: int
) -> tuple[list[AnyMessage], list[AnyMessage]]:
"""把消息切成前缀(要摘要)和后缀(要保留)。"""
messages_to_summarize = conversation_messages[:cutoff_index]
preserved_messages = conversation_messages[cutoff_index:]
return messages_to_summarize, preserved_messages
def _find_safe_cutoff(self, messages: list[AnyMessage], messages_to_keep: int) -> int:
"""先按消息条数算切分点,再调整到安全的 tool 边界。"""
if len(messages) <= messages_to_keep:
return 0
target_cutoff = len(messages) - messages_to_keep
return self._find_safe_cutoff_point(messages, target_cutoff)
@staticmethod
def _find_safe_cutoff_point(messages: list[AnyMessage], cutoff_index: int) -> int:
"""避免切在 AI-tool 调用链中间。
如果切分点落在 ToolMessage 上,优先向前回溯到发起该 tool_call 的 AIMessage;
若找不到匹配 AIMessage,则向后跳过连续 ToolMessage。
"""
if cutoff_index >= len(messages) or not isinstance(messages[cutoff_index], ToolMessage):
return cutoff_index
tool_call_ids: set[str] = set()
idx = cutoff_index
while idx < len(messages) and isinstance(messages[idx], ToolMessage):
tool_msg = cast("ToolMessage", messages[idx])
if tool_msg.tool_call_id:
tool_call_ids.add(tool_msg.tool_call_id)
idx += 1
for i in range(cutoff_index - 1, -1, -1):
msg = messages[i]
if isinstance(msg, AIMessage) and msg.tool_calls:
ai_tool_call_ids = {tc.get("id") for tc in msg.tool_calls if tc.get("id")}
if tool_call_ids & ai_tool_call_ids:
return i
return idx
def _create_summary(self, messages_to_summarize: list[AnyMessage]) -> str:
"""同步方式调用摘要模型生成摘要。"""
if not messages_to_summarize:
return "No previous conversation history."
trimmed_messages = self._trim_messages_for_summary(messages_to_summarize)
if not trimmed_messages:
return "Previous conversation was too long to summarize."
# 转成纯缓冲区文本,避免消息对象字符串化导致元信息膨胀。
formatted_messages = get_buffer_string(trimmed_messages)
try:
response = self.model.invoke(
self.summary_prompt.format(messages=formatted_messages).rstrip(),
config={"metadata": {"lc_source": "summarization"}},
)
return response.text.strip()
except Exception as e:
# 失败时返回错误文本,保证流程可继续。
return f"Error generating summary: {e!s}"
async def _acreate_summary(self, messages_to_summarize: list[AnyMessage]) -> str:
"""异步方式调用摘要模型生成摘要。"""
if not messages_to_summarize:
return "No previous conversation history."
trimmed_messages = self._trim_messages_for_summary(messages_to_summarize)
if not trimmed_messages:
return "Previous conversation was too long to summarize."
formatted_messages = get_buffer_string(trimmed_messages)
try:
response = await self.model.ainvoke(
self.summary_prompt.format(messages=formatted_messages).rstrip(),
config={"metadata": {"lc_source": "summarization"}},
)
return response.text.strip()
except Exception as e:
return f"Error generating summary: {e!s}"
def _trim_messages_for_summary(self, messages: list[AnyMessage]) -> list[AnyMessage]:
"""裁剪用于生成摘要的输入消息。
策略说明:
- 保留最新内容(`strategy='last'`)
- 尽量从 human 消息边界开始(`start_on='human'`)
- 保留 system 消息
- 允许部分裁剪
"""
try:
if self.trim_tokens_to_summarize is None:
return messages
return cast(
"list[AnyMessage]",
trim_messages(
messages,
max_tokens=self.trim_tokens_to_summarize,
token_counter=self.token_counter,
start_on="human",
strategy="last",
allow_partial=True,
include_system=True,
),
)
except Exception:
return messages[-_DEFAULT_FALLBACK_MESSAGE_COUNT:]Short-term memory 短期记忆
对应链接:https://docs.langchain.com/oss/python/langchain/short-term-memory
在langchain官网有说到:
启用短期记忆后,长对话可能会超过 LLM 的上下文窗口。常见解决方案有:
- 修剪消息:在调用 LLM 之前删除前 N 条或后 N 条消息
- 删除消息:永久从 LangGraph 状态中删除消息
- 总结消息:总结历史中的较早消息并用摘要替换它们
- 自定义策略:自定义策略(例如,消息过滤等)
这使得智能体能够在不超过 LLM 限制的情况下,持续跟踪对话内容。
例如:修剪消息
大多数 LLM 都有一个最大支持的上下文窗口(以 token 计)。
决定何时截断消息的一种方法是计算消息历史中的 token 数量,并在接近该限制时截断。如果你使用 LangChain,可以使用 trim messages 工具,并指定要保留的 token 数量,以及 strategy (例如,保留最后一个 max_tokens )来处理边界。
trim messages 工具
py
from langchain.messages import RemoveMessage
from langchain_ollama import ChatOllama
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model
from langgraph.runtime import Runtime
from langchain_core.runnables import RunnableConfig
from typing import Any
@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
"""仅保留最后几条消息以适应上下文窗口。"""
messages = state["messages"]
if len(messages) <= 3:
return None # No changes needed
# 保留第一次消息和最近的3条或4条
first_msg = messages[0]
recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
new_messages = [first_msg] + recent_messages
return {
"messages": [
RemoveMessage(id=REMOVE_ALL_MESSAGES),
*new_messages
]
}
agent = create_agent(
model=ChatOllama(model="qwen3:0.6b"),
middleware=[trim_messages],
checkpointer=InMemorySaver(),
)
config: RunnableConfig = {"configurable": {"thread_id": "1"}}
agent.invoke({"messages": "你好,我叫鲍勃"}, config)
agent.invoke({"messages": "写一首关于猫的短诗"}, config)
agent.invoke({"messages": "现在为狗写一首同样的"}, config)
final_response = agent.invoke({"messages": "我叫什么名字?"}, config)
final_response["messages"][-1].pretty_print()问题 5:上下文 token 超过,如何对其进行压缩
我一般按“三步压缩”来做:
- 基础清洗(无损优先)
- 删空行、重复日志、无关注释、调试输出、重复片段。
- 语义压缩(有损但保核心)
- 把长文本变成结构化摘要:
结论 + 关键证据 + 限制条件。 - 代码场景保留:函数签名、关键逻辑、依赖关系、异常处理。
- 把长文本变成结构化摘要:
- 分块召回(避免一次塞满)
- 按主题切块并打标签,提问时只召回 Top-K 相关片段。
实战里我会优先无损压缩,不够再做语义摘要,最后再加检索召回。
问题 6:向量数据库你了解的有哪些
我接触过这些:
Pinecone(托管型,接入快)Milvus(开源,生态成熟)Qdrant(开源,过滤能力不错)Weaviate(内置语义特性比较多)Chroma(本地开发常用)pgvector(基于 PostgreSQL,适合和业务库结合)Elasticsearch/OpenSearch的向量检索能力(混合检索常见)
问题 7:向量数据库和传统数据库的区别
可以这么回答,更准确:
- 存储对象不同:传统库主要存结构化字段;向量库核心是高维向量。
- 查询方式不同:传统库是精确过滤(
=、>、LIKE);向量库是相似度检索(Top-K)。 - 索引不同:传统库常见 B+Tree、倒排;向量库常见 HNSW、IVF、PQ 等近似最近邻索引。
- 目标不同:传统库强调强事务和精确查询;向量库强调语义召回效率。
- 实际不是二选一:线上通常“混合用”,先结构化过滤,再向量召回。
问题 8:向量数据库的匹配相似度的方法
常见有三类:
- 余弦相似度(Cosine):看方向是否一致,文本语义检索最常见。
- 点积(Dot Product):和向量长度有关,很多 embedding 模型也支持。
- 欧氏距离(L2):看空间距离,距离越小越相似。
面试里补一句:具体用哪种,要和 embedding 模型的训练目标保持一致。
问题 9:对余弦相似度的理解,是如何计算的
可以这样说:
- 余弦相似度衡量的是两个向量夹角的余弦值,范围一般在
[-1, 1]。越接近1,方向越一致,语义越相近。
公式:cos(a,b) = (a·b) / (||a|| * ||b||) - 欧氏距离L2是两个向量在空间中的直线距离,越小越相似。
公式:dist(a,b) = sqrt(Σ(ai - bi)^2)
plaintext
B(4,6)
/|
/ |
d=5/ | Δy=4
/ |
/ |
A(1,2)───▶ Δx=3
横向差:Δx=3
纵向差:Δy=4
直线距离:斜边 d=5问题 10:MCP 由几部分组成
MCP 可以理解成 3 层:
- MCP Host(宿主):比如 IDE 或 Agent 应用,负责发起请求。
- MCP Client(客户端连接层):在 Host 内,负责和 MCP Server 建连、通信。
- MCP Server(能力提供方):把工具、数据资源、提示模板标准化暴露出来。
核心价值是:用统一协议把“模型”和“外部工具/数据”解耦。
问题 11:LangChain 去构建一个 Agent 的步骤是什么
我一般按 7 步走:
- 定义任务目标和成功标准。
- 选模型(推理能力、速度、成本)。
- 定义工具(搜索、数据库、代码执行等)和输入输出 schema。
- 设计 Agent 提示词(角色、规则、失败处理)。
- 组装执行流程(单 Agent 或工作流,是否循环反思)。
- 接入记忆(短期对话、长期用户画像/知识)。
- 做评测和观测(成功率、时延、成本、错误类型),再迭代。
问题 12:在 LangChain 中注册 tools 有哪些方法
- 通过
tools参数传入工具列表 - 通过
model.bind_tools方法绑定工具
问题 13:LangChain 大版本升级有哪些核心变化
从我的使用上,变化有:
- 在0.3版本的创建agent有create_react_agent和create_function_call_agent,1.x的版本统一为create_agent
- 在1.x的版本中添加了中间件,可以根据钩子作用到agent的不同时机,可扩展性更高
- langgraph变成了langchain的底层框架
问题 14:Memory 有使用过吗,对它的理解
我用过。我的理解是:Memory 的本质是让 Agent 具备“跨轮次连续性”。
- 没有 memory:每轮都像第一次聊天。
- 有 memory:能记住用户偏好、历史结论、上下文约束,回答更连贯。
但 memory 不是越多越好,重点是“该记什么、什么时候记、什么时候清理”。
问题 15:Memory 的短中长期记忆是什么
可以这样分:
- 短期记忆:当前会话最近几轮内容,保证对话连贯。
- 中期记忆:当前任务阶段的阶段性结论,比如“已排除的原因、已确认的方案”。内容过多可以进行上下文压缩。
- 长期记忆:跨会话保留的稳定信息,比如用户偏好、业务术语、常用配置。
我一般会把“高频稳定事实”放长期,把“临时过程信息”放短中期。
问题 16:Memory 裁剪压缩
我的做法是“窗口裁剪 + 摘要压缩 + 关键事实固化”:
- 窗口裁剪:只保留最近 N 轮原始消息。
- 摘要压缩:把更早消息压成结构化摘要(目标、约束、结论、待办)。
md
你是专业的对话摘要助手。请把下面**更早的历史消息**,提炼成结构化摘要,只输出结果,不要多余解释。
请严格按以下 4 个维度输出:
1. **目标**:用户想做什么、核心诉求
2. **约束**:限制条件、技术栈、规则、要求
3. **结论**:已完成、已确定、已解决的内容
4. **待办**:还没做、接下来要做、需要跟进的事项
要求:
- 只保留关键信息,极度精简
- 不重复、不展开、不抒情
- 用短句、要点式呈现
- 只输出摘要,不要其他文字- 关键事实固化:把用户长期偏好和关键实体单独存储,避免被摘要丢掉。
- 按 token 动态触发:不是固定轮数,接近上限就触发裁剪/压缩。
问题 17:Memory 在什么时机去存储,以及内容的召回和更新的时机
可以按“存储-召回-更新”三个阶段回答:
- 存储时机
- 用户明确偏好(如语言风格、输出格式)。
- 任务关键结论(如最终方案、已确认事实)。
- 可复用经验(如排障路径、最佳实践)。
- 召回时机
- 新问题开始时先召回一次用户画像和历史约束。
- 生成前再按当前 query 做一次语义召回。
解释
召回时机(拿出来用),需要召回两次
- 新问题一上来,先 “召回一次” = 先把这个人的基本信息调出来
md
比如:
- 他喜欢简洁 / 详细?
- 他是前端开发?
- 之前定过什么规则?
就像:见面前先看一眼这个人的档案,知道他是谁。- 正式回答前,再 “语义召回” = 根据他现在这句问题,再精准捞一遍相关历史
md
比如他现在问:“我这段 Vue 代码为啥报错”
系统就去历史里找:
- 他之前写过什么 Vue 代码
- 他之前报过什么错
- 他用的什么版本、什么环境
就像:他现在问啥,我就只把跟这个问题相关的记忆翻出来。- 更新时机
- 用户纠正旧信息时要覆盖更新。
- 长期未使用且置信度低的信息要降权或清理。
问题 18:如何设计记忆机制?以便在第 100 轮对话时还能精准记得第 1 轮的细节?
我会设计成“4 层记忆”:
- 原始日志层:完整保存所有对话,保证可追溯。
- 滑动窗口层:实时给模型最近 K 轮,保证当前连贯性。
- 摘要层:每隔一段对历史做结构化摘要,降低 token 成本。
- 实体/事实层:抽取人名、项目、偏好、约束等可检索事实,按需精准召回。
再加两点机制:
- 冲突检测:新旧事实冲突时不直接覆盖,标记版本和置信度。
- 遗忘机制:低价值、过期信息定期降权清理,避免噪声污染。
问题 19:工具权限的设计
我会从“原则 + 分级 + 落地”三个层面讲:
- 原则
- 最小权限:只给完成任务必需权限。
- 显式授权:高风险操作必须二次确认。
- 全链路审计:每次工具调用都可追踪。
- 分级
- L0:只读公开信息。
- L1:内部只读数据。
- L2:可写操作(发消息、改数据)需要审批或策略校验。
- L3:高危操作(删库、生产发布、转账)必须人工审批。
- 工程落地
- 工具注册时打风险标签。
- 执行前经过权限中间件校验。
- 动态暴露工具,没权限的工具不要给模型看到。
- 增加限流、超时、重试和熔断,防止误调用扩散。
问题 20:像 Gemini 的 Deep Research,你大概了解它怎么实现?如果让你实现一个类似“深度研究 Agent”,你会怎么开始?
我会把它当成一个“可循环的研究工作流”来做,而不是单次问答:
- 任务分流(Router)
- 先判断问题是否值得 deep research。
- 简单问答直接返回,复杂问题进入研究流程。
- 规划(Planner)
- 把大问题拆成多个可验证子问题,定义每个子问题的证据标准。
- 检索与阅读(Retriever + Reader)
- 多路检索(搜索引擎、指定站点、内部知识库)。
- 抽取关键信息并保留来源、时间、原文片段。
- 证据评估(Evaluator)
- 去重、冲突检测、来源可信度打分、时效性校验。
- 答案合成(Synthesizer)
- 按“结论-证据-不确定性-引用”输出,保证可核查。
- 反思迭代(Critic Loop)
- 如果证据不足或冲突未解决,自动回到 Planner 再补查一轮。
工程上我会先做 MVP:单 Agent + 工具链 + 可观测日志;跑通后再升级成多 Agent 协作。