Skip to content

AI 相关的面试题

问题 1:LLM 出现幻觉(Hallucination)的深层原因是什么

  • 语言模型是概率模型,不是事实模型:LLM 的本质是“预测下一个最可能的词”,不是在“查找真相”,而是在生成语言模式。当输入提示不明确或知识缺失时,会凭统计相关性“合理地编造”。
  • 训练数据中存在噪声和虚假样本:大模型学习了互联网上的海量文本,而这些内容本身可能包含错误或臆测信息。模型学到这些偏差后,在回答中会自然复现。
  • 缺乏事实验证机制:模型输出结果时不会自动校验真伪,也不会访问实时数据。在多轮推理中,错误会被“递进强化”——尤其是 Agent 模式下的反射循环,会放大错误逻辑。
  • Prompt 上下文过短或缺乏约束:当上下文被截断、知识片段不完整,模型会自动“补空缺”,生成符合语义但不符合事实的回答。
  • 任务模糊或目标歧义:如果任务没有明确评价标准,模型会更倾向于填补内容空白,从而编造细节。

问题 2:RAG(检索增强生成)是什么?

RAG(Retrieval-Augmented Generation)是当前企业级 AI 应用最核心的架构思路之一,让模型“具备最新知识”,而不依赖模型固有训练语料。

  1. 文档嵌入(Embedding)
    • 把知识库(PDF、Markdown、数据库内容等)切成小块(Chunk)。
    • 然后用 Embedding Model(如 text-embedding-3-largebge-m3)将文本转为高维向量。
  2. 向量检索(Vector Search)
    • 用户提问时,将 Query 也转成向量。
    • 计算 Query 向量与文档向量的相似度。
    • 检索出最相关的若干段落。
  3. 增强生成(Augmented Generation)
    • 把检索结果拼入 Prompt 的上下文中。
    • 交由 LLM 生成最终回答。

问题 3:提示词技巧有哪些

我一般会按这个结构来写提示词,稳定性会高很多:

  1. 先定角色和目标:比如“你是资深前端架构师,目标是给出可落地方案”。
  2. 补充上下文:项目背景、技术栈、约束条件、输入数据格式。
  3. 明确输出格式:比如“用分点回答,先结论后原因,最后给代码示例”。
  4. 写清边界和禁区:不能编造、不能省略风险、不能使用某些库。
  5. 给 1-2 个高质量示例:Few-shot 对输出风格和准确度帮助很大。
  6. 要求自检:让模型输出前做一次检查,比如“是否满足约束、是否有不确定项”。
  7. 迭代优化:一次不满意就补充反馈,告诉它哪里偏了。

问题 4:在上下文窗口大小限制的情况下,如何更合理的处理上下文

我的思路是“分层 + 按需注入”,不是把所有内容一股脑塞进去:

  1. 分层优先级:系统指令 > 当前任务核心信息 > 最近相关对话 > 历史背景。
  2. 只保留与当前问题强相关的信息:无关上下文直接丢掉。
  3. 用检索代替硬塞:历史知识放到向量库,提问时再召回。
  4. 短期窗口 + 长期摘要:最近几轮保留原文,久远内容保留摘要。
  5. 结构化注入:把上下文整理成 事实/约束/待办/结论,比原始聊天记录更省 token。
  6. 控制预算:给每类信息设 token 上限,防止某一块把窗口吃满。
langchain 中 SummarizationMiddleware 摘要中间件的使用和源码分析

对应链接:https://docs.langchain.com/oss/python/langchain/middleware/built-in#summarization

使用

在接近 token 限制时自动总结对话历史,保留最近的消息同时压缩旧的内容。摘要对以下情况很有用:

  • 长时间对话超出上下文窗口
  • 多轮对话,包含大量历史记录
  • 需要保留完整对话上下文的应用场景

使用也非常简单:

py
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware

agent = create_agent(
    model="gpt-4.1",
    middleware=[
        SummarizationMiddleware(
            model="gpt-4.1-mini",
            trigger=("tokens", 4000),
            keep=("messages", 20),
        ),
    ],
)

源码

这个内容看似很多,其实核心就是写好一段提示词,根据用户希望的切割方式(例如:按message数量、按token数量、按比例),在每次调用大模型前的钩子进行处理,然后调用大模型对内容进行总结,总结完成后的内容再拼接到原来的消息队列中。

源码
py
"""Summarization middleware(带注释副本)。

这个文件是 LangChain `SummarizationMiddleware` 的注释版拷贝,
用于更直观地理解其控制流程。
"""

import uuid
import warnings
from collections.abc import Callable, Iterable, Mapping
from functools import partial
from typing import Any, Literal, cast

