跳转到内容

pydantic_ai.agent

Agent dataclass

基类:AbstractAgent[AgentDepsT, OutputDataT]

用于定义“代理”(agent)的类——一种与 LLM 进行特定类型“对话”的方式。

代理在其所接受的依赖类型 AgentDepsT 和其返回的输出类型 OutputDataT 方面是通用的。

默认情况下,如果两个泛型参数都未自定义,代理的类型为 Agent[None, str]

最小使用示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')
result = agent.run_sync('What is the capital of France?')
print(result.output)
#> The capital of France is Paris.
源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
@dataclasses.dataclass(init=False)
class Agent(AbstractAgent[AgentDepsT, OutputDataT]):
    """Class for defining "agents" - a way to have a specific type of "conversation" with an LLM.

    Agents are generic in the dependency type they take [`AgentDepsT`][pydantic_ai.tools.AgentDepsT]
    and the output type they return, [`OutputDataT`][pydantic_ai.output.OutputDataT].

    By default, if neither generic parameter is customised, agents have type `Agent[None, str]`.

    Minimal usage example:

    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')
    result = agent.run_sync('What is the capital of France?')
    print(result.output)
    #> The capital of France is Paris.
    ```
    """

    _model: models.Model | models.KnownModelName | str | None

    _name: str | None
    end_strategy: EndStrategy
    """Strategy for handling tool calls when a final result is found."""

    model_settings: ModelSettings | None
    """Optional model request settings to use for this agents's runs, by default.

    Note, if `model_settings` is provided by `run`, `run_sync`, or `run_stream`, those settings will
    be merged with this value, with the runtime argument taking priority.
    """

    _output_type: OutputSpec[OutputDataT]

    instrument: InstrumentationSettings | bool | None
    """Options to automatically instrument with OpenTelemetry."""

    _instrument_default: ClassVar[InstrumentationSettings | bool] = False

    _deps_type: type[AgentDepsT] = dataclasses.field(repr=False)
    _output_schema: _output.BaseOutputSchema[OutputDataT] = dataclasses.field(repr=False)
    _output_validators: list[_output.OutputValidator[AgentDepsT, OutputDataT]] = dataclasses.field(repr=False)
    _instructions: str | None = dataclasses.field(repr=False)
    _instructions_functions: list[_system_prompt.SystemPromptRunner[AgentDepsT]] = dataclasses.field(repr=False)
    _system_prompts: tuple[str, ...] = dataclasses.field(repr=False)
    _system_prompt_functions: list[_system_prompt.SystemPromptRunner[AgentDepsT]] = dataclasses.field(repr=False)
    _system_prompt_dynamic_functions: dict[str, _system_prompt.SystemPromptRunner[AgentDepsT]] = dataclasses.field(
        repr=False
    )
    _function_toolset: FunctionToolset[AgentDepsT] = dataclasses.field(repr=False)
    _output_toolset: OutputToolset[AgentDepsT] | None = dataclasses.field(repr=False)
    _user_toolsets: list[AbstractToolset[AgentDepsT]] = dataclasses.field(repr=False)
    _prepare_tools: ToolsPrepareFunc[AgentDepsT] | None = dataclasses.field(repr=False)
    _prepare_output_tools: ToolsPrepareFunc[AgentDepsT] | None = dataclasses.field(repr=False)
    _max_result_retries: int = dataclasses.field(repr=False)
    _max_tool_retries: int = dataclasses.field(repr=False)

    _event_stream_handler: EventStreamHandler[AgentDepsT] | None = dataclasses.field(repr=False)

    _enter_lock: Lock = dataclasses.field(repr=False)
    _entered_count: int = dataclasses.field(repr=False)
    _exit_stack: AsyncExitStack | None = dataclasses.field(repr=False)

    @overload
    def __init__(
        self,
        model: models.Model | models.KnownModelName | str | None = None,
        *,
        output_type: OutputSpec[OutputDataT] = str,
        instructions: str
        | _system_prompt.SystemPromptFunc[AgentDepsT]
        | Sequence[str | _system_prompt.SystemPromptFunc[AgentDepsT]]
        | None = None,
        system_prompt: str | Sequence[str] = (),
        deps_type: type[AgentDepsT] = NoneType,
        name: str | None = None,
        model_settings: ModelSettings | None = None,
        retries: int = 1,
        output_retries: int | None = None,
        tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] = (),
        builtin_tools: Sequence[AbstractBuiltinTool] = (),
        prepare_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
        prepare_output_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
        toolsets: Sequence[AbstractToolset[AgentDepsT] | ToolsetFunc[AgentDepsT]] | None = None,
        defer_model_check: bool = False,
        end_strategy: EndStrategy = 'early',
        instrument: InstrumentationSettings | bool | None = None,
        history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> None: ...

    @overload
    @deprecated('`mcp_servers` is deprecated, use `toolsets` instead.')
    def __init__(
        self,
        model: models.Model | models.KnownModelName | str | None = None,
        *,
        output_type: OutputSpec[OutputDataT] = str,
        instructions: str
        | _system_prompt.SystemPromptFunc[AgentDepsT]
        | Sequence[str | _system_prompt.SystemPromptFunc[AgentDepsT]]
        | None = None,
        system_prompt: str | Sequence[str] = (),
        deps_type: type[AgentDepsT] = NoneType,
        name: str | None = None,
        model_settings: ModelSettings | None = None,
        retries: int = 1,
        output_retries: int | None = None,
        tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] = (),
        builtin_tools: Sequence[AbstractBuiltinTool] = (),
        prepare_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
        prepare_output_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
        mcp_servers: Sequence[MCPServer] = (),
        defer_model_check: bool = False,
        end_strategy: EndStrategy = 'early',
        instrument: InstrumentationSettings | bool | None = None,
        history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> None: ...

    def __init__(
        self,
        model: models.Model | models.KnownModelName | str | None = None,
        *,
        output_type: OutputSpec[OutputDataT] = str,
        instructions: str
        | _system_prompt.SystemPromptFunc[AgentDepsT]
        | Sequence[str | _system_prompt.SystemPromptFunc[AgentDepsT]]
        | None = None,
        system_prompt: str | Sequence[str] = (),
        deps_type: type[AgentDepsT] = NoneType,
        name: str | None = None,
        model_settings: ModelSettings | None = None,
        retries: int = 1,
        output_retries: int | None = None,
        tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] = (),
        builtin_tools: Sequence[AbstractBuiltinTool] = (),
        prepare_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
        prepare_output_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
        toolsets: Sequence[AbstractToolset[AgentDepsT] | ToolsetFunc[AgentDepsT]] | None = None,
        defer_model_check: bool = False,
        end_strategy: EndStrategy = 'early',
        instrument: InstrumentationSettings | bool | None = None,
        history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
        **_deprecated_kwargs: Any,
    ):
        """Create an agent.

        Args:
            model: The default model to use for this agent, if not provided,
                you must provide the model when calling it. We allow `str` here since the actual list of allowed models changes frequently.
            output_type: The type of the output data, used to validate the data returned by the model,
                defaults to `str`.
            instructions: Instructions to use for this agent, you can also register instructions via a function with
                [`instructions`][pydantic_ai.Agent.instructions].
            system_prompt: Static system prompts to use for this agent, you can also register system
                prompts via a function with [`system_prompt`][pydantic_ai.Agent.system_prompt].
            deps_type: The type used for dependency injection, this parameter exists solely to allow you to fully
                parameterize the agent, and therefore get the best out of static type checking.
                If you're not using deps, but want type checking to pass, you can set `deps=None` to satisfy Pyright
                or add a type hint `: Agent[None, <return type>]`.
            name: The name of the agent, used for logging. If `None`, we try to infer the agent name from the call frame
                when the agent is first run.
            model_settings: Optional model request settings to use for this agent's runs, by default.
            retries: The default number of retries to allow before raising an error.
            output_retries: The maximum number of retries to allow for output validation, defaults to `retries`.
            tools: Tools to register with the agent, you can also register tools via the decorators
                [`@agent.tool`][pydantic_ai.Agent.tool] and [`@agent.tool_plain`][pydantic_ai.Agent.tool_plain].
            builtin_tools: The builtin tools that the agent will use. This depends on the model, as some models may not
                support certain tools. If the model doesn't support the builtin tools, an error will be raised.
            prepare_tools: Custom function to prepare the tool definition of all tools for each step, except output tools.
                This is useful if you want to customize the definition of multiple tools or you want to register
                a subset of tools for a given step. See [`ToolsPrepareFunc`][pydantic_ai.tools.ToolsPrepareFunc]
            prepare_output_tools: Custom function to prepare the tool definition of all output tools for each step.
                This is useful if you want to customize the definition of multiple output tools or you want to register
                a subset of output tools for a given step. See [`ToolsPrepareFunc`][pydantic_ai.tools.ToolsPrepareFunc]
            toolsets: Toolsets to register with the agent, including MCP servers and functions which take a run context
                and return a toolset. See [`ToolsetFunc`][pydantic_ai.toolsets.ToolsetFunc] for more information.
            defer_model_check: by default, if you provide a [named][pydantic_ai.models.KnownModelName] model,
                it's evaluated to create a [`Model`][pydantic_ai.models.Model] instance immediately,
                which checks for the necessary environment variables. Set this to `false`
                to defer the evaluation until the first run. Useful if you want to
                [override the model][pydantic_ai.Agent.override] for testing.
            end_strategy: Strategy for handling tool calls that are requested alongside a final result.
                See [`EndStrategy`][pydantic_ai.agent.EndStrategy] for more information.
            instrument: Set to True to automatically instrument with OpenTelemetry,
                which will use Logfire if it's configured.
                Set to an instance of [`InstrumentationSettings`][pydantic_ai.agent.InstrumentationSettings] to customize.
                If this isn't set, then the last value set by
                [`Agent.instrument_all()`][pydantic_ai.Agent.instrument_all]
                will be used, which defaults to False.
                See the [Debugging and Monitoring guide](https://ai.pydantic.org.cn/logfire/) for more info.
            history_processors: Optional list of callables to process the message history before sending it to the model.
                Each processor takes a list of messages and returns a modified list of messages.
                Processors can be sync or async and are applied in sequence.
            event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools.
        """
        if model is None or defer_model_check:
            self._model = model
        else:
            self._model = models.infer_model(model)

        self._name = name
        self.end_strategy = end_strategy
        self.model_settings = model_settings

        self._output_type = output_type
        self.instrument = instrument
        self._deps_type = deps_type

        if mcp_servers := _deprecated_kwargs.pop('mcp_servers', None):
            if toolsets is not None:  # pragma: no cover
                raise TypeError('`mcp_servers` and `toolsets` cannot be set at the same time.')
            warnings.warn('`mcp_servers` is deprecated, use `toolsets` instead', DeprecationWarning)
            toolsets = mcp_servers

        _utils.validate_empty_kwargs(_deprecated_kwargs)

        default_output_mode = (
            self.model.profile.default_structured_output_mode if isinstance(self.model, models.Model) else None
        )

        self._output_schema = _output.OutputSchema[OutputDataT].build(output_type, default_mode=default_output_mode)
        self._output_validators = []

        self._instructions = ''
        self._instructions_functions = []
        if isinstance(instructions, str | Callable):
            instructions = [instructions]
        for instruction in instructions or []:
            if isinstance(instruction, str):
                self._instructions += instruction + '\n'
            else:
                self._instructions_functions.append(_system_prompt.SystemPromptRunner(instruction))
        self._instructions = self._instructions.strip() or None

        self._system_prompts = (system_prompt,) if isinstance(system_prompt, str) else tuple(system_prompt)
        self._system_prompt_functions = []
        self._system_prompt_dynamic_functions = {}

        self._max_result_retries = output_retries if output_retries is not None else retries
        self._max_tool_retries = retries

        self._builtin_tools = builtin_tools

        self._prepare_tools = prepare_tools
        self._prepare_output_tools = prepare_output_tools

        self._output_toolset = self._output_schema.toolset
        if self._output_toolset:
            self._output_toolset.max_retries = self._max_result_retries

        self._function_toolset = _AgentFunctionToolset(
            tools, max_retries=self._max_tool_retries, output_schema=self._output_schema
        )
        self._dynamic_toolsets = [
            DynamicToolset[AgentDepsT](toolset_func=toolset)
            for toolset in toolsets or []
            if not isinstance(toolset, AbstractToolset)
        ]
        self._user_toolsets = [toolset for toolset in toolsets or [] if isinstance(toolset, AbstractToolset)]

        self.history_processors = history_processors or []

        self._event_stream_handler = event_stream_handler

        self._override_deps: ContextVar[_utils.Option[AgentDepsT]] = ContextVar('_override_deps', default=None)
        self._override_model: ContextVar[_utils.Option[models.Model]] = ContextVar('_override_model', default=None)
        self._override_toolsets: ContextVar[_utils.Option[Sequence[AbstractToolset[AgentDepsT]]]] = ContextVar(
            '_override_toolsets', default=None
        )
        self._override_tools: ContextVar[
            _utils.Option[Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]]]
        ] = ContextVar('_override_tools', default=None)

        self._enter_lock = Lock()
        self._entered_count = 0
        self._exit_stack = None

    @staticmethod
    def instrument_all(instrument: InstrumentationSettings | bool = True) -> None:
        """Set the instrumentation options for all agents where `instrument` is not set."""
        Agent._instrument_default = instrument

    @property
    def model(self) -> models.Model | models.KnownModelName | str | None:
        """The default model configured for this agent."""
        return self._model

    @model.setter
    def model(self, value: models.Model | models.KnownModelName | str | None) -> None:
        """Set the default model configured for this agent.

        We allow `str` here since the actual list of allowed models changes frequently.
        """
        self._model = value

    @property
    def name(self) -> str | None:
        """The name of the agent, used for logging.

        If `None`, we try to infer the agent name from the call frame when the agent is first run.
        """
        return self._name

    @name.setter
    def name(self, value: str | None) -> None:
        """Set the name of the agent, used for logging."""
        self._name = value

    @property
    def deps_type(self) -> type:
        """The type of dependencies used by the agent."""
        return self._deps_type

    @property
    def output_type(self) -> OutputSpec[OutputDataT]:
        """The type of data output by agent runs, used to validate the data returned by the model, defaults to `str`."""
        return self._output_type

    @property
    def event_stream_handler(self) -> EventStreamHandler[AgentDepsT] | None:
        """Optional handler for events from the model's streaming response and the agent's execution of tools."""
        return self._event_stream_handler

    def __repr__(self) -> str:
        return f'{type(self).__name__}(model={self.model!r}, name={self.name!r}, end_strategy={self.end_strategy!r}, model_settings={self.model_settings!r}, output_type={self.output_type!r}, instrument={self.instrument!r})'

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, OutputDataT]]: ...

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, RunOutputDataT]]: ...

    @asynccontextmanager
    async def iter(  # noqa: C901
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
        """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

        This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
        `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
        executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
        stream of events coming from the execution of tools.

        The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
        and the final result of the run once it has completed.

        For more details, see the documentation of `AgentRun`.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            nodes = []
            async with agent.iter('What is the capital of France?') as agent_run:
                async for node in agent_run:
                    nodes.append(node)
            print(nodes)
            '''
            [
                UserPromptNode(
                    user_prompt='What is the capital of France?',
                    instructions=None,
                    instructions_functions=[],
                    system_prompts=(),
                    system_prompt_functions=[],
                    system_prompt_dynamic_functions={},
                ),
                ModelRequestNode(
                    request=ModelRequest(
                        parts=[
                            UserPromptPart(
                                content='What is the capital of France?',
                                timestamp=datetime.datetime(...),
                            )
                        ]
                    )
                ),
                CallToolsNode(
                    model_response=ModelResponse(
                        parts=[TextPart(content='The capital of France is Paris.')],
                        usage=RequestUsage(input_tokens=56, output_tokens=7),
                        model_name='gpt-4o',
                        timestamp=datetime.datetime(...),
                    )
                ),
                End(data=FinalResult(output='The capital of France is Paris.')),
            ]
            '''
            print(agent_run.result.output)
            #> The capital of France is Paris.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            deferred_tool_results: Optional results for deferred tool calls in the message history.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.

        Returns:
            The result of the run.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())
        model_used = self._get_model(model)
        del model

        deps = self._get_deps(deps)
        new_message_index = len(message_history) if message_history else 0
        output_schema = self._prepare_output_schema(output_type, model_used.profile)

        output_type_ = output_type or self.output_type

        # We consider it a user error if a user tries to restrict the result type while having an output validator that
        # may change the result type from the restricted type to something else. Therefore, we consider the following
        # typecast reasonable, even though it is possible to violate it with otherwise-type-checked code.
        output_validators = cast(list[_output.OutputValidator[AgentDepsT, RunOutputDataT]], self._output_validators)

        output_toolset = self._output_toolset
        if output_schema != self._output_schema or output_validators:
            output_toolset = cast(OutputToolset[AgentDepsT], output_schema.toolset)
            if output_toolset:
                output_toolset.max_retries = self._max_result_retries
                output_toolset.output_validators = output_validators
        toolset = self._get_toolset(output_toolset=output_toolset, additional_toolsets=toolsets)
        tool_manager = ToolManager[AgentDepsT](toolset)

        # Build the graph
        graph: Graph[_agent_graph.GraphAgentState, _agent_graph.GraphAgentDeps[AgentDepsT, Any], FinalResult[Any]] = (
            _agent_graph.build_agent_graph(self.name, self._deps_type, output_type_)
        )

        # Build the initial state
        usage = usage or _usage.RunUsage()
        state = _agent_graph.GraphAgentState(
            message_history=message_history[:] if message_history else [],
            usage=usage,
            retries=0,
            run_step=0,
        )

        # Merge model settings in order of precedence: run > agent > model
        merged_settings = merge_model_settings(model_used.settings, self.model_settings)
        model_settings = merge_model_settings(merged_settings, model_settings)
        usage_limits = usage_limits or _usage.UsageLimits()

        async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None:
            parts = [
                self._instructions,
                *[await func.run(run_context) for func in self._instructions_functions],
            ]

            model_profile = model_used.profile
            if isinstance(output_schema, _output.PromptedOutputSchema):
                instructions = output_schema.instructions(model_profile.prompted_output_template)
                parts.append(instructions)

            parts = [p for p in parts if p]
            if not parts:
                return None
            return '\n\n'.join(parts).strip()

        if isinstance(model_used, InstrumentedModel):
            instrumentation_settings = model_used.instrumentation_settings
            tracer = model_used.instrumentation_settings.tracer
        else:
            instrumentation_settings = None
            tracer = NoOpTracer()

        tool_call_results: dict[str, DeferredToolResult] | None = None
        if deferred_tool_results is not None:
            tool_call_results = {}
            for tool_call_id, approval in deferred_tool_results.approvals.items():
                if approval is True:
                    approval = ToolApproved()
                elif approval is False:
                    approval = ToolDenied()
                tool_call_results[tool_call_id] = approval

            if calls := deferred_tool_results.calls:
                call_result_types = _utils.get_union_args(DeferredToolCallResult)
                for tool_call_id, result in calls.items():
                    if not isinstance(result, call_result_types):
                        result = _messages.ToolReturn(result)
                    tool_call_results[tool_call_id] = result

        graph_deps = _agent_graph.GraphAgentDeps[AgentDepsT, RunOutputDataT](
            user_deps=deps,
            prompt=user_prompt,
            new_message_index=new_message_index,
            model=model_used,
            model_settings=model_settings,
            usage_limits=usage_limits,
            max_result_retries=self._max_result_retries,
            end_strategy=self.end_strategy,
            output_schema=output_schema,
            output_validators=output_validators,
            history_processors=self.history_processors,
            builtin_tools=list(self._builtin_tools),
            tool_manager=tool_manager,
            tool_call_results=tool_call_results,
            tracer=tracer,
            get_instructions=get_instructions,
            instrumentation_settings=instrumentation_settings,
        )
        start_node = _agent_graph.UserPromptNode[AgentDepsT](
            user_prompt=user_prompt,
            instructions=self._instructions,
            instructions_functions=self._instructions_functions,
            system_prompts=self._system_prompts,
            system_prompt_functions=self._system_prompt_functions,
            system_prompt_dynamic_functions=self._system_prompt_dynamic_functions,
        )

        agent_name = self.name or 'agent'
        run_span = tracer.start_span(
            'agent run',
            attributes={
                'model_name': model_used.model_name if model_used else 'no-model',
                'agent_name': agent_name,
                'logfire.msg': f'{agent_name} run',
            },
        )

        try:
            async with toolset:
                async with graph.iter(
                    start_node,
                    state=state,
                    deps=graph_deps,
                    span=use_span(run_span) if run_span.is_recording() else None,
                    infer_name=False,
                ) as graph_run:
                    agent_run = AgentRun(graph_run)
                    yield agent_run
                    if (final_result := agent_run.result) is not None and run_span.is_recording():
                        if instrumentation_settings and instrumentation_settings.include_content:
                            run_span.set_attribute(
                                'final_result',
                                (
                                    final_result.output
                                    if isinstance(final_result.output, str)
                                    else json.dumps(InstrumentedModel.serialize_any(final_result.output))
                                ),
                            )
        finally:
            try:
                if instrumentation_settings and run_span.is_recording():
                    run_span.set_attributes(self._run_span_end_attributes(state, usage, instrumentation_settings))
            finally:
                run_span.end()

    def _run_span_end_attributes(
        self, state: _agent_graph.GraphAgentState, usage: _usage.RunUsage, settings: InstrumentationSettings
    ):
        if settings.version == 1:
            attrs = {
                'all_messages_events': json.dumps(
                    [
                        InstrumentedModel.event_to_dict(e)
                        for e in settings.messages_to_otel_events(state.message_history)
                    ]
                )
            }
        else:
            attrs = {
                'pydantic_ai.all_messages': json.dumps(settings.messages_to_otel_messages(state.message_history)),
                **settings.system_instructions_attributes(self._instructions),
            }

        return {
            **usage.opentelemetry_attributes(),
            **attrs,
            'logfire.json_schema': json.dumps(
                {
                    'type': 'object',
                    'properties': {
                        **{attr: {'type': 'array'} for attr in attrs.keys()},
                        'final_result': {'type': 'object'},
                    },
                }
            ),
        }

    @contextmanager
    def override(
        self,
        *,
        deps: AgentDepsT | _utils.Unset = _utils.UNSET,
        model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
        tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
    ) -> Iterator[None]:
        """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

        This is particularly useful when testing.
        You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

        Args:
            deps: The dependencies to use instead of the dependencies passed to the agent run.
            model: The model to use instead of the model passed to the agent run.
            toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
            tools: The tools to use instead of the tools registered with the agent.
        """
        if _utils.is_set(deps):
            deps_token = self._override_deps.set(_utils.Some(deps))
        else:
            deps_token = None

        if _utils.is_set(model):
            model_token = self._override_model.set(_utils.Some(models.infer_model(model)))
        else:
            model_token = None

        if _utils.is_set(toolsets):
            toolsets_token = self._override_toolsets.set(_utils.Some(toolsets))
        else:
            toolsets_token = None

        if _utils.is_set(tools):
            tools_token = self._override_tools.set(_utils.Some(tools))
        else:
            tools_token = None

        try:
            yield
        finally:
            if deps_token is not None:
                self._override_deps.reset(deps_token)
            if model_token is not None:
                self._override_model.reset(model_token)
            if toolsets_token is not None:
                self._override_toolsets.reset(toolsets_token)
            if tools_token is not None:
                self._override_tools.reset(tools_token)

    @overload
    def instructions(
        self, func: Callable[[RunContext[AgentDepsT]], str], /
    ) -> Callable[[RunContext[AgentDepsT]], str]: ...

    @overload
    def instructions(
        self, func: Callable[[RunContext[AgentDepsT]], Awaitable[str]], /
    ) -> Callable[[RunContext[AgentDepsT]], Awaitable[str]]: ...

    @overload
    def instructions(self, func: Callable[[], str], /) -> Callable[[], str]: ...

    @overload
    def instructions(self, func: Callable[[], Awaitable[str]], /) -> Callable[[], Awaitable[str]]: ...

    @overload
    def instructions(
        self, /
    ) -> Callable[[_system_prompt.SystemPromptFunc[AgentDepsT]], _system_prompt.SystemPromptFunc[AgentDepsT]]: ...

    def instructions(
        self,
        func: _system_prompt.SystemPromptFunc[AgentDepsT] | None = None,
        /,
    ) -> (
        Callable[[_system_prompt.SystemPromptFunc[AgentDepsT]], _system_prompt.SystemPromptFunc[AgentDepsT]]
        | _system_prompt.SystemPromptFunc[AgentDepsT]
    ):
        """Decorator to register an instructions function.

        Optionally takes [`RunContext`][pydantic_ai.tools.RunContext] as its only argument.
        Can decorate a sync or async functions.

        The decorator can be used bare (`agent.instructions`).

        Overloads for every possible signature of `instructions` are included so the decorator doesn't obscure
        the type of the function.

        Example:
        ```python
        from pydantic_ai import Agent, RunContext

        agent = Agent('test', deps_type=str)

        @agent.instructions
        def simple_instructions() -> str:
            return 'foobar'

        @agent.instructions
        async def async_instructions(ctx: RunContext[str]) -> str:
            return f'{ctx.deps} is the best'
        ```
        """
        if func is None:

            def decorator(
                func_: _system_prompt.SystemPromptFunc[AgentDepsT],
            ) -> _system_prompt.SystemPromptFunc[AgentDepsT]:
                self._instructions_functions.append(_system_prompt.SystemPromptRunner(func_))
                return func_

            return decorator
        else:
            self._instructions_functions.append(_system_prompt.SystemPromptRunner(func))
            return func

    @overload
    def system_prompt(
        self, func: Callable[[RunContext[AgentDepsT]], str], /
    ) -> Callable[[RunContext[AgentDepsT]], str]: ...

    @overload
    def system_prompt(
        self, func: Callable[[RunContext[AgentDepsT]], Awaitable[str]], /
    ) -> Callable[[RunContext[AgentDepsT]], Awaitable[str]]: ...

    @overload
    def system_prompt(self, func: Callable[[], str], /) -> Callable[[], str]: ...

    @overload
    def system_prompt(self, func: Callable[[], Awaitable[str]], /) -> Callable[[], Awaitable[str]]: ...

    @overload
    def system_prompt(
        self, /, *, dynamic: bool = False
    ) -> Callable[[_system_prompt.SystemPromptFunc[AgentDepsT]], _system_prompt.SystemPromptFunc[AgentDepsT]]: ...

    def system_prompt(
        self,
        func: _system_prompt.SystemPromptFunc[AgentDepsT] | None = None,
        /,
        *,
        dynamic: bool = False,
    ) -> (
        Callable[[_system_prompt.SystemPromptFunc[AgentDepsT]], _system_prompt.SystemPromptFunc[AgentDepsT]]
        | _system_prompt.SystemPromptFunc[AgentDepsT]
    ):
        """Decorator to register a system prompt function.

        Optionally takes [`RunContext`][pydantic_ai.tools.RunContext] as its only argument.
        Can decorate a sync or async functions.

        The decorator can be used either bare (`agent.system_prompt`) or as a function call
        (`agent.system_prompt(...)`), see the examples below.

        Overloads for every possible signature of `system_prompt` are included so the decorator doesn't obscure
        the type of the function, see `tests/typed_agent.py` for tests.

        Args:
            func: The function to decorate
            dynamic: If True, the system prompt will be reevaluated even when `messages_history` is provided,
                see [`SystemPromptPart.dynamic_ref`][pydantic_ai.messages.SystemPromptPart.dynamic_ref]

        Example:
        ```python
        from pydantic_ai import Agent, RunContext

        agent = Agent('test', deps_type=str)

        @agent.system_prompt
        def simple_system_prompt() -> str:
            return 'foobar'

        @agent.system_prompt(dynamic=True)
        async def async_system_prompt(ctx: RunContext[str]) -> str:
            return f'{ctx.deps} is the best'
        ```
        """
        if func is None:

            def decorator(
                func_: _system_prompt.SystemPromptFunc[AgentDepsT],
            ) -> _system_prompt.SystemPromptFunc[AgentDepsT]:
                runner = _system_prompt.SystemPromptRunner[AgentDepsT](func_, dynamic=dynamic)
                self._system_prompt_functions.append(runner)
                if dynamic:  # pragma: lax no cover
                    self._system_prompt_dynamic_functions[func_.__qualname__] = runner
                return func_

            return decorator
        else:
            assert not dynamic, "dynamic can't be True in this case"
            self._system_prompt_functions.append(_system_prompt.SystemPromptRunner[AgentDepsT](func, dynamic=dynamic))
            return func

    @overload
    def output_validator(
        self, func: Callable[[RunContext[AgentDepsT], OutputDataT], OutputDataT], /
    ) -> Callable[[RunContext[AgentDepsT], OutputDataT], OutputDataT]: ...

    @overload
    def output_validator(
        self, func: Callable[[RunContext[AgentDepsT], OutputDataT], Awaitable[OutputDataT]], /
    ) -> Callable[[RunContext[AgentDepsT], OutputDataT], Awaitable[OutputDataT]]: ...

    @overload
    def output_validator(
        self, func: Callable[[OutputDataT], OutputDataT], /
    ) -> Callable[[OutputDataT], OutputDataT]: ...

    @overload
    def output_validator(
        self, func: Callable[[OutputDataT], Awaitable[OutputDataT]], /
    ) -> Callable[[OutputDataT], Awaitable[OutputDataT]]: ...

    def output_validator(
        self, func: _output.OutputValidatorFunc[AgentDepsT, OutputDataT], /
    ) -> _output.OutputValidatorFunc[AgentDepsT, OutputDataT]:
        """Decorator to register an output validator function.

        Optionally takes [`RunContext`][pydantic_ai.tools.RunContext] as its first argument.
        Can decorate a sync or async functions.

        Overloads for every possible signature of `output_validator` are included so the decorator doesn't obscure
        the type of the function, see `tests/typed_agent.py` for tests.

        Example:
        ```python
        from pydantic_ai import Agent, ModelRetry, RunContext

        agent = Agent('test', deps_type=str)

        @agent.output_validator
        def output_validator_simple(data: str) -> str:
            if 'wrong' in data:
                raise ModelRetry('wrong response')
            return data

        @agent.output_validator
        async def output_validator_deps(ctx: RunContext[str], data: str) -> str:
            if ctx.deps in data:
                raise ModelRetry('wrong response')
            return data

        result = agent.run_sync('foobar', deps='spam')
        print(result.output)
        #> success (no tool calls)
        ```
        """
        self._output_validators.append(_output.OutputValidator[AgentDepsT, Any](func))
        return func

    @overload
    def tool(self, func: ToolFuncContext[AgentDepsT, ToolParams], /) -> ToolFuncContext[AgentDepsT, ToolParams]: ...

    @overload
    def tool(
        self,
        /,
        *,
        name: str | None = None,
        retries: int | None = None,
        prepare: ToolPrepareFunc[AgentDepsT] | None = None,
        docstring_format: DocstringFormat = 'auto',
        require_parameter_descriptions: bool = False,
        schema_generator: type[GenerateJsonSchema] = GenerateToolJsonSchema,
        strict: bool | None = None,
        requires_approval: bool = False,
    ) -> Callable[[ToolFuncContext[AgentDepsT, ToolParams]], ToolFuncContext[AgentDepsT, ToolParams]]: ...

    def tool(
        self,
        func: ToolFuncContext[AgentDepsT, ToolParams] | None = None,
        /,
        *,
        name: str | None = None,
        retries: int | None = None,
        prepare: ToolPrepareFunc[AgentDepsT] | None = None,
        docstring_format: DocstringFormat = 'auto',
        require_parameter_descriptions: bool = False,
        schema_generator: type[GenerateJsonSchema] = GenerateToolJsonSchema,
        strict: bool | None = None,
        requires_approval: bool = False,
    ) -> Any:
        """Decorator to register a tool function which takes [`RunContext`][pydantic_ai.tools.RunContext] as its first argument.

        Can decorate a sync or async functions.

        The docstring is inspected to extract both the tool description and description of each parameter,
        [learn more](../tools.md#function-tools-and-schema).

        We can't add overloads for every possible signature of tool, since the return type is a recursive union
        so the signature of functions decorated with `@agent.tool` is obscured.

        Example:
        ```python
        from pydantic_ai import Agent, RunContext

        agent = Agent('test', deps_type=int)

        @agent.tool
        def foobar(ctx: RunContext[int], x: int) -> int:
            return ctx.deps + x

        @agent.tool(retries=2)
        async def spam(ctx: RunContext[str], y: float) -> float:
            return ctx.deps + y

        result = agent.run_sync('foobar', deps=1)
        print(result.output)
        #> {"foobar":1,"spam":1.0}
        ```

        Args:
            func: The tool function to register.
            name: The name of the tool, defaults to the function name.
            retries: The number of retries to allow for this tool, defaults to the agent's default retries,
                which defaults to 1.
            prepare: custom method to prepare the tool definition for each step, return `None` to omit this
                tool from a given step. This is useful if you want to customise a tool at call time,
                or omit it completely from a step. See [`ToolPrepareFunc`][pydantic_ai.tools.ToolPrepareFunc].
            docstring_format: The format of the docstring, see [`DocstringFormat`][pydantic_ai.tools.DocstringFormat].
                Defaults to `'auto'`, such that the format is inferred from the structure of the docstring.
            require_parameter_descriptions: If True, raise an error if a parameter description is missing. Defaults to False.
            schema_generator: The JSON schema generator class to use for this tool. Defaults to `GenerateToolJsonSchema`.
            strict: Whether to enforce JSON schema compliance (only affects OpenAI).
                See [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] for more info.
            requires_approval: Whether this tool requires human-in-the-loop approval. Defaults to False.
                See the [tools documentation](../deferred-tools.md#human-in-the-loop-tool-approval) for more info.
        """

        def tool_decorator(
            func_: ToolFuncContext[AgentDepsT, ToolParams],
        ) -> ToolFuncContext[AgentDepsT, ToolParams]:
            # noinspection PyTypeChecker
            self._function_toolset.add_function(
                func_,
                True,
                name,
                retries,
                prepare,
                docstring_format,
                require_parameter_descriptions,
                schema_generator,
                strict,
                requires_approval,
            )
            return func_

        return tool_decorator if func is None else tool_decorator(func)

    @overload
    def tool_plain(self, func: ToolFuncPlain[ToolParams], /) -> ToolFuncPlain[ToolParams]: ...

    @overload
    def tool_plain(
        self,
        /,
        *,
        name: str | None = None,
        retries: int | None = None,
        prepare: ToolPrepareFunc[AgentDepsT] | None = None,
        docstring_format: DocstringFormat = 'auto',
        require_parameter_descriptions: bool = False,
        schema_generator: type[GenerateJsonSchema] = GenerateToolJsonSchema,
        strict: bool | None = None,
        requires_approval: bool = False,
    ) -> Callable[[ToolFuncPlain[ToolParams]], ToolFuncPlain[ToolParams]]: ...

    def tool_plain(
        self,
        func: ToolFuncPlain[ToolParams] | None = None,
        /,
        *,
        name: str | None = None,
        retries: int | None = None,
        prepare: ToolPrepareFunc[AgentDepsT] | None = None,
        docstring_format: DocstringFormat = 'auto',
        require_parameter_descriptions: bool = False,
        schema_generator: type[GenerateJsonSchema] = GenerateToolJsonSchema,
        strict: bool | None = None,
        requires_approval: bool = False,
    ) -> Any:
        """Decorator to register a tool function which DOES NOT take `RunContext` as an argument.

        Can decorate a sync or async functions.

        The docstring is inspected to extract both the tool description and description of each parameter,
        [learn more](../tools.md#function-tools-and-schema).

        We can't add overloads for every possible signature of tool, since the return type is a recursive union
        so the signature of functions decorated with `@agent.tool` is obscured.

        Example:
        ```python
        from pydantic_ai import Agent, RunContext

        agent = Agent('test')

        @agent.tool
        def foobar(ctx: RunContext[int]) -> int:
            return 123

        @agent.tool(retries=2)
        async def spam(ctx: RunContext[str]) -> float:
            return 3.14

        result = agent.run_sync('foobar', deps=1)
        print(result.output)
        #> {"foobar":123,"spam":3.14}
        ```

        Args:
            func: The tool function to register.
            name: The name of the tool, defaults to the function name.
            retries: The number of retries to allow for this tool, defaults to the agent's default retries,
                which defaults to 1.
            prepare: custom method to prepare the tool definition for each step, return `None` to omit this
                tool from a given step. This is useful if you want to customise a tool at call time,
                or omit it completely from a step. See [`ToolPrepareFunc`][pydantic_ai.tools.ToolPrepareFunc].
            docstring_format: The format of the docstring, see [`DocstringFormat`][pydantic_ai.tools.DocstringFormat].
                Defaults to `'auto'`, such that the format is inferred from the structure of the docstring.
            require_parameter_descriptions: If True, raise an error if a parameter description is missing. Defaults to False.
            schema_generator: The JSON schema generator class to use for this tool. Defaults to `GenerateToolJsonSchema`.
            strict: Whether to enforce JSON schema compliance (only affects OpenAI).
                See [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] for more info.
            requires_approval: Whether this tool requires human-in-the-loop approval. Defaults to False.
                See the [tools documentation](../deferred-tools.md#human-in-the-loop-tool-approval) for more info.
        """

        def tool_decorator(func_: ToolFuncPlain[ToolParams]) -> ToolFuncPlain[ToolParams]:
            # noinspection PyTypeChecker
            self._function_toolset.add_function(
                func_,
                False,
                name,
                retries,
                prepare,
                docstring_format,
                require_parameter_descriptions,
                schema_generator,
                strict,
                requires_approval,
            )
            return func_

        return tool_decorator if func is None else tool_decorator(func)

    @overload
    def toolset(self, func: ToolsetFunc[AgentDepsT], /) -> ToolsetFunc[AgentDepsT]: ...

    @overload
    def toolset(
        self,
        /,
        *,
        per_run_step: bool = True,
    ) -> Callable[[ToolsetFunc[AgentDepsT]], ToolsetFunc[AgentDepsT]]: ...

    def toolset(
        self,
        func: ToolsetFunc[AgentDepsT] | None = None,
        /,
        *,
        per_run_step: bool = True,
    ) -> Any:
        """Decorator to register a toolset function which takes [`RunContext`][pydantic_ai.tools.RunContext] as its only argument.

        Can decorate a sync or async functions.

        The decorator can be used bare (`agent.toolset`).

        Example:
        ```python
        from pydantic_ai import Agent, RunContext
        from pydantic_ai.toolsets import AbstractToolset, FunctionToolset

        agent = Agent('test', deps_type=str)

        @agent.toolset
        async def simple_toolset(ctx: RunContext[str]) -> AbstractToolset[str]:
            return FunctionToolset()
        ```

        Args:
            func: The toolset function to register.
            per_run_step: Whether to re-evaluate the toolset for each run step. Defaults to True.
        """

        def toolset_decorator(func_: ToolsetFunc[AgentDepsT]) -> ToolsetFunc[AgentDepsT]:
            self._dynamic_toolsets.append(DynamicToolset(func_, per_run_step=per_run_step))
            return func_

        return toolset_decorator if func is None else toolset_decorator(func)

    def _get_model(self, model: models.Model | models.KnownModelName | str | None) -> models.Model:
        """Create a model configured for this agent.

        Args:
            model: model to use for this run, required if `model` was not set when creating the agent.

        Returns:
            The model used
        """
        model_: models.Model
        if some_model := self._override_model.get():
            # we don't want `override()` to cover up errors from the model not being defined, hence this check
            if model is None and self.model is None:
                raise exceptions.UserError(
                    '`model` must either be set on the agent or included when calling it. '
                    '(Even when `override(model=...)` is customizing the model that will actually be called)'
                )
            model_ = some_model.value
        elif model is not None:
            model_ = models.infer_model(model)
        elif self.model is not None:
            # noinspection PyTypeChecker
            model_ = self.model = models.infer_model(self.model)
        else:
            raise exceptions.UserError('`model` must either be set on the agent or included when calling it.')

        instrument = self.instrument
        if instrument is None:
            instrument = self._instrument_default

        return instrument_model(model_, instrument)

    def _get_deps(self: Agent[T, OutputDataT], deps: T) -> T:
        """Get deps for a run.

        If we've overridden deps via `_override_deps`, use that, otherwise use the deps passed to the call.

        We could do runtime type checking of deps against `self._deps_type`, but that's a slippery slope.
        """
        if some_deps := self._override_deps.get():
            return some_deps.value
        else:
            return deps

    def _get_toolset(
        self,
        output_toolset: AbstractToolset[AgentDepsT] | None | _utils.Unset = _utils.UNSET,
        additional_toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AbstractToolset[AgentDepsT]:
        """Get the complete toolset.

        Args:
            output_toolset: The output toolset to use instead of the one built at agent construction time.
            additional_toolsets: Additional toolsets to add, unless toolsets have been overridden.
        """
        toolsets = self.toolsets
        # Don't add additional toolsets if the toolsets have been overridden
        if additional_toolsets and self._override_toolsets.get() is None:
            toolsets = [*toolsets, *additional_toolsets]

        toolset = CombinedToolset(toolsets)

        # Copy the dynamic toolsets to ensure each run has its own instances
        def copy_dynamic_toolsets(toolset: AbstractToolset[AgentDepsT]) -> AbstractToolset[AgentDepsT]:
            if isinstance(toolset, DynamicToolset):
                return dataclasses.replace(toolset)
            else:
                return toolset

        toolset = toolset.visit_and_replace(copy_dynamic_toolsets)

        if self._prepare_tools:
            toolset = PreparedToolset(toolset, self._prepare_tools)

        output_toolset = output_toolset if _utils.is_set(output_toolset) else self._output_toolset
        if output_toolset is not None:
            if self._prepare_output_tools:
                output_toolset = PreparedToolset(output_toolset, self._prepare_output_tools)
            toolset = CombinedToolset([output_toolset, toolset])

        return toolset

    @property
    def toolsets(self) -> Sequence[AbstractToolset[AgentDepsT]]:
        """All toolsets registered on the agent, including a function toolset holding tools that were registered on the agent directly.

        Output tools are not included.
        """
        toolsets: list[AbstractToolset[AgentDepsT]] = []

        if some_tools := self._override_tools.get():
            function_toolset = _AgentFunctionToolset(
                some_tools.value, max_retries=self._max_tool_retries, output_schema=self._output_schema
            )
        else:
            function_toolset = self._function_toolset
        toolsets.append(function_toolset)

        if some_user_toolsets := self._override_toolsets.get():
            user_toolsets = some_user_toolsets.value
        else:
            user_toolsets = [*self._user_toolsets, *self._dynamic_toolsets]
        toolsets.extend(user_toolsets)

        return toolsets

    def _prepare_output_schema(
        self, output_type: OutputSpec[RunOutputDataT] | None, model_profile: ModelProfile
    ) -> _output.OutputSchema[RunOutputDataT]:
        if output_type is not None:
            if self._output_validators:
                raise exceptions.UserError('Cannot set a custom run `output_type` when the agent has output validators')
            schema = _output.OutputSchema[RunOutputDataT].build(
                output_type, default_mode=model_profile.default_structured_output_mode
            )
        else:
            schema = self._output_schema.with_default_mode(model_profile.default_structured_output_mode)

        schema.raise_if_unsupported(model_profile)

        return schema  # pyright: ignore[reportReturnType]

    async def __aenter__(self) -> Self:
        """Enter the agent context.

        This will start all [`MCPServerStdio`s][pydantic_ai.mcp.MCPServerStdio] registered as `toolsets` so they are ready to be used.

        This is a no-op if the agent has already been entered.
        """
        async with self._enter_lock:
            if self._entered_count == 0:
                async with AsyncExitStack() as exit_stack:
                    toolset = self._get_toolset()
                    await exit_stack.enter_async_context(toolset)

                    self._exit_stack = exit_stack.pop_all()
            self._entered_count += 1
        return self

    async def __aexit__(self, *args: Any) -> bool | None:
        async with self._enter_lock:
            self._entered_count -= 1
            if self._entered_count == 0 and self._exit_stack is not None:
                await self._exit_stack.aclose()
                self._exit_stack = None

    def set_mcp_sampling_model(self, model: models.Model | models.KnownModelName | str | None = None) -> None:
        """Set the sampling model on all MCP servers registered with the agent.

        If no sampling model is provided, the agent's model will be used.
        """
        try:
            sampling_model = models.infer_model(model) if model else self._get_model(None)
        except exceptions.UserError as e:
            raise exceptions.UserError('No sampling model provided and no model set on the agent.') from e

        from ..mcp import MCPServer

        def _set_sampling_model(toolset: AbstractToolset[AgentDepsT]) -> None:
            if isinstance(toolset, MCPServer):
                toolset.sampling_model = sampling_model

        self._get_toolset().apply(_set_sampling_model)

    @asynccontextmanager
    @deprecated(
        '`run_mcp_servers` is deprecated, use `async with agent:` instead. If you need to set a sampling model on all MCP servers, use `agent.set_mcp_sampling_model()`.'
    )
    async def run_mcp_servers(
        self, model: models.Model | models.KnownModelName | str | None = None
    ) -> AsyncIterator[None]:
        """Run [`MCPServerStdio`s][pydantic_ai.mcp.MCPServerStdio] so they can be used by the agent.

        Deprecated: use [`async with agent`][pydantic_ai.agent.Agent.__aenter__] instead.
        If you need to set a sampling model on all MCP servers, use [`agent.set_mcp_sampling_model()`][pydantic_ai.agent.Agent.set_mcp_sampling_model].

        Returns: a context manager to start and shutdown the servers.
        """
        try:
            self.set_mcp_sampling_model(model)
        except exceptions.UserError:
            if model is not None:
                raise

        async with self:
            yield

__init__

__init__(
    model: Model | KnownModelName | str | None = None,
    *,
    output_type: OutputSpec[OutputDataT] = str,
    instructions: (
        str
        | SystemPromptFunc[AgentDepsT]
        | Sequence[str | SystemPromptFunc[AgentDepsT]]
        | None
    ) = None,
    system_prompt: str | Sequence[str] = (),
    deps_type: type[AgentDepsT] = NoneType,
    name: str | None = None,
    model_settings: ModelSettings | None = None,
    retries: int = 1,
    output_retries: int | None = None,
    tools: Sequence[
        Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]
    ] = (),
    builtin_tools: Sequence[AbstractBuiltinTool] = (),
    prepare_tools: (
        ToolsPrepareFunc[AgentDepsT] | None
    ) = None,
    prepare_output_tools: (
        ToolsPrepareFunc[AgentDepsT] | None
    ) = None,
    toolsets: (
        Sequence[
            AbstractToolset[AgentDepsT]
            | ToolsetFunc[AgentDepsT]
        ]
        | None
    ) = None,
    defer_model_check: bool = False,
    end_strategy: EndStrategy = "early",
    instrument: (
        InstrumentationSettings | bool | None
    ) = None,
    history_processors: (
        Sequence[HistoryProcessor[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> None
__init__(
    model: Model | KnownModelName | str | None = None,
    *,
    output_type: OutputSpec[OutputDataT] = str,
    instructions: (
        str
        | SystemPromptFunc[AgentDepsT]
        | Sequence[str | SystemPromptFunc[AgentDepsT]]
        | None
    ) = None,
    system_prompt: str | Sequence[str] = (),
    deps_type: type[AgentDepsT] = NoneType,
    name: str | None = None,
    model_settings: ModelSettings | None = None,
    retries: int = 1,
    output_retries: int | None = None,
    tools: Sequence[
        Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]
    ] = (),
    builtin_tools: Sequence[AbstractBuiltinTool] = (),
    prepare_tools: (
        ToolsPrepareFunc[AgentDepsT] | None
    ) = None,
    prepare_output_tools: (
        ToolsPrepareFunc[AgentDepsT] | None
    ) = None,
    mcp_servers: Sequence[MCPServer] = (),
    defer_model_check: bool = False,
    end_strategy: EndStrategy = "early",
    instrument: (
        InstrumentationSettings | bool | None
    ) = None,
    history_processors: (
        Sequence[HistoryProcessor[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> None
__init__(
    model: Model | KnownModelName | str | None = None,
    *,
    output_type: OutputSpec[OutputDataT] = str,
    instructions: (
        str
        | SystemPromptFunc[AgentDepsT]
        | Sequence[str | SystemPromptFunc[AgentDepsT]]
        | None
    ) = None,
    system_prompt: str | Sequence[str] = (),
    deps_type: type[AgentDepsT] = NoneType,
    name: str | None = None,
    model_settings: ModelSettings | None = None,
    retries: int = 1,
    output_retries: int | None = None,
    tools: Sequence[
        Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]
    ] = (),
    builtin_tools: Sequence[AbstractBuiltinTool] = (),
    prepare_tools: (
        ToolsPrepareFunc[AgentDepsT] | None
    ) = None,
    prepare_output_tools: (
        ToolsPrepareFunc[AgentDepsT] | None
    ) = None,
    toolsets: (
        Sequence[
            AbstractToolset[AgentDepsT]
            | ToolsetFunc[AgentDepsT]
        ]
        | None
    ) = None,
    defer_model_check: bool = False,
    end_strategy: EndStrategy = "early",
    instrument: (
        InstrumentationSettings | bool | None
    ) = None,
    history_processors: (
        Sequence[HistoryProcessor[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None,
    **_deprecated_kwargs: Any
)

创建一个代理。

参数

名称 类型 描述 默认值
model Model | KnownModelName | str | None

此代理使用的默认模型,如果未提供,则必须在调用时提供模型。我们在这里允许使用 str,因为实际允许的模型列表经常变化。

None
output_type OutputSpec[OutputDataT]

输出数据的类型,用于验证模型返回的数据,默认为 str

str
指令 str | SystemPromptFunc[AgentDepsT] | Sequence[str | SystemPromptFunc[AgentDepsT]] | None

此代理使用的指令,您也可以通过函数使用 instructions 注册指令。

None
system_prompt str | Sequence[str]

此代理使用的静态系统提示,您也可以通过函数使用 system_prompt 注册系统提示。

()
deps_type type[AgentDepsT]

用于依赖注入的类型,此参数的存在仅是为了让您能够完全参数化代理,从而最大限度地利用静态类型检查。如果您不使用依赖项,但希望类型检查通过,可以设置 deps=None 以满足 Pyright 的要求,或添加类型提示 : Agent[None, <return type>]

NoneType
name str | None

代理的名称,用于日志记录。如果为 None,我们会在代理首次运行时尝试从调用帧中推断代理名称。

None
model_settings ModelSettings | None

默认情况下,用于此代理运行的可选模型请求设置。

None
retries int

在引发错误前允许的默认重试次数。

1
output_retries int | None

允许进行输出验证的最大重试次数,默认为 retries

None
工具 Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]]

向代理注册的工具,您也可以通过装饰器 @agent.tool@agent.tool_plain 注册工具。

()
builtin_tools Sequence[AbstractBuiltinTool]

代理将使用的内置工具。这取决于模型,因为某些模型可能不支持某些工具。如果模型不支持内置工具,将引发错误。

()
prepare_tools ToolsPrepareFunc[AgentDepsT] | None

自定义函数,用于为每个步骤准备除输出工具外的所有工具的定义。如果您想自定义多个工具的定义,或者想为给定步骤注册工具的子集,这将非常有用。请参阅 ToolsPrepareFunc

None
prepare_output_tools ToolsPrepareFunc[AgentDepsT] | None

自定义函数,用于为每个步骤准备所有输出工具的工具定义。如果您想自定义多个输出工具的定义,或者想为给定步骤注册输出工具的子集,这将非常有用。请参阅 ToolsPrepareFunc

None
toolsets Sequence[AbstractToolset[AgentDepsT] | ToolsetFunc[AgentDepsT]] | None

向代理注册的工具集,包括 MCP 服务器和接受运行上下文并返回工具集的函数。有关更多信息,请参阅 ToolsetFunc

None
defer_model_check bool

默认情况下,如果您提供一个命名的模型,它会立即被评估以创建一个 Model 实例,该实例会检查必要的环境变量。将此设置为 false 以将评估推迟到第一次运行。如果您想在测试时覆盖模型,这将非常有用。

False
结束策略 EndStrategy

用于处理与最终结果一起请求的工具调用的策略。有关更多信息,请参阅 EndStrategy

'early'
instrument InstrumentationSettings | bool | None

设置为 True 可自动使用 OpenTelemetry 进行检测,如果已配置 Logfire,则将使用 Logfire。设置为 InstrumentationSettings 的实例以进行自定义。如果未设置,则将使用 Agent.instrument_all() 设置的最后一个值,默认为 False。有关更多信息,请参阅调试和监控指南

None
history_processors Sequence[HistoryProcessor[AgentDepsT]] | None

可选的可调用对象列表,用于在将消息历史记录发送到模型之前对其进行处理。每个处理器接受一个消息列表并返回一个修改后的消息列表。处理器可以是同步或异步的,并按顺序应用。

None
event_stream_handler EventStreamHandler[AgentDepsT] | None

可选的事件处理器,用于处理来自模型的流式响应和代理执行工具时产生的事件。

None
源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
def __init__(
    self,
    model: models.Model | models.KnownModelName | str | None = None,
    *,
    output_type: OutputSpec[OutputDataT] = str,
    instructions: str
    | _system_prompt.SystemPromptFunc[AgentDepsT]
    | Sequence[str | _system_prompt.SystemPromptFunc[AgentDepsT]]
    | None = None,
    system_prompt: str | Sequence[str] = (),
    deps_type: type[AgentDepsT] = NoneType,
    name: str | None = None,
    model_settings: ModelSettings | None = None,
    retries: int = 1,
    output_retries: int | None = None,
    tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] = (),
    builtin_tools: Sequence[AbstractBuiltinTool] = (),
    prepare_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
    prepare_output_tools: ToolsPrepareFunc[AgentDepsT] | None = None,
    toolsets: Sequence[AbstractToolset[AgentDepsT] | ToolsetFunc[AgentDepsT]] | None = None,
    defer_model_check: bool = False,
    end_strategy: EndStrategy = 'early',
    instrument: InstrumentationSettings | bool | None = None,
    history_processors: Sequence[HistoryProcessor[AgentDepsT]] | None = None,
    event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    **_deprecated_kwargs: Any,
):
    """Create an agent.

    Args:
        model: The default model to use for this agent, if not provided,
            you must provide the model when calling it. We allow `str` here since the actual list of allowed models changes frequently.
        output_type: The type of the output data, used to validate the data returned by the model,
            defaults to `str`.
        instructions: Instructions to use for this agent, you can also register instructions via a function with
            [`instructions`][pydantic_ai.Agent.instructions].
        system_prompt: Static system prompts to use for this agent, you can also register system
            prompts via a function with [`system_prompt`][pydantic_ai.Agent.system_prompt].
        deps_type: The type used for dependency injection, this parameter exists solely to allow you to fully
            parameterize the agent, and therefore get the best out of static type checking.
            If you're not using deps, but want type checking to pass, you can set `deps=None` to satisfy Pyright
            or add a type hint `: Agent[None, <return type>]`.
        name: The name of the agent, used for logging. If `None`, we try to infer the agent name from the call frame
            when the agent is first run.
        model_settings: Optional model request settings to use for this agent's runs, by default.
        retries: The default number of retries to allow before raising an error.
        output_retries: The maximum number of retries to allow for output validation, defaults to `retries`.
        tools: Tools to register with the agent, you can also register tools via the decorators
            [`@agent.tool`][pydantic_ai.Agent.tool] and [`@agent.tool_plain`][pydantic_ai.Agent.tool_plain].
        builtin_tools: The builtin tools that the agent will use. This depends on the model, as some models may not
            support certain tools. If the model doesn't support the builtin tools, an error will be raised.
        prepare_tools: Custom function to prepare the tool definition of all tools for each step, except output tools.
            This is useful if you want to customize the definition of multiple tools or you want to register
            a subset of tools for a given step. See [`ToolsPrepareFunc`][pydantic_ai.tools.ToolsPrepareFunc]
        prepare_output_tools: Custom function to prepare the tool definition of all output tools for each step.
            This is useful if you want to customize the definition of multiple output tools or you want to register
            a subset of output tools for a given step. See [`ToolsPrepareFunc`][pydantic_ai.tools.ToolsPrepareFunc]
        toolsets: Toolsets to register with the agent, including MCP servers and functions which take a run context
            and return a toolset. See [`ToolsetFunc`][pydantic_ai.toolsets.ToolsetFunc] for more information.
        defer_model_check: by default, if you provide a [named][pydantic_ai.models.KnownModelName] model,
            it's evaluated to create a [`Model`][pydantic_ai.models.Model] instance immediately,
            which checks for the necessary environment variables. Set this to `false`
            to defer the evaluation until the first run. Useful if you want to
            [override the model][pydantic_ai.Agent.override] for testing.
        end_strategy: Strategy for handling tool calls that are requested alongside a final result.
            See [`EndStrategy`][pydantic_ai.agent.EndStrategy] for more information.
        instrument: Set to True to automatically instrument with OpenTelemetry,
            which will use Logfire if it's configured.
            Set to an instance of [`InstrumentationSettings`][pydantic_ai.agent.InstrumentationSettings] to customize.
            If this isn't set, then the last value set by
            [`Agent.instrument_all()`][pydantic_ai.Agent.instrument_all]
            will be used, which defaults to False.
            See the [Debugging and Monitoring guide](https://ai.pydantic.org.cn/logfire/) for more info.
        history_processors: Optional list of callables to process the message history before sending it to the model.
            Each processor takes a list of messages and returns a modified list of messages.
            Processors can be sync or async and are applied in sequence.
        event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools.
    """
    if model is None or defer_model_check:
        self._model = model
    else:
        self._model = models.infer_model(model)

    self._name = name
    self.end_strategy = end_strategy
    self.model_settings = model_settings

    self._output_type = output_type
    self.instrument = instrument
    self._deps_type = deps_type

    if mcp_servers := _deprecated_kwargs.pop('mcp_servers', None):
        if toolsets is not None:  # pragma: no cover
            raise TypeError('`mcp_servers` and `toolsets` cannot be set at the same time.')
        warnings.warn('`mcp_servers` is deprecated, use `toolsets` instead', DeprecationWarning)
        toolsets = mcp_servers

    _utils.validate_empty_kwargs(_deprecated_kwargs)

    default_output_mode = (
        self.model.profile.default_structured_output_mode if isinstance(self.model, models.Model) else None
    )

    self._output_schema = _output.OutputSchema[OutputDataT].build(output_type, default_mode=default_output_mode)
    self._output_validators = []

    self._instructions = ''
    self._instructions_functions = []
    if isinstance(instructions, str | Callable):
        instructions = [instructions]
    for instruction in instructions or []:
        if isinstance(instruction, str):
            self._instructions += instruction + '\n'
        else:
            self._instructions_functions.append(_system_prompt.SystemPromptRunner(instruction))
    self._instructions = self._instructions.strip() or None

    self._system_prompts = (system_prompt,) if isinstance(system_prompt, str) else tuple(system_prompt)
    self._system_prompt_functions = []
    self._system_prompt_dynamic_functions = {}

    self._max_result_retries = output_retries if output_retries is not None else retries
    self._max_tool_retries = retries

    self._builtin_tools = builtin_tools

    self._prepare_tools = prepare_tools
    self._prepare_output_tools = prepare_output_tools

    self._output_toolset = self._output_schema.toolset
    if self._output_toolset:
        self._output_toolset.max_retries = self._max_result_retries

    self._function_toolset = _AgentFunctionToolset(
        tools, max_retries=self._max_tool_retries, output_schema=self._output_schema
    )
    self._dynamic_toolsets = [
        DynamicToolset[AgentDepsT](toolset_func=toolset)
        for toolset in toolsets or []
        if not isinstance(toolset, AbstractToolset)
    ]
    self._user_toolsets = [toolset for toolset in toolsets or [] if isinstance(toolset, AbstractToolset)]

    self.history_processors = history_processors or []

    self._event_stream_handler = event_stream_handler

    self._override_deps: ContextVar[_utils.Option[AgentDepsT]] = ContextVar('_override_deps', default=None)
    self._override_model: ContextVar[_utils.Option[models.Model]] = ContextVar('_override_model', default=None)
    self._override_toolsets: ContextVar[_utils.Option[Sequence[AbstractToolset[AgentDepsT]]]] = ContextVar(
        '_override_toolsets', default=None
    )
    self._override_tools: ContextVar[
        _utils.Option[Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]]]
    ] = ContextVar('_override_tools', default=None)

    self._enter_lock = Lock()
    self._entered_count = 0
    self._exit_stack = None

end_strategy instance-attribute

end_strategy: EndStrategy = end_strategy

当找到最终结果时处理工具调用的策略。

model_settings instance-attribute

model_settings: ModelSettings | None = model_settings

默认情况下,用于此代理运行的可选模型请求设置。

注意,如果 runrun_syncrun_stream 提供了 model_settings,这些设置将与此值合并,运行时参数优先。

instrument instance-attribute

instrument: InstrumentationSettings | bool | None = (
    instrument
)

使用 OpenTelemetry 自动检测的选项。

instrument_all staticmethod

instrument_all(
    instrument: InstrumentationSettings | bool = True,
) -> None

为所有未设置 `instrument` 的代理设置检测选项。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
381
382
383
384
@staticmethod
def instrument_all(instrument: InstrumentationSettings | bool = True) -> None:
    """Set the instrumentation options for all agents where `instrument` is not set."""
    Agent._instrument_default = instrument

model property writable

model: Model | KnownModelName | str | None

为此代理配置的默认模型。

name property writable

name: str | None

代理的名称,用于日志记录。

如果为 None,我们会在代理首次运行时尝试从调用帧中推断代理名称。

deps_type property

deps_type: type

代理使用的依赖项类型。

output_type property

output_type: OutputSpec[OutputDataT]

代理运行输出的数据类型,用于验证模型返回的数据,默认为 str

event_stream_handler property

event_stream_handler: EventStreamHandler[AgentDepsT] | None

可选的事件处理器,用于处理来自模型的流式响应和代理执行工具时产生的事件。

iter async

iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, OutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, RunOutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]

一个上下文管理器,可用于在代理图的节点执行时对其进行迭代。

此方法构建一个内部代理图(使用系统提示、工具和输出模式),然后返回一个 AgentRun 对象。AgentRun 可用于在图的节点执行时异步迭代它们。如果您想消费来自每个 LLM 模型响应的输出,或来自工具执行的事件流,应使用此 API。

AgentRun 还提供方法来访问完整的消息历史记录、新消息、使用情况统计数据以及运行完成后的最终结果。

有关更多详细信息,请参阅 AgentRun 的文档。

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    nodes = []
    async with agent.iter('What is the capital of France?') as agent_run:
        async for node in agent_run:
            nodes.append(node)
    print(nodes)
    '''
    [
        UserPromptNode(
            user_prompt='What is the capital of France?',
            instructions=None,
            instructions_functions=[],
            system_prompts=(),
            system_prompt_functions=[],
            system_prompt_dynamic_functions={},
        ),
        ModelRequestNode(
            request=ModelRequest(
                parts=[
                    UserPromptPart(
                        content='What is the capital of France?',
                        timestamp=datetime.datetime(...),
                    )
                ]
            )
        ),
        CallToolsNode(
            model_response=ModelResponse(
                parts=[TextPart(content='The capital of France is Paris.')],
                usage=RequestUsage(input_tokens=56, output_tokens=7),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            )
        ),
        End(data=FinalResult(output='The capital of France is Paris.')),
    ]
    '''
    print(agent_run.result.output)
    #> The capital of France is Paris.

参数

名称 类型 描述 默认值
user_prompt str | Sequence[UserContent] | None

用于开始/继续对话的用户输入。

None
output_type OutputSpec[RunOutputDataT] | None

用于此运行的自定义输出类型,只有当代理没有输出验证器时才可以使用 output_type,因为输出验证器会期望一个与代理输出类型匹配的参数。

None
message_history list[ModelMessage] | None

到目前为止的对话历史。

None
deferred_tool_results DeferredToolResults | None

消息历史记录中延迟工具调用的可选结果。

None
model Model | KnownModelName | str | None

此运行可选使用的模型,如果在创建代理时未设置 model,则为必需项。

None
deps AgentDepsT

此运行可选使用的依赖项。

None
model_settings ModelSettings | None

用于此模型请求的可选设置。

None
usage_limits UsageLimits | None

模型请求次数或令牌使用量的可选限制。

None
usage RunUsage | None

可选的起始使用量,用于恢复对话或在工具中使用的代理。

None
infer_name bool

如果代理名称未设置,是否尝试从调用帧中推断。

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

此运行可选的附加工具集。

None

返回

类型 描述
AsyncIterator[AgentRun[AgentDepsT, Any]]

运行的结果。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
@asynccontextmanager
async def iter(  # noqa: C901
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    deferred_tool_results: DeferredToolResults | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.RunUsage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
    """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

    This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
    `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
    executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
    stream of events coming from the execution of tools.

    The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
    and the final result of the run once it has completed.

    For more details, see the documentation of `AgentRun`.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        nodes = []
        async with agent.iter('What is the capital of France?') as agent_run:
            async for node in agent_run:
                nodes.append(node)
        print(nodes)
        '''
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                instructions=None,
                instructions_functions=[],
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                        )
                    ]
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='The capital of France is Paris.')],
                    usage=RequestUsage(input_tokens=56, output_tokens=7),
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                )
            ),
            End(data=FinalResult(output='The capital of France is Paris.')),
        ]
        '''
        print(agent_run.result.output)
        #> The capital of France is Paris.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        deferred_tool_results: Optional results for deferred tool calls in the message history.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.

    Returns:
        The result of the run.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())
    model_used = self._get_model(model)
    del model

    deps = self._get_deps(deps)
    new_message_index = len(message_history) if message_history else 0
    output_schema = self._prepare_output_schema(output_type, model_used.profile)

    output_type_ = output_type or self.output_type

    # We consider it a user error if a user tries to restrict the result type while having an output validator that
    # may change the result type from the restricted type to something else. Therefore, we consider the following
    # typecast reasonable, even though it is possible to violate it with otherwise-type-checked code.
    output_validators = cast(list[_output.OutputValidator[AgentDepsT, RunOutputDataT]], self._output_validators)

    output_toolset = self._output_toolset
    if output_schema != self._output_schema or output_validators:
        output_toolset = cast(OutputToolset[AgentDepsT], output_schema.toolset)
        if output_toolset:
            output_toolset.max_retries = self._max_result_retries
            output_toolset.output_validators = output_validators
    toolset = self._get_toolset(output_toolset=output_toolset, additional_toolsets=toolsets)
    tool_manager = ToolManager[AgentDepsT](toolset)

    # Build the graph
    graph: Graph[_agent_graph.GraphAgentState, _agent_graph.GraphAgentDeps[AgentDepsT, Any], FinalResult[Any]] = (
        _agent_graph.build_agent_graph(self.name, self._deps_type, output_type_)
    )

    # Build the initial state
    usage = usage or _usage.RunUsage()
    state = _agent_graph.GraphAgentState(
        message_history=message_history[:] if message_history else [],
        usage=usage,
        retries=0,
        run_step=0,
    )

    # Merge model settings in order of precedence: run > agent > model
    merged_settings = merge_model_settings(model_used.settings, self.model_settings)
    model_settings = merge_model_settings(merged_settings, model_settings)
    usage_limits = usage_limits or _usage.UsageLimits()

    async def get_instructions(run_context: RunContext[AgentDepsT]) -> str | None:
        parts = [
            self._instructions,
            *[await func.run(run_context) for func in self._instructions_functions],
        ]

        model_profile = model_used.profile
        if isinstance(output_schema, _output.PromptedOutputSchema):
            instructions = output_schema.instructions(model_profile.prompted_output_template)
            parts.append(instructions)

        parts = [p for p in parts if p]
        if not parts:
            return None
        return '\n\n'.join(parts).strip()

    if isinstance(model_used, InstrumentedModel):
        instrumentation_settings = model_used.instrumentation_settings
        tracer = model_used.instrumentation_settings.tracer
    else:
        instrumentation_settings = None
        tracer = NoOpTracer()

    tool_call_results: dict[str, DeferredToolResult] | None = None
    if deferred_tool_results is not None:
        tool_call_results = {}
        for tool_call_id, approval in deferred_tool_results.approvals.items():
            if approval is True:
                approval = ToolApproved()
            elif approval is False:
                approval = ToolDenied()
            tool_call_results[tool_call_id] = approval

        if calls := deferred_tool_results.calls:
            call_result_types = _utils.get_union_args(DeferredToolCallResult)
            for tool_call_id, result in calls.items():
                if not isinstance(result, call_result_types):
                    result = _messages.ToolReturn(result)
                tool_call_results[tool_call_id] = result

    graph_deps = _agent_graph.GraphAgentDeps[AgentDepsT, RunOutputDataT](
        user_deps=deps,
        prompt=user_prompt,
        new_message_index=new_message_index,
        model=model_used,
        model_settings=model_settings,
        usage_limits=usage_limits,
        max_result_retries=self._max_result_retries,
        end_strategy=self.end_strategy,
        output_schema=output_schema,
        output_validators=output_validators,
        history_processors=self.history_processors,
        builtin_tools=list(self._builtin_tools),
        tool_manager=tool_manager,
        tool_call_results=tool_call_results,
        tracer=tracer,
        get_instructions=get_instructions,
        instrumentation_settings=instrumentation_settings,
    )
    start_node = _agent_graph.UserPromptNode[AgentDepsT](
        user_prompt=user_prompt,
        instructions=self._instructions,
        instructions_functions=self._instructions_functions,
        system_prompts=self._system_prompts,
        system_prompt_functions=self._system_prompt_functions,
        system_prompt_dynamic_functions=self._system_prompt_dynamic_functions,
    )

    agent_name = self.name or 'agent'
    run_span = tracer.start_span(
        'agent run',
        attributes={
            'model_name': model_used.model_name if model_used else 'no-model',
            'agent_name': agent_name,
            'logfire.msg': f'{agent_name} run',
        },
    )

    try:
        async with toolset:
            async with graph.iter(
                start_node,
                state=state,
                deps=graph_deps,
                span=use_span(run_span) if run_span.is_recording() else None,
                infer_name=False,
            ) as graph_run:
                agent_run = AgentRun(graph_run)
                yield agent_run
                if (final_result := agent_run.result) is not None and run_span.is_recording():
                    if instrumentation_settings and instrumentation_settings.include_content:
                        run_span.set_attribute(
                            'final_result',
                            (
                                final_result.output
                                if isinstance(final_result.output, str)
                                else json.dumps(InstrumentedModel.serialize_any(final_result.output))
                            ),
                        )
    finally:
        try:
            if instrumentation_settings and run_span.is_recording():
                run_span.set_attributes(self._run_span_end_attributes(state, usage, instrumentation_settings))
        finally:
            run_span.end()

重写

override(
    *,
    deps: AgentDepsT | Unset = UNSET,
    model: Model | KnownModelName | str | Unset = UNSET,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | Unset
    ) = UNSET,
    tools: (
        Sequence[
            Tool[AgentDepsT]
            | ToolFuncEither[AgentDepsT, ...]
        ]
        | Unset
    ) = UNSET
) -> Iterator[None]

用于临时覆盖代理依赖项、模型、工具集或工具的上下文管理器。

这在测试时特别有用。您可以在此处找到一个示例。

参数

名称 类型 描述 默认值
deps AgentDepsT | Unset

用于替代传递给代理运行的依赖项的依赖项。

UNSET
model Model | KnownModelName | str | Unset

用于替代传递给代理运行的模型的模型。

UNSET
toolsets Sequence[AbstractToolset[AgentDepsT]] | Unset

用于替代传递给代理构造函数和代理运行的工具集的工具集。

UNSET
工具 Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | Unset

用于替代已向代理注册的工具的工具。

UNSET
源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
@contextmanager
def override(
    self,
    *,
    deps: AgentDepsT | _utils.Unset = _utils.UNSET,
    model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
    tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
) -> Iterator[None]:
    """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

    This is particularly useful when testing.
    You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

    Args:
        deps: The dependencies to use instead of the dependencies passed to the agent run.
        model: The model to use instead of the model passed to the agent run.
        toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
        tools: The tools to use instead of the tools registered with the agent.
    """
    if _utils.is_set(deps):
        deps_token = self._override_deps.set(_utils.Some(deps))
    else:
        deps_token = None

    if _utils.is_set(model):
        model_token = self._override_model.set(_utils.Some(models.infer_model(model)))
    else:
        model_token = None

    if _utils.is_set(toolsets):
        toolsets_token = self._override_toolsets.set(_utils.Some(toolsets))
    else:
        toolsets_token = None

    if _utils.is_set(tools):
        tools_token = self._override_tools.set(_utils.Some(tools))
    else:
        tools_token = None

    try:
        yield
    finally:
        if deps_token is not None:
            self._override_deps.reset(deps_token)
        if model_token is not None:
            self._override_model.reset(model_token)
        if toolsets_token is not None:
            self._override_toolsets.reset(toolsets_token)
        if tools_token is not None:
            self._override_tools.reset(tools_token)

指令

instructions(
    func: Callable[[RunContext[AgentDepsT]], str],
) -> Callable[[RunContext[AgentDepsT]], str]
instructions(func: Callable[[], str]) -> Callable[[], str]
instructions(
    func: Callable[[], Awaitable[str]],
) -> Callable[[], Awaitable[str]]

用于注册指令函数的装饰器。

可选地接受 RunContext 作为其唯一参数。可以装饰同步或异步函数。

该装饰器可以单独使用(agent.instructions)。

包含了 instructions 所有可能签名的重载,因此该装饰器不会掩盖函数的类型。

示例

from pydantic_ai import Agent, RunContext

agent = Agent('test', deps_type=str)

@agent.instructions
def simple_instructions() -> str:
    return 'foobar'

@agent.instructions
async def async_instructions(ctx: RunContext[str]) -> str:
    return f'{ctx.deps} is the best'

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
def instructions(
    self,
    func: _system_prompt.SystemPromptFunc[AgentDepsT] | None = None,
    /,
) -> (
    Callable[[_system_prompt.SystemPromptFunc[AgentDepsT]], _system_prompt.SystemPromptFunc[AgentDepsT]]
    | _system_prompt.SystemPromptFunc[AgentDepsT]
):
    """Decorator to register an instructions function.

    Optionally takes [`RunContext`][pydantic_ai.tools.RunContext] as its only argument.
    Can decorate a sync or async functions.

    The decorator can be used bare (`agent.instructions`).

    Overloads for every possible signature of `instructions` are included so the decorator doesn't obscure
    the type of the function.

    Example:
    ```python
    from pydantic_ai import Agent, RunContext

    agent = Agent('test', deps_type=str)

    @agent.instructions
    def simple_instructions() -> str:
        return 'foobar'

    @agent.instructions
    async def async_instructions(ctx: RunContext[str]) -> str:
        return f'{ctx.deps} is the best'
    ```
    """
    if func is None:

        def decorator(
            func_: _system_prompt.SystemPromptFunc[AgentDepsT],
        ) -> _system_prompt.SystemPromptFunc[AgentDepsT]:
            self._instructions_functions.append(_system_prompt.SystemPromptRunner(func_))
            return func_

        return decorator
    else:
        self._instructions_functions.append(_system_prompt.SystemPromptRunner(func))
        return func

system_prompt

system_prompt(
    func: Callable[[RunContext[AgentDepsT]], str],
) -> Callable[[RunContext[AgentDepsT]], str]
system_prompt(func: Callable[[], str]) -> Callable[[], str]
system_prompt(
    func: Callable[[], Awaitable[str]],
) -> Callable[[], Awaitable[str]]
system_prompt(*, dynamic: bool = False) -> Callable[
    [SystemPromptFunc[AgentDepsT]],
    SystemPromptFunc[AgentDepsT],
]
system_prompt(
    func: SystemPromptFunc[AgentDepsT] | None = None,
    /,
    *,
    dynamic: bool = False,
) -> (
    Callable[
        [SystemPromptFunc[AgentDepsT]],
        SystemPromptFunc[AgentDepsT],
    ]
    | SystemPromptFunc[AgentDepsT]
)

用于注册系统提示函数的装饰器。

可选地接受 RunContext 作为其唯一参数。可以装饰同步或异步函数。

该装饰器可以单独使用(agent.system_prompt),也可以作为函数调用使用(agent.system_prompt(...)),请参见下面的示例。

包含了 system_prompt 所有可能签名的重载,因此该装饰器不会掩盖函数的类型,请参见 tests/typed_agent.py 进行测试。

参数

名称 类型 描述 默认值
func SystemPromptFunc[AgentDepsT] | None

要装饰的函数

None
dynamic bool

如果为 True,即使提供了 messages_history,系统提示也会被重新评估,请参见 SystemPromptPart.dynamic_ref

False

示例

from pydantic_ai import Agent, RunContext

agent = Agent('test', deps_type=str)

@agent.system_prompt
def simple_system_prompt() -> str:
    return 'foobar'

@agent.system_prompt(dynamic=True)
async def async_system_prompt(ctx: RunContext[str]) -> str:
    return f'{ctx.deps} is the best'

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
def system_prompt(
    self,
    func: _system_prompt.SystemPromptFunc[AgentDepsT] | None = None,
    /,
    *,
    dynamic: bool = False,
) -> (
    Callable[[_system_prompt.SystemPromptFunc[AgentDepsT]], _system_prompt.SystemPromptFunc[AgentDepsT]]
    | _system_prompt.SystemPromptFunc[AgentDepsT]
):
    """Decorator to register a system prompt function.

    Optionally takes [`RunContext`][pydantic_ai.tools.RunContext] as its only argument.
    Can decorate a sync or async functions.

    The decorator can be used either bare (`agent.system_prompt`) or as a function call
    (`agent.system_prompt(...)`), see the examples below.

    Overloads for every possible signature of `system_prompt` are included so the decorator doesn't obscure
    the type of the function, see `tests/typed_agent.py` for tests.

    Args:
        func: The function to decorate
        dynamic: If True, the system prompt will be reevaluated even when `messages_history` is provided,
            see [`SystemPromptPart.dynamic_ref`][pydantic_ai.messages.SystemPromptPart.dynamic_ref]

    Example:
    ```python
    from pydantic_ai import Agent, RunContext

    agent = Agent('test', deps_type=str)

    @agent.system_prompt
    def simple_system_prompt() -> str:
        return 'foobar'

    @agent.system_prompt(dynamic=True)
    async def async_system_prompt(ctx: RunContext[str]) -> str:
        return f'{ctx.deps} is the best'
    ```
    """
    if func is None:

        def decorator(
            func_: _system_prompt.SystemPromptFunc[AgentDepsT],
        ) -> _system_prompt.SystemPromptFunc[AgentDepsT]:
            runner = _system_prompt.SystemPromptRunner[AgentDepsT](func_, dynamic=dynamic)
            self._system_prompt_functions.append(runner)
            if dynamic:  # pragma: lax no cover
                self._system_prompt_dynamic_functions[func_.__qualname__] = runner
            return func_

        return decorator
    else:
        assert not dynamic, "dynamic can't be True in this case"
        self._system_prompt_functions.append(_system_prompt.SystemPromptRunner[AgentDepsT](func, dynamic=dynamic))
        return func

output_validator

output_validator(
    func: OutputValidatorFunc[AgentDepsT, OutputDataT],
) -> OutputValidatorFunc[AgentDepsT, OutputDataT]

用于注册输出验证器函数的装饰器。

可选地接受 RunContext 作为其第一个参数。可以装饰同步或异步函数。

包含了 output_validator 所有可能签名的重载,因此该装饰器不会掩盖函数的类型,请参见 tests/typed_agent.py 进行测试。

示例

from pydantic_ai import Agent, ModelRetry, RunContext

agent = Agent('test', deps_type=str)

@agent.output_validator
def output_validator_simple(data: str) -> str:
    if 'wrong' in data:
        raise ModelRetry('wrong response')
    return data

@agent.output_validator
async def output_validator_deps(ctx: RunContext[str], data: str) -> str:
    if ctx.deps in data:
        raise ModelRetry('wrong response')
    return data

result = agent.run_sync('foobar', deps='spam')
print(result.output)
#> success (no tool calls)

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
def output_validator(
    self, func: _output.OutputValidatorFunc[AgentDepsT, OutputDataT], /
) -> _output.OutputValidatorFunc[AgentDepsT, OutputDataT]:
    """Decorator to register an output validator function.

    Optionally takes [`RunContext`][pydantic_ai.tools.RunContext] as its first argument.
    Can decorate a sync or async functions.

    Overloads for every possible signature of `output_validator` are included so the decorator doesn't obscure
    the type of the function, see `tests/typed_agent.py` for tests.

    Example:
    ```python
    from pydantic_ai import Agent, ModelRetry, RunContext

    agent = Agent('test', deps_type=str)

    @agent.output_validator
    def output_validator_simple(data: str) -> str:
        if 'wrong' in data:
            raise ModelRetry('wrong response')
        return data

    @agent.output_validator
    async def output_validator_deps(ctx: RunContext[str], data: str) -> str:
        if ctx.deps in data:
            raise ModelRetry('wrong response')
        return data

    result = agent.run_sync('foobar', deps='spam')
    print(result.output)
    #> success (no tool calls)
    ```
    """
    self._output_validators.append(_output.OutputValidator[AgentDepsT, Any](func))
    return func

工具

tool(
    *,
    name: str | None = None,
    retries: int | None = None,
    prepare: ToolPrepareFunc[AgentDepsT] | None = None,
    docstring_format: DocstringFormat = "auto",
    require_parameter_descriptions: bool = False,
    schema_generator: type[
        GenerateJsonSchema
    ] = GenerateToolJsonSchema,
    strict: bool | None = None,
    requires_approval: bool = False
) -> Callable[
    [ToolFuncContext[AgentDepsT, ToolParams]],
    ToolFuncContext[AgentDepsT, ToolParams],
]
tool(
    func: (
        ToolFuncContext[AgentDepsT, ToolParams] | None
    ) = None,
    /,
    *,
    name: str | None = None,
    retries: int | None = None,
    prepare: ToolPrepareFunc[AgentDepsT] | None = None,
    docstring_format: DocstringFormat = "auto",
    require_parameter_descriptions: bool = False,
    schema_generator: type[
        GenerateJsonSchema
    ] = GenerateToolJsonSchema,
    strict: bool | None = None,
    requires_approval: bool = False,
) -> Any

用于注册一个工具函数,该函数接受 RunContext 作为其第一个参数。

可以装饰同步或异步函数。

检查文档字符串以提取工具描述和每个参数的描述,了解更多

我们无法为工具的每个可能签名添加重载,因为返回类型是一个递归联合体,所以用 @agent.tool 装饰的函数的签名被掩盖了。

示例

from pydantic_ai import Agent, RunContext

agent = Agent('test', deps_type=int)

@agent.tool
def foobar(ctx: RunContext[int], x: int) -> int:
    return ctx.deps + x

@agent.tool(retries=2)
async def spam(ctx: RunContext[str], y: float) -> float:
    return ctx.deps + y

result = agent.run_sync('foobar', deps=1)
print(result.output)
#> {"foobar":1,"spam":1.0}

参数

名称 类型 描述 默认值
func ToolFuncContext[AgentDepsT, ToolParams] | None

要注册的工具函数。

None
name str | None

工具的名称,默认为函数名。

None
retries int | None

此工具允许的重试次数,默认为代理的默认重试次数,默认为 1。

None
prepare ToolPrepareFunc[AgentDepsT] | None

自定义方法,用于为每个步骤准备工具定义,返回 None 以在给定步骤中省略此工具。如果您想在调用时自定义工具,或在某个步骤中完全省略它,这将非常有用。请参阅 ToolPrepareFunc

None
docstring_format DocstringFormat

文档字符串的格式,请参阅 DocstringFormat。默认为 'auto',这样格式会从文档字符串的结构中推断出来。

'auto'
require_parameter_descriptions bool

如果为 True,则在缺少参数描述时引发错误。默认为 False。

False
schema_generator type[GenerateJsonSchema]

此工具使用的 JSON 模式生成器类。默认为 GenerateToolJsonSchema

GenerateToolJsonSchema
strict bool | None

是否强制执行 JSON 模式合规性(仅影响 OpenAI)。有关更多信息,请参阅 ToolDefinition

None
requires_approval bool

此工具是否需要人在回路中批准。默认为 False。更多信息请参见工具文档

False
源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
def tool(
    self,
    func: ToolFuncContext[AgentDepsT, ToolParams] | None = None,
    /,
    *,
    name: str | None = None,
    retries: int | None = None,
    prepare: ToolPrepareFunc[AgentDepsT] | None = None,
    docstring_format: DocstringFormat = 'auto',
    require_parameter_descriptions: bool = False,
    schema_generator: type[GenerateJsonSchema] = GenerateToolJsonSchema,
    strict: bool | None = None,
    requires_approval: bool = False,
) -> Any:
    """Decorator to register a tool function which takes [`RunContext`][pydantic_ai.tools.RunContext] as its first argument.

    Can decorate a sync or async functions.

    The docstring is inspected to extract both the tool description and description of each parameter,
    [learn more](../tools.md#function-tools-and-schema).

    We can't add overloads for every possible signature of tool, since the return type is a recursive union
    so the signature of functions decorated with `@agent.tool` is obscured.

    Example:
    ```python
    from pydantic_ai import Agent, RunContext

    agent = Agent('test', deps_type=int)

    @agent.tool
    def foobar(ctx: RunContext[int], x: int) -> int:
        return ctx.deps + x

    @agent.tool(retries=2)
    async def spam(ctx: RunContext[str], y: float) -> float:
        return ctx.deps + y

    result = agent.run_sync('foobar', deps=1)
    print(result.output)
    #> {"foobar":1,"spam":1.0}
    ```

    Args:
        func: The tool function to register.
        name: The name of the tool, defaults to the function name.
        retries: The number of retries to allow for this tool, defaults to the agent's default retries,
            which defaults to 1.
        prepare: custom method to prepare the tool definition for each step, return `None` to omit this
            tool from a given step. This is useful if you want to customise a tool at call time,
            or omit it completely from a step. See [`ToolPrepareFunc`][pydantic_ai.tools.ToolPrepareFunc].
        docstring_format: The format of the docstring, see [`DocstringFormat`][pydantic_ai.tools.DocstringFormat].
            Defaults to `'auto'`, such that the format is inferred from the structure of the docstring.
        require_parameter_descriptions: If True, raise an error if a parameter description is missing. Defaults to False.
        schema_generator: The JSON schema generator class to use for this tool. Defaults to `GenerateToolJsonSchema`.
        strict: Whether to enforce JSON schema compliance (only affects OpenAI).
            See [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] for more info.
        requires_approval: Whether this tool requires human-in-the-loop approval. Defaults to False.
            See the [tools documentation](../deferred-tools.md#human-in-the-loop-tool-approval) for more info.
    """

    def tool_decorator(
        func_: ToolFuncContext[AgentDepsT, ToolParams],
    ) -> ToolFuncContext[AgentDepsT, ToolParams]:
        # noinspection PyTypeChecker
        self._function_toolset.add_function(
            func_,
            True,
            name,
            retries,
            prepare,
            docstring_format,
            require_parameter_descriptions,
            schema_generator,
            strict,
            requires_approval,
        )
        return func_

    return tool_decorator if func is None else tool_decorator(func)

tool_plain

tool_plain(
    *,
    name: str | None = None,
    retries: int | None = None,
    prepare: ToolPrepareFunc[AgentDepsT] | None = None,
    docstring_format: DocstringFormat = "auto",
    require_parameter_descriptions: bool = False,
    schema_generator: type[
        GenerateJsonSchema
    ] = GenerateToolJsonSchema,
    strict: bool | None = None,
    requires_approval: bool = False
) -> Callable[
    [ToolFuncPlain[ToolParams]], ToolFuncPlain[ToolParams]
]
tool_plain(
    func: ToolFuncPlain[ToolParams] | None = None,
    /,
    *,
    name: str | None = None,
    retries: int | None = None,
    prepare: ToolPrepareFunc[AgentDepsT] | None = None,
    docstring_format: DocstringFormat = "auto",
    require_parameter_descriptions: bool = False,
    schema_generator: type[
        GenerateJsonSchema
    ] = GenerateToolJsonSchema,
    strict: bool | None = None,
    requires_approval: bool = False,
) -> Any

用于注册一个工具函数的装饰器,该函数不接受 RunContext 作为参数。

可以装饰同步或异步函数。

检查文档字符串以提取工具描述和每个参数的描述,了解更多

我们无法为工具的每个可能签名添加重载,因为返回类型是一个递归联合体,所以用 @agent.tool 装饰的函数的签名被掩盖了。

示例

from pydantic_ai import Agent, RunContext

agent = Agent('test')

@agent.tool
def foobar(ctx: RunContext[int]) -> int:
    return 123

@agent.tool(retries=2)
async def spam(ctx: RunContext[str]) -> float:
    return 3.14

result = agent.run_sync('foobar', deps=1)
print(result.output)
#> {"foobar":123,"spam":3.14}

参数

名称 类型 描述 默认值
func ToolFuncPlain[ToolParams] | None

要注册的工具函数。

None
name str | None

工具的名称,默认为函数名。

None
retries int | None

此工具允许的重试次数,默认为代理的默认重试次数,默认为 1。

None
prepare ToolPrepareFunc[AgentDepsT] | None

自定义方法,用于为每个步骤准备工具定义,返回 None 以在给定步骤中省略此工具。如果您想在调用时自定义工具,或在某个步骤中完全省略它,这将非常有用。请参阅 ToolPrepareFunc

None
docstring_format DocstringFormat

文档字符串的格式,请参阅 DocstringFormat。默认为 'auto',这样格式会从文档字符串的结构中推断出来。

'auto'
require_parameter_descriptions bool

如果为 True,则在缺少参数描述时引发错误。默认为 False。

False
schema_generator type[GenerateJsonSchema]

此工具使用的 JSON 模式生成器类。默认为 GenerateToolJsonSchema

GenerateToolJsonSchema
strict bool | None

是否强制执行 JSON 模式合规性(仅影响 OpenAI)。有关更多信息,请参阅 ToolDefinition

None
requires_approval bool

此工具是否需要人在回路中批准。默认为 False。更多信息请参见工具文档

False
源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
def tool_plain(
    self,
    func: ToolFuncPlain[ToolParams] | None = None,
    /,
    *,
    name: str | None = None,
    retries: int | None = None,
    prepare: ToolPrepareFunc[AgentDepsT] | None = None,
    docstring_format: DocstringFormat = 'auto',
    require_parameter_descriptions: bool = False,
    schema_generator: type[GenerateJsonSchema] = GenerateToolJsonSchema,
    strict: bool | None = None,
    requires_approval: bool = False,
) -> Any:
    """Decorator to register a tool function which DOES NOT take `RunContext` as an argument.

    Can decorate a sync or async functions.

    The docstring is inspected to extract both the tool description and description of each parameter,
    [learn more](../tools.md#function-tools-and-schema).

    We can't add overloads for every possible signature of tool, since the return type is a recursive union
    so the signature of functions decorated with `@agent.tool` is obscured.

    Example:
    ```python
    from pydantic_ai import Agent, RunContext

    agent = Agent('test')

    @agent.tool
    def foobar(ctx: RunContext[int]) -> int:
        return 123

    @agent.tool(retries=2)
    async def spam(ctx: RunContext[str]) -> float:
        return 3.14

    result = agent.run_sync('foobar', deps=1)
    print(result.output)
    #> {"foobar":123,"spam":3.14}
    ```

    Args:
        func: The tool function to register.
        name: The name of the tool, defaults to the function name.
        retries: The number of retries to allow for this tool, defaults to the agent's default retries,
            which defaults to 1.
        prepare: custom method to prepare the tool definition for each step, return `None` to omit this
            tool from a given step. This is useful if you want to customise a tool at call time,
            or omit it completely from a step. See [`ToolPrepareFunc`][pydantic_ai.tools.ToolPrepareFunc].
        docstring_format: The format of the docstring, see [`DocstringFormat`][pydantic_ai.tools.DocstringFormat].
            Defaults to `'auto'`, such that the format is inferred from the structure of the docstring.
        require_parameter_descriptions: If True, raise an error if a parameter description is missing. Defaults to False.
        schema_generator: The JSON schema generator class to use for this tool. Defaults to `GenerateToolJsonSchema`.
        strict: Whether to enforce JSON schema compliance (only affects OpenAI).
            See [`ToolDefinition`][pydantic_ai.tools.ToolDefinition] for more info.
        requires_approval: Whether this tool requires human-in-the-loop approval. Defaults to False.
            See the [tools documentation](../deferred-tools.md#human-in-the-loop-tool-approval) for more info.
    """

    def tool_decorator(func_: ToolFuncPlain[ToolParams]) -> ToolFuncPlain[ToolParams]:
        # noinspection PyTypeChecker
        self._function_toolset.add_function(
            func_,
            False,
            name,
            retries,
            prepare,
            docstring_format,
            require_parameter_descriptions,
            schema_generator,
            strict,
            requires_approval,
        )
        return func_

    return tool_decorator if func is None else tool_decorator(func)

工具集

toolset(
    *, per_run_step: bool = True
) -> Callable[
    [ToolsetFunc[AgentDepsT]], ToolsetFunc[AgentDepsT]
]
toolset(
    func: ToolsetFunc[AgentDepsT] | None = None,
    /,
    *,
    per_run_step: bool = True,
) -> Any

用于注册一个工具集函数,该函数接受 RunContext 作为其唯一参数。

可以装饰同步或异步函数。

该装饰器可以单独使用(agent.toolset)。

示例

from pydantic_ai import Agent, RunContext
from pydantic_ai.toolsets import AbstractToolset, FunctionToolset

agent = Agent('test', deps_type=str)

@agent.toolset
async def simple_toolset(ctx: RunContext[str]) -> AbstractToolset[str]:
    return FunctionToolset()

参数

名称 类型 描述 默认值
func ToolsetFunc[AgentDepsT] | None

要注册的工具集函数。

None
per_run_step bool

是否为每个运行步骤重新评估工具集。默认为 True。

True
源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
def toolset(
    self,
    func: ToolsetFunc[AgentDepsT] | None = None,
    /,
    *,
    per_run_step: bool = True,
) -> Any:
    """Decorator to register a toolset function which takes [`RunContext`][pydantic_ai.tools.RunContext] as its only argument.

    Can decorate a sync or async functions.

    The decorator can be used bare (`agent.toolset`).

    Example:
    ```python
    from pydantic_ai import Agent, RunContext
    from pydantic_ai.toolsets import AbstractToolset, FunctionToolset

    agent = Agent('test', deps_type=str)

    @agent.toolset
    async def simple_toolset(ctx: RunContext[str]) -> AbstractToolset[str]:
        return FunctionToolset()
    ```

    Args:
        func: The toolset function to register.
        per_run_step: Whether to re-evaluate the toolset for each run step. Defaults to True.
    """

    def toolset_decorator(func_: ToolsetFunc[AgentDepsT]) -> ToolsetFunc[AgentDepsT]:
        self._dynamic_toolsets.append(DynamicToolset(func_, per_run_step=per_run_step))
        return func_

    return toolset_decorator if func is None else toolset_decorator(func)

toolsets property

在代理上注册的所有工具集,包括一个持有直接在代理上注册的工具的函数工具集。

不包括输出工具。

__aenter__ 异步

__aenter__() -> Self

进入代理上下文。

这将启动所有注册为 toolsetsMCPServerStdios,使它们准备好被使用。

如果代理已经进入,这是一个无操作。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
async def __aenter__(self) -> Self:
    """Enter the agent context.

    This will start all [`MCPServerStdio`s][pydantic_ai.mcp.MCPServerStdio] registered as `toolsets` so they are ready to be used.

    This is a no-op if the agent has already been entered.
    """
    async with self._enter_lock:
        if self._entered_count == 0:
            async with AsyncExitStack() as exit_stack:
                toolset = self._get_toolset()
                await exit_stack.enter_async_context(toolset)

                self._exit_stack = exit_stack.pop_all()
        self._entered_count += 1
    return self

set_mcp_sampling_model

set_mcp_sampling_model(
    model: Model | KnownModelName | str | None = None,
) -> None

在向代理注册的所有 MCP 服务器上设置采样模型。

如果未提供采样模型,将使用代理的模型。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
def set_mcp_sampling_model(self, model: models.Model | models.KnownModelName | str | None = None) -> None:
    """Set the sampling model on all MCP servers registered with the agent.

    If no sampling model is provided, the agent's model will be used.
    """
    try:
        sampling_model = models.infer_model(model) if model else self._get_model(None)
    except exceptions.UserError as e:
        raise exceptions.UserError('No sampling model provided and no model set on the agent.') from e

    from ..mcp import MCPServer

    def _set_sampling_model(toolset: AbstractToolset[AgentDepsT]) -> None:
        if isinstance(toolset, MCPServer):
            toolset.sampling_model = sampling_model

    self._get_toolset().apply(_set_sampling_model)

run_mcp_servers async deprecated

run_mcp_servers(
    model: Model | KnownModelName | str | None = None,
) -> AsyncIterator[None]
已弃用

run_mcp_servers 已弃用,请改用 async with agent:。如果您需要在所有 MCP 服务器上设置采样模型,请使用 agent.set_mcp_sampling_model()

运行 MCPServerStdios 以便代理可以使用它们。

已弃用:请改用 async with agent。如果您需要在所有 MCP 服务器上设置采样模型,请使用 agent.set_mcp_sampling_model()

返回:一个用于启动和关闭服务器的上下文管理器。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/__init__.py
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
@asynccontextmanager
@deprecated(
    '`run_mcp_servers` is deprecated, use `async with agent:` instead. If you need to set a sampling model on all MCP servers, use `agent.set_mcp_sampling_model()`.'
)
async def run_mcp_servers(
    self, model: models.Model | models.KnownModelName | str | None = None
) -> AsyncIterator[None]:
    """Run [`MCPServerStdio`s][pydantic_ai.mcp.MCPServerStdio] so they can be used by the agent.

    Deprecated: use [`async with agent`][pydantic_ai.agent.Agent.__aenter__] instead.
    If you need to set a sampling model on all MCP servers, use [`agent.set_mcp_sampling_model()`][pydantic_ai.agent.Agent.set_mcp_sampling_model].

    Returns: a context manager to start and shutdown the servers.
    """
    try:
        self.set_mcp_sampling_model(model)
    except exceptions.UserError:
        if model is not None:
            raise

    async with self:
        yield

AbstractAgent

基类:Generic[AgentDepsT, OutputDataT], ABC

用于 AgentWrapperAgent 和您自己的自定义代理实现的抽象超类。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
class AbstractAgent(Generic[AgentDepsT, OutputDataT], ABC):
    """Abstract superclass for [`Agent`][pydantic_ai.agent.Agent], [`WrapperAgent`][pydantic_ai.agent.WrapperAgent], and your own custom agent implementations."""

    @property
    @abstractmethod
    def model(self) -> models.Model | models.KnownModelName | str | None:
        """The default model configured for this agent."""
        raise NotImplementedError

    @property
    @abstractmethod
    def name(self) -> str | None:
        """The name of the agent, used for logging.

        If `None`, we try to infer the agent name from the call frame when the agent is first run.
        """
        raise NotImplementedError

    @name.setter
    @abstractmethod
    def name(self, value: str | None) -> None:
        """Set the name of the agent, used for logging."""
        raise NotImplementedError

    @property
    @abstractmethod
    def deps_type(self) -> type:
        """The type of dependencies used by the agent."""
        raise NotImplementedError

    @property
    @abstractmethod
    def output_type(self) -> OutputSpec[OutputDataT]:
        """The type of data output by agent runs, used to validate the data returned by the model, defaults to `str`."""
        raise NotImplementedError

    @property
    @abstractmethod
    def event_stream_handler(self) -> EventStreamHandler[AgentDepsT] | None:
        """Optional handler for events from the model's streaming response and the agent's execution of tools."""
        raise NotImplementedError

    @property
    @abstractmethod
    def toolsets(self) -> Sequence[AbstractToolset[AgentDepsT]]:
        """All toolsets registered on the agent.

        Output tools are not included.
        """
        raise NotImplementedError

    @overload
    async def run(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[OutputDataT]: ...

    @overload
    async def run(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[RunOutputDataT]: ...

    async def run(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[Any]:
        """Run the agent with a user prompt in async mode.

        This method builds an internal agent graph (using system prompts, tools and output schemas) and then
        runs the graph to completion. The result of the run is returned.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            agent_run = await agent.run('What is the capital of France?')
            print(agent_run.output)
            #> The capital of France is Paris.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            deferred_tool_results: Optional results for deferred tool calls in the message history.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.
            event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.

        Returns:
            The result of the run.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        event_stream_handler = event_stream_handler or self.event_stream_handler

        async with self.iter(
            user_prompt=user_prompt,
            output_type=output_type,
            message_history=message_history,
            deferred_tool_results=deferred_tool_results,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            toolsets=toolsets,
        ) as agent_run:
            async for node in agent_run:
                if event_stream_handler is not None and (
                    self.is_model_request_node(node) or self.is_call_tools_node(node)
                ):
                    async with node.stream(agent_run.ctx) as stream:
                        await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream)

        assert agent_run.result is not None, 'The graph run did not finish properly'
        return agent_run.result

    @overload
    def run_sync(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[OutputDataT]: ...

    @overload
    def run_sync(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[RunOutputDataT]: ...

    def run_sync(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AgentRunResult[Any]:
        """Synchronously run the agent with a user prompt.

        This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] with `loop.run_until_complete(...)`.
        You therefore can't use this method inside async code or if there's an active event loop.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        result_sync = agent.run_sync('What is the capital of Italy?')
        print(result_sync.output)
        #> The capital of Italy is Rome.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            deferred_tool_results: Optional results for deferred tool calls in the message history.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.
            event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.

        Returns:
            The result of the run.
        """
        if infer_name and self.name is None:
            self._infer_name(inspect.currentframe())

        return get_event_loop().run_until_complete(
            self.run(
                user_prompt,
                output_type=output_type,
                message_history=message_history,
                deferred_tool_results=deferred_tool_results,
                model=model,
                deps=deps,
                model_settings=model_settings,
                usage_limits=usage_limits,
                usage=usage,
                infer_name=False,
                toolsets=toolsets,
                event_stream_handler=event_stream_handler,
            )
        )

    @overload
    def run_stream(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AbstractAsyncContextManager[result.StreamedRunResult[AgentDepsT, OutputDataT]]: ...

    @overload
    def run_stream(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AbstractAsyncContextManager[result.StreamedRunResult[AgentDepsT, RunOutputDataT]]: ...

    @asynccontextmanager
    async def run_stream(  # noqa C901
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
    ) -> AsyncIterator[result.StreamedRunResult[AgentDepsT, Any]]:
        """Run the agent with a user prompt in async streaming mode.

        This method builds an internal agent graph (using system prompts, tools and output schemas) and then
        runs the graph until the model produces output matching the `output_type`, for example text or structured data.
        At this point, a streaming run result object is yielded from which you can stream the output as it comes in,
        and -- once this output has completed streaming -- get the complete output, message history, and usage.

        As this method will consider the first output matching the `output_type` to be the final output,
        it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.
        If you want to always run the agent graph to completion and stream events and output at the same time,
        use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            async with agent.run_stream('What is the capital of the UK?') as response:
                print(await response.get_output())
                #> The capital of the UK is London.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            deferred_tool_results: Optional results for deferred tool calls in the message history.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.
            event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.
                It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager.
                Note that it does _not_ receive any events after the final result is found.

        Returns:
            The result of the run.
        """
        if infer_name and self.name is None:
            # f_back because `asynccontextmanager` adds one frame
            if frame := inspect.currentframe():  # pragma: no branch
                self._infer_name(frame.f_back)

        event_stream_handler = event_stream_handler or self.event_stream_handler

        yielded = False
        async with self.iter(
            user_prompt,
            output_type=output_type,
            message_history=message_history,
            deferred_tool_results=deferred_tool_results,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=False,
            toolsets=toolsets,
        ) as agent_run:
            first_node = agent_run.next_node  # start with the first node
            assert isinstance(first_node, _agent_graph.UserPromptNode)  # the first node should be a user prompt node
            node = first_node
            while True:
                graph_ctx = agent_run.ctx
                if self.is_model_request_node(node):
                    async with node.stream(graph_ctx) as stream:
                        final_result_event = None

                        async def stream_to_final(
                            stream: AgentStream,
                        ) -> AsyncIterator[_messages.ModelResponseStreamEvent]:
                            nonlocal final_result_event
                            async for event in stream:
                                yield event
                                if isinstance(event, _messages.FinalResultEvent):
                                    final_result_event = event
                                    break

                        if event_stream_handler is not None:
                            await event_stream_handler(
                                _agent_graph.build_run_context(graph_ctx), stream_to_final(stream)
                            )
                        else:
                            async for _ in stream_to_final(stream):
                                pass

                        if final_result_event is not None:
                            final_result = FinalResult(
                                stream, final_result_event.tool_name, final_result_event.tool_call_id
                            )
                            if yielded:
                                raise exceptions.AgentRunError('Agent run produced final results')  # pragma: no cover
                            yielded = True

                            messages = graph_ctx.state.message_history.copy()

                            async def on_complete() -> None:
                                """Called when the stream has completed.

                                The model response will have been added to messages by now
                                by `StreamedRunResult._marked_completed`.
                                """
                                last_message = messages[-1]
                                assert isinstance(last_message, _messages.ModelResponse)
                                tool_calls = [
                                    part for part in last_message.parts if isinstance(part, _messages.ToolCallPart)
                                ]

                                parts: list[_messages.ModelRequestPart] = []
                                async for _event in _agent_graph.process_function_tools(
                                    graph_ctx.deps.tool_manager,
                                    tool_calls,
                                    final_result,
                                    graph_ctx,
                                    parts,
                                ):
                                    pass
                                if parts:
                                    messages.append(_messages.ModelRequest(parts))

                            yield StreamedRunResult(
                                messages,
                                graph_ctx.deps.new_message_index,
                                stream,
                                on_complete,
                            )
                            break
                elif self.is_call_tools_node(node) and event_stream_handler is not None:
                    async with node.stream(agent_run.ctx) as stream:
                        await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream)

                next_node = await agent_run.next(node)
                if isinstance(next_node, End) and agent_run.result is not None:
                    # A final output could have been produced by the CallToolsNode rather than the ModelRequestNode,
                    # if a tool function raised CallDeferred or ApprovalRequired.
                    # In this case there's no response to stream, but we still let the user access the output etc as normal.
                    yield StreamedRunResult(
                        graph_ctx.state.message_history,
                        graph_ctx.deps.new_message_index,
                        run_result=agent_run.result,
                    )
                    yielded = True
                    break
                if not isinstance(next_node, _agent_graph.AgentNode):
                    raise exceptions.AgentRunError(  # pragma: no cover
                        'Should have produced a StreamedRunResult before getting here'
                    )
                node = cast(_agent_graph.AgentNode[Any, Any], next_node)

        if not yielded:
            raise exceptions.AgentRunError('Agent run finished without producing a final result')  # pragma: no cover

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, OutputDataT]]: ...

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, RunOutputDataT]]: ...

    @asynccontextmanager
    @abstractmethod
    async def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
        """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

        This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
        `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
        executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
        stream of events coming from the execution of tools.

        The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
        and the final result of the run once it has completed.

        For more details, see the documentation of `AgentRun`.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            nodes = []
            async with agent.iter('What is the capital of France?') as agent_run:
                async for node in agent_run:
                    nodes.append(node)
            print(nodes)
            '''
            [
                UserPromptNode(
                    user_prompt='What is the capital of France?',
                    instructions=None,
                    instructions_functions=[],
                    system_prompts=(),
                    system_prompt_functions=[],
                    system_prompt_dynamic_functions={},
                ),
                ModelRequestNode(
                    request=ModelRequest(
                        parts=[
                            UserPromptPart(
                                content='What is the capital of France?',
                                timestamp=datetime.datetime(...),
                            )
                        ]
                    )
                ),
                CallToolsNode(
                    model_response=ModelResponse(
                        parts=[TextPart(content='The capital of France is Paris.')],
                        usage=RequestUsage(input_tokens=56, output_tokens=7),
                        model_name='gpt-4o',
                        timestamp=datetime.datetime(...),
                    )
                ),
                End(data=FinalResult(output='The capital of France is Paris.')),
            ]
            '''
            print(agent_run.result.output)
            #> The capital of France is Paris.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            deferred_tool_results: Optional results for deferred tool calls in the message history.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.

        Returns:
            The result of the run.
        """
        raise NotImplementedError
        yield

    @contextmanager
    @abstractmethod
    def override(
        self,
        *,
        deps: AgentDepsT | _utils.Unset = _utils.UNSET,
        model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
        tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
    ) -> Iterator[None]:
        """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

        This is particularly useful when testing.
        You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

        Args:
            deps: The dependencies to use instead of the dependencies passed to the agent run.
            model: The model to use instead of the model passed to the agent run.
            toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
            tools: The tools to use instead of the tools registered with the agent.
        """
        raise NotImplementedError
        yield

    def _infer_name(self, function_frame: FrameType | None) -> None:
        """Infer the agent name from the call frame.

        RunUsage should be `self._infer_name(inspect.currentframe())`.
        """
        assert self.name is None, 'Name already set'
        if function_frame is not None:  # pragma: no branch
            if parent_frame := function_frame.f_back:  # pragma: no branch
                for name, item in parent_frame.f_locals.items():
                    if item is self:
                        self.name = name
                        return
                if parent_frame.f_locals != parent_frame.f_globals:  # pragma: no branch
                    # if we couldn't find the agent in locals and globals are a different dict, try globals
                    for name, item in parent_frame.f_globals.items():
                        if item is self:
                            self.name = name
                            return

    @staticmethod
    def is_model_request_node(
        node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
    ) -> TypeIs[_agent_graph.ModelRequestNode[T, S]]:
        """Check if the node is a `ModelRequestNode`, narrowing the type if it is.

        This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
        """
        return isinstance(node, _agent_graph.ModelRequestNode)

    @staticmethod
    def is_call_tools_node(
        node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
    ) -> TypeIs[_agent_graph.CallToolsNode[T, S]]:
        """Check if the node is a `CallToolsNode`, narrowing the type if it is.

        This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
        """
        return isinstance(node, _agent_graph.CallToolsNode)

    @staticmethod
    def is_user_prompt_node(
        node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
    ) -> TypeIs[_agent_graph.UserPromptNode[T, S]]:
        """Check if the node is a `UserPromptNode`, narrowing the type if it is.

        This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
        """
        return isinstance(node, _agent_graph.UserPromptNode)

    @staticmethod
    def is_end_node(
        node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
    ) -> TypeIs[End[result.FinalResult[S]]]:
        """Check if the node is a `End`, narrowing the type if it is.

        This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
        """
        return isinstance(node, End)

    @abstractmethod
    async def __aenter__(self) -> AbstractAgent[AgentDepsT, OutputDataT]:
        raise NotImplementedError

    @abstractmethod
    async def __aexit__(self, *args: Any) -> bool | None:
        raise NotImplementedError

    def to_ag_ui(
        self,
        *,
        # Agent.iter parameters
        output_type: OutputSpec[OutputDataT] | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: UsageLimits | None = None,
        usage: RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
        # Starlette
        debug: bool = False,
        routes: Sequence[BaseRoute] | None = None,
        middleware: Sequence[Middleware] | None = None,
        exception_handlers: Mapping[Any, ExceptionHandler] | None = None,
        on_startup: Sequence[Callable[[], Any]] | None = None,
        on_shutdown: Sequence[Callable[[], Any]] | None = None,
        lifespan: Lifespan[AGUIApp[AgentDepsT, OutputDataT]] | None = None,
    ) -> AGUIApp[AgentDepsT, OutputDataT]:
        """Returns an ASGI application that handles every AG-UI request by running the agent.

        Note that the `deps` will be the same for each request, with the exception of the AG-UI state that's
        injected into the `state` field of a `deps` object that implements the [`StateHandler`][pydantic_ai.ag_ui.StateHandler] protocol.
        To provide different `deps` for each request (e.g. based on the authenticated user),
        use [`pydantic_ai.ag_ui.run_ag_ui`][pydantic_ai.ag_ui.run_ag_ui] or
        [`pydantic_ai.ag_ui.handle_ag_ui_request`][pydantic_ai.ag_ui.handle_ag_ui_request] instead.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')
        app = agent.to_ag_ui()
        ```

        The `app` is an ASGI application that can be used with any ASGI server.

        To run the application, you can use the following command:

        ```bash
        uvicorn app:app --host 0.0.0.0 --port 8000
        ```

        See [AG-UI docs](../ag-ui.md) for more information.

        Args:
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has
                no output validators since output validators would expect an argument that matches the agent's
                output type.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.

            debug: Boolean indicating if debug tracebacks should be returned on errors.
            routes: A list of routes to serve incoming HTTP and WebSocket requests.
            middleware: A list of middleware to run for every request. A starlette application will always
                automatically include two middleware classes. `ServerErrorMiddleware` is added as the very
                outermost middleware, to handle any uncaught errors occurring anywhere in the entire stack.
                `ExceptionMiddleware` is added as the very innermost middleware, to deal with handled
                exception cases occurring in the routing or endpoints.
            exception_handlers: A mapping of either integer status codes, or exception class types onto
                callables which handle the exceptions. Exception handler callables should be of the form
                `handler(request, exc) -> response` and may be either standard functions, or async functions.
            on_startup: A list of callables to run on application startup. Startup handler callables do not
                take any arguments, and may be either standard functions, or async functions.
            on_shutdown: A list of callables to run on application shutdown. Shutdown handler callables do
                not take any arguments, and may be either standard functions, or async functions.
            lifespan: A lifespan context function, which can be used to perform startup and shutdown tasks.
                This is a newer style that replaces the `on_startup` and `on_shutdown` handlers. Use one or
                the other, not both.

        Returns:
            An ASGI application for running Pydantic AI agents with AG-UI protocol support.
        """
        from ..ag_ui import AGUIApp

        return AGUIApp(
            agent=self,
            # Agent.iter parameters
            output_type=output_type,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=infer_name,
            toolsets=toolsets,
            # Starlette
            debug=debug,
            routes=routes,
            middleware=middleware,
            exception_handlers=exception_handlers,
            on_startup=on_startup,
            on_shutdown=on_shutdown,
            lifespan=lifespan,
        )

    def to_a2a(
        self,
        *,
        storage: Storage | None = None,
        broker: Broker | None = None,
        # Agent card
        name: str | None = None,
        url: str = 'https://:8000',
        version: str = '1.0.0',
        description: str | None = None,
        provider: AgentProvider | None = None,
        skills: list[Skill] | None = None,
        # Starlette
        debug: bool = False,
        routes: Sequence[Route] | None = None,
        middleware: Sequence[Middleware] | None = None,
        exception_handlers: dict[Any, ExceptionHandler] | None = None,
        lifespan: Lifespan[FastA2A] | None = None,
    ) -> FastA2A:
        """Convert the agent to a FastA2A application.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')
        app = agent.to_a2a()
        ```

        The `app` is an ASGI application that can be used with any ASGI server.

        To run the application, you can use the following command:

        ```bash
        uvicorn app:app --host 0.0.0.0 --port 8000
        ```
        """
        from .._a2a import agent_to_a2a

        return agent_to_a2a(
            self,
            storage=storage,
            broker=broker,
            name=name,
            url=url,
            version=version,
            description=description,
            provider=provider,
            skills=skills,
            debug=debug,
            routes=routes,
            middleware=middleware,
            exception_handlers=exception_handlers,
            lifespan=lifespan,
        )

    async def to_cli(
        self: Self,
        deps: AgentDepsT = None,
        prog_name: str = 'pydantic-ai',
        message_history: list[_messages.ModelMessage] | None = None,
    ) -> None:
        """Run the agent in a CLI chat interface.

        Args:
            deps: The dependencies to pass to the agent.
            prog_name: The name of the program to use for the CLI. Defaults to 'pydantic-ai'.
            message_history: History of the conversation so far.

        Example:
        ```python {title="agent_to_cli.py" test="skip"}
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.')

        async def main():
            await agent.to_cli()
        ```
        """
        from rich.console import Console

        from pydantic_ai._cli import run_chat

        await run_chat(
            stream=True,
            agent=self,
            deps=deps,
            console=Console(),
            code_theme='monokai',
            prog_name=prog_name,
            message_history=message_history,
        )

    def to_cli_sync(
        self: Self,
        deps: AgentDepsT = None,
        prog_name: str = 'pydantic-ai',
        message_history: list[_messages.ModelMessage] | None = None,
    ) -> None:
        """Run the agent in a CLI chat interface with the non-async interface.

        Args:
            deps: The dependencies to pass to the agent.
            prog_name: The name of the program to use for the CLI. Defaults to 'pydantic-ai'.
            message_history: History of the conversation so far.

        ```python {title="agent_to_cli_sync.py" test="skip"}
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.')
        agent.to_cli_sync()
        agent.to_cli_sync(prog_name='assistant')
        ```
        """
        return get_event_loop().run_until_complete(
            self.to_cli(deps=deps, prog_name=prog_name, message_history=message_history)
        )

model abstractmethod property

model: Model | KnownModelName | str | None

为此代理配置的默认模型。

name abstractmethod property writable

name: str | None

代理的名称,用于日志记录。

如果为 None,我们会在代理首次运行时尝试从调用帧中推断代理名称。

deps_type abstractmethod property

deps_type: type

代理使用的依赖项类型。

output_type abstractmethod property

output_type: OutputSpec[OutputDataT]

代理运行输出的数据类型,用于验证模型返回的数据,默认为 str

event_stream_handler abstractmethod property

event_stream_handler: EventStreamHandler[AgentDepsT] | None

可选的事件处理器,用于处理来自模型的流式响应和代理执行工具时产生的事件。

toolsets abstractmethod property

在代理上注册的所有工具集。

不包括输出工具。

run async

run(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[OutputDataT]
run(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[RunOutputDataT]
run(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[Any]

在异步模式下使用用户提示运行代理。

此方法构建一个内部代理图(使用系统提示、工具和输出模式),然后运行该图直到完成。返回运行的结果。

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    agent_run = await agent.run('What is the capital of France?')
    print(agent_run.output)
    #> The capital of France is Paris.

参数

名称 类型 描述 默认值
user_prompt str | Sequence[UserContent] | None

用于开始/继续对话的用户输入。

None
output_type OutputSpec[RunOutputDataT] | None

用于此运行的自定义输出类型,只有当代理没有输出验证器时才可以使用 output_type,因为输出验证器会期望一个与代理输出类型匹配的参数。

None
message_history list[ModelMessage] | None

到目前为止的对话历史。

None
deferred_tool_results DeferredToolResults | None

消息历史记录中延迟工具调用的可选结果。

None
model Model | KnownModelName | str | None

此运行可选使用的模型,如果在创建代理时未设置 model,则为必需项。

None
deps AgentDepsT

此运行可选使用的依赖项。

None
model_settings ModelSettings | None

用于此模型请求的可选设置。

None
usage_limits UsageLimits | None

模型请求次数或令牌使用量的可选限制。

None
usage RunUsage | None

可选的起始使用量,用于恢复对话或在工具中使用的代理。

None
infer_name bool

如果代理名称未设置,是否尝试从调用帧中推断。

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

此运行可选的附加工具集。

None
event_stream_handler EventStreamHandler[AgentDepsT] | None

可选的事件处理器,用于处理此运行中来自模型的流式响应和代理执行工具时产生的事件。

None

返回

类型 描述
AgentRunResult[Any]

运行的结果。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
async def run(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    deferred_tool_results: DeferredToolResults | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.RunUsage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
) -> AgentRunResult[Any]:
    """Run the agent with a user prompt in async mode.

    This method builds an internal agent graph (using system prompts, tools and output schemas) and then
    runs the graph to completion. The result of the run is returned.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        agent_run = await agent.run('What is the capital of France?')
        print(agent_run.output)
        #> The capital of France is Paris.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        deferred_tool_results: Optional results for deferred tool calls in the message history.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.
        event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.

    Returns:
        The result of the run.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    event_stream_handler = event_stream_handler or self.event_stream_handler

    async with self.iter(
        user_prompt=user_prompt,
        output_type=output_type,
        message_history=message_history,
        deferred_tool_results=deferred_tool_results,
        model=model,
        deps=deps,
        model_settings=model_settings,
        usage_limits=usage_limits,
        usage=usage,
        toolsets=toolsets,
    ) as agent_run:
        async for node in agent_run:
            if event_stream_handler is not None and (
                self.is_model_request_node(node) or self.is_call_tools_node(node)
            ):
                async with node.stream(agent_run.ctx) as stream:
                    await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream)

    assert agent_run.result is not None, 'The graph run did not finish properly'
    return agent_run.result

run_sync

run_sync(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[OutputDataT]
run_sync(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[RunOutputDataT]
run_sync(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AgentRunResult[Any]

同步运行带有用户提示的代理。

这是一个方便的方法,它用 loop.run_until_complete(...) 包装了 self.run。因此,您不能在异步代码内部或有活动事件循环时使用此方法。

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

result_sync = agent.run_sync('What is the capital of Italy?')
print(result_sync.output)
#> The capital of Italy is Rome.

参数

名称 类型 描述 默认值
user_prompt str | Sequence[UserContent] | None

用于开始/继续对话的用户输入。

None
output_type OutputSpec[RunOutputDataT] | None

用于此运行的自定义输出类型,只有当代理没有输出验证器时才可以使用 output_type,因为输出验证器会期望一个与代理输出类型匹配的参数。

None
message_history list[ModelMessage] | None

到目前为止的对话历史。

None
deferred_tool_results DeferredToolResults | None

消息历史记录中延迟工具调用的可选结果。

None
model Model | KnownModelName | str | None

此运行可选使用的模型,如果在创建代理时未设置 model,则为必需项。

None
deps AgentDepsT

此运行可选使用的依赖项。

None
model_settings ModelSettings | None

用于此模型请求的可选设置。

None
usage_limits UsageLimits | None

模型请求次数或令牌使用量的可选限制。

None
usage RunUsage | None

可选的起始使用量,用于恢复对话或在工具中使用的代理。

None
infer_name bool

如果代理名称未设置,是否尝试从调用帧中推断。

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

此运行可选的附加工具集。

None
event_stream_handler EventStreamHandler[AgentDepsT] | None

可选的事件处理器,用于处理此运行中来自模型的流式响应和代理执行工具时产生的事件。

None

返回

类型 描述
AgentRunResult[Any]

运行的结果。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def run_sync(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    deferred_tool_results: DeferredToolResults | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.RunUsage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
) -> AgentRunResult[Any]:
    """Synchronously run the agent with a user prompt.

    This is a convenience method that wraps [`self.run`][pydantic_ai.agent.AbstractAgent.run] with `loop.run_until_complete(...)`.
    You therefore can't use this method inside async code or if there's an active event loop.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    result_sync = agent.run_sync('What is the capital of Italy?')
    print(result_sync.output)
    #> The capital of Italy is Rome.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        deferred_tool_results: Optional results for deferred tool calls in the message history.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.
        event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.

    Returns:
        The result of the run.
    """
    if infer_name and self.name is None:
        self._infer_name(inspect.currentframe())

    return get_event_loop().run_until_complete(
        self.run(
            user_prompt,
            output_type=output_type,
            message_history=message_history,
            deferred_tool_results=deferred_tool_results,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=False,
            toolsets=toolsets,
            event_stream_handler=event_stream_handler,
        )
    )

run_stream async

run_stream(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AbstractAsyncContextManager[
    StreamedRunResult[AgentDepsT, OutputDataT]
]
run_stream(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AbstractAsyncContextManager[
    StreamedRunResult[AgentDepsT, RunOutputDataT]
]
run_stream(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    event_stream_handler: (
        EventStreamHandler[AgentDepsT] | None
    ) = None
) -> AsyncIterator[StreamedRunResult[AgentDepsT, Any]]

在异步流模式下使用用户提示运行代理。

此方法构建一个内部代理图(使用系统提示、工具和输出模式),然后运行该图,直到模型产生与 `output_type` 匹配的输出,例如文本或结构化数据。此时,将产生一个流式运行结果对象,您可以从中流式传输输出,并且——一旦此输出流完成——获取完整的输出、消息历史记录和使用情况。

由于此方法将第一个与 `output_type` 匹配的输出视为最终输出,它将停止运行代理图,并且不会执行模型在此“最终”输出之后进行的任何工具调用。如果您想始终将代理图运行到完成并同时流式传输事件和输出,请使用带有 `event_stream_handler` 的 agent.run()agent.iter()

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    async with agent.run_stream('What is the capital of the UK?') as response:
        print(await response.get_output())
        #> The capital of the UK is London.

参数

名称 类型 描述 默认值
user_prompt str | Sequence[UserContent] | None

用于开始/继续对话的用户输入。

None
output_type OutputSpec[RunOutputDataT] | None

用于此运行的自定义输出类型,只有当代理没有输出验证器时才可以使用 output_type,因为输出验证器会期望一个与代理输出类型匹配的参数。

None
message_history list[ModelMessage] | None

到目前为止的对话历史。

None
deferred_tool_results DeferredToolResults | None

消息历史记录中延迟工具调用的可选结果。

None
model Model | KnownModelName | str | None

此运行可选使用的模型,如果在创建代理时未设置 model,则为必需项。

None
deps AgentDepsT

此运行可选使用的依赖项。

None
model_settings ModelSettings | None

用于此模型请求的可选设置。

None
usage_limits UsageLimits | None

模型请求次数或令牌使用量的可选限制。

None
usage RunUsage | None

可选的起始使用量,用于恢复对话或在工具中使用的代理。

None
infer_name bool

如果代理名称未设置,是否尝试从调用帧中推断。

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

此运行可选的附加工具集。

None
event_stream_handler EventStreamHandler[AgentDepsT] | None

可选的事件处理器,用于处理此运行中来自模型的流式响应和代理执行工具时产生的事件。它将接收所有事件,直到找到最终结果,然后您可以在上下文管理器内部读取或流式传输最终结果。请注意,它*不会*接收到找到最终结果之后的任何事件。

None

返回

类型 描述
AsyncIterator[StreamedRunResult[AgentDepsT, Any]]

运行的结果。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
@asynccontextmanager
async def run_stream(  # noqa C901
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    deferred_tool_results: DeferredToolResults | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.RunUsage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    event_stream_handler: EventStreamHandler[AgentDepsT] | None = None,
) -> AsyncIterator[result.StreamedRunResult[AgentDepsT, Any]]:
    """Run the agent with a user prompt in async streaming mode.

    This method builds an internal agent graph (using system prompts, tools and output schemas) and then
    runs the graph until the model produces output matching the `output_type`, for example text or structured data.
    At this point, a streaming run result object is yielded from which you can stream the output as it comes in,
    and -- once this output has completed streaming -- get the complete output, message history, and usage.

    As this method will consider the first output matching the `output_type` to be the final output,
    it will stop running the agent graph and will not execute any tool calls made by the model after this "final" output.
    If you want to always run the agent graph to completion and stream events and output at the same time,
    use [`agent.run()`][pydantic_ai.agent.AbstractAgent.run] with an `event_stream_handler` or [`agent.iter()`][pydantic_ai.agent.AbstractAgent.iter] instead.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        async with agent.run_stream('What is the capital of the UK?') as response:
            print(await response.get_output())
            #> The capital of the UK is London.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        deferred_tool_results: Optional results for deferred tool calls in the message history.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.
        event_stream_handler: Optional handler for events from the model's streaming response and the agent's execution of tools to use for this run.
            It will receive all the events up until the final result is found, which you can then read or stream from inside the context manager.
            Note that it does _not_ receive any events after the final result is found.

    Returns:
        The result of the run.
    """
    if infer_name and self.name is None:
        # f_back because `asynccontextmanager` adds one frame
        if frame := inspect.currentframe():  # pragma: no branch
            self._infer_name(frame.f_back)

    event_stream_handler = event_stream_handler or self.event_stream_handler

    yielded = False
    async with self.iter(
        user_prompt,
        output_type=output_type,
        message_history=message_history,
        deferred_tool_results=deferred_tool_results,
        model=model,
        deps=deps,
        model_settings=model_settings,
        usage_limits=usage_limits,
        usage=usage,
        infer_name=False,
        toolsets=toolsets,
    ) as agent_run:
        first_node = agent_run.next_node  # start with the first node
        assert isinstance(first_node, _agent_graph.UserPromptNode)  # the first node should be a user prompt node
        node = first_node
        while True:
            graph_ctx = agent_run.ctx
            if self.is_model_request_node(node):
                async with node.stream(graph_ctx) as stream:
                    final_result_event = None

                    async def stream_to_final(
                        stream: AgentStream,
                    ) -> AsyncIterator[_messages.ModelResponseStreamEvent]:
                        nonlocal final_result_event
                        async for event in stream:
                            yield event
                            if isinstance(event, _messages.FinalResultEvent):
                                final_result_event = event
                                break

                    if event_stream_handler is not None:
                        await event_stream_handler(
                            _agent_graph.build_run_context(graph_ctx), stream_to_final(stream)
                        )
                    else:
                        async for _ in stream_to_final(stream):
                            pass

                    if final_result_event is not None:
                        final_result = FinalResult(
                            stream, final_result_event.tool_name, final_result_event.tool_call_id
                        )
                        if yielded:
                            raise exceptions.AgentRunError('Agent run produced final results')  # pragma: no cover
                        yielded = True

                        messages = graph_ctx.state.message_history.copy()

                        async def on_complete() -> None:
                            """Called when the stream has completed.

                            The model response will have been added to messages by now
                            by `StreamedRunResult._marked_completed`.
                            """
                            last_message = messages[-1]
                            assert isinstance(last_message, _messages.ModelResponse)
                            tool_calls = [
                                part for part in last_message.parts if isinstance(part, _messages.ToolCallPart)
                            ]

                            parts: list[_messages.ModelRequestPart] = []
                            async for _event in _agent_graph.process_function_tools(
                                graph_ctx.deps.tool_manager,
                                tool_calls,
                                final_result,
                                graph_ctx,
                                parts,
                            ):
                                pass
                            if parts:
                                messages.append(_messages.ModelRequest(parts))

                        yield StreamedRunResult(
                            messages,
                            graph_ctx.deps.new_message_index,
                            stream,
                            on_complete,
                        )
                        break
            elif self.is_call_tools_node(node) and event_stream_handler is not None:
                async with node.stream(agent_run.ctx) as stream:
                    await event_stream_handler(_agent_graph.build_run_context(agent_run.ctx), stream)

            next_node = await agent_run.next(node)
            if isinstance(next_node, End) and agent_run.result is not None:
                # A final output could have been produced by the CallToolsNode rather than the ModelRequestNode,
                # if a tool function raised CallDeferred or ApprovalRequired.
                # In this case there's no response to stream, but we still let the user access the output etc as normal.
                yield StreamedRunResult(
                    graph_ctx.state.message_history,
                    graph_ctx.deps.new_message_index,
                    run_result=agent_run.result,
                )
                yielded = True
                break
            if not isinstance(next_node, _agent_graph.AgentNode):
                raise exceptions.AgentRunError(  # pragma: no cover
                    'Should have produced a StreamedRunResult before getting here'
                )
            node = cast(_agent_graph.AgentNode[Any, Any], next_node)

    if not yielded:
        raise exceptions.AgentRunError('Agent run finished without producing a final result')  # pragma: no cover

iter abstractmethod async

iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, OutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, RunOutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]

一个上下文管理器,可用于在代理图的节点执行时对其进行迭代。

此方法构建一个内部代理图(使用系统提示、工具和输出模式),然后返回一个 AgentRun 对象。AgentRun 可用于在图的节点执行时异步迭代它们。如果您想消费来自每个 LLM 模型响应的输出,或来自工具执行的事件流,应使用此 API。

AgentRun 还提供方法来访问完整的消息历史记录、新消息、使用情况统计数据以及运行完成后的最终结果。

有关更多详细信息,请参阅 AgentRun 的文档。

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    nodes = []
    async with agent.iter('What is the capital of France?') as agent_run:
        async for node in agent_run:
            nodes.append(node)
    print(nodes)
    '''
    [
        UserPromptNode(
            user_prompt='What is the capital of France?',
            instructions=None,
            instructions_functions=[],
            system_prompts=(),
            system_prompt_functions=[],
            system_prompt_dynamic_functions={},
        ),
        ModelRequestNode(
            request=ModelRequest(
                parts=[
                    UserPromptPart(
                        content='What is the capital of France?',
                        timestamp=datetime.datetime(...),
                    )
                ]
            )
        ),
        CallToolsNode(
            model_response=ModelResponse(
                parts=[TextPart(content='The capital of France is Paris.')],
                usage=RequestUsage(input_tokens=56, output_tokens=7),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            )
        ),
        End(data=FinalResult(output='The capital of France is Paris.')),
    ]
    '''
    print(agent_run.result.output)
    #> The capital of France is Paris.

参数

名称 类型 描述 默认值
user_prompt str | Sequence[UserContent] | None

用于开始/继续对话的用户输入。

None
output_type OutputSpec[RunOutputDataT] | None

用于此运行的自定义输出类型,只有当代理没有输出验证器时才可以使用 output_type,因为输出验证器会期望一个与代理输出类型匹配的参数。

None
message_history list[ModelMessage] | None

到目前为止的对话历史。

None
deferred_tool_results DeferredToolResults | None

消息历史记录中延迟工具调用的可选结果。

None
model Model | KnownModelName | str | None

此运行可选使用的模型,如果在创建代理时未设置 model,则为必需项。

None
deps AgentDepsT

此运行可选使用的依赖项。

None
model_settings ModelSettings | None

用于此模型请求的可选设置。

None
usage_limits UsageLimits | None

模型请求次数或令牌使用量的可选限制。

None
usage RunUsage | None

可选的起始使用量,用于恢复对话或在工具中使用的代理。

None
infer_name bool

如果代理名称未设置,是否尝试从调用帧中推断。

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

此运行可选的附加工具集。

None

返回

类型 描述
AsyncIterator[AgentRun[AgentDepsT, Any]]

运行的结果。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
@asynccontextmanager
@abstractmethod
async def iter(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    deferred_tool_results: DeferredToolResults | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.RunUsage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
    """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

    This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
    `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
    executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
    stream of events coming from the execution of tools.

    The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
    and the final result of the run once it has completed.

    For more details, see the documentation of `AgentRun`.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        nodes = []
        async with agent.iter('What is the capital of France?') as agent_run:
            async for node in agent_run:
                nodes.append(node)
        print(nodes)
        '''
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                instructions=None,
                instructions_functions=[],
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                        )
                    ]
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='The capital of France is Paris.')],
                    usage=RequestUsage(input_tokens=56, output_tokens=7),
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                )
            ),
            End(data=FinalResult(output='The capital of France is Paris.')),
        ]
        '''
        print(agent_run.result.output)
        #> The capital of France is Paris.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        deferred_tool_results: Optional results for deferred tool calls in the message history.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.

    Returns:
        The result of the run.
    """
    raise NotImplementedError
    yield

override abstractmethod

override(
    *,
    deps: AgentDepsT | Unset = UNSET,
    model: Model | KnownModelName | str | Unset = UNSET,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | Unset
    ) = UNSET,
    tools: (
        Sequence[
            Tool[AgentDepsT]
            | ToolFuncEither[AgentDepsT, ...]
        ]
        | Unset
    ) = UNSET
) -> Iterator[None]

用于临时覆盖代理依赖项、模型、工具集或工具的上下文管理器。

这在测试时特别有用。您可以在此处找到一个示例。

参数

名称 类型 描述 默认值
deps AgentDepsT | Unset

用于替代传递给代理运行的依赖项的依赖项。

UNSET
model Model | KnownModelName | str | Unset

用于替代传递给代理运行的模型的模型。

UNSET
toolsets Sequence[AbstractToolset[AgentDepsT]] | Unset

用于替代传递给代理构造函数和代理运行的工具集的工具集。

UNSET
工具 Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | Unset

用于替代已向代理注册的工具的工具。

UNSET
源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
@contextmanager
@abstractmethod
def override(
    self,
    *,
    deps: AgentDepsT | _utils.Unset = _utils.UNSET,
    model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
    tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
) -> Iterator[None]:
    """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

    This is particularly useful when testing.
    You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

    Args:
        deps: The dependencies to use instead of the dependencies passed to the agent run.
        model: The model to use instead of the model passed to the agent run.
        toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
        tools: The tools to use instead of the tools registered with the agent.
    """
    raise NotImplementedError
    yield

is_model_request_node staticmethod

is_model_request_node(
    node: AgentNode[T, S] | End[FinalResult[S]],
) -> TypeIs[ModelRequestNode[T, S]]

检查节点是否为 ModelRequestNode,如果是,则收窄类型。

此方法在收窄类型的同时保留泛型参数,与直接调用 isinstance 不同。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
717
718
719
720
721
722
723
724
725
@staticmethod
def is_model_request_node(
    node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
) -> TypeIs[_agent_graph.ModelRequestNode[T, S]]:
    """Check if the node is a `ModelRequestNode`, narrowing the type if it is.

    This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
    """
    return isinstance(node, _agent_graph.ModelRequestNode)

is_call_tools_node staticmethod

is_call_tools_node(
    node: AgentNode[T, S] | End[FinalResult[S]],
) -> TypeIs[CallToolsNode[T, S]]

检查节点是否为 CallToolsNode,如果是,则收窄类型。

此方法在收窄类型的同时保留泛型参数,与直接调用 isinstance 不同。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
727
728
729
730
731
732
733
734
735
@staticmethod
def is_call_tools_node(
    node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
) -> TypeIs[_agent_graph.CallToolsNode[T, S]]:
    """Check if the node is a `CallToolsNode`, narrowing the type if it is.

    This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
    """
    return isinstance(node, _agent_graph.CallToolsNode)

is_user_prompt_node staticmethod

is_user_prompt_node(
    node: AgentNode[T, S] | End[FinalResult[S]],
) -> TypeIs[UserPromptNode[T, S]]

检查节点是否为 UserPromptNode,如果是,则收窄类型。

此方法在收窄类型的同时保留泛型参数,与直接调用 isinstance 不同。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
737
738
739
740
741
742
743
744
745
@staticmethod
def is_user_prompt_node(
    node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
) -> TypeIs[_agent_graph.UserPromptNode[T, S]]:
    """Check if the node is a `UserPromptNode`, narrowing the type if it is.

    This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
    """
    return isinstance(node, _agent_graph.UserPromptNode)

is_end_node staticmethod

is_end_node(
    node: AgentNode[T, S] | End[FinalResult[S]],
) -> TypeIs[End[FinalResult[S]]]

检查节点是否为 End,如果是,则收窄类型。

此方法在收窄类型的同时保留泛型参数,与直接调用 isinstance 不同。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
747
748
749
750
751
752
753
754
755
@staticmethod
def is_end_node(
    node: _agent_graph.AgentNode[T, S] | End[result.FinalResult[S]],
) -> TypeIs[End[result.FinalResult[S]]]:
    """Check if the node is a `End`, narrowing the type if it is.

    This method preserves the generic parameters while narrowing the type, unlike a direct call to `isinstance`.
    """
    return isinstance(node, End)

to_ag_ui

to_ag_ui(
    *,
    output_type: OutputSpec[OutputDataT] | None = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None,
    debug: bool = False,
    routes: Sequence[BaseRoute] | None = None,
    middleware: Sequence[Middleware] | None = None,
    exception_handlers: (
        Mapping[Any, ExceptionHandler] | None
    ) = None,
    on_startup: Sequence[Callable[[], Any]] | None = None,
    on_shutdown: Sequence[Callable[[], Any]] | None = None,
    lifespan: (
        Lifespan[AGUIApp[AgentDepsT, OutputDataT]] | None
    ) = None
) -> AGUIApp[AgentDepsT, OutputDataT]

返回一个 ASGI 应用程序,通过运行代理来处理每个 AG-UI 请求。

请注意,每个请求的 deps 将是相同的,但注入到实现 StateHandler 协议的 deps 对象的 state 字段中的 AG-UI 状态除外。要为每个请求提供不同的 deps(例如,基于已认证的用户),请改用 pydantic_ai.ag_ui.run_ag_uipydantic_ai.ag_ui.handle_ag_ui_request

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')
app = agent.to_ag_ui()

app 是一个可以与任何 ASGI 服务器一起使用的 ASGI 应用程序。

要运行该应用程序,您可以使用以下命令

uvicorn app:app --host 0.0.0.0 --port 8000

有关更多信息,请参阅 AG-UI 文档

参数

名称 类型 描述 默认值
output_type OutputSpec[OutputDataT] | None

用于此运行的自定义输出类型,只有当代理没有输出验证器时才可以使用 output_type,因为输出验证器会期望一个与代理输出类型匹配的参数。

None
model Model | KnownModelName | str | None

此运行可选使用的模型,如果在创建代理时未设置 model,则为必需项。

None
deps AgentDepsT

此运行可选使用的依赖项。

None
model_settings ModelSettings | None

用于此模型请求的可选设置。

None
usage_limits UsageLimits | None

模型请求次数或令牌使用量的可选限制。

None
usage RunUsage | None

可选的起始使用量,用于恢复对话或在工具中使用的代理。

None
infer_name bool

如果代理名称未设置,是否尝试从调用帧中推断。

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

此运行可选的附加工具集。

None
debug bool

布尔值,指示是否在出错时返回调试回溯信息。

False
routes Sequence[BaseRoute] | None

用于处理传入 HTTP 和 WebSocket 请求的路由列表。

None
middleware Sequence[Middleware] | None

为每个请求运行的中间件列表。一个 starlette 应用程序将总是自动包含两个中间件类。`ServerErrorMiddleware` 作为最外层的中间件添加,用于处理整个堆栈中任何未捕获的错误。`ExceptionMiddleware` 作为最内层的中间件添加,用于处理路由或端点中已处理的异常情况。

None
exception_handlers Mapping[Any, ExceptionHandler] | None

一个将整数状态码或异常类类型映射到处理异常的可调用对象的映射。异常处理程序可调用对象的形式应为 `handler(request, exc) -> response`,并且可以是标准函数或异步函数。

None
on_startup Sequence[Callable[[], Any]] | None

应用程序启动时运行的可调用对象列表。启动处理程序可调用对象不接受任何参数,并且可以是标准函数或异步函数。

None
on_shutdown Sequence[Callable[[], Any]] | None

应用程序关闭时运行的可调用对象列表。关闭处理程序可调用对象不接受任何参数,并且可以是标准函数或异步函数。

None
lifespan Lifespan[AGUIApp[AgentDepsT, OutputDataT]] | None

一个生命周期上下文函数,可用于执行启动和关闭任务。这是一种取代 `on_startup` 和 `on_shutdown` 处理程序的较新样式。请使用其中一种,而不是两者都用。

None

返回

类型 描述
AGUIApp[AgentDepsT, OutputDataT]

一个用于运行支持 AG-UI 协议的 Pydantic AI 代理的 ASGI 应用程序。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
def to_ag_ui(
    self,
    *,
    # Agent.iter parameters
    output_type: OutputSpec[OutputDataT] | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    # Starlette
    debug: bool = False,
    routes: Sequence[BaseRoute] | None = None,
    middleware: Sequence[Middleware] | None = None,
    exception_handlers: Mapping[Any, ExceptionHandler] | None = None,
    on_startup: Sequence[Callable[[], Any]] | None = None,
    on_shutdown: Sequence[Callable[[], Any]] | None = None,
    lifespan: Lifespan[AGUIApp[AgentDepsT, OutputDataT]] | None = None,
) -> AGUIApp[AgentDepsT, OutputDataT]:
    """Returns an ASGI application that handles every AG-UI request by running the agent.

    Note that the `deps` will be the same for each request, with the exception of the AG-UI state that's
    injected into the `state` field of a `deps` object that implements the [`StateHandler`][pydantic_ai.ag_ui.StateHandler] protocol.
    To provide different `deps` for each request (e.g. based on the authenticated user),
    use [`pydantic_ai.ag_ui.run_ag_ui`][pydantic_ai.ag_ui.run_ag_ui] or
    [`pydantic_ai.ag_ui.handle_ag_ui_request`][pydantic_ai.ag_ui.handle_ag_ui_request] instead.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')
    app = agent.to_ag_ui()
    ```

    The `app` is an ASGI application that can be used with any ASGI server.

    To run the application, you can use the following command:

    ```bash
    uvicorn app:app --host 0.0.0.0 --port 8000
    ```

    See [AG-UI docs](../ag-ui.md) for more information.

    Args:
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has
            no output validators since output validators would expect an argument that matches the agent's
            output type.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.

        debug: Boolean indicating if debug tracebacks should be returned on errors.
        routes: A list of routes to serve incoming HTTP and WebSocket requests.
        middleware: A list of middleware to run for every request. A starlette application will always
            automatically include two middleware classes. `ServerErrorMiddleware` is added as the very
            outermost middleware, to handle any uncaught errors occurring anywhere in the entire stack.
            `ExceptionMiddleware` is added as the very innermost middleware, to deal with handled
            exception cases occurring in the routing or endpoints.
        exception_handlers: A mapping of either integer status codes, or exception class types onto
            callables which handle the exceptions. Exception handler callables should be of the form
            `handler(request, exc) -> response` and may be either standard functions, or async functions.
        on_startup: A list of callables to run on application startup. Startup handler callables do not
            take any arguments, and may be either standard functions, or async functions.
        on_shutdown: A list of callables to run on application shutdown. Shutdown handler callables do
            not take any arguments, and may be either standard functions, or async functions.
        lifespan: A lifespan context function, which can be used to perform startup and shutdown tasks.
            This is a newer style that replaces the `on_startup` and `on_shutdown` handlers. Use one or
            the other, not both.

    Returns:
        An ASGI application for running Pydantic AI agents with AG-UI protocol support.
    """
    from ..ag_ui import AGUIApp

    return AGUIApp(
        agent=self,
        # Agent.iter parameters
        output_type=output_type,
        model=model,
        deps=deps,
        model_settings=model_settings,
        usage_limits=usage_limits,
        usage=usage,
        infer_name=infer_name,
        toolsets=toolsets,
        # Starlette
        debug=debug,
        routes=routes,
        middleware=middleware,
        exception_handlers=exception_handlers,
        on_startup=on_startup,
        on_shutdown=on_shutdown,
        lifespan=lifespan,
    )

to_a2a

to_a2a(
    *,
    storage: Storage | None = None,
    broker: Broker | None = None,
    name: str | None = None,
    url: str = "https://:8000",
    version: str = "1.0.0",
    description: str | None = None,
    provider: AgentProvider | None = None,
    skills: list[Skill] | None = None,
    debug: bool = False,
    routes: Sequence[Route] | None = None,
    middleware: Sequence[Middleware] | None = None,
    exception_handlers: (
        dict[Any, ExceptionHandler] | None
    ) = None,
    lifespan: Lifespan[FastA2A] | None = None
) -> FastA2A

将代理转换为 FastA2A 应用程序。

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')
app = agent.to_a2a()

app 是一个可以与任何 ASGI 服务器一起使用的 ASGI 应用程序。

要运行该应用程序,您可以使用以下命令

uvicorn app:app --host 0.0.0.0 --port 8000
源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
def to_a2a(
    self,
    *,
    storage: Storage | None = None,
    broker: Broker | None = None,
    # Agent card
    name: str | None = None,
    url: str = 'https://:8000',
    version: str = '1.0.0',
    description: str | None = None,
    provider: AgentProvider | None = None,
    skills: list[Skill] | None = None,
    # Starlette
    debug: bool = False,
    routes: Sequence[Route] | None = None,
    middleware: Sequence[Middleware] | None = None,
    exception_handlers: dict[Any, ExceptionHandler] | None = None,
    lifespan: Lifespan[FastA2A] | None = None,
) -> FastA2A:
    """Convert the agent to a FastA2A application.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')
    app = agent.to_a2a()
    ```

    The `app` is an ASGI application that can be used with any ASGI server.

    To run the application, you can use the following command:

    ```bash
    uvicorn app:app --host 0.0.0.0 --port 8000
    ```
    """
    from .._a2a import agent_to_a2a

    return agent_to_a2a(
        self,
        storage=storage,
        broker=broker,
        name=name,
        url=url,
        version=version,
        description=description,
        provider=provider,
        skills=skills,
        debug=debug,
        routes=routes,
        middleware=middleware,
        exception_handlers=exception_handlers,
        lifespan=lifespan,
    )

to_cli async

to_cli(
    deps: AgentDepsT = None,
    prog_name: str = "pydantic-ai",
    message_history: list[ModelMessage] | None = None,
) -> None

在 CLI 聊天界面中运行代理。

参数

名称 类型 描述 默认值
deps AgentDepsT

传递给代理的依赖项。

None
prog_name str

用于 CLI 的程序名称。默认为 'pydantic-ai'。

'pydantic-ai'
message_history list[ModelMessage] | None

到目前为止的对话历史。

None

示例

agent_to_cli.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.')

async def main():
    await agent.to_cli()

源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
async def to_cli(
    self: Self,
    deps: AgentDepsT = None,
    prog_name: str = 'pydantic-ai',
    message_history: list[_messages.ModelMessage] | None = None,
) -> None:
    """Run the agent in a CLI chat interface.

    Args:
        deps: The dependencies to pass to the agent.
        prog_name: The name of the program to use for the CLI. Defaults to 'pydantic-ai'.
        message_history: History of the conversation so far.

    Example:
    ```python {title="agent_to_cli.py" test="skip"}
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.')

    async def main():
        await agent.to_cli()
    ```
    """
    from rich.console import Console

    from pydantic_ai._cli import run_chat

    await run_chat(
        stream=True,
        agent=self,
        deps=deps,
        console=Console(),
        code_theme='monokai',
        prog_name=prog_name,
        message_history=message_history,
    )

to_cli_sync

to_cli_sync(
    deps: AgentDepsT = None,
    prog_name: str = "pydantic-ai",
    message_history: list[ModelMessage] | None = None,
) -> None

使用非异步接口在 CLI 聊天界面中运行代理。

参数

名称 类型 描述 默认值
deps AgentDepsT

传递给代理的依赖项。

None
prog_name str

用于 CLI 的程序名称。默认为 'pydantic-ai'。

'pydantic-ai'
message_history list[ModelMessage] | None

到目前为止的对话历史。

None
agent_to_cli_sync.py
from pydantic_ai import Agent

agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.')
agent.to_cli_sync()
agent.to_cli_sync(prog_name='assistant')
源代码位于 pydantic_ai_slim/pydantic_ai/agent/abstract.py
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
def to_cli_sync(
    self: Self,
    deps: AgentDepsT = None,
    prog_name: str = 'pydantic-ai',
    message_history: list[_messages.ModelMessage] | None = None,
) -> None:
    """Run the agent in a CLI chat interface with the non-async interface.

    Args:
        deps: The dependencies to pass to the agent.
        prog_name: The name of the program to use for the CLI. Defaults to 'pydantic-ai'.
        message_history: History of the conversation so far.

    ```python {title="agent_to_cli_sync.py" test="skip"}
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o', instructions='You always respond in Italian.')
    agent.to_cli_sync()
    agent.to_cli_sync(prog_name='assistant')
    ```
    """
    return get_event_loop().run_until_complete(
        self.to_cli(deps=deps, prog_name=prog_name, message_history=message_history)
    )

WrapperAgent

基类:AbstractAgent[AgentDepsT, OutputDataT]

包装另一个代理的代理。

本身不做任何事情,用作基类。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/wrapper.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
class WrapperAgent(AbstractAgent[AgentDepsT, OutputDataT]):
    """Agent which wraps another agent.

    Does nothing on its own, used as a base class.
    """

    def __init__(self, wrapped: AbstractAgent[AgentDepsT, OutputDataT]):
        self.wrapped = wrapped

    @property
    def model(self) -> models.Model | models.KnownModelName | str | None:
        return self.wrapped.model

    @property
    def name(self) -> str | None:
        return self.wrapped.name

    @name.setter
    def name(self, value: str | None) -> None:
        self.wrapped.name = value

    @property
    def deps_type(self) -> type:
        return self.wrapped.deps_type

    @property
    def output_type(self) -> OutputSpec[OutputDataT]:
        return self.wrapped.output_type

    @property
    def event_stream_handler(self) -> EventStreamHandler[AgentDepsT] | None:
        return self.wrapped.event_stream_handler

    @property
    def toolsets(self) -> Sequence[AbstractToolset[AgentDepsT]]:
        return self.wrapped.toolsets

    async def __aenter__(self) -> AbstractAgent[AgentDepsT, OutputDataT]:
        return await self.wrapped.__aenter__()

    async def __aexit__(self, *args: Any) -> bool | None:
        return await self.wrapped.__aexit__(*args)

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, OutputDataT]]: ...

    @overload
    def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT],
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AbstractAsyncContextManager[AgentRun[AgentDepsT, RunOutputDataT]]: ...

    @asynccontextmanager
    async def iter(
        self,
        user_prompt: str | Sequence[_messages.UserContent] | None = None,
        *,
        output_type: OutputSpec[RunOutputDataT] | None = None,
        message_history: list[_messages.ModelMessage] | None = None,
        deferred_tool_results: DeferredToolResults | None = None,
        model: models.Model | models.KnownModelName | str | None = None,
        deps: AgentDepsT = None,
        model_settings: ModelSettings | None = None,
        usage_limits: _usage.UsageLimits | None = None,
        usage: _usage.RunUsage | None = None,
        infer_name: bool = True,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
    ) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
        """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

        This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
        `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
        executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
        stream of events coming from the execution of tools.

        The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
        and the final result of the run once it has completed.

        For more details, see the documentation of `AgentRun`.

        Example:
        ```python
        from pydantic_ai import Agent

        agent = Agent('openai:gpt-4o')

        async def main():
            nodes = []
            async with agent.iter('What is the capital of France?') as agent_run:
                async for node in agent_run:
                    nodes.append(node)
            print(nodes)
            '''
            [
                UserPromptNode(
                    user_prompt='What is the capital of France?',
                    instructions=None,
                    instructions_functions=[],
                    system_prompts=(),
                    system_prompt_functions=[],
                    system_prompt_dynamic_functions={},
                ),
                ModelRequestNode(
                    request=ModelRequest(
                        parts=[
                            UserPromptPart(
                                content='What is the capital of France?',
                                timestamp=datetime.datetime(...),
                            )
                        ]
                    )
                ),
                CallToolsNode(
                    model_response=ModelResponse(
                        parts=[TextPart(content='The capital of France is Paris.')],
                        usage=RequestUsage(input_tokens=56, output_tokens=7),
                        model_name='gpt-4o',
                        timestamp=datetime.datetime(...),
                    )
                ),
                End(data=FinalResult(output='The capital of France is Paris.')),
            ]
            '''
            print(agent_run.result.output)
            #> The capital of France is Paris.
        ```

        Args:
            user_prompt: User input to start/continue the conversation.
            output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
                output validators since output validators would expect an argument that matches the agent's output type.
            message_history: History of the conversation so far.
            deferred_tool_results: Optional results for deferred tool calls in the message history.
            model: Optional model to use for this run, required if `model` was not set when creating the agent.
            deps: Optional dependencies to use for this run.
            model_settings: Optional settings to use for this model's request.
            usage_limits: Optional limits on model request count or token usage.
            usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
            infer_name: Whether to try to infer the agent name from the call frame if it's not set.
            toolsets: Optional additional toolsets for this run.

        Returns:
            The result of the run.
        """
        async with self.wrapped.iter(
            user_prompt=user_prompt,
            output_type=output_type,
            message_history=message_history,
            deferred_tool_results=deferred_tool_results,
            model=model,
            deps=deps,
            model_settings=model_settings,
            usage_limits=usage_limits,
            usage=usage,
            infer_name=infer_name,
            toolsets=toolsets,
        ) as run:
            yield run

    @contextmanager
    def override(
        self,
        *,
        deps: AgentDepsT | _utils.Unset = _utils.UNSET,
        model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
        toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
        tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
    ) -> Iterator[None]:
        """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

        This is particularly useful when testing.
        You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

        Args:
            deps: The dependencies to use instead of the dependencies passed to the agent run.
            model: The model to use instead of the model passed to the agent run.
            toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
            tools: The tools to use instead of the tools registered with the agent.
        """
        with self.wrapped.override(deps=deps, model=model, toolsets=toolsets, tools=tools):
            yield

iter async

iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, OutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT],
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AbstractAsyncContextManager[
    AgentRun[AgentDepsT, RunOutputDataT]
]
iter(
    user_prompt: str | Sequence[UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[ModelMessage] | None = None,
    deferred_tool_results: (
        DeferredToolResults | None
    ) = None,
    model: Model | KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: UsageLimits | None = None,
    usage: RunUsage | None = None,
    infer_name: bool = True,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | None
    ) = None
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]

一个上下文管理器,可用于在代理图的节点执行时对其进行迭代。

此方法构建一个内部代理图(使用系统提示、工具和输出模式),然后返回一个 AgentRun 对象。AgentRun 可用于在图的节点执行时异步迭代它们。如果您想消费来自每个 LLM 模型响应的输出,或来自工具执行的事件流,应使用此 API。

AgentRun 还提供方法来访问完整的消息历史记录、新消息、使用情况统计数据以及运行完成后的最终结果。

有关更多详细信息,请参阅 AgentRun 的文档。

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    nodes = []
    async with agent.iter('What is the capital of France?') as agent_run:
        async for node in agent_run:
            nodes.append(node)
    print(nodes)
    '''
    [
        UserPromptNode(
            user_prompt='What is the capital of France?',
            instructions=None,
            instructions_functions=[],
            system_prompts=(),
            system_prompt_functions=[],
            system_prompt_dynamic_functions={},
        ),
        ModelRequestNode(
            request=ModelRequest(
                parts=[
                    UserPromptPart(
                        content='What is the capital of France?',
                        timestamp=datetime.datetime(...),
                    )
                ]
            )
        ),
        CallToolsNode(
            model_response=ModelResponse(
                parts=[TextPart(content='The capital of France is Paris.')],
                usage=RequestUsage(input_tokens=56, output_tokens=7),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            )
        ),
        End(data=FinalResult(output='The capital of France is Paris.')),
    ]
    '''
    print(agent_run.result.output)
    #> The capital of France is Paris.

参数

名称 类型 描述 默认值
user_prompt str | Sequence[UserContent] | None

用于开始/继续对话的用户输入。

None
output_type OutputSpec[RunOutputDataT] | None

用于此运行的自定义输出类型,只有当代理没有输出验证器时才可以使用 output_type,因为输出验证器会期望一个与代理输出类型匹配的参数。

None
message_history list[ModelMessage] | None

到目前为止的对话历史。

None
deferred_tool_results DeferredToolResults | None

消息历史记录中延迟工具调用的可选结果。

None
model Model | KnownModelName | str | None

此运行可选使用的模型,如果在创建代理时未设置 model,则为必需项。

None
deps AgentDepsT

此运行可选使用的依赖项。

None
model_settings ModelSettings | None

用于此模型请求的可选设置。

None
usage_limits UsageLimits | None

模型请求次数或令牌使用量的可选限制。

None
usage RunUsage | None

可选的起始使用量,用于恢复对话或在工具中使用的代理。

None
infer_name bool

如果代理名称未设置,是否尝试从调用帧中推断。

True
toolsets Sequence[AbstractToolset[AgentDepsT]] | None

此运行可选的附加工具集。

None

返回

类型 描述
AsyncIterator[AgentRun[AgentDepsT, Any]]

运行的结果。

源代码位于 pydantic_ai_slim/pydantic_ai/agent/wrapper.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
@asynccontextmanager
async def iter(
    self,
    user_prompt: str | Sequence[_messages.UserContent] | None = None,
    *,
    output_type: OutputSpec[RunOutputDataT] | None = None,
    message_history: list[_messages.ModelMessage] | None = None,
    deferred_tool_results: DeferredToolResults | None = None,
    model: models.Model | models.KnownModelName | str | None = None,
    deps: AgentDepsT = None,
    model_settings: ModelSettings | None = None,
    usage_limits: _usage.UsageLimits | None = None,
    usage: _usage.RunUsage | None = None,
    infer_name: bool = True,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | None = None,
) -> AsyncIterator[AgentRun[AgentDepsT, Any]]:
    """A contextmanager which can be used to iterate over the agent graph's nodes as they are executed.

    This method builds an internal agent graph (using system prompts, tools and output schemas) and then returns an
    `AgentRun` object. The `AgentRun` can be used to async-iterate over the nodes of the graph as they are
    executed. This is the API to use if you want to consume the outputs coming from each LLM model response, or the
    stream of events coming from the execution of tools.

    The `AgentRun` also provides methods to access the full message history, new messages, and usage statistics,
    and the final result of the run once it has completed.

    For more details, see the documentation of `AgentRun`.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        nodes = []
        async with agent.iter('What is the capital of France?') as agent_run:
            async for node in agent_run:
                nodes.append(node)
        print(nodes)
        '''
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                instructions=None,
                instructions_functions=[],
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                        )
                    ]
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='The capital of France is Paris.')],
                    usage=RequestUsage(input_tokens=56, output_tokens=7),
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                )
            ),
            End(data=FinalResult(output='The capital of France is Paris.')),
        ]
        '''
        print(agent_run.result.output)
        #> The capital of France is Paris.
    ```

    Args:
        user_prompt: User input to start/continue the conversation.
        output_type: Custom output type to use for this run, `output_type` may only be used if the agent has no
            output validators since output validators would expect an argument that matches the agent's output type.
        message_history: History of the conversation so far.
        deferred_tool_results: Optional results for deferred tool calls in the message history.
        model: Optional model to use for this run, required if `model` was not set when creating the agent.
        deps: Optional dependencies to use for this run.
        model_settings: Optional settings to use for this model's request.
        usage_limits: Optional limits on model request count or token usage.
        usage: Optional usage to start with, useful for resuming a conversation or agents used in tools.
        infer_name: Whether to try to infer the agent name from the call frame if it's not set.
        toolsets: Optional additional toolsets for this run.

    Returns:
        The result of the run.
    """
    async with self.wrapped.iter(
        user_prompt=user_prompt,
        output_type=output_type,
        message_history=message_history,
        deferred_tool_results=deferred_tool_results,
        model=model,
        deps=deps,
        model_settings=model_settings,
        usage_limits=usage_limits,
        usage=usage,
        infer_name=infer_name,
        toolsets=toolsets,
    ) as run:
        yield run

重写

override(
    *,
    deps: AgentDepsT | Unset = UNSET,
    model: Model | KnownModelName | str | Unset = UNSET,
    toolsets: (
        Sequence[AbstractToolset[AgentDepsT]] | Unset
    ) = UNSET,
    tools: (
        Sequence[
            Tool[AgentDepsT]
            | ToolFuncEither[AgentDepsT, ...]
        ]
        | Unset
    ) = UNSET
) -> Iterator[None]

用于临时覆盖代理依赖项、模型、工具集或工具的上下文管理器。

这在测试时特别有用。您可以在此处找到一个示例。

参数

名称 类型 描述 默认值
deps AgentDepsT | Unset

用于替代传递给代理运行的依赖项的依赖项。

UNSET
model Model | KnownModelName | str | Unset

用于替代传递给代理运行的模型的模型。

UNSET
toolsets Sequence[AbstractToolset[AgentDepsT]] | Unset

用于替代传递给代理构造函数和代理运行的工具集的工具集。

UNSET
工具 Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | Unset

用于替代已向代理注册的工具的工具。

UNSET
源代码位于 pydantic_ai_slim/pydantic_ai/agent/wrapper.py
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
@contextmanager
def override(
    self,
    *,
    deps: AgentDepsT | _utils.Unset = _utils.UNSET,
    model: models.Model | models.KnownModelName | str | _utils.Unset = _utils.UNSET,
    toolsets: Sequence[AbstractToolset[AgentDepsT]] | _utils.Unset = _utils.UNSET,
    tools: Sequence[Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]] | _utils.Unset = _utils.UNSET,
) -> Iterator[None]:
    """Context manager to temporarily override agent dependencies, model, toolsets, or tools.

    This is particularly useful when testing.
    You can find an example of this [here](../testing.md#overriding-model-via-pytest-fixtures).

    Args:
        deps: The dependencies to use instead of the dependencies passed to the agent run.
        model: The model to use instead of the model passed to the agent run.
        toolsets: The toolsets to use instead of the toolsets passed to the agent constructor and agent run.
        tools: The tools to use instead of the tools registered with the agent.
    """
    with self.wrapped.override(deps=deps, model=model, toolsets=toolsets, tools=tools):
        yield

AgentRun dataclass

基类:Generic[AgentDepsT, OutputDataT]

一个 Agent 的有状态、可异步迭代的运行。

您通常通过调用 async with my_agent.iter(...) as agent_run: 来获取一个 AgentRun 实例。

一旦您有了一个实例,您就可以用它来遍历运行中的节点,因为它们正在执行。当达到一个 End 时,运行结束,并且 result 变得可用。

示例

from pydantic_ai import Agent

agent = Agent('openai:gpt-4o')

async def main():
    nodes = []
    # Iterate through the run, recording each node along the way:
    async with agent.iter('What is the capital of France?') as agent_run:
        async for node in agent_run:
            nodes.append(node)
    print(nodes)
    '''
    [
        UserPromptNode(
            user_prompt='What is the capital of France?',
            instructions=None,
            instructions_functions=[],
            system_prompts=(),
            system_prompt_functions=[],
            system_prompt_dynamic_functions={},
        ),
        ModelRequestNode(
            request=ModelRequest(
                parts=[
                    UserPromptPart(
                        content='What is the capital of France?',
                        timestamp=datetime.datetime(...),
                    )
                ]
            )
        ),
        CallToolsNode(
            model_response=ModelResponse(
                parts=[TextPart(content='The capital of France is Paris.')],
                usage=RequestUsage(input_tokens=56, output_tokens=7),
                model_name='gpt-4o',
                timestamp=datetime.datetime(...),
            )
        ),
        End(data=FinalResult(output='The capital of France is Paris.')),
    ]
    '''
    print(agent_run.result.output)
    #> The capital of France is Paris.

您还可以使用 next 方法手动驱动迭代,以实现更精细的控制。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
@dataclasses.dataclass(repr=False)
class AgentRun(Generic[AgentDepsT, OutputDataT]):
    """A stateful, async-iterable run of an [`Agent`][pydantic_ai.agent.Agent].

    You generally obtain an `AgentRun` instance by calling `async with my_agent.iter(...) as agent_run:`.

    Once you have an instance, you can use it to iterate through the run's nodes as they execute. When an
    [`End`][pydantic_graph.nodes.End] is reached, the run finishes and [`result`][pydantic_ai.agent.AgentRun.result]
    becomes available.

    Example:
    ```python
    from pydantic_ai import Agent

    agent = Agent('openai:gpt-4o')

    async def main():
        nodes = []
        # Iterate through the run, recording each node along the way:
        async with agent.iter('What is the capital of France?') as agent_run:
            async for node in agent_run:
                nodes.append(node)
        print(nodes)
        '''
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                instructions=None,
                instructions_functions=[],
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                        )
                    ]
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='The capital of France is Paris.')],
                    usage=RequestUsage(input_tokens=56, output_tokens=7),
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                )
            ),
            End(data=FinalResult(output='The capital of France is Paris.')),
        ]
        '''
        print(agent_run.result.output)
        #> The capital of France is Paris.
    ```

    You can also manually drive the iteration using the [`next`][pydantic_ai.agent.AgentRun.next] method for
    more granular control.
    """

    _graph_run: GraphRun[
        _agent_graph.GraphAgentState, _agent_graph.GraphAgentDeps[AgentDepsT, Any], FinalResult[OutputDataT]
    ]

    @overload
    def _traceparent(self, *, required: Literal[False]) -> str | None: ...
    @overload
    def _traceparent(self) -> str: ...
    def _traceparent(self, *, required: bool = True) -> str | None:
        traceparent = self._graph_run._traceparent(required=False)  # type: ignore[reportPrivateUsage]
        if traceparent is None and required:  # pragma: no cover
            raise AttributeError('No span was created for this agent run')
        return traceparent

    @property
    def ctx(self) -> GraphRunContext[_agent_graph.GraphAgentState, _agent_graph.GraphAgentDeps[AgentDepsT, Any]]:
        """The current context of the agent run."""
        return GraphRunContext[_agent_graph.GraphAgentState, _agent_graph.GraphAgentDeps[AgentDepsT, Any]](
            state=self._graph_run.state, deps=self._graph_run.deps
        )

    @property
    def next_node(
        self,
    ) -> _agent_graph.AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]:
        """The next node that will be run in the agent graph.

        This is the next node that will be used during async iteration, or if a node is not passed to `self.next(...)`.
        """
        next_node = self._graph_run.next_node
        if isinstance(next_node, End):
            return next_node
        if _agent_graph.is_agent_node(next_node):
            return next_node
        raise exceptions.AgentRunError(f'Unexpected node type: {type(next_node)}')  # pragma: no cover

    @property
    def result(self) -> AgentRunResult[OutputDataT] | None:
        """The final result of the run if it has ended, otherwise `None`.

        Once the run returns an [`End`][pydantic_graph.nodes.End] node, `result` is populated
        with an [`AgentRunResult`][pydantic_ai.agent.AgentRunResult].
        """
        graph_run_result = self._graph_run.result
        if graph_run_result is None:
            return None
        return AgentRunResult(
            graph_run_result.output.output,
            graph_run_result.output.tool_name,
            graph_run_result.state,
            self._graph_run.deps.new_message_index,
            self._traceparent(required=False),
        )

    def __aiter__(
        self,
    ) -> AsyncIterator[_agent_graph.AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]]:
        """Provide async-iteration over the nodes in the agent run."""
        return self

    async def __anext__(
        self,
    ) -> _agent_graph.AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]:
        """Advance to the next node automatically based on the last returned node."""
        next_node = await self._graph_run.__anext__()
        if _agent_graph.is_agent_node(node=next_node):
            return next_node
        assert isinstance(next_node, End), f'Unexpected node type: {type(next_node)}'
        return next_node

    async def next(
        self,
        node: _agent_graph.AgentNode[AgentDepsT, OutputDataT],
    ) -> _agent_graph.AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]:
        """Manually drive the agent run by passing in the node you want to run next.

        This lets you inspect or mutate the node before continuing execution, or skip certain nodes
        under dynamic conditions. The agent run should be stopped when you return an [`End`][pydantic_graph.nodes.End]
        node.

        Example:
        ```python
        from pydantic_ai import Agent
        from pydantic_graph import End

        agent = Agent('openai:gpt-4o')

        async def main():
            async with agent.iter('What is the capital of France?') as agent_run:
                next_node = agent_run.next_node  # start with the first node
                nodes = [next_node]
                while not isinstance(next_node, End):
                    next_node = await agent_run.next(next_node)
                    nodes.append(next_node)
                # Once `next_node` is an End, we've finished:
                print(nodes)
                '''
                [
                    UserPromptNode(
                        user_prompt='What is the capital of France?',
                        instructions=None,
                        instructions_functions=[],
                        system_prompts=(),
                        system_prompt_functions=[],
                        system_prompt_dynamic_functions={},
                    ),
                    ModelRequestNode(
                        request=ModelRequest(
                            parts=[
                                UserPromptPart(
                                    content='What is the capital of France?',
                                    timestamp=datetime.datetime(...),
                                )
                            ]
                        )
                    ),
                    CallToolsNode(
                        model_response=ModelResponse(
                            parts=[TextPart(content='The capital of France is Paris.')],
                            usage=RequestUsage(input_tokens=56, output_tokens=7),
                            model_name='gpt-4o',
                            timestamp=datetime.datetime(...),
                        )
                    ),
                    End(data=FinalResult(output='The capital of France is Paris.')),
                ]
                '''
                print('Final result:', agent_run.result.output)
                #> Final result: The capital of France is Paris.
        ```

        Args:
            node: The node to run next in the graph.

        Returns:
            The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if
            the run has completed.
        """
        # Note: It might be nice to expose a synchronous interface for iteration, but we shouldn't do it
        # on this class, or else IDEs won't warn you if you accidentally use `for` instead of `async for` to iterate.
        next_node = await self._graph_run.next(node)
        if _agent_graph.is_agent_node(next_node):
            return next_node
        assert isinstance(next_node, End), f'Unexpected node type: {type(next_node)}'
        return next_node

    def usage(self) -> _usage.RunUsage:
        """Get usage statistics for the run so far, including token usage, model requests, and so on."""
        return self._graph_run.state.usage

    def __repr__(self) -> str:  # pragma: no cover
        result = self._graph_run.result
        result_repr = '<run not finished>' if result is None else repr(result.output)
        return f'<{type(self).__name__} result={result_repr} usage={self.usage()}>'

