pydantic_graph.persistence
SnapshotStatus module-attribute
SnapshotStatus = Literal[
"created", "pending", "running", "success", "error"
]
快照的状态。
'created'
: 快照已创建,但尚未运行。'pending'
: 快照已被load_next
检索,但尚未运行。'running'
: 快照当前正在运行。'success'
: 快照已成功运行。'error'
: 快照已运行,但发生了错误。
NodeSnapshot dataclass
Bases: Generic[StateT, RunEndT]
描述图中节点执行的历史步骤。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
|
state 实例属性
state: StateT
节点运行前图的状态。
node instance-attribute
下一个要运行的节点。
start_ts class-attribute
instance-attribute
start_ts: datetime | None = None
节点开始运行的时间戳,在运行开始前为 None
。
EndSnapshot dataclass
Bases: Generic[StateT, RunEndT]
描述图运行结束的历史步骤。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
|
state 实例属性
state: StateT
运行结束时图的状态。
Snapshot module-attribute
Snapshot = (
NodeSnapshot[StateT, RunEndT]
| EndSnapshot[StateT, RunEndT]
)
图运行历史中的一个步骤。
Graph.run
返回一个描述图执行过程的步骤列表,以及运行的返回值。
BaseStatePersistence
Bases: ABC
, Generic[StateT, RunEndT]
用于存储图运行状态的抽象基类。
BaseStatePersistence
子类的每个实例应用于单次图运行。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 |
|
snapshot_node abstractmethod
async
当下一步是运行节点时,对图的状态进行快照。
此方法应向持久化存储中添加一个 NodeSnapshot
。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
state
|
StateT
|
图的状态。 |
必需 |
next_node
|
BaseNode[StateT, Any, RunEndT]
|
下一个要运行的节点。 |
必需 |
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
112 113 114 115 116 117 118 119 120 121 122 |
|
snapshot_node_if_new abstractmethod
async
snapshot_node_if_new(
snapshot_id: str,
state: StateT,
next_node: BaseNode[StateT, Any, RunEndT],
) -> None
如果快照ID在持久化存储中尚不存在,则对图的状态进行快照。
此方法通常会调用 snapshot_node
,但应以原子方式执行。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
snapshot_id
|
str
|
要检查的快照ID。 |
必需 |
state
|
StateT
|
图的状态。 |
必需 |
next_node
|
BaseNode[StateT, Any, RunEndT]
|
下一个要运行的节点。 |
必需 |
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
|
snapshot_end abstractmethod
async
snapshot_end(state: StateT, end: End[RunEndT]) -> None
当图运行结束时,对图的状态进行快照。
此方法应向持久化存储中添加一个 EndSnapshot
。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
state
|
StateT
|
图的状态。 |
必需 |
end
|
End[RunEndT]
|
来自运行结束时的数据。 |
必需 |
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
140 141 142 143 144 145 146 147 148 149 150 |
|
record_run abstractmethod
record_run(
snapshot_id: str,
) -> AbstractAsyncContextManager[None]
记录节点的运行,如果节点已在运行,则报错。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
snapshot_id
|
str
|
要记录的快照ID。 |
必需 |
引发
类型 | 描述 |
---|---|
GraphNodeRunningError
|
如果节点状态不是 |
LookupError
|
如果在持久化存储中未找到快照ID。 |
返回
类型 | 描述 |
---|---|
AbstractAsyncContextManager[None]
|
一个记录节点运行的异步上下文管理器。 |
具体来说,它应在运行开始时设置
NodeSnapshot.status
为'running'
以及设置NodeSnapshot.start_ts
。- 在运行结束时设置
NodeSnapshot.status
为'success'
或'error'
以及设置NodeSnapshot.duration
。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 |
|
load_next abstractmethod
async
load_next() -> NodeSnapshot[StateT, RunEndT] | None
检索一个状态为 'created'
的节点快照,并将其状态设置为 'pending'
。
此方法由 Graph.iter_from_persistence
用来获取下一个要运行的节点。
返回:快照,如果不存在状态为 'created'
的快照,则返回 None
。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
175 176 177 178 179 180 181 182 183 184 |
|
load_all abstractmethod
async
加载整个快照历史记录。
load_all
不被 pydantic-graph 本身使用,而是为了方便从持久化存储中获取所有快照而提供。
返回:快照列表。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
186 187 188 189 190 191 192 193 194 195 |
|
set_graph_types
从一个图中设置状态和运行结束的类型。
通常您不需要自定义此方法,而是实现 set_types
和 should_set_types
。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
197 198 199 200 201 202 203 204 205 206 |
|
should_set_types
should_set_types() -> bool
类型是否需要被设置。
如果需要类型但尚未设置,实现应重写此方法以返回 True
。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
208 209 210 211 212 213 |
|
set_types
设置状态和运行结束的类型。
此方法可用于创建用于序列化和反序列化快照的类型适配器,例如使用 build_snapshot_list_type_adapter
。
参数
名称 | 类型 | 描述 | 默认值 |
---|---|---|---|
state_type
|
type[StateT]
|
状态类型。 |
必需 |
run_end_type
|
type[RunEndT]
|
运行结束类型。 |
必需 |
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
215 216 217 218 219 220 221 222 223 224 225 |
|
build_snapshot_list_type_adapter
build_snapshot_list_type_adapter(
state_t: type[StateT], run_end_t: type[RunEndT]
) -> TypeAdapter[list[Snapshot[StateT, RunEndT]]]
为快照列表构建一个类型适配器。
此方法应在 set_types
内部调用,其中会设置上下文变量,以便 Pydantic 可以为 NodeSnapshot.node
创建模式(schema)。
源代码位于 pydantic_graph/pydantic_graph/persistence/__init__.py
228 229 230 231 232 233 234 235 236 237 238 |
|
内存中的状态持久化。
此模块为图提供了简单的内存状态持久化功能。
SimpleStatePersistence dataclass
Bases: BaseStatePersistence[StateT, RunEndT]
简单的内存状态持久化,仅保存最新的快照。
如果在运行图时未提供状态持久化实现,则默认使用此实现。
源代码位于 pydantic_graph/pydantic_graph/persistence/in_mem.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 |
|
last_snapshot class-attribute
instance-attribute
last_snapshot: Snapshot[StateT, RunEndT] | None = None
最后一个快照。
FullStatePersistence dataclass
Bases: BaseStatePersistence[StateT, RunEndT]
保存快照列表的内存状态持久化。
源代码位于 pydantic_graph/pydantic_graph/persistence/in_mem.py
85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
|
deep_copy class-attribute
instance-attribute
deep_copy: bool = True
在存储状态和节点时是否进行深拷贝。
默认为 True
,因此即使在拍摄快照后修改了节点或状态,持久化历史记录也会记录快照拍摄时的值。
history class-attribute
instance-attribute
图运行期间拍摄的快照列表。
dump_json
将历史记录转储为 JSON 字节。
源代码位于 pydantic_graph/pydantic_graph/persistence/in_mem.py
157 158 159 160 |
|
load_json
从 JSON 加载历史记录。
源代码位于 pydantic_graph/pydantic_graph/persistence/in_mem.py
162 163 164 165 |
|
FileStatePersistence dataclass
Bases: BaseStatePersistence[StateT, RunEndT]
基于文件的状态持久化,将图运行状态保存在 JSON 文件中。
源代码位于 pydantic_graph/pydantic_graph/persistence/file.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
|
json_file instance-attribute
json_file: Path
存储快照的 JSON 文件的路径。
您应该为每次图运行使用不同的文件,但对于同一次运行的多个步骤应重用同一个文件。
例如,如果您有一个形式为 run_123abc
的运行 ID,您可以这样创建一个 FileStatePersistence
from pathlib import Path
from pydantic_graph import FullStatePersistence
run_id = 'run_123abc'
persistence = FullStatePersistence(Path('runs') / f'{run_id}.json')
should_set_types
should_set_types() -> bool
类型是否需要被设置。
源代码位于 pydantic_graph/pydantic_graph/persistence/file.py
104 105 106 |
|