from langchain_core.messages import (
    AIMessage,
    AnyMessage,
    MessageLikeRepresentation,
    RemoveMessage,
    ToolMessage,
)
from langchain_core.messages.human import HumanMessage
from langchain_core.messages.utils import (
    count_tokens_approximately,
    get_buffer_string,
    trim_messages,
)
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.runtime import Runtime
from typing_extensions import override

from langchain.agents.middleware.types import AgentMiddleware, AgentState, ContextT, ResponseT
from langchain.chat_models import BaseChatModel, init_chat_model

# TokenCounter 接收消息序列并返回 token 数。
TokenCounter = Callable[[Iterable[MessageLikeRepresentation]], int]

DEFAULT_SUMMARY_PROMPT = """<role>
上下文提取助手
</role>

<primary_objective>
你在此任务中的唯一目标,是从下面的对话历史中提取最高质量、最相关的上下文信息。
</primary_objective>

<objective_information>
你即将接近可接受的输入 token 总上限,因此必须从对话历史中提取最高质量、最相关的信息。
这些上下文将覆盖下方展示的对话历史。因此,你提取的内容必须只包含对继续推进总体目标最重要的信息。
</objective_information>

# instructions 指示
<instructions>
下方的对话历史将被你在本步骤提取的上下文替换。
为了避免重复已经完成的操作,你提取的上下文应聚焦于对总体目标最关键的信息。

请使用以下章节组织你的摘要。每个章节都相当于一份检查清单:你必须填入相关信息;若该部分无内容,请明确写“无”。

## 会话意图
用户的主要目标或请求是什么?你正在尝试完成的整体任务是什么?内容应简洁,但要完整到足以理解整个会话的目的。

## 摘要
从对话历史中提取并记录所有最重要的上下文。包括本次对话中形成的重要选择、结论或策略;补充关键决策背后的理由;记录被否决的方案及未采纳原因。

## 产物
在本次对话中,创建、修改或访问了哪些产物、文件或资源?如果有文件修改,请列出具体文件路径并简要说明每个文件的改动。该部分用于避免产物信息被静默丢失。

## 下一步
为了达成会话意图,还剩哪些具体任务需要完成?你接下来应该做什么?

</instructions>

用户会向你发送完整的消息历史,你需要从中提取上下文以生成替代内容。请仔细通读全部内容,并深入判断哪些信息对总体目标最重要且必须保留:

基于以上要求,请认真阅读完整对话历史,提取最重要、最相关的上下文来替换原历史,从而释放对话上下文空间。
只输出提取后的上下文。不要添加任何额外信息,也不要在提取内容前后添加其他文字。

<messages>
待总结的消息:
{messages}
</messages>"""  # noqa: E501

# 默认行为:
# - 摘要后保留最近 20 条消息。
# - 生成摘要前,先把摘要输入裁剪到 4000 token。
# - 如果裁剪异常,回退为最后 15 条消息。
_DEFAULT_MESSAGES_TO_KEEP = 20
_DEFAULT_TRIM_TOKEN_LIMIT = 4000
_DEFAULT_FALLBACK_MESSAGE_COUNT = 15

# 上下文策略的类型别名。
ContextFraction = tuple[Literal["fraction"], float]
ContextTokens = tuple[Literal["tokens"], int]
ContextMessages = tuple[Literal["messages"], int]
ContextSize = ContextFraction | ContextTokens | ContextMessages

"""
# 该函数返回一个偏函数(类似js的一个新函数)
res = _get_approximate_token_counter()
token_count = res('123') # 返回的是token数:15
"""
def _get_approximate_token_counter(model: BaseChatModel) -> TokenCounter:
    """按模型类型调整近似 token 计数参数。

    Anthropic 聊天模型会使用专门的 chars-per-token 估计值。
    """
    if model._llm_type == "anthropic-chat":  # noqa: SLF001
        return partial(
            count_tokens_approximately,
            use_usage_metadata_scaling=True,
            chars_per_token=3.3,
        )
    return partial(count_tokens_approximately, use_usage_metadata_scaling=True)