ctx property

ctx: GraphRunContext[
    GraphAgentState, GraphAgentDeps[AgentDepsT, Any]
]

代理运行的当前上下文。

next_node property

next_node: (
    AgentNode[AgentDepsT, OutputDataT]
    | End[FinalResult[OutputDataT]]
)

将在代理图中运行的下一个节点。

这是在异步迭代期间将使用的下一个节点,或者如果没有节点传递给 self.next(...)

result property

result: AgentRunResult[OutputDataT] | None

如果运行已结束,则为运行的最终结果,否则为 None

一旦运行返回一个 End 节点,result 将被填充一个 AgentRunResult

__aiter__

__aiter__() -> (
    AsyncIterator[
        AgentNode[AgentDepsT, OutputDataT]
        | End[FinalResult[OutputDataT]]
    ]
)

提供对代理运行中节点的异步迭代。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
140
141
142
143
144
def __aiter__(
    self,
) -> AsyncIterator[_agent_graph.AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]]:
    """Provide async-iteration over the nodes in the agent run."""
    return self

__anext__ async

__anext__() -> (
    AgentNode[AgentDepsT, OutputDataT]
    | End[FinalResult[OutputDataT]]
)

根据最后一个返回的节点自动前进到下一个节点。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
146
147
148
149
150
151
152
153
154
async def __anext__(
    self,
) -> _agent_graph.AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]:
    """Advance to the next node automatically based on the last returned node."""
    next_node = await self._graph_run.__anext__()
    if _agent_graph.is_agent_node(node=next_node):
        return next_node
    assert isinstance(next_node, End), f'Unexpected node type: {type(next_node)}'
    return next_node

