跳转到内容

pydantic_graph

Graph dataclass

Bases: Generic[StateT, DepsT, RunEndT]

图的定义。

pydantic-graph 中,图是可按顺序运行的节点的集合。节点定义了它们的出边——例如,接下来可以运行哪些节点,从而定义了图的结构。

这是一个非常简单的图示例,它将一个数字加 1,但确保最终结果永远不是 42。

never_42.py
from __future__ import annotations

from dataclasses import dataclass

from pydantic_graph import BaseNode, End, Graph, GraphRunContext

@dataclass
class MyState:
    number: int

@dataclass
class Increment(BaseNode[MyState]):
    async def run(self, ctx: GraphRunContext) -> Check42:
        ctx.state.number += 1
        return Check42()

@dataclass
class Check42(BaseNode[MyState, None, int]):
    async def run(self, ctx: GraphRunContext) -> Increment | End[int]:
        if ctx.state.number == 42:
            return Increment()
        else:
            return End(ctx.state.number)

never_42_graph = Graph(nodes=(Increment, Check42))
(这个例子是完整的,可以“按原样”运行)

有关运行图的示例,请参阅 run;有关从图生成 mermaid 图表的示例,请参阅 mermaid_code

源代码位于 pydantic_graph/pydantic_graph/graph.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
@dataclass(init=False)
class Graph(Generic[StateT, DepsT, RunEndT]):
    """Definition of a graph.

    In `pydantic-graph`, a graph is a collection of nodes that can be run in sequence. The nodes define
    their outgoing edges — e.g. which nodes may be run next, and thereby the structure of the graph.

    Here's a very simple example of a graph which increments a number by 1, but makes sure the number is never
    42 at the end.

    ```py {title="never_42.py" noqa="I001"}
    from __future__ import annotations

    from dataclasses import dataclass

    from pydantic_graph import BaseNode, End, Graph, GraphRunContext

    @dataclass
    class MyState:
        number: int

    @dataclass
    class Increment(BaseNode[MyState]):
        async def run(self, ctx: GraphRunContext) -> Check42:
            ctx.state.number += 1
            return Check42()

    @dataclass
    class Check42(BaseNode[MyState, None, int]):
        async def run(self, ctx: GraphRunContext) -> Increment | End[int]:
            if ctx.state.number == 42:
                return Increment()
            else:
                return End(ctx.state.number)

    never_42_graph = Graph(nodes=(Increment, Check42))
    ```
    _(This example is complete, it can be run "as is")_

    See [`run`][pydantic_graph.graph.Graph.run] For an example of running graph, and
    [`mermaid_code`][pydantic_graph.graph.Graph.mermaid_code] for an example of generating a mermaid diagram
    from the graph.
    """

    name: str | None
    node_defs: dict[str, NodeDef[StateT, DepsT, RunEndT]]
    _state_type: type[StateT] | _utils.Unset = field(repr=False)
    _run_end_type: type[RunEndT] | _utils.Unset = field(repr=False)
    auto_instrument: bool = field(repr=False)

    def __init__(
        self,
        *,
        nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
        name: str | None = None,
        state_type: type[StateT] | _utils.Unset = _utils.UNSET,
        run_end_type: type[RunEndT] | _utils.Unset = _utils.UNSET,
        auto_instrument: bool = True,
    ):
        """Create a graph from a sequence of nodes.

        Args:
            nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same
                state type.
            name: Optional name for the graph, if not provided the name will be inferred from the calling frame
                on the first call to a graph method.
            state_type: The type of the state for the graph, this can generally be inferred from `nodes`.
            run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`.
            auto_instrument: Whether to create a span for the graph run and the execution of each node's run method.
        """
        self.name = name
        self._state_type = state_type
        self._run_end_type = run_end_type
        self.auto_instrument = auto_instrument

        parent_namespace = _utils.get_parent_namespace(inspect.currentframe())
        self.node_defs = {}
        for node in nodes:
            self._register_node(node, parent_namespace)

        self._validate_edges()

    async def run(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
        infer_name: bool = True,
    ) -> GraphRunResult[StateT, RunEndT]:
        """Run the graph from a starting node until it ends.

        Args:
            start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            persistence: State persistence interface, defaults to
                [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            A `GraphRunResult` containing information about the run, including its final result.

        Here's an example of running the graph from [above][pydantic_graph.graph.Graph]:

        ```py {title="run_never_42.py" noqa="I001" requires="never_42.py"}
        from never_42 import Increment, MyState, never_42_graph

        async def main():
            state = MyState(1)
            await never_42_graph.run(Increment(), state=state)
            print(state)
            #> MyState(number=2)

            state = MyState(41)
            await never_42_graph.run(Increment(), state=state)
            print(state)
            #> MyState(number=43)
        ```
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        async with self.iter(
            start_node, state=state, deps=deps, persistence=persistence, infer_name=False
        ) as graph_run:
            async for _node in graph_run:
                pass

        result = graph_run.result
        assert result is not None, 'GraphRun should have a result'
        return result

    def run_sync(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
        infer_name: bool = True,
    ) -> GraphRunResult[StateT, RunEndT]:
        """Synchronously run the graph.

        This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`.
        You therefore can't use this method inside async code or if there's an active event loop.

        Args:
            start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            persistence: State persistence interface, defaults to
                [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            The result type from ending the run and the history of the run.
        """
        if infer_name and self.name is None:  # pragma: no branch
            self._infer_name(inspect.currentframe())

        return _utils.get_event_loop().run_until_complete(
            self.run(start_node, state=state, deps=deps, persistence=persistence, infer_name=False)
        )

    @asynccontextmanager
    async def iter(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
        span: AbstractContextManager[AbstractSpan] | None = None,
        infer_name: bool = True,
    ) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
        """A contextmanager which can be used to iterate over the graph's nodes as they are executed.

        This method returns a `GraphRun` object which can be used to async-iterate over the nodes of this `Graph` as
        they are executed. This is the API to use if you want to record or interact with the nodes as the graph
        execution unfolds.

        The `GraphRun` can also be used to manually drive the graph execution by calling
        [`GraphRun.next`][pydantic_graph.graph.GraphRun.next].

        The `GraphRun` provides access to the full run history, state, deps, and the final result of the run once
        it has completed.

        For more details, see the API documentation of [`GraphRun`][pydantic_graph.graph.GraphRun].

        Args:
            start_node: the first node to run. Since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            persistence: State persistence interface, defaults to
                [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
            span: The span to use for the graph run. If not provided, a new span will be created.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns: A GraphRun that can be async iterated over to drive the graph to completion.
        """
        if infer_name and self.name is None:
            # f_back because `asynccontextmanager` adds one frame
            if frame := inspect.currentframe():  # pragma: no branch
                self._infer_name(frame.f_back)

        if persistence is None:
            persistence = SimpleStatePersistence()
        persistence.set_graph_types(self)

        with ExitStack() as stack:
            entered_span: AbstractSpan | None = None
            if span is None:
                if self.auto_instrument:
                    entered_span = stack.enter_context(logfire_span('run graph {graph.name}', graph=self))
            else:
                entered_span = stack.enter_context(span)
            traceparent = None if entered_span is None else get_traceparent(entered_span)
            yield GraphRun[StateT, DepsT, RunEndT](
                graph=self,
                start_node=start_node,
                persistence=persistence,
                state=state,
                deps=deps,
                traceparent=traceparent,
            )

    @asynccontextmanager
    async def iter_from_persistence(
        self,
        persistence: BaseStatePersistence[StateT, RunEndT],
        *,
        deps: DepsT = None,
        span: AbstractContextManager[AbstractSpan] | None = None,
        infer_name: bool = True,
    ) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
        """A contextmanager to iterate over the graph's nodes as they are executed, created from a persistence object.

        This method has similar functionality to [`iter`][pydantic_graph.graph.Graph.iter],
        but instead of passing the node to run, it will restore the node and state from state persistence.

        Args:
            persistence: The state persistence interface to use.
            deps: The dependencies of the graph.
            span: The span to use for the graph run. If not provided, a new span will be created.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns: A GraphRun that can be async iterated over to drive the graph to completion.
        """
        if infer_name and self.name is None:
            # f_back because `asynccontextmanager` adds one frame
            if frame := inspect.currentframe():  # pragma: no branch
                self._infer_name(frame.f_back)

        persistence.set_graph_types(self)

        snapshot = await persistence.load_next()
        if snapshot is None:
            raise exceptions.GraphRuntimeError('Unable to restore snapshot from state persistence.')

        snapshot.node.set_snapshot_id(snapshot.id)

        if self.auto_instrument and span is None:  # pragma: no branch
            span = logfire_span('run graph {graph.name}', graph=self)

        with ExitStack() as stack:
            entered_span = None if span is None else stack.enter_context(span)
            traceparent = None if entered_span is None else get_traceparent(entered_span)
            yield GraphRun[StateT, DepsT, RunEndT](
                graph=self,
                start_node=snapshot.node,
                persistence=persistence,
                state=snapshot.state,
                deps=deps,
                snapshot_id=snapshot.id,
                traceparent=traceparent,
            )

    async def initialize(
        self,
        node: BaseNode[StateT, DepsT, RunEndT],
        persistence: BaseStatePersistence[StateT, RunEndT],
        *,
        state: StateT = None,
        infer_name: bool = True,
    ) -> None:
        """Initialize a new graph run in persistence without running it.

        This is useful if you want to set up a graph run to be run later, e.g. via
        [`iter_from_persistence`][pydantic_graph.graph.Graph.iter_from_persistence].

        Args:
            node: The node to run first.
            persistence: State persistence interface.
            state: The start state of the graph.
            infer_name: Whether to infer the graph name from the calling frame.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        persistence.set_graph_types(self)
        await persistence.snapshot_node(state, node)

    def mermaid_code(
        self,
        *,
        start_node: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
        title: str | None | typing_extensions.Literal[False] = None,
        edge_labels: bool = True,
        notes: bool = True,
        highlighted_nodes: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
        highlight_css: str = mermaid.DEFAULT_HIGHLIGHT_CSS,
        infer_name: bool = True,
        direction: mermaid.StateDiagramDirection | None = None,
    ) -> str:
        """Generate a diagram representing the graph as [mermaid](https://mermaid.npmjs.net.cn/) diagram.

        This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code].

        Args:
            start_node: The node or nodes which can start the graph.
            title: The title of the diagram, use `False` to not include a title.
            edge_labels: Whether to include edge labels.
            notes: Whether to include notes on each node.
            highlighted_nodes: Optional node or nodes to highlight.
            highlight_css: The CSS to use for highlighting nodes.
            infer_name: Whether to infer the graph name from the calling frame.
            direction: The direction of flow.

        Returns:
            The mermaid code for the graph, which can then be rendered as a diagram.

        Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]:

        ```py {title="mermaid_never_42.py" requires="never_42.py"}
        from never_42 import Increment, never_42_graph

        print(never_42_graph.mermaid_code(start_node=Increment))
        '''
        ---
        title: never_42_graph
        ---
        stateDiagram-v2
          [*] --> Increment
          Increment --> Check42
          Check42 --> Increment
          Check42 --> [*]
        '''
        ```

        The rendered diagram will look like this:

        ```mermaid
        ---
        title: never_42_graph
        ---
        stateDiagram-v2
          [*] --> Increment
          Increment --> Check42
          Check42 --> Increment
          Check42 --> [*]
        ```
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if title is None and self.name:
            title = self.name
        return mermaid.generate_code(
            self,
            start_node=start_node,
            highlighted_nodes=highlighted_nodes,
            highlight_css=highlight_css,
            title=title or None,
            edge_labels=edge_labels,
            notes=notes,
            direction=direction,
        )

    def mermaid_image(
        self, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
    ) -> bytes:
        """Generate a diagram representing the graph as an image.

        The format and diagram can be customized using `kwargs`,
        see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

        !!! note "Uses external service"
            This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
            is a free service not affiliated with Pydantic.

        Args:
            infer_name: Whether to infer the graph name from the calling frame.
            **kwargs: Additional arguments to pass to `mermaid.request_image`.

        Returns:
            The image bytes.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if 'title' not in kwargs and self.name:
            kwargs['title'] = self.name
        return mermaid.request_image(self, **kwargs)

    def mermaid_save(
        self, path: Path | str, /, *, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
    ) -> None:
        """Generate a diagram representing the graph and save it as an image.

        The format and diagram can be customized using `kwargs`,
        see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

        !!! note "Uses external service"
            This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
            is a free service not affiliated with Pydantic.

        Args:
            path: The path to save the image to.
            infer_name: Whether to infer the graph name from the calling frame.
            **kwargs: Additional arguments to pass to `mermaid.save_image`.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if 'title' not in kwargs and self.name:
            kwargs['title'] = self.name
        mermaid.save_image(path, self, **kwargs)

    def get_nodes(self) -> Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]:
        """Get the nodes in the graph."""
        return [node_def.node for node_def in self.node_defs.values()]

    @cached_property
    def inferred_types(self) -> tuple[type[StateT], type[RunEndT]]:
        # Get the types of the state and run end from the graph.
        if _utils.is_set(self._state_type) and _utils.is_set(self._run_end_type):
            return self._state_type, self._run_end_type

        state_type = self._state_type
        run_end_type = self._run_end_type

        for node_def in self.node_defs.values():
            for base in typing_extensions.get_original_bases(node_def.node):
                if typing_extensions.get_origin(base) is BaseNode:
                    args = typing_extensions.get_args(base)
                    if not _utils.is_set(state_type) and args:
                        state_type = args[0]

                    if not _utils.is_set(run_end_type) and len(args) == 3:
                        t = args[2]
                        if not typing_objects.is_never(t):
                            run_end_type = t
                    if _utils.is_set(state_type) and _utils.is_set(run_end_type):
                        return state_type, run_end_type  # pyright: ignore[reportReturnType]
                    # break the inner (bases) loop
                    break

        if not _utils.is_set(state_type):  # pragma: no branch
            # state defaults to None, so use that if we can't infer it
            state_type = None
        if not _utils.is_set(run_end_type):
            # this happens if a graph has no return nodes, use None so any downstream errors are clear
            run_end_type = None
        return state_type, run_end_type  # pyright: ignore[reportReturnType]

    def _register_node(
        self,
        node: type[BaseNode[StateT, DepsT, RunEndT]],
        parent_namespace: dict[str, Any] | None,
    ) -> None:
        node_id = node.get_node_id()
        if existing_node := self.node_defs.get(node_id):
            raise exceptions.GraphSetupError(
                f'Node ID `{node_id}` is not unique — found on {existing_node.node} and {node}'
            )
        else:
            self.node_defs[node_id] = node.get_node_def(parent_namespace)

    def _validate_edges(self):
        known_node_ids = self.node_defs.keys()
        bad_edges: dict[str, list[str]] = {}

        for node_id, node_def in self.node_defs.items():
            for edge in node_def.next_node_edges.keys():
                if edge not in known_node_ids:
                    bad_edges.setdefault(edge, []).append(f'`{node_id}`')

        if bad_edges:
            bad_edges_list = [f'`{k}` is referenced by {_utils.comma_and(v)}' for k, v in bad_edges.items()]
            if len(bad_edges_list) == 1:
                raise exceptions.GraphSetupError(f'{bad_edges_list[0]} but not included in the graph.')
            else:
                b = '\n'.join(f' {be}' for be in bad_edges_list)
                raise exceptions.GraphSetupError(
                    f'Nodes are referenced in the graph but not included in the graph:\n{b}'
                )

    def _infer_name(self, function_frame: types.FrameType | None) -> None:
        """Infer the agent name from the call frame.

        Usage should be `self._infer_name(inspect.currentframe())`.

        Copied from `Agent`.
        """
        assert self.name is None, 'Name already set'
        if function_frame is not None and (parent_frame := function_frame.f_back):  # pragma: no branch
            for name, item in parent_frame.f_locals.items():
                if item is self:
                    self.name = name
                    return
            if parent_frame.f_locals != parent_frame.f_globals:  # pragma: no branch
                # if we couldn't find the agent in locals and globals are a different dict, try globals
                for name, item in parent_frame.f_globals.items():  # pragma: no branch
                    if item is self:
                        self.name = name
                        return

__init__

__init__(
    *,
    nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
    name: str | None = None,
    state_type: type[StateT] | Unset = UNSET,
    run_end_type: type[RunEndT] | Unset = UNSET,
    auto_instrument: bool = True
)

从一系列节点创建一个图。

参数

名称 类型 描述 默认值
nodes Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]

构成图的节点,节点必须是唯一的,并且都具有相同的状态类型泛型。

必需
name str | None

图的可选名称,如果未提供,名称将从首次调用图方法时的调用帧中推断出来。

None
state_type type[StateT] | Unset

图的状态类型,通常可以从 nodes 中推断出来。

UNSET
run_end_type type[RunEndT] | Unset

运行图的结果类型,通常可以从 nodes 中推断出来。

UNSET
auto_instrument bool

是否为图的运行以及每个节点 run 方法的执行创建一个 span。

True
源代码位于 pydantic_graph/pydantic_graph/graph.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
def __init__(
    self,
    *,
    nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
    name: str | None = None,
    state_type: type[StateT] | _utils.Unset = _utils.UNSET,
    run_end_type: type[RunEndT] | _utils.Unset = _utils.UNSET,
    auto_instrument: bool = True,
):
    """Create a graph from a sequence of nodes.

    Args:
        nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same
            state type.
        name: Optional name for the graph, if not provided the name will be inferred from the calling frame
            on the first call to a graph method.
        state_type: The type of the state for the graph, this can generally be inferred from `nodes`.
        run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`.
        auto_instrument: Whether to create a span for the graph run and the execution of each node's run method.
    """
    self.name = name
    self._state_type = state_type
    self._run_end_type = run_end_type
    self.auto_instrument = auto_instrument

    parent_namespace = _utils.get_parent_namespace(inspect.currentframe())
    self.node_defs = {}
    for node in nodes:
        self._register_node(node, parent_namespace)

    self._validate_edges()

run async

run(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: (
        BaseStatePersistence[StateT, RunEndT] | None
    ) = None,
    infer_name: bool = True
) -> GraphRunResult[StateT, RunEndT]

从一个起始节点运行图,直到它结束。

参数

名称 类型 描述 默认值
start_node BaseNode[StateT, DepsT, RunEndT]

要运行的第一个节点,因为图的定义没有指定图的入口点,所以你需要提供起始节点。

必需
state StateT

图的初始状态。

None
deps DepsT

图的依赖项。

None
persistence BaseStatePersistence[StateT, RunEndT] | None

状态持久化接口,如果为 None,则默认为 SimpleStatePersistence

None
infer_name bool

是否从调用帧推断图的名称。

True

返回

类型 描述
GraphRunResult[StateT, RunEndT]

一个 GraphRunResult,包含有关运行的信息,包括其最终结果。

这是运行上面图的示例。

run_never_42.py
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(1)
    await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=2)

    state = MyState(41)
    await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=43)
源代码位于 pydantic_graph/pydantic_graph/graph.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
async def run(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    infer_name: bool = True,
) -> GraphRunResult[StateT, RunEndT]:
    """Run the graph from a starting node until it ends.

    Args:
        start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        persistence: State persistence interface, defaults to
            [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        A `GraphRunResult` containing information about the run, including its final result.

    Here's an example of running the graph from [above][pydantic_graph.graph.Graph]:

    ```py {title="run_never_42.py" noqa="I001" requires="never_42.py"}
    from never_42 import Increment, MyState, never_42_graph

    async def main():
        state = MyState(1)
        await never_42_graph.run(Increment(), state=state)
        print(state)
        #> MyState(number=2)

        state = MyState(41)
        await never_42_graph.run(Increment(), state=state)
        print(state)
        #> MyState(number=43)
    ```
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    async with self.iter(
        start_node, state=state, deps=deps, persistence=persistence, infer_name=False
    ) as graph_run:
        async for _node in graph_run:
            pass

    result = graph_run.result
    assert result is not None, 'GraphRun should have a result'
    return result

run_sync

run_sync(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: (
        BaseStatePersistence[StateT, RunEndT] | None
    ) = None,
    infer_name: bool = True
) -> GraphRunResult[StateT, RunEndT]

同步运行图。

这是一个方便的方法,它用 loop.run_until_complete(...) 包装了 self.run。因此,你不能在异步代码中或在有活动事件循环的情况下使用此方法。

参数

名称 类型 描述 默认值
start_node BaseNode[StateT, DepsT, RunEndT]

要运行的第一个节点,因为图的定义没有指定图的入口点,所以你需要提供起始节点。

必需
state StateT

图的初始状态。

None
deps DepsT

图的依赖项。

None
persistence BaseStatePersistence[StateT, RunEndT] | None

状态持久化接口,如果为 None,则默认为 SimpleStatePersistence

None
infer_name bool

是否从调用帧推断图的名称。

True

返回

类型 描述
GraphRunResult[StateT, RunEndT]

结束运行的结果类型和运行的历史记录。

源代码位于 pydantic_graph/pydantic_graph/graph.py
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def run_sync(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    infer_name: bool = True,
) -> GraphRunResult[StateT, RunEndT]:
    """Synchronously run the graph.

    This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`.
    You therefore can't use this method inside async code or if there's an active event loop.

    Args:
        start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        persistence: State persistence interface, defaults to
            [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        The result type from ending the run and the history of the run.
    """
    if infer_name and self.name is None:  # pragma: no branch
        self._infer_name(inspect.currentframe())

    return _utils.get_event_loop().run_until_complete(
        self.run(start_node, state=state, deps=deps, persistence=persistence, infer_name=False)
    )

iter async

iter(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: (
        BaseStatePersistence[StateT, RunEndT] | None
    ) = None,
    span: (
        AbstractContextManager[AbstractSpan] | None
    ) = None,
    infer_name: bool = True
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

一个上下文管理器,可用于在图的节点执行时对其进行迭代。

此方法返回一个 GraphRun 对象,可用于在执行时异步迭代此 Graph 的节点。如果你想在图执行展开时记录或与节点交互,这是要使用的 API。

GraphRun 也可以通过调用 GraphRun.next 来手动驱动图的执行。

GraphRun 提供了对完整运行历史、状态、依赖项以及运行完成后最终结果的访问。

有关更多详细信息,请参阅 GraphRun 的 API 文档。

参数

名称 类型 描述 默认值
start_node BaseNode[StateT, DepsT, RunEndT]

要运行的第一个节点。因为图的定义没有指定图的入口点,所以你需要提供起始节点。

必需
state StateT

图的初始状态。

None
deps DepsT

图的依赖项。

None
persistence BaseStatePersistence[StateT, RunEndT] | None

状态持久化接口,如果为 None,则默认为 SimpleStatePersistence

None
span AbstractContextManager[AbstractSpan] | None

用于图运行的 span。如果未提供,将创建一个新的 span。

None
infer_name bool

是否从调用帧推断图的名称。

True

返回:一个 GraphRun,可以异步迭代以驱动图完成。

源代码位于 pydantic_graph/pydantic_graph/graph.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
@asynccontextmanager
async def iter(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
    """A contextmanager which can be used to iterate over the graph's nodes as they are executed.

    This method returns a `GraphRun` object which can be used to async-iterate over the nodes of this `Graph` as
    they are executed. This is the API to use if you want to record or interact with the nodes as the graph
    execution unfolds.

    The `GraphRun` can also be used to manually drive the graph execution by calling
    [`GraphRun.next`][pydantic_graph.graph.GraphRun.next].

    The `GraphRun` provides access to the full run history, state, deps, and the final result of the run once
    it has completed.

    For more details, see the API documentation of [`GraphRun`][pydantic_graph.graph.GraphRun].

    Args:
        start_node: the first node to run. Since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        persistence: State persistence interface, defaults to
            [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
        span: The span to use for the graph run. If not provided, a new span will be created.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns: A GraphRun that can be async iterated over to drive the graph to completion.
    """
    if infer_name and self.name is None:
        # f_back because `asynccontextmanager` adds one frame
        if frame := inspect.currentframe():  # pragma: no branch
            self._infer_name(frame.f_back)

    if persistence is None:
        persistence = SimpleStatePersistence()
    persistence.set_graph_types(self)

    with ExitStack() as stack:
        entered_span: AbstractSpan | None = None
        if span is None:
            if self.auto_instrument:
                entered_span = stack.enter_context(logfire_span('run graph {graph.name}', graph=self))
        else:
            entered_span = stack.enter_context(span)
        traceparent = None if entered_span is None else get_traceparent(entered_span)
        yield GraphRun[StateT, DepsT, RunEndT](
            graph=self,
            start_node=start_node,
            persistence=persistence,
            state=state,
            deps=deps,
            traceparent=traceparent,
        )

iter_from_persistence async

iter_from_persistence(
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    deps: DepsT = None,
    span: (
        AbstractContextManager[AbstractSpan] | None
    ) = None,
    infer_name: bool = True
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

一个上下文管理器,用于在图的节点执行时对其进行迭代,该管理器从持久化对象创建。

此方法的功能与 iter 类似,但它不是传递要运行的节点,而是从状态持久化中恢复节点和状态。

参数

名称 类型 描述 默认值
persistence BaseStatePersistence[StateT, RunEndT]

要使用的状态持久化接口。

必需
deps DepsT

图的依赖项。

None
span AbstractContextManager[AbstractSpan] | None

用于图运行的 span。如果未提供,将创建一个新的 span。

None
infer_name bool

是否从调用帧推断图的名称。

True

返回:一个 GraphRun,可以异步迭代以驱动图完成。

源代码位于 pydantic_graph/pydantic_graph/graph.py
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
@asynccontextmanager
async def iter_from_persistence(
    self,
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    deps: DepsT = None,
    span: AbstractContextManager[AbstractSpan] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
    """A contextmanager to iterate over the graph's nodes as they are executed, created from a persistence object.

    This method has similar functionality to [`iter`][pydantic_graph.graph.Graph.iter],
    but instead of passing the node to run, it will restore the node and state from state persistence.

    Args:
        persistence: The state persistence interface to use.
        deps: The dependencies of the graph.
        span: The span to use for the graph run. If not provided, a new span will be created.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns: A GraphRun that can be async iterated over to drive the graph to completion.
    """
    if infer_name and self.name is None:
        # f_back because `asynccontextmanager` adds one frame
        if frame := inspect.currentframe():  # pragma: no branch
            self._infer_name(frame.f_back)

    persistence.set_graph_types(self)

    snapshot = await persistence.load_next()
    if snapshot is None:
        raise exceptions.GraphRuntimeError('Unable to restore snapshot from state persistence.')

    snapshot.node.set_snapshot_id(snapshot.id)

    if self.auto_instrument and span is None:  # pragma: no branch
        span = logfire_span('run graph {graph.name}', graph=self)

    with ExitStack() as stack:
        entered_span = None if span is None else stack.enter_context(span)
        traceparent = None if entered_span is None else get_traceparent(entered_span)
        yield GraphRun[StateT, DepsT, RunEndT](
            graph=self,
            start_node=snapshot.node,
            persistence=persistence,
            state=snapshot.state,
            deps=deps,
            snapshot_id=snapshot.id,
            traceparent=traceparent,
        )

initialize async

initialize(
    node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    state: StateT = None,
    infer_name: bool = True
) -> None

在持久化中初始化一个新的图运行,但不运行它。

如果你想设置一个图运行以便稍后运行(例如,通过 iter_from_persistence),这很有用。

参数

名称 类型 描述 默认值
node BaseNode[StateT, DepsT, RunEndT]

要首先运行的节点。

必需
persistence BaseStatePersistence[StateT, RunEndT]

状态持久化接口。

必需
state StateT

图的起始状态。

None
infer_name bool

是否从调用帧推断图的名称。

True
源代码位于 pydantic_graph/pydantic_graph/graph.py
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
async def initialize(
    self,
    node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    state: StateT = None,
    infer_name: bool = True,
) -> None:
    """Initialize a new graph run in persistence without running it.

    This is useful if you want to set up a graph run to be run later, e.g. via
    [`iter_from_persistence`][pydantic_graph.graph.Graph.iter_from_persistence].

    Args:
        node: The node to run first.
        persistence: State persistence interface.
        state: The start state of the graph.
        infer_name: Whether to infer the graph name from the calling frame.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    persistence.set_graph_types(self)
    await persistence.snapshot_node(state, node)

mermaid_code

mermaid_code(
    *,
    start_node: (
        Sequence[NodeIdent] | NodeIdent | None
    ) = None,
    title: str | None | Literal[False] = None,
    edge_labels: bool = True,
    notes: bool = True,
    highlighted_nodes: (
        Sequence[NodeIdent] | NodeIdent | None
    ) = None,
    highlight_css: str = DEFAULT_HIGHLIGHT_CSS,
    infer_name: bool = True,
    direction: StateDiagramDirection | None = None
) -> str

生成一个表示图的 mermaid 图表。

此方法调用 pydantic_graph.mermaid.generate_code

参数

名称 类型 描述 默认值
start_node Sequence[NodeIdent] | NodeIdent | None

可以启动图的一个或多个节点。

None
title str | None | Literal[False]

图表的标题,使用 False 表示不包含标题。

None
edge_labels bool

是否包含边的标签。

True
notes bool

是否在每个节点上包含注释。

True
highlighted_nodes Sequence[NodeIdent] | NodeIdent | None

要高亮显示的可选一个或多个节点。

None
highlight_css str

用于高亮显示节点的 CSS。

DEFAULT_HIGHLIGHT_CSS
infer_name bool

是否从调用帧推断图的名称。

True
direction StateDiagramDirection | None

流程的方向。

None

返回

类型 描述
str

图的 mermaid 代码,然后可以将其渲染为图表。

这是为上面的图生成图表的示例。

mermaid_never_42.py
from never_42 import Increment, never_42_graph

print(never_42_graph.mermaid_code(start_node=Increment))
'''
---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
'''

渲染出的图表将如下所示:

---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
源代码位于 pydantic_graph/pydantic_graph/graph.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
def mermaid_code(
    self,
    *,
    start_node: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    title: str | None | typing_extensions.Literal[False] = None,
    edge_labels: bool = True,
    notes: bool = True,
    highlighted_nodes: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    highlight_css: str = mermaid.DEFAULT_HIGHLIGHT_CSS,
    infer_name: bool = True,
    direction: mermaid.StateDiagramDirection | None = None,
) -> str:
    """Generate a diagram representing the graph as [mermaid](https://mermaid.npmjs.net.cn/) diagram.

    This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code].

    Args:
        start_node: The node or nodes which can start the graph.
        title: The title of the diagram, use `False` to not include a title.
        edge_labels: Whether to include edge labels.
        notes: Whether to include notes on each node.
        highlighted_nodes: Optional node or nodes to highlight.
        highlight_css: The CSS to use for highlighting nodes.
        infer_name: Whether to infer the graph name from the calling frame.
        direction: The direction of flow.

    Returns:
        The mermaid code for the graph, which can then be rendered as a diagram.

    Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]:

    ```py {title="mermaid_never_42.py" requires="never_42.py"}
    from never_42 import Increment, never_42_graph

    print(never_42_graph.mermaid_code(start_node=Increment))
    '''
    ---
    title: never_42_graph
    ---
    stateDiagram-v2
      [*] --> Increment
      Increment --> Check42
      Check42 --> Increment
      Check42 --> [*]
    '''
    ```

    The rendered diagram will look like this:

    ```mermaid
    ---
    title: never_42_graph
    ---
    stateDiagram-v2
      [*] --> Increment
      Increment --> Check42
      Check42 --> Increment
      Check42 --> [*]
    ```
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if title is None and self.name:
        title = self.name
    return mermaid.generate_code(
        self,
        start_node=start_node,
        highlighted_nodes=highlighted_nodes,
        highlight_css=highlight_css,
        title=title or None,
        edge_labels=edge_labels,
        notes=notes,
        direction=direction,
    )

mermaid_image

mermaid_image(
    infer_name: bool = True, **kwargs: Unpack[MermaidConfig]
) -> bytes

生成一个表示图的图像。

可以使用 kwargs 自定义格式和图表,请参阅 pydantic_graph.mermaid.MermaidConfig

使用外部服务

此方法向 mermaid.ink 发出请求以渲染图像,mermaid.ink 是一个免费服务,与 Pydantic 无关。

参数

名称 类型 描述 默认值
infer_name bool

是否从调用帧推断图的名称。

True
**kwargs Unpack[MermaidConfig]

传递给 mermaid.request_image 的附加参数。

{}

返回

类型 描述
bytes

图像的字节数据。

源代码位于 pydantic_graph/pydantic_graph/graph.py
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def mermaid_image(
    self, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
) -> bytes:
    """Generate a diagram representing the graph as an image.

    The format and diagram can be customized using `kwargs`,
    see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

    !!! note "Uses external service"
        This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
        is a free service not affiliated with Pydantic.

    Args:
        infer_name: Whether to infer the graph name from the calling frame.
        **kwargs: Additional arguments to pass to `mermaid.request_image`.

    Returns:
        The image bytes.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if 'title' not in kwargs and self.name:
        kwargs['title'] = self.name
    return mermaid.request_image(self, **kwargs)

mermaid_save

mermaid_save(
    path: Path | str,
    /,
    *,
    infer_name: bool = True,
    **kwargs: Unpack[MermaidConfig],
) -> None

生成一个表示图的图表并将其保存为图像。

可以使用 kwargs 自定义格式和图表,请参阅 pydantic_graph.mermaid.MermaidConfig

使用外部服务

此方法向 mermaid.ink 发出请求以渲染图像,mermaid.ink 是一个免费服务,与 Pydantic 无关。

参数

名称 类型 描述 默认值
path Path | str

保存图像的路径。

必需
infer_name bool

是否从调用帧推断图的名称。

True
**kwargs Unpack[MermaidConfig]

传递给 mermaid.save_image 的附加参数。

{}
源代码位于 pydantic_graph/pydantic_graph/graph.py
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
def mermaid_save(
    self, path: Path | str, /, *, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
) -> None:
    """Generate a diagram representing the graph and save it as an image.

    The format and diagram can be customized using `kwargs`,
    see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

    !!! note "Uses external service"
        This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
        is a free service not affiliated with Pydantic.

    Args:
        path: The path to save the image to.
        infer_name: Whether to infer the graph name from the calling frame.
        **kwargs: Additional arguments to pass to `mermaid.save_image`.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if 'title' not in kwargs and self.name:
        kwargs['title'] = self.name
    mermaid.save_image(path, self, **kwargs)

get_nodes

get_nodes() -> (
    Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]
)

获取图中的节点。

源代码位于 pydantic_graph/pydantic_graph/graph.py
454
455
456
def get_nodes(self) -> Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]:
    """Get the nodes in the graph."""
    return [node_def.node for node_def in self.node_defs.values()]

GraphRun

Bases: Generic[StateT, DepsT, RunEndT]

一个有状态的、可异步迭代的 Graph 运行实例。

你通常通过调用 async with [my_graph.iter(...)][pydantic_graph.graph.Graph.iter] as graph_run: 来获取 GraphRun 实例。这使你能够在节点运行时对其进行迭代,可以通过 async for 迭代或重复调用 .next(...) 来实现。

这是迭代上面图的示例。

iter_never_42.py
from copy import deepcopy
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(1)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
        async for node in graph_run:
            node_states.append((node, deepcopy(graph_run.state)))
        print(node_states)
        '''
        [
            (Increment(), MyState(number=1)),
            (Increment(), MyState(number=1)),
            (Check42(), MyState(number=2)),
            (End(data=2), MyState(number=2)),
        ]
        '''

    state = MyState(41)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
        async for node in graph_run:
            node_states.append((node, deepcopy(graph_run.state)))
        print(node_states)
        '''
        [
            (Increment(), MyState(number=41)),
            (Increment(), MyState(number=41)),
            (Check42(), MyState(number=42)),
            (Increment(), MyState(number=42)),
            (Check42(), MyState(number=43)),
            (End(data=43), MyState(number=43)),
        ]
        '''

有关如何手动驱动图运行的示例,请参阅 GraphRun.next 文档

源代码位于 pydantic_graph/pydantic_graph/graph.py
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
class GraphRun(Generic[StateT, DepsT, RunEndT]):
    """A stateful, async-iterable run of a [`Graph`][pydantic_graph.graph.Graph].

    You typically get a `GraphRun` instance from calling
    `async with [my_graph.iter(...)][pydantic_graph.graph.Graph.iter] as graph_run:`. That gives you the ability to iterate
    through nodes as they run, either by `async for` iteration or by repeatedly calling `.next(...)`.

    Here's an example of iterating over the graph from [above][pydantic_graph.graph.Graph]:
    ```py {title="iter_never_42.py" noqa="I001" requires="never_42.py"}
    from copy import deepcopy
    from never_42 import Increment, MyState, never_42_graph

    async def main():
        state = MyState(1)
        async with never_42_graph.iter(Increment(), state=state) as graph_run:
            node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
            async for node in graph_run:
                node_states.append((node, deepcopy(graph_run.state)))
            print(node_states)
            '''
            [
                (Increment(), MyState(number=1)),
                (Increment(), MyState(number=1)),
                (Check42(), MyState(number=2)),
                (End(data=2), MyState(number=2)),
            ]
            '''

        state = MyState(41)
        async with never_42_graph.iter(Increment(), state=state) as graph_run:
            node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
            async for node in graph_run:
                node_states.append((node, deepcopy(graph_run.state)))
            print(node_states)
            '''
            [
                (Increment(), MyState(number=41)),
                (Increment(), MyState(number=41)),
                (Check42(), MyState(number=42)),
                (Increment(), MyState(number=42)),
                (Check42(), MyState(number=43)),
                (End(data=43), MyState(number=43)),
            ]
            '''
    ```

    See the [`GraphRun.next` documentation][pydantic_graph.graph.GraphRun.next] for an example of how to manually
    drive the graph run.
    """

    def __init__(
        self,
        *,
        graph: Graph[StateT, DepsT, RunEndT],
        start_node: BaseNode[StateT, DepsT, RunEndT],
        persistence: BaseStatePersistence[StateT, RunEndT],
        state: StateT,
        deps: DepsT,
        traceparent: str | None,
        snapshot_id: str | None = None,
    ):
        """Create a new run for a given graph, starting at the specified node.

        Typically, you'll use [`Graph.iter`][pydantic_graph.graph.Graph.iter] rather than calling this directly.

        Args:
            graph: The [`Graph`][pydantic_graph.graph.Graph] to run.
            start_node: The node where execution will begin.
            persistence: State persistence interface.
            state: A shared state object or primitive (like a counter, dataclass, etc.) that is available
                to all nodes via `ctx.state`.
            deps: Optional dependencies that each node can access via `ctx.deps`, e.g. database connections,
                configuration, or logging clients.
            traceparent: The traceparent for the span used for the graph run.
            snapshot_id: The ID of the snapshot the node came from.
        """
        self.graph = graph
        self.persistence = persistence
        self._snapshot_id: str | None = snapshot_id
        self.state = state
        self.deps = deps

        self.__traceparent = traceparent
        self._next_node: BaseNode[StateT, DepsT, RunEndT] | End[RunEndT] = start_node
        self._is_started: bool = False

    @overload
    def _traceparent(self, *, required: typing_extensions.Literal[False]) -> str | None: ...
    @overload
    def _traceparent(self) -> str: ...
    def _traceparent(self, *, required: bool = True) -> str | None:
        if self.__traceparent is None and required:  # pragma: no cover
            raise exceptions.GraphRuntimeError('No span was created for this graph run')
        return self.__traceparent

    @property
    def next_node(self) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
        """The next node that will be run in the graph.

        This is the next node that will be used during async iteration, or if a node is not passed to `self.next(...)`.
        """
        return self._next_node

    @property
    def result(self) -> GraphRunResult[StateT, RunEndT] | None:
        """The final result of the graph run if the run is completed, otherwise `None`."""
        if not isinstance(self._next_node, End):
            return None  # The GraphRun has not finished running
        return GraphRunResult[StateT, RunEndT](
            self._next_node.data,
            state=self.state,
            persistence=self.persistence,
            traceparent=self._traceparent(required=False),
        )

    async def next(
        self, node: BaseNode[StateT, DepsT, RunEndT] | None = None
    ) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
        """Manually drive the graph run by passing in the node you want to run next.

        This lets you inspect or mutate the node before continuing execution, or skip certain nodes
        under dynamic conditions. The graph run should stop when you return an [`End`][pydantic_graph.nodes.End] node.

        Here's an example of using `next` to drive the graph from [above][pydantic_graph.graph.Graph]:
        ```py {title="next_never_42.py" noqa="I001" requires="never_42.py"}
        from copy import deepcopy
        from pydantic_graph import End
        from never_42 import Increment, MyState, never_42_graph

        async def main():
            state = MyState(48)
            async with never_42_graph.iter(Increment(), state=state) as graph_run:
                next_node = graph_run.next_node  # start with the first node
                node_states = [(next_node, deepcopy(graph_run.state))]

                while not isinstance(next_node, End):
                    if graph_run.state.number == 50:
                        graph_run.state.number = 42
                    next_node = await graph_run.next(next_node)
                    node_states.append((next_node, deepcopy(graph_run.state)))

                print(node_states)
                '''
                [
                    (Increment(), MyState(number=48)),
                    (Check42(), MyState(number=49)),
                    (End(data=49), MyState(number=49)),
                ]
                '''
        ```

        Args:
            node: The node to run next in the graph. If not specified, uses `self.next_node`, which is initialized to
                the `start_node` of the run and updated each time a new node is returned.

        Returns:
            The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if
            the run has completed.
        """
        if node is None:
            # This cast is necessary because self._next_node could be an `End`. You'll get a runtime error if that's
            # the case, but if it is, the only way to get there would be to have tried calling next manually after
            # the run finished. Either way, maybe it would be better to not do this cast...
            node = cast(BaseNode[StateT, DepsT, RunEndT], self._next_node)
            node_snapshot_id = node.get_snapshot_id()
        else:
            node_snapshot_id = node.get_snapshot_id()

        if node_snapshot_id != self._snapshot_id:
            await self.persistence.snapshot_node_if_new(node_snapshot_id, self.state, node)
            self._snapshot_id = node_snapshot_id

        if not isinstance(node, BaseNode):
            # While technically this is not compatible with the documented method signature, it's an easy mistake to
            # make, and we should eagerly provide a more helpful error message than you'd get otherwise.
            raise TypeError(f'`next` must be called with a `BaseNode` instance, got {node!r}.')

        node_id = node.get_node_id()
        if node_id not in self.graph.node_defs:
            raise exceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')

        with ExitStack() as stack:
            if self.graph.auto_instrument:
                stack.enter_context(logfire_span('run node {node_id}', node_id=node_id, node=node))

            async with self.persistence.record_run(node_snapshot_id):
                ctx = GraphRunContext(state=self.state, deps=self.deps)
                self._next_node = await node.run(ctx)

        if isinstance(self._next_node, End):
            self._snapshot_id = self._next_node.get_snapshot_id()
            await self.persistence.snapshot_end(self.state, self._next_node)
        elif isinstance(self._next_node, BaseNode):
            self._snapshot_id = self._next_node.get_snapshot_id()
            await self.persistence.snapshot_node(self.state, self._next_node)
        else:
            raise exceptions.GraphRuntimeError(
                f'Invalid node return type: `{type(self._next_node).__name__}`. Expected `BaseNode` or `End`.'
            )

        return self._next_node

    def __aiter__(self) -> AsyncIterator[BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]]:
        return self

    async def __anext__(self) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
        """Use the last returned node as the input to `Graph.next`."""
        if not self._is_started:
            self._is_started = True
            return self._next_node

        if isinstance(self._next_node, End):
            raise StopAsyncIteration

        return await self.next(self._next_node)

    def __repr__(self) -> str:
        return f'<GraphRun graph={self.graph.name or "[unnamed]"}>'

__init__

__init__(
    *,
    graph: Graph[StateT, DepsT, RunEndT],
    start_node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    state: StateT,
    deps: DepsT,
    traceparent: str | None,
    snapshot_id: str | None = None
)

为给定的图创建一个新的运行,从指定的节点开始。

通常,你会使用 Graph.iter 而不是直接调用它。

参数

名称 类型 描述 默认值
graph Graph[StateT, DepsT, RunEndT]

要运行的 Graph

必需
start_node BaseNode[StateT, DepsT, RunEndT]

执行将开始的节点。

必需
persistence BaseStatePersistence[StateT, RunEndT]

状态持久化接口。

必需
state StateT

一个共享的状态对象或原始类型(如计数器、数据类等),所有节点都可以通过 ctx.state 访问。

必需
deps DepsT

每个节点都可以通过 ctx.deps 访问的可选依赖项,例如数据库连接、配置或日志记录客户端。

必需
traceparent str | None

用于图运行的 span 的 traceparent。

必需
snapshot_id str | None

节点来源的快照 ID。

None
源代码位于 pydantic_graph/pydantic_graph/graph.py
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
def __init__(
    self,
    *,
    graph: Graph[StateT, DepsT, RunEndT],
    start_node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    state: StateT,
    deps: DepsT,
    traceparent: str | None,
    snapshot_id: str | None = None,
):
    """Create a new run for a given graph, starting at the specified node.

    Typically, you'll use [`Graph.iter`][pydantic_graph.graph.Graph.iter] rather than calling this directly.

    Args:
        graph: The [`Graph`][pydantic_graph.graph.Graph] to run.
        start_node: The node where execution will begin.
        persistence: State persistence interface.
        state: A shared state object or primitive (like a counter, dataclass, etc.) that is available
            to all nodes via `ctx.state`.
        deps: Optional dependencies that each node can access via `ctx.deps`, e.g. database connections,
            configuration, or logging clients.
        traceparent: The traceparent for the span used for the graph run.
        snapshot_id: The ID of the snapshot the node came from.
    """
    self.graph = graph
    self.persistence = persistence
    self._snapshot_id: str | None = snapshot_id
    self.state = state
    self.deps = deps

    self.__traceparent = traceparent
    self._next_node: BaseNode[StateT, DepsT, RunEndT] | End[RunEndT] = start_node
    self._is_started: bool = False

next_node property

将在图中运行的下一个节点。

这是在异步迭代期间,或者如果没有将节点传递给 self.next(...) 时将使用的下一个节点。

result property

result: GraphRunResult[StateT, RunEndT] | None

如果运行已完成,则为图运行的最终结果,否则为 None

next async

next(
    node: BaseNode[StateT, DepsT, RunEndT] | None = None,
) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

通过传入你想要下一个运行的节点来手动驱动图的运行。

这允许你在继续执行之前检查或修改节点,或在动态条件下跳过某些节点。当你返回一个 End 节点时,图的运行应该停止。

这是使用 next 驱动上面图的示例。

next_never_42.py
from copy import deepcopy
from pydantic_graph import End
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(48)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        next_node = graph_run.next_node  # start with the first node
        node_states = [(next_node, deepcopy(graph_run.state))]

        while not isinstance(next_node, End):
            if graph_run.state.number == 50:
                graph_run.state.number = 42
            next_node = await graph_run.next(next_node)
            node_states.append((next_node, deepcopy(graph_run.state)))

        print(node_states)
        '''
        [
            (Increment(), MyState(number=48)),
            (Check42(), MyState(number=49)),
            (End(data=49), MyState(number=49)),
        ]
        '''

参数

名称 类型 描述 默认值
node BaseNode[StateT, DepsT, RunEndT] | None

要在图中下一个运行的节点。如果未指定,则使用 self.next_node,它被初始化为运行的 start_node,并在每次返回新节点时更新。

None

返回

类型 描述
BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

由图逻辑返回的下一个节点,或者如果

BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

运行已完成,则为一个 End 节点。

源代码位于 pydantic_graph/pydantic_graph/graph.py
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
async def next(
    self, node: BaseNode[StateT, DepsT, RunEndT] | None = None
) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
    """Manually drive the graph run by passing in the node you want to run next.

    This lets you inspect or mutate the node before continuing execution, or skip certain nodes
    under dynamic conditions. The graph run should stop when you return an [`End`][pydantic_graph.nodes.End] node.

    Here's an example of using `next` to drive the graph from [above][pydantic_graph.graph.Graph]:
    ```py {title="next_never_42.py" noqa="I001" requires="never_42.py"}
    from copy import deepcopy
    from pydantic_graph import End
    from never_42 import Increment, MyState, never_42_graph

    async def main():
        state = MyState(48)
        async with never_42_graph.iter(Increment(), state=state) as graph_run:
            next_node = graph_run.next_node  # start with the first node
            node_states = [(next_node, deepcopy(graph_run.state))]

            while not isinstance(next_node, End):
                if graph_run.state.number == 50:
                    graph_run.state.number = 42
                next_node = await graph_run.next(next_node)
                node_states.append((next_node, deepcopy(graph_run.state)))

            print(node_states)
            '''
            [
                (Increment(), MyState(number=48)),
                (Check42(), MyState(number=49)),
                (End(data=49), MyState(number=49)),
            ]
            '''
    ```

    Args:
        node: The node to run next in the graph. If not specified, uses `self.next_node`, which is initialized to
            the `start_node` of the run and updated each time a new node is returned.

    Returns:
        The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if
        the run has completed.
    """
    if node is None:
        # This cast is necessary because self._next_node could be an `End`. You'll get a runtime error if that's
        # the case, but if it is, the only way to get there would be to have tried calling next manually after
        # the run finished. Either way, maybe it would be better to not do this cast...
        node = cast(BaseNode[StateT, DepsT, RunEndT], self._next_node)
        node_snapshot_id = node.get_snapshot_id()
    else:
        node_snapshot_id = node.get_snapshot_id()

    if node_snapshot_id != self._snapshot_id:
        await self.persistence.snapshot_node_if_new(node_snapshot_id, self.state, node)
        self._snapshot_id = node_snapshot_id

    if not isinstance(node, BaseNode):
        # While technically this is not compatible with the documented method signature, it's an easy mistake to
        # make, and we should eagerly provide a more helpful error message than you'd get otherwise.
        raise TypeError(f'`next` must be called with a `BaseNode` instance, got {node!r}.')

    node_id = node.get_node_id()
    if node_id not in self.graph.node_defs:
        raise exceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')

    with ExitStack() as stack:
        if self.graph.auto_instrument:
            stack.enter_context(logfire_span('run node {node_id}', node_id=node_id, node=node))

        async with self.persistence.record_run(node_snapshot_id):
            ctx = GraphRunContext(state=self.state, deps=self.deps)
            self._next_node = await node.run(ctx)

    if isinstance(self._next_node, End):
        self._snapshot_id = self._next_node.get_snapshot_id()
        await self.persistence.snapshot_end(self.state, self._next_node)
    elif isinstance(self._next_node, BaseNode):
        self._snapshot_id = self._next_node.get_snapshot_id()
        await self.persistence.snapshot_node(self.state, self._next_node)
    else:
        raise exceptions.GraphRuntimeError(
            f'Invalid node return type: `{type(self._next_node).__name__}`. Expected `BaseNode` or `End`.'
        )

    return self._next_node

__anext__ async

__anext__() -> (
    BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]
)