class SummarizationMiddleware(AgentMiddleware[AgentState[ResponseT], ContextT, ResponseT]):
    """当达到阈值时,对旧会话历史做摘要压缩。

    高层流程:
    1) 每次模型调用前检查是否触发摘要。
    2) 计算安全切分点(避免拆开 AI-Tool 成对消息)。
    3) 对较早的前缀消息生成摘要。
    4) 用“摘要消息 + 保留后缀”重写历史。
    """

    """
    使用方法:
    SummarizationMiddleware(
        model="gpt-4.1-mini",
        trigger=("tokens", 4000),
        keep=("messages", 20),
    ),
    """

    def __init__(
        self,
        model: str | BaseChatModel,
        *,
        trigger: ContextSize | list[ContextSize] | None = None,
        keep: ContextSize = ("messages", _DEFAULT_MESSAGES_TO_KEEP),
        token_counter: TokenCounter = count_tokens_approximately,
        summary_prompt: str = DEFAULT_SUMMARY_PROMPT,
        trim_tokens_to_summarize: int | None = _DEFAULT_TRIM_TOKEN_LIMIT,
        **deprecated_kwargs: Any,
    ) -> None:
        super().__init__()

        # 支持模型名字符串或模型实例。
        # isinstance()的核心作用是检查一个对象是否是指定类(或多个类)的实例,或者是该类子类的实例
        if isinstance(model, str):
            # 这是1.x新api,旧的是 ChatOpenAI
            model = init_chat_model(model)

        self.model = model

        # 统一成列表形式,便于后续逐条判断触发条件。
        # trigger=("tokens", 4000),
        if trigger is None:
            self.trigger: ContextSize | list[ContextSize] | None = None
            trigger_conditions: list[ContextSize] = []
        elif isinstance(trigger, list):
            # 对列表里每一项做校验
            validated_list = [self._validate_context_size(item, "trigger") for item in trigger]
            self.trigger = validated_list
            trigger_conditions = validated_list
        else:
            validated = self._validate_context_size(trigger, "trigger")
            self.trigger = validated
            trigger_conditions = [validated]
        # self.trigger:保留用户传入的“原样”
        # self._trigger_conditions:内部使用的、校验过的列表,长度可能是 0、1 或多个;后面逻辑只依赖它来做“是否触发摘要”的判断
        self._trigger_conditions = trigger_conditions

        # keep 只接受一个 ContextSize 元组。
        # keep=("messages", 20)
        self.keep = self._validate_context_size(keep, "keep")

        # 未传自定义计数器时,使用按 provider 调优的近似计数器。
        # 判断用户是否传递了自定义token计算,没有就使用上面定义的函数_get_approximate_token_counter
        # 得到一个偏函数token_counter,后续通过token_counter()计算token数
        if token_counter is count_tokens_approximately:
            self.token_counter = _get_approximate_token_counter(self.model)
            # 用于后缀二分比较时,关闭 usage_metadata 缩放。
            self._partial_token_counter: TokenCounter = partial(  # type: ignore[call-arg]
                self.token_counter, use_usage_metadata_scaling=False
            )
        else:
            self.token_counter = token_counter
            self._partial_token_counter = token_counter

        # 摘要提示词,默认为上面定义的DEFAULT_SUMMARY_PROMPT
        self.summary_prompt = summary_prompt
        # 摘要前裁剪的 token 上限,默认4000
        self.trim_tokens_to_summarize = trim_tokens_to_summarize

        # fraction 模式依赖 model.profile.max_input_tokens。
        requires_profile = any(condition[0] == "fraction" for condition in self._trigger_conditions)
        if self.keep[0] == "fraction":
            requires_profile = True
        if requires_profile and self._get_profile_limits() is None:
            msg = (
                "使用分数标记限制时,需要模型配置文件信息, "
                "并且对于指定型号不可用。请使用绝对标记"
                "改为计数,或通过"
                '`\n\nChatModel(..., profile={"max_input_tokens": ...}).\n\n`'
                "模型最大输入标记的期望整数值"
            )
            raise ValueError(msg)

    @override
    def before_model(
        self, state: AgentState[Any], runtime: Runtime[ContextT]
    ) -> dict[str, Any] | None:
        """同步钩子:每次模型调用前执行。"""
        # 实现功能:用来在上下文太长时先做摘要、再让模型只看到「摘要 + 最近几条消息」,避免超长上下文。
        messages = state["messages"]
        self._ensure_message_ids(messages)

        # 计算token数,这是在init中初始化好的偏函数
        total_tokens = self.token_counter(messages)
        # 判断是否满足触发条件(messages/tokens/fraction)
        # 没达到就不做事,返回 None,框架不会改 state。
        if not self._should_summarize(messages, total_tokens):
            return None

        # 根据 keep 策略算一个下标:这条之前的算“旧历史”、之后的算“要保留的最近消息”。
        cutoff_index = self._determine_cutoff_index(messages)
        if cutoff_index <= 0:
            return None

        # 按 cutoff_index 切成两段:前面要拿去摘要,后面保留不动。
        messages_to_summarize, preserved_messages = self._partition_messages(messages, cutoff_index)

        # 用摘要模型对前面那段消息生成一段文字摘要(同步调用)
        summary = self._create_summary(messages_to_summarize)
        # 把摘要包成一条(或几条)固定格式的消息
        new_messages = self._build_new_messages(summary)

        # 用“摘要上下文 + 保留的最近消息”替换完整历史。
        return {
            "messages": [
                RemoveMessage(id=REMOVE_ALL_MESSAGES),
                *new_messages,
                *preserved_messages,
            ]
        }

    @override
    async def abefore_model(
        self, state: AgentState[Any], runtime: Runtime[ContextT]
    ) -> dict[str, Any] | None:
        """异步钩子:每次模型调用前执行。"""
        messages = state["messages"]
        self._ensure_message_ids(messages)

        total_tokens = self.token_counter(messages)
        if not self._should_summarize(messages, total_tokens):
            return None

        cutoff_index = self._determine_cutoff_index(messages)
        if cutoff_index <= 0:
            return None

        messages_to_summarize, preserved_messages = self._partition_messages(messages, cutoff_index)

        summary = await self._acreate_summary(messages_to_summarize)
        new_messages = self._build_new_messages(summary)

        return {
            "messages": [
                RemoveMessage(id=REMOVE_ALL_MESSAGES),
                *new_messages,
                *preserved_messages,
            ]
        }

    def _should_summarize_based_on_reported_tokens(
        self, messages: list[AnyMessage], threshold: float
    ) -> bool:
        """备用判断:使用最近 AIMessage 上报的 token 用量。"""
        last_ai_message = next((msg for msg in reversed(messages) if isinstance(msg, AIMessage)), None)

        if (
            isinstance(last_ai_message, AIMessage)
            and last_ai_message.usage_metadata is not None
            and (reported_tokens := last_ai_message.usage_metadata.get("total_tokens", -1))
            and reported_tokens >= threshold
            and (message_provider := last_ai_message.response_metadata.get("model_provider"))
            and message_provider == self.model._get_ls_params().get("ls_provider")  # noqa: SLF001
        ):
            return True
        return False

    # 判断当前对话是否该做一次摘要
    def _should_summarize(self, messages: list[AnyMessage], total_tokens: int) -> bool:
        """判断是否满足触发条件(messages/tokens/fraction)。"""
        if not self._trigger_conditions:
            return False

        for kind, value in self._trigger_conditions:
            if kind == "messages" and len(messages) >= value:
                return True
            if kind == "tokens" and total_tokens >= value:
                return True
            if kind == "tokens" and self._should_summarize_based_on_reported_tokens(messages, value):
                return True
            if kind == "fraction":
                max_input_tokens = self._get_profile_limits()
                if max_input_tokens is None:
                    continue
                threshold = int(max_input_tokens * value)
                if threshold <= 0:
                    threshold = 1
                if total_tokens >= threshold:
                    return True
                if self._should_summarize_based_on_reported_tokens(messages, threshold):
                    return True

        return False

    # 根据 keep 策略计算历史切分点。
    """
    self.keep 的 kind 决定怎么算切分点:
        keep == ("messages", N):按条数保留,直接走「保留最近 N 条」的逻辑,得到初步切分点后再做安全调整。
        keep == ("tokens", N) 或 ("fraction", r):先按 token 预算算切分点;若算不出来(例如没有 profile、或 token 逻辑失败),就退化成「按条数保留」,用默认条数(如 20 条)再算一次。
    也就是说:要么按条数、要么按 token(含比例),按 token 失败就退回按条数。

    1、按条数:先算目标下标,再调到“安全”边界
        当按条数保留时,用 _find_safe_cutoff(messages, messages_to_keep):
        在保证不拆开「AI–Tool 成对消息」的前提下,把切分点前移或后移到安全位置,返回最终下标。
    2、按 token:二分找“最小后缀起点”,再调到安全边界
        当按 token 保留时,用 _find_token_based_cutoff(messages):
        二分查找最早下标,使后缀 token 数不超过 keep 预算,得到初步切分点后再做安全调整。
        如果token在句子中,则整段保留
    """
    def _determine_cutoff_index(self, messages: list[AnyMessage]) -> int:
        """根据 keep 策略计算历史切分点。"""
        kind, value = self.keep
        if kind in {"tokens", "fraction"}:
            token_based_cutoff = self._find_token_based_cutoff(messages)
            if token_based_cutoff is not None:
                return token_based_cutoff
            return self._find_safe_cutoff(messages, _DEFAULT_MESSAGES_TO_KEEP)
        return self._find_safe_cutoff(messages, cast("int", value))

    def _find_token_based_cutoff(self, messages: list[AnyMessage]) -> int | None:
        """找到最早下标,使后缀 token 数不超过 keep 预算。"""
        if not messages:
            return 0

        kind, value = self.keep
        if kind == "fraction":
            max_input_tokens = self._get_profile_limits()
            if max_input_tokens is None:
                return None
            target_token_count = int(max_input_tokens * value)
        elif kind == "tokens":
            target_token_count = int(value)
        else:
            return None

        if target_token_count <= 0:
            target_token_count = 1

        if self.token_counter(messages) <= target_token_count:
            return 0

        # 二分查找:寻找满足 token 预算的最小后缀起点。
        left, right = 0, len(messages)
        cutoff_candidate = len(messages)
        max_iterations = len(messages).bit_length() + 1

        for _ in range(max_iterations):
            if left >= right:
                break
            mid = (left + right) // 2
            if self._partial_token_counter(messages[mid:]) <= target_token_count:
                cutoff_candidate = mid
                right = mid
            else:
                left = mid + 1

        if cutoff_candidate == len(messages):
            cutoff_candidate = left

        if cutoff_candidate >= len(messages):
            if len(messages) == 1:
                return 0
            cutoff_candidate = len(messages) - 1

        # 避免把 tool 响应块与其来源 AI tool call 拆开。
        return self._find_safe_cutoff_point(messages, cutoff_candidate)

    def _get_profile_limits(self) -> int | None:
        """读取 model.profile['max_input_tokens'](若存在)。"""
        try:
            profile = self.model.profile
        except AttributeError:
            return None

        if not isinstance(profile, Mapping):
            return None

        max_input_tokens = profile.get("max_input_tokens")
        if not isinstance(max_input_tokens, int):
            return None

        return max_input_tokens

    @staticmethod
    def _validate_context_size(context: ContextSize, parameter_name: str) -> ContextSize:
        """校验配置元组格式和值范围。"""
        kind, value = context
        if kind == "fraction":
            if not 0 < value <= 1:
                raise ValueError(
                    f"Fractional {parameter_name} values must be between 0 and 1, got {value}."
                )
        elif kind in {"tokens", "messages"}:
            if value <= 0:
                raise ValueError(
                    f"{parameter_name} thresholds must be greater than 0, got {value}."
                )
        else:
            raise ValueError(f"Unsupported context size type {kind} for {parameter_name}.")

        return context

    @staticmethod
    def _build_new_messages(summary: str) -> list[HumanMessage]:
        """把摘要包装成单条 HumanMessage。"""
        return [
            HumanMessage(
                content=f"Here is a summary of the conversation to date:\n\n{summary}",
                additional_kwargs={"lc_source": "summarization"},
            )
        ]

    @staticmethod
    def _ensure_message_ids(messages: list[AnyMessage]) -> None:
        """为缺失 ID 的消息补 ID,确保 reducer 可安全替换/删除。"""
        for msg in messages:
            if msg.id is None:
                msg.id = str(uuid.uuid4())

    @staticmethod
    def _partition_messages(
        conversation_messages: list[AnyMessage], cutoff_index: int
    ) -> tuple[list[AnyMessage], list[AnyMessage]]:
        """把消息切成前缀(要摘要)和后缀(要保留)。"""
        messages_to_summarize = conversation_messages[:cutoff_index]
        preserved_messages = conversation_messages[cutoff_index:]
        return messages_to_summarize, preserved_messages

    def _find_safe_cutoff(self, messages: list[AnyMessage], messages_to_keep: int) -> int:
        """先按消息条数算切分点,再调整到安全的 tool 边界。"""
        if len(messages) <= messages_to_keep:
            return 0

        target_cutoff = len(messages) - messages_to_keep
        return self._find_safe_cutoff_point(messages, target_cutoff)

    @staticmethod
    def _find_safe_cutoff_point(messages: list[AnyMessage], cutoff_index: int) -> int:
        """避免切在 AI-tool 调用链中间。

        如果切分点落在 ToolMessage 上,优先向前回溯到发起该 tool_call 的 AIMessage;
        若找不到匹配 AIMessage,则向后跳过连续 ToolMessage。
        """
        if cutoff_index >= len(messages) or not isinstance(messages[cutoff_index], ToolMessage):
            return cutoff_index

        tool_call_ids: set[str] = set()
        idx = cutoff_index
        while idx < len(messages) and isinstance(messages[idx], ToolMessage):
            tool_msg = cast("ToolMessage", messages[idx])
            if tool_msg.tool_call_id:
                tool_call_ids.add(tool_msg.tool_call_id)
            idx += 1

        for i in range(cutoff_index - 1, -1, -1):
            msg = messages[i]
            if isinstance(msg, AIMessage) and msg.tool_calls:
                ai_tool_call_ids = {tc.get("id") for tc in msg.tool_calls if tc.get("id")}
                if tool_call_ids & ai_tool_call_ids:
                    return i

        return idx

    def _create_summary(self, messages_to_summarize: list[AnyMessage]) -> str:
        """同步方式调用摘要模型生成摘要。"""
        if not messages_to_summarize:
            return "No previous conversation history."

        trimmed_messages = self._trim_messages_for_summary(messages_to_summarize)
        if not trimmed_messages:
            return "Previous conversation was too long to summarize."

        # 转成纯缓冲区文本,避免消息对象字符串化导致元信息膨胀。
        formatted_messages = get_buffer_string(trimmed_messages)

        try:
            response = self.model.invoke(
                self.summary_prompt.format(messages=formatted_messages).rstrip(),
                config={"metadata": {"lc_source": "summarization"}},
            )
            return response.text.strip()
        except Exception as e:
            # 失败时返回错误文本,保证流程可继续。
            return f"Error generating summary: {e!s}"

    async def _acreate_summary(self, messages_to_summarize: list[AnyMessage]) -> str:
        """异步方式调用摘要模型生成摘要。"""
        if not messages_to_summarize:
            return "No previous conversation history."

        trimmed_messages = self._trim_messages_for_summary(messages_to_summarize)
        if not trimmed_messages:
            return "Previous conversation was too long to summarize."

        formatted_messages = get_buffer_string(trimmed_messages)

        try:
            response = await self.model.ainvoke(
                self.summary_prompt.format(messages=formatted_messages).rstrip(),
                config={"metadata": {"lc_source": "summarization"}},
            )
            return response.text.strip()
        except Exception as e:
            return f"Error generating summary: {e!s}"

    def _trim_messages_for_summary(self, messages: list[AnyMessage]) -> list[AnyMessage]:
        """裁剪用于生成摘要的输入消息。

        策略说明:
        - 保留最新内容(`strategy='last'`)
        - 尽量从 human 消息边界开始(`start_on='human'`)
        - 保留 system 消息
        - 允许部分裁剪
        """
        try:
            if self.trim_tokens_to_summarize is None:
                return messages

            return cast(
                "list[AnyMessage]",
                trim_messages(
                    messages,
                    max_tokens=self.trim_tokens_to_summarize,
                    token_counter=self.token_counter,
                    start_on="human",
                    strategy="last",
                    allow_partial=True,
                    include_system=True,
                ),
            )
        except Exception:
            return messages[-_DEFAULT_FALLBACK_MESSAGE_COUNT:]