next async

next(
    node: AgentNode[AgentDepsT, OutputDataT],
) -> (
    AgentNode[AgentDepsT, OutputDataT]
    | End[FinalResult[OutputDataT]]
)

通过传入您想接下来运行的节点来手动驱动代理运行。

这使您可以在继续执行之前检查或修改节点,或在动态条件下跳过某些节点。当您返回一个 End 节点时,应停止代理运行。

示例

from pydantic_ai import Agent
from pydantic_graph import End

agent = Agent('openai:gpt-4o')

async def main():
    async with agent.iter('What is the capital of France?') as agent_run:
        next_node = agent_run.next_node  # start with the first node
        nodes = [next_node]
        while not isinstance(next_node, End):
            next_node = await agent_run.next(next_node)
            nodes.append(next_node)
        # Once `next_node` is an End, we've finished:
        print(nodes)
        '''
        [
            UserPromptNode(
                user_prompt='What is the capital of France?',
                instructions=None,
                instructions_functions=[],
                system_prompts=(),
                system_prompt_functions=[],
                system_prompt_dynamic_functions={},
            ),
            ModelRequestNode(
                request=ModelRequest(
                    parts=[
                        UserPromptPart(
                            content='What is the capital of France?',
                            timestamp=datetime.datetime(...),
                        )
                    ]
                )
            ),
            CallToolsNode(
                model_response=ModelResponse(
                    parts=[TextPart(content='The capital of France is Paris.')],
                    usage=RequestUsage(input_tokens=56, output_tokens=7),
                    model_name='gpt-4o',
                    timestamp=datetime.datetime(...),
                )
            ),
            End(data=FinalResult(output='The capital of France is Paris.')),
        ]
        '''
        print('Final result:', agent_run.result.output)
        #> Final result: The capital of France is Paris.