使用最后返回的节点作为 Graph.next 的输入。

源代码位于 pydantic_graph/pydantic_graph/graph.py
749
750
751
752
753
754
755
756
757
758
async def __anext__(self) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
    """Use the last returned node as the input to `Graph.next`."""
    if not self._is_started:
        self._is_started = True
        return self._next_node

    if isinstance(self._next_node, End):
        raise StopAsyncIteration

    return await self.next(self._next_node)

GraphRunResult dataclass

Bases: Generic[StateT, RunEndT]

运行图的最终结果。

源代码位于 pydantic_graph/pydantic_graph/graph.py
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
@dataclass(init=False)
class GraphRunResult(Generic[StateT, RunEndT]):
    """The final result of running a graph."""

    output: RunEndT
    state: StateT
    persistence: BaseStatePersistence[StateT, RunEndT] = field(repr=False)

    def __init__(
        self,
        output: RunEndT,
        state: StateT,
        persistence: BaseStatePersistence[StateT, RunEndT],
        traceparent: str | None = None,
    ):
        self.output = output
        self.state = state
        self.persistence = persistence
        self.__traceparent = traceparent

    @overload
    def _traceparent(self, *, required: typing_extensions.Literal[False]) -> str | None: ...
    @overload
    def _traceparent(self) -> str: ...
    def _traceparent(self, *, required: bool = True) -> str | None:  # pragma: no cover
        if self.__traceparent is None and required:
            raise exceptions.GraphRuntimeError('No span was created for this graph run.')
        return self.__traceparent