Short-term memory 短期记忆

对应链接:https://docs.langchain.com/oss/python/langchain/short-term-memory

在langchain官网有说到:

启用短期记忆后,长对话可能会超过 LLM 的上下文窗口。常见解决方案有:

  • 修剪消息:在调用 LLM 之前删除前 N 条或后 N 条消息
  • 删除消息:永久从 LangGraph 状态中删除消息
  • 总结消息:总结历史中的较早消息并用摘要替换它们
  • 自定义策略:自定义策略(例如,消息过滤等)

这使得智能体能够在不超过 LLM 限制的情况下,持续跟踪对话内容。

例如:修剪消息

大多数 LLM 都有一个最大支持的上下文窗口(以 token 计)。

决定何时截断消息的一种方法是计算消息历史中的 token 数量,并在接近该限制时截断。如果你使用 LangChain,可以使用 trim messages 工具,并指定要保留的 token 数量,以及 strategy (例如,保留最后一个 max_tokens )来处理边界。

trim messages 工具
py
from langchain.messages import RemoveMessage
from langchain_ollama import ChatOllama
from langgraph.graph.message import REMOVE_ALL_MESSAGES
from langgraph.checkpoint.memory import InMemorySaver
from langchain.agents import create_agent, AgentState
from langchain.agents.middleware import before_model
from langgraph.runtime import Runtime
from langchain_core.runnables import RunnableConfig
from typing import Any