参数

名称 类型 描述 默认值
node AgentNode[AgentDepsT, OutputDataT]

在图中接下来要运行的节点。

必需

返回

类型 描述
AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]

图逻辑返回的下一个节点,如果

AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]

运行已完成,则为一个 End 节点。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def next(
    self,
    node: _agent_graph.AgentNode[AgentDepsT, OutputDataT],
) -> _agent_graph.AgentNode[AgentDepsT, OutputDataT] | End[FinalResult[OutputDataT]]:
    """Manually drive the agent run by passing in the node you want to run next.

    This lets you inspect or mutate the node before continuing execution, or skip certain nodes
    under dynamic conditions. The agent run should be stopped when you return an [`End`][pydantic_graph.nodes.End]
    node.

    Example:
    ```python
    from pydantic_ai import Agent
    from pydantic_graph import End

    agent = Agent('openai:gpt-4o')

    async def main():
        async with agent.iter('What is the capital of France?') as agent_run:
            next_node = agent_run.next_node  # start with the first node
            nodes = [next_node]
            while not isinstance(next_node, End):
                next_node = await agent_run.next(next_node)
                nodes.append(next_node)
            # Once `next_node` is an End, we've finished:
            print(nodes)
            '''
            [
                UserPromptNode(
                    user_prompt='What is the capital of France?',
                    instructions=None,
                    instructions_functions=[],
                    system_prompts=(),
                    system_prompt_functions=[],
                    system_prompt_dynamic_functions={},
                ),
                ModelRequestNode(
                    request=ModelRequest(
                        parts=[
                            UserPromptPart(
                                content='What is the capital of France?',
                                timestamp=datetime.datetime(...),
                            )
                        ]
                    )
                ),
                CallToolsNode(
                    model_response=ModelResponse(
                        parts=[TextPart(content='The capital of France is Paris.')],
                        usage=RequestUsage(input_tokens=56, output_tokens=7),
                        model_name='gpt-4o',
                        timestamp=datetime.datetime(...),
                    )
                ),
                End(data=FinalResult(output='The capital of France is Paris.')),
            ]
            '''
            print('Final result:', agent_run.result.output)
            #> Final result: The capital of France is Paris.
    ```

    Args:
        node: The node to run next in the graph.

    Returns:
        The next node returned by the graph logic, or an [`End`][pydantic_graph.nodes.End] node if
        the run has completed.
    """
    # Note: It might be nice to expose a synchronous interface for iteration, but we shouldn't do it
    # on this class, or else IDEs won't warn you if you accidentally use `for` instead of `async for` to iterate.
    next_node = await self._graph_run.next(node)
    if _agent_graph.is_agent_node(next_node):
        return next_node
    assert isinstance(next_node, End), f'Unexpected node type: {type(next_node)}'
    return next_node

