1. 学习背景
一般在构建代理时,会经过较长时间的任务处理流程,对于这些任务,持久化和流式输出是非常重要的两个概念。
持久性:
允许您在特定时间点保留代理的状态,这可以让您返回到该状态,并在将来的交互中恢复该状态。
流式传输:
您可以发出正在进行的工作的信号列表,对于长时间运行的应用程序,您可以准确知道代理正在执行的操作。
2. 示例
2.1 准备环境
from dotenv import load_dotenv
_ = load_dotenv()
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import AnyMessage, SystemMessage, HumanMessage, ToolMessage
from langchain_openai import ChatOpenAI
from langchain_community.tools.tavily_search import TavilySearchResults
# 初始化工具
tool = TavilySearchResults(max_results=2)
2.2 编写代理
class AgentState(TypedDict):
messages: Annotated[list[AnyMessage], operator.add]
为了处理持久化,在langgraph中添加了checkpoint,检查点基本上在每个节点之后和之间对状态进行检查
# 准备持久化,使用SqliteSaver,内存级数据库,重启则丢失内容
from langgraph.checkpoint.sqlite import SqliteSaver
memory = SqliteSaver.from_conn_string(":memory:")
编写Agent,具体代码介绍见:https://blog.csdn.net/l8947943/article/details/140592234?spm=1001.2014.3001.5502
class Agent:
# 可以看到,初始化阶段提供了checkpointer
def __init__(self, model, tools, checkpointer, system=""):
self.system = system
graph = StateGraph(AgentState)
graph.add_node("llm", self.call_openai)
graph.add_node("action", self.take_action)
graph.add_conditional_edges("llm", self.exists_action, {True: "action", False: END})
graph.add_edge("action", "llm")
graph.set_entry_point("llm")
self.graph = graph.compile(checkpointer=checkpointer) # 在图编译阶段,将checkpointer内置
self.tools = {t.name: t for t in tools}
self.model = model.bind_tools(tools)
def call_openai(self, state: AgentState):
messages = state['messages']
if self.system:
messages = [SystemMessage(content=self.system)] + messages
message = self.model.invoke(messages)
return {'messages': [message]}
def exists_action(self, state: AgentState):
result = state['messages'][-1]
return len(result.tool_calls) > 0
def take_action(self, state: AgentState):
tool_calls = state['messages'][-1].tool_calls
results = []
for t in tool_calls:
print(f"Calling: {t}")
result = self.tools[t['name']].invoke(t['args'])
results.append(ToolMessage(tool_call_id=t['id'], name=t['name'], content=str(result)))
print("Back to the model!")
return {'messages': results}
2.3 构建prompt,初始化Agent
prompt = """You are a smart research assistant. Use the search engine to look up information. \
You are allowed to make multiple calls (either together or in sequence). \
Only look up information when you are sure of what you want. \
If you need to look up some information before asking a follow up question, you are allowed to do that!
"""
model = ChatOpenAI(model="gpt-4o")
abot = Agent(model, [tool], system=prompt, checkpointer=memory) # 注意初始化存储
# 构建HumanMessage
messages = [HumanMessage(content="What is the weather in sf?")]
# 设置线程字典,其实就是设置个id,用于执行过程的内容区分,可以理解为令牌
# 线程非常重要,特别是多个用户进行操作,可以用作隔离的标志
thread = {"configurable": {"thread_id": "1"}}
# 对graph的内容进行
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v['messages'])
输出如下:
[AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_aqUDZtKH5HolN9ZVPANsz4ep', 'function': {'arguments': '{"query":"current weather in San Francisco"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 151, 'total_tokens': 173}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_c4e5b6fa31', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-7c06cd16-f52a-4d4a-95b5-868dfc91456d-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_aqUDZtKH5HolN9ZVPANsz4ep'}])]
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_aqUDZtKH5HolN9ZVPANsz4ep'}
Back to the model!
[ToolMessage(content='[{\'url\': \'https://www.weatherapi.com/\', \'content\': "{\'location\': {\'name\': \'San Francisco\', \'region\': \'California\', \'country\': \'United States of America\', \'lat\': 37.78, \'lon\': -122.42, \'tz_id\': \'America/Los_Angeles\', \'localtime_epoch\': 1721638208, \'localtime\': \'2024-07-22 1:50\'}, \'current\': {\'last_updated_epoch\': 1721637900, \'last_updated\': \'2024-07-22 01:45\', \'temp_c\': 13.0, \'temp_f\': 55.5, \'is_day\': 0, \'condition\': {\'text\': \'Partly Cloudy\', \'icon\': \'//cdn.weatherapi.com/weather/64x64/night/116.png\', \'code\': 1003}, \'wind_mph\': 7.2, \'wind_kph\': 11.5, \'wind_degree\': 204, \'wind_dir\': \'SSW\', \'pressure_mb\': 1011.0, \'pressure_in\': 29.86, \'precip_mm\': 0.0, \'precip_in\': 0.0, \'humidity\': 91, \'cloud\': 40, \'feelslike_c\': 12.1, \'feelslike_f\': 53.8, \'windchill_c\': 12.1, \'windchill_f\': 53.8, \'heatindex_c\': 13.0, \'heatindex_f\': 55.5, \'dewpoint_c\': 11.6, \'dewpoint_f\': 52.9, \'vis_km\': 10.0, \'vis_miles\': 6.0, \'uv\': 1.0, \'gust_mph\': 11.0, \'gust_kph\': 17.7}}"}, {\'url\': \'https://www.timeanddate.com/weather/usa/san-francisco/hourly\', \'content\': \'Hour-by-Hour Forecast for San Francisco, California, USA. Weather Today Weather Hourly 14 Day Forecast Yesterday/Past Weather Climate (Averages) Currently: 64 °F. Passing clouds. (Weather station: San Francisco International Airport, USA). See more current weather.\'}]', name='tavily_search_results_json', tool_call_id='call_aqUDZtKH5HolN9ZVPANsz4ep')]
[AIMessage(content='The current weather in San Francisco is partly cloudy with a temperature of 13°C (55.5°F). The wind is coming from the south-southwest at 7.2 mph (11.5 kph), and the humidity is at 91%. The visibility is 10 kilometers (6 miles), and the UV index is 1.', response_metadata={'token_usage': {'completion_tokens': 72, 'prompt_tokens': 675, 'total_tokens': 747}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_c4e5b6fa31', 'finish_reason': 'stop', 'logprobs': None}, id='run-7f7a1179-4d54-412f-afdc-50db86262313-0')]
可以看到,执行过程中,先是AIMessage,其次是ToolMessage,最后是响应的AIMessage。通过这种流方法,我们可以得到中间结果。
2.4 继续提问
messages = [HumanMessage(content="What about in la?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)
输出如下:
{'messages': [AIMessage(content='', additional_kwargs={'tool_calls': [{'id': 'call_FLX2DJNqEGmsb5mgdxbfNOpJ', 'function': {'arguments': '{"query":"current weather in Los Angeles"}', 'name': 'tavily_search_results_json'}, 'type': 'function'}]}, response_metadata={'token_usage': {'completion_tokens': 22, 'prompt_tokens': 759, 'total_tokens': 781}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_c4e5b6fa31', 'finish_reason': 'tool_calls', 'logprobs': None}, id='run-f2054e4e-984c-41dc-a55a-d017e1b52211-0', tool_calls=[{'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_FLX2DJNqEGmsb5mgdxbfNOpJ'}])]}
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in Los Angeles'}, 'id': 'call_FLX2DJNqEGmsb5mgdxbfNOpJ'}
Back to the model!
{'messages': [ToolMessage(content="[{'url': 'https://www.accuweather.com/en/us/los-angeles/90012/current-weather/347625', 'content': 'Current weather in Los Angeles, CA. Check current conditions in Los Angeles, CA with radar, hourly, and more.'}, {'url': 'https://forecast.weather.gov/zipcity.php?inputstring=Los Angeles,CA', 'content': 'Los Angeles CA 34.05°N 118.25°W (Elev. 377 ft) Last Update: 9:52 pm PDT Jul 19, 2024. Forecast Valid: 2am PDT Jul 20, 2024-6pm PDT Jul 26, 2024 . ... Severe Weather ; Current Outlook Maps ; Drought ; Fire Weather ; Fronts/Precipitation Maps ; Current Graphical Forecast Maps ; Rivers ; Marine ; Offshore and High Seas; Hurricanes ;'}]", name='tavily_search_results_json', tool_call_id='call_FLX2DJNqEGmsb5mgdxbfNOpJ')]}
{'messages': [AIMessage(content='For the most current weather information in Los Angeles, you can check the following sources:\n\n1. [AccuWeather - Los Angeles](https://www.accuweather.com/en/us/los-angeles/90012/current-weather/347625): Provides current conditions, radar, and hourly updates.\n\n2. [NOAA - Los Angeles](https://forecast.weather.gov/zipcity.php?inputstring=Los Angeles,CA): Offers detailed forecasts, severe weather alerts, and more.\n\nBy visiting these links, you will get the most up-to-date weather information for Los Angeles.', response_metadata={'token_usage': {'completion_tokens': 117, 'prompt_tokens': 981, 'total_tokens': 1098}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_400f27fa1f', 'finish_reason': 'stop', 'logprobs': None}, id='run-e64b680f-f97b-451c-b7f4-e09b0d0b2d51-0')]}
可以看到,也经历了AIMessage、ToolMessage、AIMessage的响应。在此回话的基础上,询问哪个温度较高。
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "1"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)
输出如下:
{'messages': [AIMessage(content='Based on the provided data, San Francisco has a current temperature of 13°C (55.5°F). For Los Angeles, specific temperature data was not retrieved, but you can view the current conditions on the [AccuWeather - Los Angeles](https://www.accuweather.com/en/us/los-angeles/90012/current-weather/347625) page.\n\nHowever, it is generally known that Los Angeles tends to be warmer than San Francisco. For precise and up-to-date temperature comparisons, please check the provided links.', response_metadata={'token_usage': {'completion_tokens': 107, 'prompt_tokens': 1110, 'total_tokens': 1217}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_c4e5b6fa31', 'finish_reason': 'stop', 'logprobs': None}, id='run-4827c13a-d08d-4d32-b29e-3226454193cd-0')]}
可以看到,对话过程中是保留着上下文信息的,这是因为保留着检查点的持久内容。我们换个线程,看看执行结果。
messages = [HumanMessage(content="Which one is warmer?")]
thread = {"configurable": {"thread_id": "2"}}
for event in abot.graph.stream({"messages": messages}, thread):
for v in event.values():
print(v)
输出如下:
{'messages': [AIMessage(content="Could you please provide more context or specify the items or locations you're comparing in terms of warmth? This will help me give you a precise answer.", response_metadata={'token_usage': {'completion_tokens': 30, 'prompt_tokens': 149, 'total_tokens': 179}, 'model_name': 'gpt-4o', 'system_fingerprint': 'fp_c4e5b6fa31', 'finish_reason': 'stop', 'logprobs': None}, id='run-e5f989a4-01cf-4be7-8360-717609683ba4-0')]}
可以看到,这时候的Agent竟然不知所措,这是因为我们使用了不同的线程ID。
2.5 实时Streaming tokens
from langgraph.checkpoint.aiosqlite import AsyncSqliteSaver
memory = AsyncSqliteSaver.from_conn_string(":memory:")
abot = Agent(model, [tool], system=prompt, checkpointer=memory)
messages = [HumanMessage(content="What is the weather in SF?")]
thread = {"configurable": {"thread_id": "4"}}
async for event in abot.graph.astream_events({"messages": messages}, thread, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# Empty content in the context of OpenAI means
# that the model is asking for a tool to be invoked.
# So we only print non-empty content
print(content, end="|")
输出如下:
Calling: {'name': 'tavily_search_results_json', 'args': {'query': 'current weather in San Francisco'}, 'id': 'call_8dBYQ8xGAXysN9TqtVX44adf'}
Back to the model!
The| current| weather| in| San| Francisco| is| partly| cloudy| with| a| temperature| of| |13|°C| (|55|.|5|°F|).| The| wind| is| blowing| from| the| south|-s|outh|west| at| |7|.|2| mph| (|11|.|5| k|ph|),| and| the| humidity| is| at| |91|%.| The| visibility| is| |10| km| (|6| miles|),| and| the| UV| index| is| |1|.|
最终的结果是一个个被执行出来的,这是因为使用了异步持久化,这种事件核心获取是on_chat_model_stream。最大的特点是一个个将token输出出来。
3.总结
本节学习了持久化和流式输出,最后还有个异步流式输出,按部就班即可。