@before_model
def trim_messages(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    """仅保留最后几条消息以适应上下文窗口。"""
    messages = state["messages"]

    if len(messages) <= 3:
        return None  # No changes needed

    # 保留第一次消息和最近的3条或4条
    first_msg = messages[0]
    recent_messages = messages[-3:] if len(messages) % 2 == 0 else messages[-4:]
    new_messages = [first_msg] + recent_messages

    return {
        "messages": [
            RemoveMessage(id=REMOVE_ALL_MESSAGES),
            *new_messages
        ]
    }

agent = create_agent(
    model=ChatOllama(model="qwen3:0.6b"),
    middleware=[trim_messages],
    checkpointer=InMemorySaver(),
)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}

agent.invoke({"messages": "你好,我叫鲍勃"}, config)
agent.invoke({"messages": "写一首关于猫的短诗"}, config)
agent.invoke({"messages": "现在为狗写一首同样的"}, config)
final_response = agent.invoke({"messages": "我叫什么名字?"}, config)

final_response["messages"][-1].pretty_print()

问题 5:上下文 token 超过,如何对其进行压缩

我一般按“三步压缩”来做:

  1. 基础清洗(无损优先)
    • 删空行、重复日志、无关注释、调试输出、重复片段。
  2. 语义压缩(有损但保核心)
    • 把长文本变成结构化摘要:结论 + 关键证据 + 限制条件
    • 代码场景保留:函数签名、关键逻辑、依赖关系、异常处理。
  3. 分块召回(避免一次塞满)
    • 按主题切块并打标签,提问时只召回 Top-K 相关片段。