usage

usage() -> RunUsage

获取到目前为止的运行使用情况统计信息,包括令牌使用量、模型请求次数等。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
232
233
234
def usage(self) -> _usage.RunUsage:
    """Get usage statistics for the run so far, including token usage, model requests, and so on."""
    return self._graph_run.state.usage

AgentRunResult dataclass

基类:Generic[OutputDataT]

代理运行的最终结果。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
@dataclasses.dataclass
class AgentRunResult(Generic[OutputDataT]):
    """The final result of an agent run."""

    output: OutputDataT
    """The output data from the agent run."""

    _output_tool_name: str | None = dataclasses.field(repr=False)
    _state: _agent_graph.GraphAgentState = dataclasses.field(repr=False)
    _new_message_index: int = dataclasses.field(repr=False)
    _traceparent_value: str | None = dataclasses.field(repr=False)

    @overload
    def _traceparent(self, *, required: Literal[False]) -> str | None: ...
    @overload
    def _traceparent(self) -> str: ...
    def _traceparent(self, *, required: bool = True) -> str | None:
        if self._traceparent_value is None and required:  # pragma: no cover
            raise AttributeError('No span was created for this agent run')
        return self._traceparent_value

    def _set_output_tool_return(self, return_content: str) -> list[_messages.ModelMessage]:
        """Set return content for the output tool.

        Useful if you want to continue the conversation and want to set the response to the output tool call.
        """
        if not self._output_tool_name:
            raise ValueError('Cannot set output tool return content when the return type is `str`.')

        messages = self._state.message_history
        last_message = messages[-1]
        for idx, part in enumerate(last_message.parts):
            if isinstance(part, _messages.ToolReturnPart) and part.tool_name == self._output_tool_name:
                # Only do deepcopy when we have to modify
                copied_messages = list(messages)
                copied_last = deepcopy(last_message)
                copied_last.parts[idx].content = return_content  # type: ignore[misc]
                copied_messages[-1] = copied_last
                return copied_messages

        raise LookupError(f'No tool call found with tool name {self._output_tool_name!r}.')

    def all_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
        """Return the history of _messages.

        Args:
            output_tool_return_content: The return content of the tool call to set in the last message.
                This provides a convenient way to modify the content of the output tool call if you want to continue
                the conversation and want to set the response to the output tool call. If `None`, the last message will
                not be modified.

        Returns:
            List of messages.
        """
        if output_tool_return_content is not None:
            return self._set_output_tool_return(output_tool_return_content)
        else:
            return self._state.message_history

    def all_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes:
        """Return all messages from [`all_messages`][pydantic_ai.agent.AgentRunResult.all_messages] as JSON bytes.

        Args:
            output_tool_return_content: The return content of the tool call to set in the last message.
                This provides a convenient way to modify the content of the output tool call if you want to continue
                the conversation and want to set the response to the output tool call. If `None`, the last message will
                not be modified.

        Returns:
            JSON bytes representing the messages.
        """
        return _messages.ModelMessagesTypeAdapter.dump_json(
            self.all_messages(output_tool_return_content=output_tool_return_content)
        )

    def new_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
        """Return new messages associated with this run.

        Messages from older runs are excluded.

        Args:
            output_tool_return_content: The return content of the tool call to set in the last message.
                This provides a convenient way to modify the content of the output tool call if you want to continue
                the conversation and want to set the response to the output tool call. If `None`, the last message will
                not be modified.

        Returns:
            List of new messages.
        """
        return self.all_messages(output_tool_return_content=output_tool_return_content)[self._new_message_index :]

    def new_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes:
        """Return new messages from [`new_messages`][pydantic_ai.agent.AgentRunResult.new_messages] as JSON bytes.

        Args:
            output_tool_return_content: The return content of the tool call to set in the last message.
                This provides a convenient way to modify the content of the output tool call if you want to continue
                the conversation and want to set the response to the output tool call. If `None`, the last message will
                not be modified.

        Returns:
            JSON bytes representing the new messages.
        """
        return _messages.ModelMessagesTypeAdapter.dump_json(
            self.new_messages(output_tool_return_content=output_tool_return_content)
        )

    def usage(self) -> _usage.RunUsage:
        """Return the usage of the whole run."""
        return self._state.usage

    def timestamp(self) -> datetime:
        """Return the timestamp of last response."""
        model_response = self.all_messages()[-1]
        assert isinstance(model_response, _messages.ModelResponse)
        return model_response.timestamp

