跳到内容

pydantic_graph

Graph dataclass

基类: Generic[StateT, DepsT, RunEndT]

图的定义。

pydantic-graph 中,图是可以按顺序运行的节点的集合。节点定义了它们的传出边——例如,哪些节点可以接下来运行,从而定义了图的结构。

这是一个非常简单的图的示例,该图将数字递增 1,但确保最终数字永远不是 42。

never_42.py
from __future__ import annotations

from dataclasses import dataclass

from pydantic_graph import BaseNode, End, Graph, GraphRunContext

@dataclass
class MyState:
    number: int

@dataclass
class Increment(BaseNode[MyState]):
    async def run(self, ctx: GraphRunContext) -> Check42:
        ctx.state.number += 1
        return Check42()

@dataclass
class Check42(BaseNode[MyState, None, int]):
    async def run(self, ctx: GraphRunContext) -> Increment | End[int]:
        if ctx.state.number == 42:
            return Increment()
        else:
            return End(ctx.state.number)

never_42_graph = Graph(nodes=(Increment, Check42))
(此示例是完整的,可以“按原样”运行)

有关运行图的示例,请参阅 run,有关从图生成 mermaid 图的示例,请参阅 mermaid_code

源代码位于 pydantic_graph/pydantic_graph/graph.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
@dataclass(init=False)
class Graph(Generic[StateT, DepsT, RunEndT]):
    """Definition of a graph.

    In `pydantic-graph`, a graph is a collection of nodes that can be run in sequence. The nodes define
    their outgoing edges — e.g. which nodes may be run next, and thereby the structure of the graph.

    Here's a very simple example of a graph which increments a number by 1, but makes sure the number is never
    42 at the end.

    ```py {title="never_42.py" noqa="I001" py="3.10"}
    from __future__ import annotations

    from dataclasses import dataclass

    from pydantic_graph import BaseNode, End, Graph, GraphRunContext

    @dataclass
    class MyState:
        number: int

    @dataclass
    class Increment(BaseNode[MyState]):
        async def run(self, ctx: GraphRunContext) -> Check42:
            ctx.state.number += 1
            return Check42()

    @dataclass
    class Check42(BaseNode[MyState, None, int]):
        async def run(self, ctx: GraphRunContext) -> Increment | End[int]:
            if ctx.state.number == 42:
                return Increment()
            else:
                return End(ctx.state.number)

    never_42_graph = Graph(nodes=(Increment, Check42))
    ```
    _(This example is complete, it can be run "as is")_

    See [`run`][pydantic_graph.graph.Graph.run] For an example of running graph, and
    [`mermaid_code`][pydantic_graph.graph.Graph.mermaid_code] for an example of generating a mermaid diagram
    from the graph.
    """

    name: str | None
    node_defs: dict[str, NodeDef[StateT, DepsT, RunEndT]]
    _state_type: type[StateT] | _utils.Unset = field(repr=False)
    _run_end_type: type[RunEndT] | _utils.Unset = field(repr=False)
    auto_instrument: bool = field(repr=False)

    def __init__(
        self,
        *,
        nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
        name: str | None = None,
        state_type: type[StateT] | _utils.Unset = _utils.UNSET,
        run_end_type: type[RunEndT] | _utils.Unset = _utils.UNSET,
        auto_instrument: bool = True,
    ):
        """Create a graph from a sequence of nodes.

        Args:
            nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same
                state type.
            name: Optional name for the graph, if not provided the name will be inferred from the calling frame
                on the first call to a graph method.
            state_type: The type of the state for the graph, this can generally be inferred from `nodes`.
            run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`.
            auto_instrument: Whether to create a span for the graph run and the execution of each node's run method.
        """
        self.name = name
        self._state_type = state_type
        self._run_end_type = run_end_type
        self.auto_instrument = auto_instrument

        parent_namespace = _utils.get_parent_namespace(inspect.currentframe())
        self.node_defs: dict[str, NodeDef[StateT, DepsT, RunEndT]] = {}
        for node in nodes:
            self._register_node(node, parent_namespace)

        self._validate_edges()

    async def run(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
        infer_name: bool = True,
        span: LogfireSpan | None = None,
    ) -> GraphRunResult[StateT, RunEndT]:
        """Run the graph from a starting node until it ends.

        Args:
            start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            persistence: State persistence interface, defaults to
                [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
            infer_name: Whether to infer the graph name from the calling frame.
            span: The span to use for the graph run. If not provided, a span will be created depending on the value of
                the `auto_instrument` field.

        Returns:
            A `GraphRunResult` containing information about the run, including its final result.

        Here's an example of running the graph from [above][pydantic_graph.graph.Graph]:

        ```py {title="run_never_42.py" noqa="I001" py="3.10"}
        from never_42 import Increment, MyState, never_42_graph

        async def main():
            state = MyState(1)
            await never_42_graph.run(Increment(), state=state)
            print(state)
            #> MyState(number=2)

            state = MyState(41)
            await never_42_graph.run(Increment(), state=state)
            print(state)
            #> MyState(number=43)
        ```
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        async with self.iter(
            start_node, state=state, deps=deps, persistence=persistence, span=span, infer_name=False
        ) as graph_run:
            async for _node in graph_run:
                pass

        final_result = graph_run.result
        assert final_result is not None, 'GraphRun should have a final result'
        return final_result

    def run_sync(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
        infer_name: bool = True,
    ) -> GraphRunResult[StateT, RunEndT]:
        """Synchronously run the graph.

        This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`.
        You therefore can't use this method inside async code or if there's an active event loop.

        Args:
            start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            persistence: State persistence interface, defaults to
                [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            The result type from ending the run and the history of the run.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        return _utils.get_event_loop().run_until_complete(
            self.run(start_node, state=state, deps=deps, persistence=persistence, infer_name=False)
        )

    @asynccontextmanager
    async def iter(
        self,
        start_node: BaseNode[StateT, DepsT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
        span: AbstractContextManager[Any] | None = None,
        infer_name: bool = True,
    ) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
        """A contextmanager which can be used to iterate over the graph's nodes as they are executed.

        This method returns a `GraphRun` object which can be used to async-iterate over the nodes of this `Graph` as
        they are executed. This is the API to use if you want to record or interact with the nodes as the graph
        execution unfolds.

        The `GraphRun` can also be used to manually drive the graph execution by calling
        [`GraphRun.next`][pydantic_graph.graph.GraphRun.next].

        The `GraphRun` provides access to the full run history, state, deps, and the final result of the run once
        it has completed.

        For more details, see the API documentation of [`GraphRun`][pydantic_graph.graph.GraphRun].

        Args:
            start_node: the first node to run. Since the graph definition doesn't define the entry point in the graph,
                you need to provide the starting node.
            state: The initial state of the graph.
            deps: The dependencies of the graph.
            persistence: State persistence interface, defaults to
                [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
            span: The span to use for the graph run. If not provided, a new span will be created.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns: A GraphRun that can be async iterated over to drive the graph to completion.
        """
        if infer_name and self.name is None:
            # f_back because `asynccontextmanager` adds one frame
            if frame := inspect.currentframe():  # pragma: no branch
                self._infer_name(frame.f_back)

        if persistence is None:
            persistence = SimpleStatePersistence()
        persistence.set_graph_types(self)

        if self.auto_instrument and span is None:
            span = logfire_api.span('run graph {graph.name}', graph=self)

        with ExitStack() as stack:
            if span is not None:
                stack.enter_context(span)
            yield GraphRun[StateT, DepsT, RunEndT](
                graph=self, start_node=start_node, persistence=persistence, state=state, deps=deps
            )

    @asynccontextmanager
    async def iter_from_persistence(
        self,
        persistence: BaseStatePersistence[StateT, RunEndT],
        *,
        deps: DepsT = None,
        span: AbstractContextManager[Any] | None = None,
        infer_name: bool = True,
    ) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
        """A contextmanager to iterate over the graph's nodes as they are executed, created from a persistence object.

        This method has similar functionality to [`iter`][pydantic_graph.graph.Graph.iter],
        but instead of passing the node to run, it will restore the node and state from state persistence.

        Args:
            persistence: The state persistence interface to use.
            deps: The dependencies of the graph.
            span: The span to use for the graph run. If not provided, a new span will be created.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns: A GraphRun that can be async iterated over to drive the graph to completion.
        """
        if infer_name and self.name is None:
            # f_back because `asynccontextmanager` adds one frame
            if frame := inspect.currentframe():  # pragma: no branch
                self._infer_name(frame.f_back)

        persistence.set_graph_types(self)

        snapshot = await persistence.load_next()
        if snapshot is None:
            raise exceptions.GraphRuntimeError('Unable to restore snapshot from state persistence.')

        snapshot.node.set_snapshot_id(snapshot.id)

        if self.auto_instrument and span is None:
            span = logfire_api.span('run graph {graph.name}', graph=self)

        with ExitStack() as stack:
            if span is not None:
                stack.enter_context(span)
            yield GraphRun[StateT, DepsT, RunEndT](
                graph=self,
                start_node=snapshot.node,
                persistence=persistence,
                state=snapshot.state,
                deps=deps,
                snapshot_id=snapshot.id,
            )

    async def initialize(
        self,
        node: BaseNode[StateT, DepsT, RunEndT],
        persistence: BaseStatePersistence[StateT, RunEndT],
        *,
        state: StateT = None,
        infer_name: bool = True,
    ) -> None:
        """Initialize a new graph run in persistence without running it.

        This is useful if you want to set up a graph run to be run later, e.g. via
        [`iter_from_persistence`][pydantic_graph.graph.Graph.iter_from_persistence].

        Args:
            node: The node to run first.
            persistence: State persistence interface.
            state: The start state of the graph.
            infer_name: Whether to infer the graph name from the calling frame.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        persistence.set_graph_types(self)
        await persistence.snapshot_node(state, node)

    @deprecated('`next` is deprecated, use `async with graph.iter(...) as run:  run.next()` instead')
    async def next(
        self,
        node: BaseNode[StateT, DepsT, RunEndT],
        persistence: BaseStatePersistence[StateT, RunEndT],
        *,
        state: StateT = None,
        deps: DepsT = None,
        infer_name: bool = True,
    ) -> BaseNode[StateT, DepsT, Any] | End[RunEndT]:
        """Run a node in the graph and return the next node to run.

        Args:
            node: The node to run.
            persistence: State persistence interface, defaults to
                [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
            state: The current state of the graph.
            deps: The dependencies of the graph.
            infer_name: Whether to infer the graph name from the calling frame.

        Returns:
            The next node to run or [`End`][pydantic_graph.nodes.End] if the graph has finished.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        persistence.set_graph_types(self)
        run = GraphRun[StateT, DepsT, RunEndT](
            graph=self,
            start_node=node,
            persistence=persistence,
            state=state,
            deps=deps,
        )
        return await run.next(node)

    def mermaid_code(
        self,
        *,
        start_node: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
        title: str | None | typing_extensions.Literal[False] = None,
        edge_labels: bool = True,
        notes: bool = True,
        highlighted_nodes: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
        highlight_css: str = mermaid.DEFAULT_HIGHLIGHT_CSS,
        infer_name: bool = True,
        direction: mermaid.StateDiagramDirection | None = None,
    ) -> str:
        """Generate a diagram representing the graph as [mermaid](https://mermaid.npmjs.net.cn/) diagram.

        This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code].

        Args:
            start_node: The node or nodes which can start the graph.
            title: The title of the diagram, use `False` to not include a title.
            edge_labels: Whether to include edge labels.
            notes: Whether to include notes on each node.
            highlighted_nodes: Optional node or nodes to highlight.
            highlight_css: The CSS to use for highlighting nodes.
            infer_name: Whether to infer the graph name from the calling frame.
            direction: The direction of flow.

        Returns:
            The mermaid code for the graph, which can then be rendered as a diagram.

        Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]:

        ```py {title="mermaid_never_42.py" py="3.10"}
        from never_42 import Increment, never_42_graph

        print(never_42_graph.mermaid_code(start_node=Increment))
        '''
        ---
        title: never_42_graph
        ---
        stateDiagram-v2
          [*] --> Increment
          Increment --> Check42
          Check42 --> Increment
          Check42 --> [*]
        '''
        ```

        The rendered diagram will look like this:

        ```mermaid
        ---
        title: never_42_graph
        ---
        stateDiagram-v2
          [*] --> Increment
          Increment --> Check42
          Check42 --> Increment
          Check42 --> [*]
        ```
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if title is None and self.name:
            title = self.name
        return mermaid.generate_code(
            self,
            start_node=start_node,
            highlighted_nodes=highlighted_nodes,
            highlight_css=highlight_css,
            title=title or None,
            edge_labels=edge_labels,
            notes=notes,
            direction=direction,
        )

    def mermaid_image(
        self, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
    ) -> bytes:
        """Generate a diagram representing the graph as an image.

        The format and diagram can be customized using `kwargs`,
        see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

        !!! note "Uses external service"
            This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
            is a free service not affiliated with Pydantic.

        Args:
            infer_name: Whether to infer the graph name from the calling frame.
            **kwargs: Additional arguments to pass to `mermaid.request_image`.

        Returns:
            The image bytes.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if 'title' not in kwargs and self.name:
            kwargs['title'] = self.name
        return mermaid.request_image(self, **kwargs)

    def mermaid_save(
        self, path: Path | str, /, *, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
    ) -> None:
        """Generate a diagram representing the graph and save it as an image.

        The format and diagram can be customized using `kwargs`,
        see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

        !!! note "Uses external service"
            This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
            is a free service not affiliated with Pydantic.

        Args:
            path: The path to save the image to.
            infer_name: Whether to infer the graph name from the calling frame.
            **kwargs: Additional arguments to pass to `mermaid.save_image`.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        if 'title' not in kwargs and self.name:
            kwargs['title'] = self.name
        mermaid.save_image(path, self, **kwargs)

    def get_nodes(self) -> Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]:
        """Get the nodes in the graph."""
        return [node_def.node for node_def in self.node_defs.values()]

    @cached_property
    def inferred_types(self) -> tuple[type[StateT], type[RunEndT]]:
        # Get the types of the state and run end from the graph.
        if _utils.is_set(self._state_type) and _utils.is_set(self._run_end_type):
            return self._state_type, self._run_end_type

        state_type = self._state_type
        run_end_type = self._run_end_type

        for node_def in self.node_defs.values():
            for base in typing_extensions.get_original_bases(node_def.node):
                if typing_extensions.get_origin(base) is BaseNode:
                    args = typing_extensions.get_args(base)
                    if not _utils.is_set(state_type) and args:
                        state_type = args[0]

                    if not _utils.is_set(run_end_type) and len(args) == 3:
                        t = args[2]
                        if not typing_objects.is_never(t):
                            run_end_type = t
                    if _utils.is_set(state_type) and _utils.is_set(run_end_type):
                        return state_type, run_end_type  # pyright: ignore[reportReturnType]
                    # break the inner (bases) loop
                    break

        if not _utils.is_set(state_type):
            # state defaults to None, so use that if we can't infer it
            state_type = None
        if not _utils.is_set(run_end_type):
            # this happens if a graph has no return nodes, use None so any downstream errors are clear
            run_end_type = None
        return state_type, run_end_type  # pyright: ignore[reportReturnType]

    def _register_node(
        self,
        node: type[BaseNode[StateT, DepsT, RunEndT]],
        parent_namespace: dict[str, Any] | None,
    ) -> None:
        node_id = node.get_node_id()
        if existing_node := self.node_defs.get(node_id):
            raise exceptions.GraphSetupError(
                f'Node ID `{node_id}` is not unique — found on {existing_node.node} and {node}'
            )
        else:
            self.node_defs[node_id] = node.get_node_def(parent_namespace)

    def _validate_edges(self):
        known_node_ids = self.node_defs.keys()
        bad_edges: dict[str, list[str]] = {}

        for node_id, node_def in self.node_defs.items():
            for edge in node_def.next_node_edges.keys():
                if edge not in known_node_ids:
                    bad_edges.setdefault(edge, []).append(f'`{node_id}`')

        if bad_edges:
            bad_edges_list = [f'`{k}` is referenced by {_utils.comma_and(v)}' for k, v in bad_edges.items()]
            if len(bad_edges_list) == 1:
                raise exceptions.GraphSetupError(f'{bad_edges_list[0]} but not included in the graph.')
            else:
                b = '\n'.join(f' {be}' for be in bad_edges_list)
                raise exceptions.GraphSetupError(
                    f'Nodes are referenced in the graph but not included in the graph:\n{b}'
                )

    def _infer_name(self, function_frame: types.FrameType | None) -> None:
        """Infer the agent name from the call frame.

        Usage should be `self._infer_name(inspect.currentframe())`.

        Copied from `Agent`.
        """
        assert self.name is None, 'Name already set'
        if function_frame is not None and (parent_frame := function_frame.f_back):  # pragma: no branch
            for name, item in parent_frame.f_locals.items():
                if item is self:
                    self.name = name
                    return
            if parent_frame.f_locals != parent_frame.f_globals:
                # if we couldn't find the agent in locals and globals are a different dict, try globals
                for name, item in parent_frame.f_globals.items():
                    if item is self:
                        self.name = name
                        return

__init__

__init__(
    *,
    nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
    name: str | None = None,
    state_type: type[StateT] | Unset = UNSET,
    run_end_type: type[RunEndT] | Unset = UNSET,
    auto_instrument: bool = True
)

从节点序列创建图。

参数

名称 类型 描述 默认
nodes Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]

构成图的节点,节点需要是唯一的,并且都属于相同的状态类型。

必需
name str | None

图的可选名称,如果未提供,则将在首次调用图方法时从调用帧中推断出名称。

None
state_type type[StateT] | Unset

图的状态类型,通常可以从 nodes 推断出来。

UNSET
run_end_type type[RunEndT] | Unset

运行图的结果类型,通常可以从 nodes 推断出来。

UNSET
auto_instrument bool

是否为图运行和每个节点的运行方法的执行创建 span。

True
源代码位于 pydantic_graph/pydantic_graph/graph.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def __init__(
    self,
    *,
    nodes: Sequence[type[BaseNode[StateT, DepsT, RunEndT]]],
    name: str | None = None,
    state_type: type[StateT] | _utils.Unset = _utils.UNSET,
    run_end_type: type[RunEndT] | _utils.Unset = _utils.UNSET,
    auto_instrument: bool = True,
):
    """Create a graph from a sequence of nodes.

    Args:
        nodes: The nodes which make up the graph, nodes need to be unique and all be generic in the same
            state type.
        name: Optional name for the graph, if not provided the name will be inferred from the calling frame
            on the first call to a graph method.
        state_type: The type of the state for the graph, this can generally be inferred from `nodes`.
        run_end_type: The type of the result of running the graph, this can generally be inferred from `nodes`.
        auto_instrument: Whether to create a span for the graph run and the execution of each node's run method.
    """
    self.name = name
    self._state_type = state_type
    self._run_end_type = run_end_type
    self.auto_instrument = auto_instrument

    parent_namespace = _utils.get_parent_namespace(inspect.currentframe())
    self.node_defs: dict[str, NodeDef[StateT, DepsT, RunEndT]] = {}
    for node in nodes:
        self._register_node(node, parent_namespace)

    self._validate_edges()

run async

run(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: (
        BaseStatePersistence[StateT, RunEndT] | None
    ) = None,
    infer_name: bool = True,
    span: LogfireSpan | None = None
) -> GraphRunResult[StateT, RunEndT]

从起始节点运行图,直到结束。

参数

名称 类型 描述 默认
start_node BaseNode[StateT, DepsT, RunEndT]

要运行的第一个节点,由于图定义未定义图中的入口点,因此您需要提供起始节点。

必需
state StateT

图的初始状态。

None
deps DepsT

图的依赖项。

None
persistence BaseStatePersistence[StateT, RunEndT] | None

状态持久性接口,如果为 None,则默认为 SimpleStatePersistence

None
infer_name bool

是否从调用帧推断图名称。

True
span LogfireSpan | None

用于图运行的 span。如果未提供,则将根据 auto_instrument 字段的值创建 span。

None

返回

类型 描述
GraphRunResult[StateT, RunEndT]

一个 GraphRunResult,其中包含有关运行的信息,包括其最终结果。

这是一个从 上面 运行图的示例

run_never_42.py
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(1)
    await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=2)

    state = MyState(41)
    await never_42_graph.run(Increment(), state=state)
    print(state)
    #> MyState(number=43)
源代码位于 pydantic_graph/pydantic_graph/graph.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
async def run(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    infer_name: bool = True,
    span: LogfireSpan | None = None,
) -> GraphRunResult[StateT, RunEndT]:
    """Run the graph from a starting node until it ends.

    Args:
        start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        persistence: State persistence interface, defaults to
            [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
        infer_name: Whether to infer the graph name from the calling frame.
        span: The span to use for the graph run. If not provided, a span will be created depending on the value of
            the `auto_instrument` field.

    Returns:
        A `GraphRunResult` containing information about the run, including its final result.

    Here's an example of running the graph from [above][pydantic_graph.graph.Graph]:

    ```py {title="run_never_42.py" noqa="I001" py="3.10"}
    from never_42 import Increment, MyState, never_42_graph

    async def main():
        state = MyState(1)
        await never_42_graph.run(Increment(), state=state)
        print(state)
        #> MyState(number=2)

        state = MyState(41)
        await never_42_graph.run(Increment(), state=state)
        print(state)
        #> MyState(number=43)
    ```
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    async with self.iter(
        start_node, state=state, deps=deps, persistence=persistence, span=span, infer_name=False
    ) as graph_run:
        async for _node in graph_run:
            pass

    final_result = graph_run.result
    assert final_result is not None, 'GraphRun should have a final result'
    return final_result

run_sync

run_sync(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: (
        BaseStatePersistence[StateT, RunEndT] | None
    ) = None,
    infer_name: bool = True
) -> GraphRunResult[StateT, RunEndT]

同步运行图。

这是一种便捷方法,它使用 loop.run_until_complete(...) 包装 self.run。因此,您不能在异步代码中或存在活动事件循环的情况下使用此方法。

参数

名称 类型 描述 默认
start_node BaseNode[StateT, DepsT, RunEndT]

要运行的第一个节点,由于图定义未定义图中的入口点,因此您需要提供起始节点。

必需
state StateT

图的初始状态。

None
deps DepsT

图的依赖项。

None
persistence BaseStatePersistence[StateT, RunEndT] | None

状态持久性接口,如果为 None,则默认为 SimpleStatePersistence

None
infer_name bool

是否从调用帧推断图名称。

True

返回

类型 描述
GraphRunResult[StateT, RunEndT]

结束运行的结果类型和运行历史记录。

源代码位于 pydantic_graph/pydantic_graph/graph.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def run_sync(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    infer_name: bool = True,
) -> GraphRunResult[StateT, RunEndT]:
    """Synchronously run the graph.

    This is a convenience method that wraps [`self.run`][pydantic_graph.Graph.run] with `loop.run_until_complete(...)`.
    You therefore can't use this method inside async code or if there's an active event loop.

    Args:
        start_node: the first node to run, since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        persistence: State persistence interface, defaults to
            [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        The result type from ending the run and the history of the run.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    return _utils.get_event_loop().run_until_complete(
        self.run(start_node, state=state, deps=deps, persistence=persistence, infer_name=False)
    )

iter async

iter(
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: (
        BaseStatePersistence[StateT, RunEndT] | None
    ) = None,
    span: AbstractContextManager[Any] | None = None,
    infer_name: bool = True
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

一个上下文管理器,可用于在执行图的节点时对其进行迭代。

此方法返回一个 GraphRun 对象,该对象可用于异步迭代此 Graph 的节点,因为它们正在执行。如果您想在图执行展开时记录节点或与之交互,则可以使用此 API。

GraphRun 也可用于通过调用 GraphRun.next 手动驱动图执行。

GraphRun 提供对完整运行历史记录、状态、依赖项以及运行完成后的最终结果的访问。

有关更多详细信息,请参阅 GraphRun 的 API 文档。

参数

名称 类型 描述 默认
start_node BaseNode[StateT, DepsT, RunEndT]

要运行的第一个节点。由于图定义未定义图中的入口点,因此您需要提供起始节点。

必需
state StateT

图的初始状态。

None
deps DepsT

图的依赖项。

None
persistence BaseStatePersistence[StateT, RunEndT] | None

状态持久性接口,如果为 None,则默认为 SimpleStatePersistence

None
span AbstractContextManager[Any] | None

用于图运行的 span。如果未提供,则将创建新的 span。

None
infer_name bool

是否从调用帧推断图名称。

True

返回: 可以异步迭代以驱动图完成的 GraphRun。

源代码位于 pydantic_graph/pydantic_graph/graph.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
@asynccontextmanager
async def iter(
    self,
    start_node: BaseNode[StateT, DepsT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    persistence: BaseStatePersistence[StateT, RunEndT] | None = None,
    span: AbstractContextManager[Any] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
    """A contextmanager which can be used to iterate over the graph's nodes as they are executed.

    This method returns a `GraphRun` object which can be used to async-iterate over the nodes of this `Graph` as
    they are executed. This is the API to use if you want to record or interact with the nodes as the graph
    execution unfolds.

    The `GraphRun` can also be used to manually drive the graph execution by calling
    [`GraphRun.next`][pydantic_graph.graph.GraphRun.next].

    The `GraphRun` provides access to the full run history, state, deps, and the final result of the run once
    it has completed.

    For more details, see the API documentation of [`GraphRun`][pydantic_graph.graph.GraphRun].

    Args:
        start_node: the first node to run. Since the graph definition doesn't define the entry point in the graph,
            you need to provide the starting node.
        state: The initial state of the graph.
        deps: The dependencies of the graph.
        persistence: State persistence interface, defaults to
            [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
        span: The span to use for the graph run. If not provided, a new span will be created.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns: A GraphRun that can be async iterated over to drive the graph to completion.
    """
    if infer_name and self.name is None:
        # f_back because `asynccontextmanager` adds one frame
        if frame := inspect.currentframe():  # pragma: no branch
            self._infer_name(frame.f_back)

    if persistence is None:
        persistence = SimpleStatePersistence()
    persistence.set_graph_types(self)

    if self.auto_instrument and span is None:
        span = logfire_api.span('run graph {graph.name}', graph=self)

    with ExitStack() as stack:
        if span is not None:
            stack.enter_context(span)
        yield GraphRun[StateT, DepsT, RunEndT](
            graph=self, start_node=start_node, persistence=persistence, state=state, deps=deps
        )

iter_from_persistence async

iter_from_persistence(
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    deps: DepsT = None,
    span: AbstractContextManager[Any] | None = None,
    infer_name: bool = True
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]

一个上下文管理器,用于在执行图的节点时对其进行迭代,从持久性对象创建。

此方法具有与 iter 类似的功能,但它不是传递要运行的节点,而是从状态持久性中恢复节点和状态。

参数

名称 类型 描述 默认
persistence BaseStatePersistence[StateT, RunEndT]

要使用的状态持久性接口。

必需
deps DepsT

图的依赖项。

None
span AbstractContextManager[Any] | None

用于图运行的 span。如果未提供,则将创建新的 span。

None
infer_name bool

是否从调用帧推断图名称。

True

返回: 可以异步迭代以驱动图完成的 GraphRun。

源代码位于 pydantic_graph/pydantic_graph/graph.py
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
@asynccontextmanager
async def iter_from_persistence(
    self,
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    deps: DepsT = None,
    span: AbstractContextManager[Any] | None = None,
    infer_name: bool = True,
) -> AsyncIterator[GraphRun[StateT, DepsT, RunEndT]]:
    """A contextmanager to iterate over the graph's nodes as they are executed, created from a persistence object.

    This method has similar functionality to [`iter`][pydantic_graph.graph.Graph.iter],
    but instead of passing the node to run, it will restore the node and state from state persistence.

    Args:
        persistence: The state persistence interface to use.
        deps: The dependencies of the graph.
        span: The span to use for the graph run. If not provided, a new span will be created.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns: A GraphRun that can be async iterated over to drive the graph to completion.
    """
    if infer_name and self.name is None:
        # f_back because `asynccontextmanager` adds one frame
        if frame := inspect.currentframe():  # pragma: no branch
            self._infer_name(frame.f_back)

    persistence.set_graph_types(self)

    snapshot = await persistence.load_next()
    if snapshot is None:
        raise exceptions.GraphRuntimeError('Unable to restore snapshot from state persistence.')

    snapshot.node.set_snapshot_id(snapshot.id)

    if self.auto_instrument and span is None:
        span = logfire_api.span('run graph {graph.name}', graph=self)

    with ExitStack() as stack:
        if span is not None:
            stack.enter_context(span)
        yield GraphRun[StateT, DepsT, RunEndT](
            graph=self,
            start_node=snapshot.node,
            persistence=persistence,
            state=snapshot.state,
            deps=deps,
            snapshot_id=snapshot.id,
        )

initialize async

initialize(
    node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    state: StateT = None,
    infer_name: bool = True
) -> None

在持久性中初始化新的图运行,而无需运行它。

如果您想设置稍后运行的图运行,例如通过 iter_from_persistence,这将非常有用。

参数

名称 类型 描述 默认
node BaseNode[StateT, DepsT, RunEndT]

要首先运行的节点。

必需
persistence BaseStatePersistence[StateT, RunEndT]

状态持久性接口。

必需
state StateT

图的起始状态。

None
infer_name bool

是否从调用帧推断图名称。

True
源代码位于 pydantic_graph/pydantic_graph/graph.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
async def initialize(
    self,
    node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    state: StateT = None,
    infer_name: bool = True,
) -> None:
    """Initialize a new graph run in persistence without running it.

    This is useful if you want to set up a graph run to be run later, e.g. via
    [`iter_from_persistence`][pydantic_graph.graph.Graph.iter_from_persistence].

    Args:
        node: The node to run first.
        persistence: State persistence interface.
        state: The start state of the graph.
        infer_name: Whether to infer the graph name from the calling frame.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    persistence.set_graph_types(self)
    await persistence.snapshot_node(state, node)

next async

next(
    node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True
) -> BaseNode[StateT, DepsT, Any] | End[RunEndT]

运行图中的一个节点,并返回要运行的下一个节点。

参数

名称 类型 描述 默认
node BaseNode[StateT, DepsT, RunEndT]

要运行的节点。

必需
persistence BaseStatePersistence[StateT, RunEndT]

状态持久性接口,如果为 None,则默认为 SimpleStatePersistence

必需
state StateT

图的当前状态。

None
deps DepsT

图的依赖项。

None
infer_name bool

是否从调用帧推断图名称。

True

返回

类型 描述
BaseNode[StateT, DepsT, Any] | End[RunEndT]

要运行的下一个节点,如果图已完成,则为 End

源代码位于 pydantic_graph/pydantic_graph/graph.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@deprecated('`next` is deprecated, use `async with graph.iter(...) as run:  run.next()` instead')
async def next(
    self,
    node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    *,
    state: StateT = None,
    deps: DepsT = None,
    infer_name: bool = True,
) -> BaseNode[StateT, DepsT, Any] | End[RunEndT]:
    """Run a node in the graph and return the next node to run.

    Args:
        node: The node to run.
        persistence: State persistence interface, defaults to
            [`SimpleStatePersistence`][pydantic_graph.SimpleStatePersistence] if `None`.
        state: The current state of the graph.
        deps: The dependencies of the graph.
        infer_name: Whether to infer the graph name from the calling frame.

    Returns:
        The next node to run or [`End`][pydantic_graph.nodes.End] if the graph has finished.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    persistence.set_graph_types(self)
    run = GraphRun[StateT, DepsT, RunEndT](
        graph=self,
        start_node=node,
        persistence=persistence,
        state=state,
        deps=deps,
    )
    return await run.next(node)

mermaid_code

mermaid_code(
    *,
    start_node: (
        Sequence[NodeIdent] | NodeIdent | None
    ) = None,
    title: str | None | Literal[False] = None,
    edge_labels: bool = True,
    notes: bool = True,
    highlighted_nodes: (
        Sequence[NodeIdent] | NodeIdent | None
    ) = None,
    highlight_css: str = DEFAULT_HIGHLIGHT_CSS,
    infer_name: bool = True,
    direction: StateDiagramDirection | None = None
) -> str

生成一个图表,将图表示为 mermaid 图。

此方法调用 pydantic_graph.mermaid.generate_code

参数

名称 类型 描述 默认
start_node Sequence[NodeIdent] | NodeIdent | None

可以启动图的节点或多个节点。

None
title str | None | Literal[False]

图表的标题,使用 False 不包含标题。

None
edge_labels bool

是否包含边标签。

True
notes bool

是否在每个节点上包含注释。

True
highlighted_nodes Sequence[NodeIdent] | NodeIdent | None

要突出显示的可选节点或多个节点。

None
highlight_css str

用于突出显示节点的 CSS。

DEFAULT_HIGHLIGHT_CSS
infer_name bool

是否从调用帧推断图名称。

True
direction StateDiagramDirection | None

流向。

None

返回

类型 描述
str

图的 mermaid 代码,然后可以将其渲染为图表。

这是一个从 上面 生成图表的示例

mermaid_never_42.py
from never_42 import Increment, never_42_graph

print(never_42_graph.mermaid_code(start_node=Increment))
'''
---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
'''

渲染后的图表将如下所示

---
title: never_42_graph
---
stateDiagram-v2
  [*] --> Increment
  Increment --> Check42
  Check42 --> Increment
  Check42 --> [*]
源代码位于 pydantic_graph/pydantic_graph/graph.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
def mermaid_code(
    self,
    *,
    start_node: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    title: str | None | typing_extensions.Literal[False] = None,
    edge_labels: bool = True,
    notes: bool = True,
    highlighted_nodes: Sequence[mermaid.NodeIdent] | mermaid.NodeIdent | None = None,
    highlight_css: str = mermaid.DEFAULT_HIGHLIGHT_CSS,
    infer_name: bool = True,
    direction: mermaid.StateDiagramDirection | None = None,
) -> str:
    """Generate a diagram representing the graph as [mermaid](https://mermaid.npmjs.net.cn/) diagram.

    This method calls [`pydantic_graph.mermaid.generate_code`][pydantic_graph.mermaid.generate_code].

    Args:
        start_node: The node or nodes which can start the graph.
        title: The title of the diagram, use `False` to not include a title.
        edge_labels: Whether to include edge labels.
        notes: Whether to include notes on each node.
        highlighted_nodes: Optional node or nodes to highlight.
        highlight_css: The CSS to use for highlighting nodes.
        infer_name: Whether to infer the graph name from the calling frame.
        direction: The direction of flow.

    Returns:
        The mermaid code for the graph, which can then be rendered as a diagram.

    Here's an example of generating a diagram for the graph from [above][pydantic_graph.graph.Graph]:

    ```py {title="mermaid_never_42.py" py="3.10"}
    from never_42 import Increment, never_42_graph

    print(never_42_graph.mermaid_code(start_node=Increment))
    '''
    ---
    title: never_42_graph
    ---
    stateDiagram-v2
      [*] --> Increment
      Increment --> Check42
      Check42 --> Increment
      Check42 --> [*]
    '''
    ```

    The rendered diagram will look like this:

    ```mermaid
    ---
    title: never_42_graph
    ---
    stateDiagram-v2
      [*] --> Increment
      Increment --> Check42
      Check42 --> Increment
      Check42 --> [*]
    ```
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if title is None and self.name:
        title = self.name
    return mermaid.generate_code(
        self,
        start_node=start_node,
        highlighted_nodes=highlighted_nodes,
        highlight_css=highlight_css,
        title=title or None,
        edge_labels=edge_labels,
        notes=notes,
        direction=direction,
    )

mermaid_image

mermaid_image(
    infer_name: bool = True, **kwargs: Unpack[MermaidConfig]
) -> bytes

生成一个图表,将图表示为图像。

可以使用 kwargs 自定义格式和图表,请参阅 pydantic_graph.mermaid.MermaidConfig

使用外部服务

此方法向 mermaid.ink 发出请求以渲染图像,mermaid.ink 是一项免费服务,与 Pydantic 无关。

参数

名称 类型 描述 默认
infer_name bool

是否从调用帧推断图名称。

True
**kwargs Unpack[MermaidConfig]

要传递给 mermaid.request_image 的其他参数。

{}

返回

类型 描述
bytes

图像字节。

源代码位于 pydantic_graph/pydantic_graph/graph.py
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
def mermaid_image(
    self, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
) -> bytes:
    """Generate a diagram representing the graph as an image.

    The format and diagram can be customized using `kwargs`,
    see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

    !!! note "Uses external service"
        This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
        is a free service not affiliated with Pydantic.

    Args:
        infer_name: Whether to infer the graph name from the calling frame.
        **kwargs: Additional arguments to pass to `mermaid.request_image`.

    Returns:
        The image bytes.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if 'title' not in kwargs and self.name:
        kwargs['title'] = self.name
    return mermaid.request_image(self, **kwargs)

mermaid_save

mermaid_save(
    path: Path | str,
    /,
    *,
    infer_name: bool = True,
    **kwargs: Unpack[MermaidConfig],
) -> None

生成一个图表,表示图并将其另存为图像。

可以使用 kwargs 自定义格式和图表,请参阅 pydantic_graph.mermaid.MermaidConfig

使用外部服务

此方法向 mermaid.ink 发出请求以渲染图像,mermaid.ink 是一项免费服务,与 Pydantic 无关。

参数

名称 类型 描述 默认
path Path | str

要将图像保存到的路径。

必需
infer_name bool

是否从调用帧推断图名称。

True
**kwargs Unpack[MermaidConfig]

要传递给 mermaid.save_image 的其他参数。

{}
源代码位于 pydantic_graph/pydantic_graph/graph.py
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def mermaid_save(
    self, path: Path | str, /, *, infer_name: bool = True, **kwargs: typing_extensions.Unpack[mermaid.MermaidConfig]
) -> None:
    """Generate a diagram representing the graph and save it as an image.

    The format and diagram can be customized using `kwargs`,
    see [`pydantic_graph.mermaid.MermaidConfig`][pydantic_graph.mermaid.MermaidConfig].

    !!! note "Uses external service"
        This method makes a request to [mermaid.ink](https://mermaid.ink) to render the image, `mermaid.ink`
        is a free service not affiliated with Pydantic.

    Args:
        path: The path to save the image to.
        infer_name: Whether to infer the graph name from the calling frame.
        **kwargs: Additional arguments to pass to `mermaid.save_image`.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    if 'title' not in kwargs and self.name:
        kwargs['title'] = self.name
    mermaid.save_image(path, self, **kwargs)

get_nodes

get_nodes() -> (
    Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]
)

获取图中的节点。

源代码位于 pydantic_graph/pydantic_graph/graph.py
499
500
501
def get_nodes(self) -> Sequence[type[BaseNode[StateT, DepsT, RunEndT]]]:
    """Get the nodes in the graph."""
    return [node_def.node for node_def in self.node_defs.values()]

GraphRun

基类: Generic[StateT, DepsT, RunEndT]

Graph 的有状态、异步可迭代运行。

您通常通过调用 async with [my_graph.iter(...)][pydantic_graph.graph.Graph.iter] as graph_run: 来获得 GraphRun 实例。这使您能够通过 async for 迭代或重复调用 .next(...) 来迭代节点,因为它们会运行。

这是一个从 上面 迭代图的示例

iter_never_42.py
from copy import deepcopy
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(1)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
        async for node in graph_run:
            node_states.append((node, deepcopy(graph_run.state)))
        print(node_states)
        '''
        [
            (Increment(), MyState(number=1)),
            (Check42(), MyState(number=2)),
            (End(data=2), MyState(number=2)),
        ]
        '''

    state = MyState(41)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
        async for node in graph_run:
            node_states.append((node, deepcopy(graph_run.state)))
        print(node_states)
        '''
        [
            (Increment(), MyState(number=41)),
            (Check42(), MyState(number=42)),
            (Increment(), MyState(number=42)),
            (Check42(), MyState(number=43)),
            (End(data=43), MyState(number=43)),
        ]
        '''

有关如何手动驱动图运行的示例,请参阅 GraphRun.next 文档

源代码位于 pydantic_graph/pydantic_graph/graph.py
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
class GraphRun(Generic[StateT, DepsT, RunEndT]):
    """A stateful, async-iterable run of a [`Graph`][pydantic_graph.graph.Graph].

    You typically get a `GraphRun` instance from calling
    `async with [my_graph.iter(...)][pydantic_graph.graph.Graph.iter] as graph_run:`. That gives you the ability to iterate
    through nodes as they run, either by `async for` iteration or by repeatedly calling `.next(...)`.

    Here's an example of iterating over the graph from [above][pydantic_graph.graph.Graph]:
    ```py {title="iter_never_42.py" noqa="I001" py="3.10"}
    from copy import deepcopy
    from never_42 import Increment, MyState, never_42_graph

    async def main():
        state = MyState(1)
        async with never_42_graph.iter(Increment(), state=state) as graph_run:
            node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
            async for node in graph_run:
                node_states.append((node, deepcopy(graph_run.state)))
            print(node_states)
            '''
            [
                (Increment(), MyState(number=1)),
                (Check42(), MyState(number=2)),
                (End(data=2), MyState(number=2)),
            ]
            '''

        state = MyState(41)
        async with never_42_graph.iter(Increment(), state=state) as graph_run:
            node_states = [(graph_run.next_node, deepcopy(graph_run.state))]
            async for node in graph_run:
                node_states.append((node, deepcopy(graph_run.state)))
            print(node_states)
            '''
            [
                (Increment(), MyState(number=41)),
                (Check42(), MyState(number=42)),
                (Increment(), MyState(number=42)),
                (Check42(), MyState(number=43)),
                (End(data=43), MyState(number=43)),
            ]
            '''
    ```

    See the [`GraphRun.next` documentation][pydantic_graph.graph.GraphRun.next] for an example of how to manually
    drive the graph run.
    """

    def __init__(
        self,
        *,
        graph: Graph[StateT, DepsT, RunEndT],
        start_node: BaseNode[StateT, DepsT, RunEndT],
        persistence: BaseStatePersistence[StateT, RunEndT],
        state: StateT,
        deps: DepsT,
        snapshot_id: str | None = None,
    ):
        """Create a new run for a given graph, starting at the specified node.

        Typically, you'll use [`Graph.iter`][pydantic_graph.graph.Graph.iter] rather than calling this directly.

        Args:
            graph: The [`Graph`][pydantic_graph.graph.Graph] to run.
            start_node: The node where execution will begin.
            persistence: State persistence interface.
            state: A shared state object or primitive (like a counter, dataclass, etc.) that is available
                to all nodes via `ctx.state`.
            deps: Optional dependencies that each node can access via `ctx.deps`, e.g. database connections,
                configuration, or logging clients.
            snapshot_id: The ID of the snapshot the node came from.
        """
        self.graph = graph
        self.persistence = persistence
        self._snapshot_id: str | None = snapshot_id
        self.state = state
        self.deps = deps

        self._next_node: BaseNode[StateT, DepsT, RunEndT] | End[RunEndT] = start_node

    @property
    def next_node(self) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
        """The next node that will be run in the graph.

        This is the next node that will be used during async iteration, or if a node is not passed to `self.next(...)`.
        """
        return self._next_node

    @property
    def result(self) -> GraphRunResult[StateT, RunEndT] | None:
        """The final result of the graph run if the run is completed, otherwise `None`."""
        if not isinstance(self._next_node, End):
            return None  # The GraphRun has not finished running
        return GraphRunResult(
            self._next_node.data,
            state=self.state,
            persistence=self.persistence,
        )

    async def next(
        self, node: BaseNode[StateT, DepsT, RunEndT] | None = None
    ) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
        """Manually drive the graph run by passing in the node you want to run next.

        This lets you inspect or mutate the node before continuing execution, or skip certain nodes
        under dynamic conditions. The graph run should stop when you return an [`End`][pydantic_graph.nodes.End] node.

        Here's an example of using `next` to drive the graph from [above][pydantic_graph.graph.Graph]:
        ```py {title="next_never_42.py" noqa="I001" py="3.10"}
        from copy import deepcopy
        from pydantic_graph import End
        from never_42 import Increment, MyState, never_42_graph

        async def main():
            state = MyState(48)
            async with never_42_graph.iter(Increment(), state=state) as graph_run:
                next_node = graph_run.next_node  # start with the first node
                node_states = [(next_node, deepcopy(graph_run.state))]

                while not isinstance(next_node, End):
                    if graph_run.state.number == 50:
                        graph_run.state.number = 42
                    next_node = await graph_run.next(next_node)
                    node_states.append((next_node, deepcopy(graph_run.state)))

                print(node_states)
                '''
                [
                    (Increment(), MyState(number=48)),
                    (Check42(), MyState(number=49)),
                    (End(data=49), MyState(number=49)),
                ]
                '''
        ```

        Args:
            node: The node to run next in the graph. If not specified, uses `self.next_node`, which is initialized to
                the `start_node` of the run and updated each time a new node is returned.

        Returns:
            The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if
            the run has completed.
        """
        if node is None:
            # This cast is necessary because self._next_node could be an `End`. You'll get a runtime error if that's
            # the case, but if it is, the only way to get there would be to have tried calling next manually after
            # the run finished. Either way, maybe it would be better to not do this cast...
            node = cast(BaseNode[StateT, DepsT, RunEndT], self._next_node)
            node_snapshot_id = node.get_snapshot_id()
        else:
            node_snapshot_id = node.get_snapshot_id()

        if node_snapshot_id != self._snapshot_id:
            await self.persistence.snapshot_node_if_new(node_snapshot_id, self.state, node)
            self._snapshot_id = node_snapshot_id

        if not isinstance(node, BaseNode):
            # While technically this is not compatible with the documented method signature, it's an easy mistake to
            # make, and we should eagerly provide a more helpful error message than you'd get otherwise.
            raise TypeError(f'`next` must be called with a `BaseNode` instance, got {node!r}.')

        node_id = node.get_node_id()
        if node_id not in self.graph.node_defs:
            raise exceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')

        with ExitStack() as stack:
            if self.graph.auto_instrument:
                stack.enter_context(_logfire.span('run node {node_id}', node_id=node_id, node=node))

            async with self.persistence.record_run(node_snapshot_id):
                ctx = GraphRunContext(self.state, self.deps)
                self._next_node = await node.run(ctx)

        if isinstance(self._next_node, End):
            self._snapshot_id = self._next_node.get_snapshot_id()
            await self.persistence.snapshot_end(self.state, self._next_node)
        elif isinstance(self._next_node, BaseNode):
            self._snapshot_id = self._next_node.get_snapshot_id()
            await self.persistence.snapshot_node(self.state, self._next_node)
        else:
            raise exceptions.GraphRuntimeError(
                f'Invalid node return type: `{type(self._next_node).__name__}`. Expected `BaseNode` or `End`.'
            )

        return self._next_node

    def __aiter__(self) -> AsyncIterator[BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]]:
        return self

    async def __anext__(self) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
        """Use the last returned node as the input to `Graph.next`."""
        if isinstance(self._next_node, End):
            raise StopAsyncIteration
        return await self.next(self._next_node)

    def __repr__(self) -> str:
        return f'<GraphRun graph={self.graph.name or "[unnamed]"}>'

__init__

__init__(
    *,
    graph: Graph[StateT, DepsT, RunEndT],
    start_node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    state: StateT,
    deps: DepsT,
    snapshot_id: str | None = None
)

为给定的图创建一个新的运行,从指定的节点开始。

通常,您将使用 Graph.iter 而不是直接调用此方法。

参数

名称 类型 描述 默认
graph Graph[StateT, DepsT, RunEndT]

要运行的 Graph

必需
start_node BaseNode[StateT, DepsT, RunEndT]

执行将开始的节点。

必需
persistence BaseStatePersistence[StateT, RunEndT]

状态持久性接口。

必需
state StateT

一个共享状态对象或基元(如计数器、数据类等),所有节点都可以通过 ctx.state 访问它。

必需
deps DepsT

每个节点都可以通过 ctx.deps 访问的可选依赖项,例如数据库连接、配置或日志记录客户端。

必需
snapshot_id str | None

节点来自的快照的 ID。

None
源代码位于 pydantic_graph/pydantic_graph/graph.py
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
def __init__(
    self,
    *,
    graph: Graph[StateT, DepsT, RunEndT],
    start_node: BaseNode[StateT, DepsT, RunEndT],
    persistence: BaseStatePersistence[StateT, RunEndT],
    state: StateT,
    deps: DepsT,
    snapshot_id: str | None = None,
):
    """Create a new run for a given graph, starting at the specified node.

    Typically, you'll use [`Graph.iter`][pydantic_graph.graph.Graph.iter] rather than calling this directly.

    Args:
        graph: The [`Graph`][pydantic_graph.graph.Graph] to run.
        start_node: The node where execution will begin.
        persistence: State persistence interface.
        state: A shared state object or primitive (like a counter, dataclass, etc.) that is available
            to all nodes via `ctx.state`.
        deps: Optional dependencies that each node can access via `ctx.deps`, e.g. database connections,
            configuration, or logging clients.
        snapshot_id: The ID of the snapshot the node came from.
    """
    self.graph = graph
    self.persistence = persistence
    self._snapshot_id: str | None = snapshot_id
    self.state = state
    self.deps = deps

    self._next_node: BaseNode[StateT, DepsT, RunEndT] | End[RunEndT] = start_node

next_node property

图中将要运行的下一个节点。

这是异步迭代期间将使用的下一个节点,或者如果没有将节点传递给 self.next(...)

result property

result: GraphRunResult[StateT, RunEndT] | None

如果运行已完成,则为图运行的最终结果,否则为 None

next async

next(
    node: BaseNode[StateT, DepsT, RunEndT] | None = None,
) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

通过传入您要接下来运行的节点来手动驱动图运行。

这使您可以在继续执行之前检查或更改节点,或者在动态条件下跳过某些节点。当您返回 End 节点时,图运行应停止。

这是一个使用 next上面 驱动图的示例

next_never_42.py
from copy import deepcopy
from pydantic_graph import End
from never_42 import Increment, MyState, never_42_graph

async def main():
    state = MyState(48)
    async with never_42_graph.iter(Increment(), state=state) as graph_run:
        next_node = graph_run.next_node  # start with the first node
        node_states = [(next_node, deepcopy(graph_run.state))]

        while not isinstance(next_node, End):
            if graph_run.state.number == 50:
                graph_run.state.number = 42
            next_node = await graph_run.next(next_node)
            node_states.append((next_node, deepcopy(graph_run.state)))

        print(node_states)
        '''
        [
            (Increment(), MyState(number=48)),
            (Check42(), MyState(number=49)),
            (End(data=49), MyState(number=49)),
        ]
        '''

参数

名称 类型 描述 默认
node BaseNode[StateT, DepsT, RunEndT] | None

图中接下来要运行的节点。如果未指定,则使用 self.next_node,它初始化为运行的 start_node,并在每次返回新节点时更新。

None

返回

类型 描述
BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

图逻辑返回的下一个节点,如果 End 节点,则表示

BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]

运行已完成。

源代码位于 pydantic_graph/pydantic_graph/graph.py
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
async def next(
    self, node: BaseNode[StateT, DepsT, RunEndT] | None = None
) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
    """Manually drive the graph run by passing in the node you want to run next.

    This lets you inspect or mutate the node before continuing execution, or skip certain nodes
    under dynamic conditions. The graph run should stop when you return an [`End`][pydantic_graph.nodes.End] node.

    Here's an example of using `next` to drive the graph from [above][pydantic_graph.graph.Graph]:
    ```py {title="next_never_42.py" noqa="I001" py="3.10"}
    from copy import deepcopy
    from pydantic_graph import End
    from never_42 import Increment, MyState, never_42_graph

    async def main():
        state = MyState(48)
        async with never_42_graph.iter(Increment(), state=state) as graph_run:
            next_node = graph_run.next_node  # start with the first node
            node_states = [(next_node, deepcopy(graph_run.state))]

            while not isinstance(next_node, End):
                if graph_run.state.number == 50:
                    graph_run.state.number = 42
                next_node = await graph_run.next(next_node)
                node_states.append((next_node, deepcopy(graph_run.state)))

            print(node_states)
            '''
            [
                (Increment(), MyState(number=48)),
                (Check42(), MyState(number=49)),
                (End(data=49), MyState(number=49)),
            ]
            '''
    ```

    Args:
        node: The node to run next in the graph. If not specified, uses `self.next_node`, which is initialized to
            the `start_node` of the run and updated each time a new node is returned.

    Returns:
        The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if
        the run has completed.
    """
    if node is None:
        # This cast is necessary because self._next_node could be an `End`. You'll get a runtime error if that's
        # the case, but if it is, the only way to get there would be to have tried calling next manually after
        # the run finished. Either way, maybe it would be better to not do this cast...
        node = cast(BaseNode[StateT, DepsT, RunEndT], self._next_node)
        node_snapshot_id = node.get_snapshot_id()
    else:
        node_snapshot_id = node.get_snapshot_id()

    if node_snapshot_id != self._snapshot_id:
        await self.persistence.snapshot_node_if_new(node_snapshot_id, self.state, node)
        self._snapshot_id = node_snapshot_id

    if not isinstance(node, BaseNode):
        # While technically this is not compatible with the documented method signature, it's an easy mistake to
        # make, and we should eagerly provide a more helpful error message than you'd get otherwise.
        raise TypeError(f'`next` must be called with a `BaseNode` instance, got {node!r}.')

    node_id = node.get_node_id()
    if node_id not in self.graph.node_defs:
        raise exceptions.GraphRuntimeError(f'Node `{node}` is not in the graph.')

    with ExitStack() as stack:
        if self.graph.auto_instrument:
            stack.enter_context(_logfire.span('run node {node_id}', node_id=node_id, node=node))

        async with self.persistence.record_run(node_snapshot_id):
            ctx = GraphRunContext(self.state, self.deps)
            self._next_node = await node.run(ctx)

    if isinstance(self._next_node, End):
        self._snapshot_id = self._next_node.get_snapshot_id()
        await self.persistence.snapshot_end(self.state, self._next_node)
    elif isinstance(self._next_node, BaseNode):
        self._snapshot_id = self._next_node.get_snapshot_id()
        await self.persistence.snapshot_node(self.state, self._next_node)
    else:
        raise exceptions.GraphRuntimeError(
            f'Invalid node return type: `{type(self._next_node).__name__}`. Expected `BaseNode` or `End`.'
        )

    return self._next_node

__anext__ async

__anext__() -> (
    BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]
)

使用上次返回的节点作为 Graph.next 的输入。

源代码位于 pydantic_graph/pydantic_graph/graph.py
778
779
780
781
782
async def __anext__(self) -> BaseNode[StateT, DepsT, RunEndT] | End[RunEndT]:
    """Use the last returned node as the input to `Graph.next`."""
    if isinstance(self._next_node, End):
        raise StopAsyncIteration
    return await self.next(self._next_node)

GraphRunResult dataclass

基类: Generic[StateT, RunEndT]

运行图的最终结果。

源代码位于 pydantic_graph/pydantic_graph/graph.py
788
789
790
791
792
793
794
@dataclass
class GraphRunResult(Generic[StateT, RunEndT]):
    """The final result of running a graph."""

    output: RunEndT
    state: StateT
    persistence: BaseStatePersistence[StateT, RunEndT] = field(repr=False)