实战里我会优先无损压缩,不够再做语义摘要,最后再加检索召回。

问题 6:向量数据库你了解的有哪些

我接触过这些:

  • Pinecone(托管型,接入快)
  • Milvus(开源,生态成熟)
  • Qdrant(开源,过滤能力不错)
  • Weaviate(内置语义特性比较多)
  • Chroma(本地开发常用)
  • pgvector(基于 PostgreSQL,适合和业务库结合)
  • Elasticsearch/OpenSearch 的向量检索能力(混合检索常见)

问题 7:向量数据库和传统数据库的区别

可以这么回答,更准确:

  1. 存储对象不同:传统库主要存结构化字段;向量库核心是高维向量。
  2. 查询方式不同:传统库是精确过滤(=>LIKE);向量库是相似度检索(Top-K)。
  3. 索引不同:传统库常见 B+Tree、倒排;向量库常见 HNSW、IVF、PQ 等近似最近邻索引。
  4. 目标不同:传统库强调强事务和精确查询;向量库强调语义召回效率。
  5. 实际不是二选一:线上通常“混合用”,先结构化过滤,再向量召回。

问题 8:向量数据库的匹配相似度的方法

常见有三类:

  1. 余弦相似度(Cosine):看方向是否一致,文本语义检索最常见。
  2. 点积(Dot Product):和向量长度有关,很多 embedding 模型也支持。
  3. 欧氏距离(L2):看空间距离,距离越小越相似。