output instance-attribute

output: OutputDataT

来自代理运行的输出数据。

all_messages

all_messages(
    *, output_tool_return_content: str | None = None
) -> list[ModelMessage]

返回 _messages 的历史记录。

参数

名称 类型 描述 默认值
output_tool_return_content str | None

要在最后一条消息中设置的工具调用的返回内容。如果您想继续对话并希望设置对输出工具调用的响应,这提供了一种方便的方式来修改输出工具调用的内容。如果为 None,则最后一条消息不会被修改。

None

返回

类型 描述
list[ModelMessage]

消息列表。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def all_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
    """Return the history of _messages.

    Args:
        output_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the output tool call if you want to continue
            the conversation and want to set the response to the output tool call. If `None`, the last message will
            not be modified.

    Returns:
        List of messages.
    """
    if output_tool_return_content is not None:
        return self._set_output_tool_return(output_tool_return_content)
    else:
        return self._state.message_history

all_messages_json

all_messages_json(
    *, output_tool_return_content: str | None = None
) -> bytes

all_messages 中的所有消息作为 JSON 字节返回。

参数

名称 类型 描述 默认值
output_tool_return_content str | None

要在最后一条消息中设置的工具调用的返回内容。如果您想继续对话并希望设置对输出工具调用的响应,这提供了一种方便的方式来修改输出工具调用的内容。如果为 None,则最后一条消息不会被修改。