面试里补一句:具体用哪种,要和 embedding 模型的训练目标保持一致。

问题 9:对余弦相似度的理解,是如何计算的

可以这样说:

  • 余弦相似度衡量的是两个向量夹角的余弦值,范围一般在 [-1, 1]。越接近 1,方向越一致,语义越相近。
    公式:cos(a,b) = (a·b) / (||a|| * ||b||)
  • 欧氏距离L2是两个向量在空间中的直线距离,越小越相似。
    公式:dist(a,b) = sqrt(Σ(ai - bi)^2)
plaintext
        B(4,6)
       /|
      / |
  d=5/  | Δy=4
    /   |
   /    |
  A(1,2)───▶ Δx=3

横向差:Δx=3
纵向差:Δy=4
直线距离:斜边 d=5

问题 10:MCP 由几部分组成

MCP 可以理解成 3 层:

  1. MCP Host(宿主):比如 IDE 或 Agent 应用,负责发起请求。
  2. MCP Client(客户端连接层):在 Host 内,负责和 MCP Server 建连、通信。
  3. MCP Server(能力提供方):把工具、数据资源、提示模板标准化暴露出来。

核心价值是:用统一协议把“模型”和“外部工具/数据”解耦。

问题 11:LangChain 去构建一个 Agent 的步骤是什么

我一般按 7 步走:

  1. 定义任务目标和成功标准。
  2. 选模型(推理能力、速度、成本)。
  3. 定义工具(搜索、数据库、代码执行等)和输入输出 schema。
  4. 设计 Agent 提示词(角色、规则、失败处理)。
  5. 组装执行流程(单 Agent 或工作流,是否循环反思)。
  6. 接入记忆(短期对话、长期用户画像/知识)。
  7. 做评测和观测(成功率、时延、成本、错误类型),再迭代。

问题 12:在 LangChain 中注册 tools 有哪些方法

  1. 通过 tools 参数传入工具列表
  2. 通过 model.bind_tools 方法绑定工具

问题 13:LangChain 大版本升级有哪些核心变化

从我的使用上,变化有:

  1. 在0.3版本的创建agent有create_react_agent和create_function_call_agent,1.x的版本统一为create_agent
  2. 在1.x的版本中添加了中间件,可以根据钩子作用到agent的不同时机,可扩展性更高
  3. langgraph变成了langchain的底层框架

问题 14:Memory 有使用过吗,对它的理解

我用过。我的理解是:Memory 的本质是让 Agent 具备“跨轮次连续性”。

  • 没有 memory:每轮都像第一次聊天。
  • 有 memory:能记住用户偏好、历史结论、上下文约束,回答更连贯。

但 memory 不是越多越好,重点是“该记什么、什么时候记、什么时候清理”。

问题 15:Memory 的短中长期记忆是什么

可以这样分:

  1. 短期记忆:当前会话最近几轮内容,保证对话连贯。
  2. 中期记忆:当前任务阶段的阶段性结论,比如“已排除的原因、已确认的方案”。内容过多可以进行上下文压缩。
  3. 长期记忆:跨会话保留的稳定信息,比如用户偏好、业务术语、常用配置。

我一般会把“高频稳定事实”放长期,把“临时过程信息”放短中期。

问题 16:Memory 裁剪压缩