None

返回

类型 描述
bytes

表示消息的 JSON 字节。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def all_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes:
    """Return all messages from [`all_messages`][pydantic_ai.agent.AgentRunResult.all_messages] as JSON bytes.

    Args:
        output_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the output tool call if you want to continue
            the conversation and want to set the response to the output tool call. If `None`, the last message will
            not be modified.

    Returns:
        JSON bytes representing the messages.
    """
    return _messages.ModelMessagesTypeAdapter.dump_json(
        self.all_messages(output_tool_return_content=output_tool_return_content)
    )

new_messages

new_messages(
    *, output_tool_return_content: str | None = None
) -> list[ModelMessage]

返回与此运行相关的新消息。

旧运行中的消息被排除在外。

参数

名称 类型 描述 默认值
output_tool_return_content str | None

要在最后一条消息中设置的工具调用的返回内容。如果您想继续对话并希望设置对输出工具调用的响应,这提供了一种方便的方式来修改输出工具调用的内容。如果为 None,则最后一条消息不会被修改。

None

返回

类型 描述
list[ModelMessage]

新消息列表。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
def new_messages(self, *, output_tool_return_content: str | None = None) -> list[_messages.ModelMessage]:
    """Return new messages associated with this run.

    Messages from older runs are excluded.

    Args:
        output_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the output tool call if you want to continue
            the conversation and want to set the response to the output tool call. If `None`, the last message will
            not be modified.

    Returns:
        List of new messages.
    """
    return self.all_messages(output_tool_return_content=output_tool_return_content)[self._new_message_index :]