我的做法是“窗口裁剪 + 摘要压缩 + 关键事实固化”:

  1. 窗口裁剪:只保留最近 N 轮原始消息。
  2. 摘要压缩:把更早消息压成结构化摘要(目标、约束、结论、待办)。
md
你是专业的对话摘要助手。请把下面**更早的历史消息**,提炼成结构化摘要,只输出结果,不要多余解释。

请严格按以下 4 个维度输出:
1. **目标**:用户想做什么、核心诉求
2. **约束**:限制条件、技术栈、规则、要求
3. **结论**:已完成、已确定、已解决的内容
4. **待办**:还没做、接下来要做、需要跟进的事项

要求:
- 只保留关键信息,极度精简
- 不重复、不展开、不抒情
- 用短句、要点式呈现
- 只输出摘要,不要其他文字
  1. 关键事实固化:把用户长期偏好和关键实体单独存储,避免被摘要丢掉。
  2. 按 token 动态触发:不是固定轮数,接近上限就触发裁剪/压缩。

问题 17:Memory 在什么时机去存储,以及内容的召回和更新的时机

可以按“存储-召回-更新”三个阶段回答:

  1. 存储时机
    • 用户明确偏好(如语言风格、输出格式)。
    • 任务关键结论(如最终方案、已确认事实)。
    • 可复用经验(如排障路径、最佳实践)。
  2. 召回时机
    • 新问题开始时先召回一次用户画像和历史约束。
    • 生成前再按当前 query 做一次语义召回。
解释

召回时机(拿出来用),需要召回两次

  1. 新问题一上来,先 “召回一次” = 先把这个人的基本信息调出来
md
比如:
- 他喜欢简洁 / 详细?
- 他是前端开发?
- 之前定过什么规则?
就像:见面前先看一眼这个人的档案,知道他是谁。
  1. 正式回答前,再 “语义召回” = 根据他现在这句问题,再精准捞一遍相关历史
md
比如他现在问:“我这段 Vue 代码为啥报错”
系统就去历史里找:
- 他之前写过什么 Vue 代码
- 他之前报过什么错
- 他用的什么版本、什么环境
就像:他现在问啥,我就只把跟这个问题相关的记忆翻出来。
  1. 更新时机
    • 用户纠正旧信息时要覆盖更新。
    • 长期未使用且置信度低的信息要降权或清理。

问题 18:如何设计记忆机制?以便在第 100 轮对话时还能精准记得第 1 轮的细节?

我会设计成“4 层记忆”:

  1. 原始日志层:完整保存所有对话,保证可追溯。
  2. 滑动窗口层:实时给模型最近 K 轮,保证当前连贯性。
  3. 摘要层:每隔一段对历史做结构化摘要,降低 token 成本。
  4. 实体/事实层:抽取人名、项目、偏好、约束等可检索事实,按需精准召回。

再加两点机制:

  • 冲突检测:新旧事实冲突时不直接覆盖,标记版本和置信度。
  • 遗忘机制:低价值、过期信息定期降权清理,避免噪声污染。

问题 19:工具权限的设计

我会从“原则 + 分级 + 落地”三个层面讲:

  1. 原则
    • 最小权限:只给完成任务必需权限。
    • 显式授权:高风险操作必须二次确认。
    • 全链路审计:每次工具调用都可追踪。
  2. 分级
    • L0:只读公开信息。
    • L1:内部只读数据。
    • L2:可写操作(发消息、改数据)需要审批或策略校验。
    • L3:高危操作(删库、生产发布、转账)必须人工审批。
  3. 工程落地
    • 工具注册时打风险标签。
    • 执行前经过权限中间件校验。
    • 动态暴露工具,没权限的工具不要给模型看到。
    • 增加限流、超时、重试和熔断,防止误调用扩散。

问题 20:像 Gemini 的 Deep Research,你大概了解它怎么实现?如果让你实现一个类似“深度研究 Agent”,你会怎么开始?

我会把它当成一个“可循环的研究工作流”来做,而不是单次问答:

  1. 任务分流(Router)
    • 先判断问题是否值得 deep research。
    • 简单问答直接返回,复杂问题进入研究流程。
  2. 规划(Planner)
    • 把大问题拆成多个可验证子问题,定义每个子问题的证据标准。
  3. 检索与阅读(Retriever + Reader)
    • 多路检索(搜索引擎、指定站点、内部知识库)。
    • 抽取关键信息并保留来源、时间、原文片段。
  4. 证据评估(Evaluator)
    • 去重、冲突检测、来源可信度打分、时效性校验。
  5. 答案合成(Synthesizer)
    • 按“结论-证据-不确定性-引用”输出,保证可核查。
  6. 反思迭代(Critic Loop)
    • 如果证据不足或冲突未解决,自动回到 Planner 再补查一轮。

工程上我会先做 MVP:单 Agent + 工具链 + 可观测日志;跑通后再升级成多 Agent 协作。