new_messages_json

new_messages_json(
    *, output_tool_return_content: str | None = None
) -> bytes

new_messages 中的新消息作为 JSON 字节返回。

参数

名称 类型 描述 默认值
output_tool_return_content str | None

要在最后一条消息中设置的工具调用的返回内容。如果您想继续对话并希望设置对输出工具调用的响应,这提供了一种方便的方式来修改输出工具调用的内容。如果为 None,则最后一条消息不会被修改。

None

返回

类型 描述
bytes

表示新消息的 JSON 字节。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def new_messages_json(self, *, output_tool_return_content: str | None = None) -> bytes:
    """Return new messages from [`new_messages`][pydantic_ai.agent.AgentRunResult.new_messages] as JSON bytes.

    Args:
        output_tool_return_content: The return content of the tool call to set in the last message.
            This provides a convenient way to modify the content of the output tool call if you want to continue
            the conversation and want to set the response to the output tool call. If `None`, the last message will
            not be modified.

    Returns:
        JSON bytes representing the new messages.
    """
    return _messages.ModelMessagesTypeAdapter.dump_json(
        self.new_messages(output_tool_return_content=output_tool_return_content)
    )

usage

usage() -> RunUsage

返回整个运行的使用情况。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
349
350
351
def usage(self) -> _usage.RunUsage:
    """Return the usage of the whole run."""
    return self._state.usage

时间戳(timestamp)

timestamp() -> datetime

返回最后一次响应的时间戳。

源代码位于 pydantic_ai_slim/pydantic_ai/run.py
353
354
355
356
357
def timestamp(self) -> datetime:
    """Return the timestamp of last response."""
    model_response = self.all_messages()[-1]
    assert isinstance(model_response, _messages.ModelResponse)
    return model_response.timestamp

EndStrategy module-attribute

EndStrategy = Literal['early', 'exhaustive']

当找到最终结果时处理多个工具调用的策略。

  • 'early':一旦找到最终结果,就停止处理其他工具调用
  • 'exhaustive':即使找到最终结果后,也处理所有工具调用

RunOutputDataT module-attribute

RunOutputDataT = TypeVar('RunOutputDataT')

在运行调用中自定义了 output_type 的运行结果数据的类型变量。

capture_run_messages

capture_run_messages() -> Iterator[list[ModelMessage]]

用于访问在 runrun_syncrun_stream 调用中使用的消息的上下文管理器。

当运行可能引发异常时非常有用,有关更多信息,请参阅模型错误

示例

from pydantic_ai import Agent, capture_run_messages

agent = Agent('test')

with capture_run_messages() as messages:
    try:
        result = agent.run_sync('foobar')
    except Exception:
        print(messages)
        raise

注意

如果您在单个 capture_run_messages 上下文中多次调用 runrun_syncrun_streammessages 将仅表示在第一次调用期间交换的消息。

源代码位于 pydantic_ai_slim/pydantic_ai/_agent_graph.py
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
@contextmanager
def capture_run_messages() -> Iterator[list[_messages.ModelMessage]]:
    """Context manager to access the messages used in a [`run`][pydantic_ai.agent.AbstractAgent.run], [`run_sync`][pydantic_ai.agent.AbstractAgent.run_sync], or [`run_stream`][pydantic_ai.agent.AbstractAgent.run_stream] call.

    Useful when a run may raise an exception, see [model errors](../agents.md#model-errors) for more information.

    Examples:
    ```python
    from pydantic_ai import Agent, capture_run_messages

    agent = Agent('test')

    with capture_run_messages() as messages:
        try:
            result = agent.run_sync('foobar')
        except Exception:
            print(messages)
            raise
    ```

    !!! note
        If you call `run`, `run_sync`, or `run_stream` more than once within a single `capture_run_messages` context,
        `messages` will represent the messages exchanged during the first call only.
    """
    token = None
    messages: list[_messages.ModelMessage] = []

    # Try to reuse existing message context if available
    try:
        messages = _messages_ctx_var.get().messages
    except LookupError:
        # No existing context, create a new one
        token = _messages_ctx_var.set(_RunMessages(messages))

    try:
        yield messages
    finally:
        # Clean up context if we created it
        if token is not None:
            _messages_ctx_var.reset(token)

InstrumentationSettings dataclass

使用 OpenTelemetry 检测模型和代理的选项。

用于

有关更多信息,请参阅调试和监控指南

源代码位于 pydantic_ai_slim/pydantic_ai/models/instrumented.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
@dataclass(init=False)
class InstrumentationSettings:
    """Options for instrumenting models and agents with OpenTelemetry.

    Used in:

    - `Agent(instrument=...)`
    - [`Agent.instrument_all()`][pydantic_ai.agent.Agent.instrument_all]
    - [`InstrumentedModel`][pydantic_ai.models.instrumented.InstrumentedModel]

    See the [Debugging and Monitoring guide](https://ai.pydantic.org.cn/logfire/) for more info.
    """

    tracer: Tracer = field(repr=False)
    event_logger: EventLogger = field(repr=False)
    event_mode: Literal['attributes', 'logs'] = 'attributes'
    include_binary_content: bool = True
    include_content: bool = True
    version: Literal[1, 2] = 1

    def __init__(
        self,
        *,
        tracer_provider: TracerProvider | None = None,
        meter_provider: MeterProvider | None = None,
        include_binary_content: bool = True,
        include_content: bool = True,
        version: Literal[1, 2] = 2,
        event_mode: Literal['attributes', 'logs'] = 'attributes',
        event_logger_provider: EventLoggerProvider | None = None,
    ):
        """Create instrumentation options.

        Args:
            tracer_provider: The OpenTelemetry tracer provider to use.
                If not provided, the global tracer provider is used.
                Calling `logfire.configure()` sets the global tracer provider, so most users don't need this.
            meter_provider: The OpenTelemetry meter provider to use.
                If not provided, the global meter provider is used.
                Calling `logfire.configure()` sets the global meter provider, so most users don't need this.
            include_binary_content: Whether to include binary content in the instrumentation events.
            include_content: Whether to include prompts, completions, and tool call arguments and responses
                in the instrumentation events.
            version: Version of the data format. This is unrelated to the Pydantic AI package version.
                Version 1 is based on the legacy event-based OpenTelemetry GenAI spec
                    and will be removed in a future release.
                    The parameters `event_mode` and `event_logger_provider` are only relevant for version 1.
                Version 2 uses the newer OpenTelemetry GenAI spec and stores messages in the following attributes:
                    - `gen_ai.system_instructions` for instructions passed to the agent.
                    - `gen_ai.input.messages` and `gen_ai.output.messages` on model request spans.
                    - `pydantic_ai.all_messages` on agent run spans.
            event_mode: The mode for emitting events in version 1.
                If `'attributes'`, events are attached to the span as attributes.
                If `'logs'`, events are emitted as OpenTelemetry log-based events.
            event_logger_provider: The OpenTelemetry event logger provider to use.
                If not provided, the global event logger provider is used.
                Calling `logfire.configure()` sets the global event logger provider, so most users don't need this.
                This is only used if `event_mode='logs'` and `version=1`.
        """
        from pydantic_ai import __version__

        tracer_provider = tracer_provider or get_tracer_provider()
        meter_provider = meter_provider or get_meter_provider()
        event_logger_provider = event_logger_provider or get_event_logger_provider()
        scope_name = 'pydantic-ai'
        self.tracer = tracer_provider.get_tracer(scope_name, __version__)
        self.meter = meter_provider.get_meter(scope_name, __version__)
        self.event_logger = event_logger_provider.get_event_logger(scope_name, __version__)
        self.event_mode = event_mode
        self.include_binary_content = include_binary_content
        self.include_content = include_content

        if event_mode == 'logs' and version != 1:
            warnings.warn(
                'event_mode is only relevant for version=1 which is deprecated and will be removed in a future release.',
                stacklevel=2,
            )
            version = 1

        self.version = version

        # As specified in the OpenTelemetry GenAI metrics spec:
        # https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
        tokens_histogram_kwargs = dict(
            name='gen_ai.client.token.usage',
            unit='{token}',
            description='Measures number of input and output tokens used',
        )
        try:
            self.tokens_histogram = self.meter.create_histogram(
                **tokens_histogram_kwargs,
                explicit_bucket_boundaries_advisory=TOKEN_HISTOGRAM_BOUNDARIES,
            )
        except TypeError:  # pragma: lax no cover
            # Older OTel/logfire versions don't support explicit_bucket_boundaries_advisory
            self.tokens_histogram = self.meter.create_histogram(
                **tokens_histogram_kwargs,  # pyright: ignore
            )

    def messages_to_otel_events(self, messages: list[ModelMessage]) -> list[Event]:
        """Convert a list of model messages to OpenTelemetry events.

        Args:
            messages: The messages to convert.

        Returns:
            A list of OpenTelemetry events.
        """
        events: list[Event] = []
        instructions = InstrumentedModel._get_instructions(messages)  # pyright: ignore [reportPrivateUsage]
        if instructions is not None:
            events.append(
                Event(
                    'gen_ai.system.message',
                    body={**({'content': instructions} if self.include_content else {}), 'role': 'system'},
                )
            )

        for message_index, message in enumerate(messages):
            message_events: list[Event] = []
            if isinstance(message, ModelRequest):
                for part in message.parts:
                    if hasattr(part, 'otel_event'):
                        message_events.append(part.otel_event(self))
            elif isinstance(message, ModelResponse):  # pragma: no branch
                message_events = message.otel_events(self)
            for event in message_events:
                event.attributes = {
                    'gen_ai.message.index': message_index,
                    **(event.attributes or {}),
                }
            events.extend(message_events)

        for event in events:
            event.body = InstrumentedModel.serialize_any(event.body)
        return events

    def messages_to_otel_messages(self, messages: list[ModelMessage]) -> list[_otel_messages.ChatMessage]:
        result: list[_otel_messages.ChatMessage] = []
        for message in messages:
            if isinstance(message, ModelRequest):
                for is_system, group in itertools.groupby(message.parts, key=lambda p: isinstance(p, SystemPromptPart)):
                    message_parts: list[_otel_messages.MessagePart] = []
                    for part in group:
                        if hasattr(part, 'otel_message_parts'):
                            message_parts.extend(part.otel_message_parts(self))
                    result.append(
                        _otel_messages.ChatMessage(role='system' if is_system else 'user', parts=message_parts)
                    )
            elif isinstance(message, ModelResponse):  # pragma: no branch
                result.append(_otel_messages.ChatMessage(role='assistant', parts=message.otel_message_parts(self)))
        return result

    def handle_messages(self, input_messages: list[ModelMessage], response: ModelResponse, system: str, span: Span):
        if self.version == 1:
            events = self.messages_to_otel_events(input_messages)
            for event in self.messages_to_otel_events([response]):
                events.append(
                    Event(
                        'gen_ai.choice',
                        body={
                            'index': 0,
                            'message': event.body,
                        },
                    )
                )
            for event in events:
                event.attributes = {
                    GEN_AI_SYSTEM_ATTRIBUTE: system,
                    **(event.attributes or {}),
                }
            self._emit_events(span, events)
        else:
            output_messages = self.messages_to_otel_messages([response])
            assert len(output_messages) == 1
            output_message = cast(_otel_messages.OutputMessage, output_messages[0])
            if response.provider_details and 'finish_reason' in response.provider_details:
                output_message['finish_reason'] = response.provider_details['finish_reason']
            instructions = InstrumentedModel._get_instructions(input_messages)  # pyright: ignore [reportPrivateUsage]
            system_instructions_attributes = self.system_instructions_attributes(instructions)
            attributes = {
                'gen_ai.input.messages': json.dumps(self.messages_to_otel_messages(input_messages)),
                'gen_ai.output.messages': json.dumps([output_message]),
                **system_instructions_attributes,
                'logfire.json_schema': json.dumps(
                    {
                        'type': 'object',
                        'properties': {
                            'gen_ai.input.messages': {'type': 'array'},
                            'gen_ai.output.messages': {'type': 'array'},
                            **(
                                {'gen_ai.system_instructions': {'type': 'array'}}
                                if system_instructions_attributes
                                else {}
                            ),
                            'model_request_parameters': {'type': 'object'},
                        },
                    }
                ),
            }
            span.set_attributes(attributes)

    def system_instructions_attributes(self, instructions: str | None) -> dict[str, str]:
        if instructions and self.include_content:
            return {
                'gen_ai.system_instructions': json.dumps([_otel_messages.TextPart(type='text', content=instructions)]),
            }
        return {}

    def _emit_events(self, span: Span, events: list[Event]) -> None:
        if self.event_mode == 'logs':
            for event in events:
                self.event_logger.emit(event)
        else:
            attr_name = 'events'
            span.set_attributes(
                {
                    attr_name: json.dumps([InstrumentedModel.event_to_dict(event) for event in events]),
                    'logfire.json_schema': json.dumps(
                        {
                            'type': 'object',
                            'properties': {
                                attr_name: {'type': 'array'},
                                'model_request_parameters': {'type': 'object'},
                            },
                        }
                    ),
                }
            )

__init__

__init__(
    *,
    tracer_provider: TracerProvider | None = None,
    meter_provider: MeterProvider | None = None,
    include_binary_content: bool = True,
    include_content: bool = True,
    version: Literal[1, 2] = 2,
    event_mode: Literal[
        "attributes", "logs"
    ] = "attributes",
    event_logger_provider: EventLoggerProvider | None = None
)

创建检测选项。

参数

名称 类型 描述 默认值
tracer_provider TracerProvider | None

要使用的 OpenTelemetry 跟踪器提供程序。如果未提供,则使用全局跟踪器提供程序。调用 `logfire.configure()` 会设置全局跟踪器提供程序,因此大多数用户不需要这个。

None
meter_provider MeterProvider | None

要使用的 OpenTelemetry 度量提供程序。如果未提供,则使用全局度量提供程序。调用 `logfire.configure()` 会设置全局度量提供程序,因此大多数用户不需要这个。

None
include_binary_content bool

是否在检测事件中包含二进制内容。

True
include_content bool

是否在检测事件中包含提示、补全、工具调用参数和响应。

True
version 字面量[1, 2]

数据格式的版本。这与 Pydantic AI 包版本无关。版本 1 基于旧的基于事件的 OpenTelemetry GenAI 规范,并将在未来版本中移除。参数 `event_mode` 和 `event_logger_provider` 仅与版本 1 相关。版本 2 使用更新的 OpenTelemetry GenAI 规范,并将消息存储在以下属性中: - `gen_ai.system_instructions` 用于传递给代理的指令。 - `gen_ai.input.messages` 和 `gen_ai.output.messages` 在模型请求跨度上。 - `pydantic_ai.all_messages` 在代理运行跨度上。

2
event_mode Literal['attributes', 'logs']

在版本 1 中发出事件的模式。如果为 `'attributes'`,事件作为属性附加到跨度。如果为 `'logs'`,事件作为基于 OpenTelemetry 日志的事件发出。

'attributes'
event_logger_provider EventLoggerProvider | None

要使用的 OpenTelemetry 事件记录器提供程序。如果未提供,则使用全局事件记录器提供程序。调用 `logfire.configure()` 会设置全局事件记录器提供程序,因此大多数用户不需要这个。这仅在 `event_mode='logs'` 和 `version=1` 时使用。

None
源代码位于 pydantic_ai_slim/pydantic_ai/models/instrumented.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def __init__(
    self,
    *,
    tracer_provider: TracerProvider | None = None,
    meter_provider: MeterProvider | None = None,
    include_binary_content: bool = True,
    include_content: bool = True,
    version: Literal[1, 2] = 2,
    event_mode: Literal['attributes', 'logs'] = 'attributes',
    event_logger_provider: EventLoggerProvider | None = None,
):
    """Create instrumentation options.

    Args:
        tracer_provider: The OpenTelemetry tracer provider to use.
            If not provided, the global tracer provider is used.
            Calling `logfire.configure()` sets the global tracer provider, so most users don't need this.
        meter_provider: The OpenTelemetry meter provider to use.
            If not provided, the global meter provider is used.
            Calling `logfire.configure()` sets the global meter provider, so most users don't need this.
        include_binary_content: Whether to include binary content in the instrumentation events.
        include_content: Whether to include prompts, completions, and tool call arguments and responses
            in the instrumentation events.
        version: Version of the data format. This is unrelated to the Pydantic AI package version.
            Version 1 is based on the legacy event-based OpenTelemetry GenAI spec
                and will be removed in a future release.
                The parameters `event_mode` and `event_logger_provider` are only relevant for version 1.
            Version 2 uses the newer OpenTelemetry GenAI spec and stores messages in the following attributes:
                - `gen_ai.system_instructions` for instructions passed to the agent.
                - `gen_ai.input.messages` and `gen_ai.output.messages` on model request spans.
                - `pydantic_ai.all_messages` on agent run spans.
        event_mode: The mode for emitting events in version 1.
            If `'attributes'`, events are attached to the span as attributes.
            If `'logs'`, events are emitted as OpenTelemetry log-based events.
        event_logger_provider: The OpenTelemetry event logger provider to use.
            If not provided, the global event logger provider is used.
            Calling `logfire.configure()` sets the global event logger provider, so most users don't need this.
            This is only used if `event_mode='logs'` and `version=1`.
    """
    from pydantic_ai import __version__

    tracer_provider = tracer_provider or get_tracer_provider()
    meter_provider = meter_provider or get_meter_provider()
    event_logger_provider = event_logger_provider or get_event_logger_provider()
    scope_name = 'pydantic-ai'
    self.tracer = tracer_provider.get_tracer(scope_name, __version__)
    self.meter = meter_provider.get_meter(scope_name, __version__)
    self.event_logger = event_logger_provider.get_event_logger(scope_name, __version__)
    self.event_mode = event_mode
    self.include_binary_content = include_binary_content
    self.include_content = include_content

    if event_mode == 'logs' and version != 1:
        warnings.warn(
            'event_mode is only relevant for version=1 which is deprecated and will be removed in a future release.',
            stacklevel=2,
        )
        version = 1

    self.version = version

    # As specified in the OpenTelemetry GenAI metrics spec:
    # https://opentelemetry.io/docs/specs/semconv/gen-ai/gen-ai-metrics/#metric-gen_aiclienttokenusage
    tokens_histogram_kwargs = dict(
        name='gen_ai.client.token.usage',
        unit='{token}',
        description='Measures number of input and output tokens used',
    )
    try:
        self.tokens_histogram = self.meter.create_histogram(
            **tokens_histogram_kwargs,
            explicit_bucket_boundaries_advisory=TOKEN_HISTOGRAM_BOUNDARIES,
        )
    except TypeError:  # pragma: lax no cover
        # Older OTel/logfire versions don't support explicit_bucket_boundaries_advisory
        self.tokens_histogram = self.meter.create_histogram(
            **tokens_histogram_kwargs,  # pyright: ignore
        )

messages_to_otel_events

messages_to_otel_events(
    messages: list[ModelMessage],
) -> list[Event]

将模型消息列表转换为 OpenTelemetry 事件。

参数

名称 类型 描述 默认值
messages list[ModelMessage]

要转换的消息。

必需

返回

类型 描述
list[Event]

OpenTelemetry 事件列表。

源代码位于 pydantic_ai_slim/pydantic_ai/models/instrumented.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def messages_to_otel_events(self, messages: list[ModelMessage]) -> list[Event]:
    """Convert a list of model messages to OpenTelemetry events.

    Args:
        messages: The messages to convert.

    Returns:
        A list of OpenTelemetry events.
    """
    events: list[Event] = []
    instructions = InstrumentedModel._get_instructions(messages)  # pyright: ignore [reportPrivateUsage]
    if instructions is not None:
        events.append(
            Event(
                'gen_ai.system.message',
                body={**({'content': instructions} if self.include_content else {}), 'role': 'system'},
            )
        )

    for message_index, message in enumerate(messages):
        message_events: list[Event] = []
        if isinstance(message, ModelRequest):
            for part in message.parts:
                if hasattr(part, 'otel_event'):
                    message_events.append(part.otel_event(self))
        elif isinstance(message, ModelResponse):  # pragma: no branch
            message_events = message.otel_events(self)
        for event in message_events:
            event.attributes = {
                'gen_ai.message.index': message_index,
                **(event.attributes or {}),
            }
        events.extend(message_events)

    for event in events:
        event.body = InstrumentedModel.serialize_any(event.body)
    return events

EventStreamHandler module-attribute

一个函数,它接收代理 RunContext 和一个来自模型流式响应及代理工具执行的事件的异步可迭代对象。