跳转到内容

pydantic_ai.direct

以最少的抽象向语言模型发出命令式请求的方法。

这些方法允许您向 LLM 发出请求,其中唯一的抽象是输入和输出模式的转换,因此您可以使用相同的 API 与所有模型交互。

这些方法是 Model 实现的轻量级包装器。

model_request async

model_request(
    model: Model | KnownModelName | str,
    messages: list[ModelMessage],
    *,
    model_settings: ModelSettings | None = None,
    model_request_parameters: (
        ModelRequestParameters | None
    ) = None,
    instrument: InstrumentationSettings | bool | None = None
) -> ModelResponse

向模型发出非流式请求。

model_request_example.py
from pydantic_ai.direct import model_request
from pydantic_ai.messages import ModelRequest


async def main():
    model_response = await model_request(
        'anthropic:claude-3-5-haiku-latest',
        [ModelRequest.user_text_prompt('What is the capital of France?')]  # (1)!
    )
    print(model_response)
    '''
    ModelResponse(
        parts=[TextPart(content='The capital of France is Paris.')],
        usage=RequestUsage(input_tokens=56, output_tokens=7),
        model_name='claude-3-5-haiku-latest',
        timestamp=datetime.datetime(...),
    )
    '''
  1. 有关详细信息,请参阅 ModelRequest.user_text_prompt

参数

名称 类型 描述 默认值
model Model | KnownModelName | str

要向其发出请求的模型。我们在这里允许使用 str,因为实际允许的模型列表经常变化。

必需
messages list[ModelMessage]

要发送给模型的消息

必需
model_settings ModelSettings | None

可选的模型设置

None
model_request_parameters ModelRequestParameters | None

可选的模型请求参数

None
instrument InstrumentationSettings | bool | None

是否使用 OpenTelemetry/Logfire 检测请求,如果为 None,则使用来自 logfire.instrument_pydantic_ai 的值。

None

返回

类型 描述
ModelResponse

与请求相关的模型响应和令牌使用情况。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
async def model_request(
    model: models.Model | models.KnownModelName | str,
    messages: list[messages.ModelMessage],
    *,
    model_settings: settings.ModelSettings | None = None,
    model_request_parameters: models.ModelRequestParameters | None = None,
    instrument: instrumented_models.InstrumentationSettings | bool | None = None,
) -> messages.ModelResponse:
    """Make a non-streamed request to a model.

    ```py title="model_request_example.py"
    from pydantic_ai.direct import model_request
    from pydantic_ai.messages import ModelRequest


    async def main():
        model_response = await model_request(
            'anthropic:claude-3-5-haiku-latest',
            [ModelRequest.user_text_prompt('What is the capital of France?')]  # (1)!
        )
        print(model_response)
        '''
        ModelResponse(
            parts=[TextPart(content='The capital of France is Paris.')],
            usage=RequestUsage(input_tokens=56, output_tokens=7),
            model_name='claude-3-5-haiku-latest',
            timestamp=datetime.datetime(...),
        )
        '''
    ```

    1. See [`ModelRequest.user_text_prompt`][pydantic_ai.messages.ModelRequest.user_text_prompt] for details.

    Args:
        model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
        messages: Messages to send to the model
        model_settings: optional model settings
        model_request_parameters: optional model request parameters
        instrument: Whether to instrument the request with OpenTelemetry/Logfire, if `None` the value from
            [`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used.

    Returns:
        The model response and token usage associated with the request.
    """
    model_instance = _prepare_model(model, instrument)
    return await model_instance.request(
        messages,
        model_settings,
        model_instance.customize_request_parameters(model_request_parameters or models.ModelRequestParameters()),
    )

model_request_sync

model_request_sync(
    model: Model | KnownModelName | str,
    messages: list[ModelMessage],
    *,
    model_settings: ModelSettings | None = None,
    model_request_parameters: (
        ModelRequestParameters | None
    ) = None,
    instrument: InstrumentationSettings | bool | None = None
) -> ModelResponse

向模型发出同步、非流式的请求。

这是一个便捷方法,它使用 loop.run_until_complete(...) 包装了 model_request。因此,您不能在异步代码中或存在活动事件循环时使用此方法。

model_request_sync_example.py
from pydantic_ai.direct import model_request_sync
from pydantic_ai.messages import ModelRequest

model_response = model_request_sync(
    'anthropic:claude-3-5-haiku-latest',
    [ModelRequest.user_text_prompt('What is the capital of France?')]  # (1)!
)
print(model_response)
'''
ModelResponse(
    parts=[TextPart(content='The capital of France is Paris.')],
    usage=RequestUsage(input_tokens=56, output_tokens=7),
    model_name='claude-3-5-haiku-latest',
    timestamp=datetime.datetime(...),
)
'''
  1. 有关详细信息,请参阅 ModelRequest.user_text_prompt

参数

名称 类型 描述 默认值
model Model | KnownModelName | str

要向其发出请求的模型。我们在这里允许使用 str,因为实际允许的模型列表经常变化。

必需
messages list[ModelMessage]

要发送给模型的消息

必需
model_settings ModelSettings | None

可选的模型设置

None
model_request_parameters ModelRequestParameters | None

可选的模型请求参数

None
instrument InstrumentationSettings | bool | None

是否使用 OpenTelemetry/Logfire 检测请求,如果为 None,则使用来自 logfire.instrument_pydantic_ai 的值。

None

返回

类型 描述
ModelResponse

与请求相关的模型响应和令牌使用情况。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def model_request_sync(
    model: models.Model | models.KnownModelName | str,
    messages: list[messages.ModelMessage],
    *,
    model_settings: settings.ModelSettings | None = None,
    model_request_parameters: models.ModelRequestParameters | None = None,
    instrument: instrumented_models.InstrumentationSettings | bool | None = None,
) -> messages.ModelResponse:
    """Make a Synchronous, non-streamed request to a model.

    This is a convenience method that wraps [`model_request`][pydantic_ai.direct.model_request] with
    `loop.run_until_complete(...)`. You therefore can't use this method inside async code or if there's an active event loop.

    ```py title="model_request_sync_example.py"
    from pydantic_ai.direct import model_request_sync
    from pydantic_ai.messages import ModelRequest

    model_response = model_request_sync(
        'anthropic:claude-3-5-haiku-latest',
        [ModelRequest.user_text_prompt('What is the capital of France?')]  # (1)!
    )
    print(model_response)
    '''
    ModelResponse(
        parts=[TextPart(content='The capital of France is Paris.')],
        usage=RequestUsage(input_tokens=56, output_tokens=7),
        model_name='claude-3-5-haiku-latest',
        timestamp=datetime.datetime(...),
    )
    '''
    ```

    1. See [`ModelRequest.user_text_prompt`][pydantic_ai.messages.ModelRequest.user_text_prompt] for details.

    Args:
        model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
        messages: Messages to send to the model
        model_settings: optional model settings
        model_request_parameters: optional model request parameters
        instrument: Whether to instrument the request with OpenTelemetry/Logfire, if `None` the value from
            [`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used.

    Returns:
        The model response and token usage associated with the request.
    """
    return _get_event_loop().run_until_complete(
        model_request(
            model,
            messages,
            model_settings=model_settings,
            model_request_parameters=model_request_parameters,
            instrument=instrument,
        )
    )

model_request_stream

model_request_stream(
    model: Model | KnownModelName | str,
    messages: list[ModelMessage],
    *,
    model_settings: ModelSettings | None = None,
    model_request_parameters: (
        ModelRequestParameters | None
    ) = None,
    instrument: InstrumentationSettings | bool | None = None
) -> AbstractAsyncContextManager[StreamedResponse]

向模型发出流式异步请求。

model_request_stream_example.py
from pydantic_ai.direct import model_request_stream
from pydantic_ai.messages import ModelRequest


async def main():
    messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')]  # (1)!
    async with model_request_stream('openai:gpt-4.1-mini', messages) as stream:
        chunks = []
        async for chunk in stream:
            chunks.append(chunk)
        print(chunks)
        '''
        [
            PartStartEvent(index=0, part=TextPart(content='Albert Einstein was ')),
            FinalResultEvent(tool_name=None, tool_call_id=None),
            PartDeltaEvent(
                index=0, delta=TextPartDelta(content_delta='a German-born theoretical ')
            ),
            PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='physicist.')),
        ]
        '''
  1. 有关详细信息,请参阅 ModelRequest.user_text_prompt

参数

名称 类型 描述 默认值
model Model | KnownModelName | str

要向其发出请求的模型。我们在这里允许使用 str,因为实际允许的模型列表经常变化。

必需
messages list[ModelMessage]

要发送给模型的消息

必需
model_settings ModelSettings | None

可选的模型设置

None
model_request_parameters ModelRequestParameters | None

可选的模型请求参数

None
instrument InstrumentationSettings | bool | None

是否使用 OpenTelemetry/Logfire 检测请求,如果为 None,则使用来自 logfire.instrument_pydantic_ai 的值。

None

返回

类型 描述
AbstractAsyncContextManager[StreamedResponse]

一个 流式响应 异步上下文管理器。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def model_request_stream(
    model: models.Model | models.KnownModelName | str,
    messages: list[messages.ModelMessage],
    *,
    model_settings: settings.ModelSettings | None = None,
    model_request_parameters: models.ModelRequestParameters | None = None,
    instrument: instrumented_models.InstrumentationSettings | bool | None = None,
) -> AbstractAsyncContextManager[models.StreamedResponse]:
    """Make a streamed async request to a model.

    ```py {title="model_request_stream_example.py"}

    from pydantic_ai.direct import model_request_stream
    from pydantic_ai.messages import ModelRequest


    async def main():
        messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')]  # (1)!
        async with model_request_stream('openai:gpt-4.1-mini', messages) as stream:
            chunks = []
            async for chunk in stream:
                chunks.append(chunk)
            print(chunks)
            '''
            [
                PartStartEvent(index=0, part=TextPart(content='Albert Einstein was ')),
                FinalResultEvent(tool_name=None, tool_call_id=None),
                PartDeltaEvent(
                    index=0, delta=TextPartDelta(content_delta='a German-born theoretical ')
                ),
                PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='physicist.')),
            ]
            '''
    ```

    1. See [`ModelRequest.user_text_prompt`][pydantic_ai.messages.ModelRequest.user_text_prompt] for details.

    Args:
        model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
        messages: Messages to send to the model
        model_settings: optional model settings
        model_request_parameters: optional model request parameters
        instrument: Whether to instrument the request with OpenTelemetry/Logfire, if `None` the value from
            [`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used.

    Returns:
        A [stream response][pydantic_ai.models.StreamedResponse] async context manager.
    """
    model_instance = _prepare_model(model, instrument)
    return model_instance.request_stream(
        messages,
        model_settings,
        model_instance.customize_request_parameters(model_request_parameters or models.ModelRequestParameters()),
    )

model_request_stream_sync

model_request_stream_sync(
    model: Model | KnownModelName | str,
    messages: list[ModelMessage],
    *,
    model_settings: ModelSettings | None = None,
    model_request_parameters: (
        ModelRequestParameters | None
    ) = None,
    instrument: InstrumentationSettings | bool | None = None
) -> StreamedResponseSync

向模型发出流式同步请求。

这是 model_request_stream 的同步版本。它使用线程在后台运行异步流,同时提供一个同步迭代器接口。

model_request_stream_sync_example.py
from pydantic_ai.direct import model_request_stream_sync
from pydantic_ai.messages import ModelRequest

messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')]
with model_request_stream_sync('openai:gpt-4.1-mini', messages) as stream:
    chunks = []
    for chunk in stream:
        chunks.append(chunk)
    print(chunks)
    '''
    [
        PartStartEvent(index=0, part=TextPart(content='Albert Einstein was ')),
        FinalResultEvent(tool_name=None, tool_call_id=None),
        PartDeltaEvent(
            index=0, delta=TextPartDelta(content_delta='a German-born theoretical ')
        ),
        PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='physicist.')),
    ]
    '''

参数

名称 类型 描述 默认值
model Model | KnownModelName | str

要向其发出请求的模型。我们在这里允许使用 str,因为实际允许的模型列表经常变化。

必需
messages list[ModelMessage]

要发送给模型的消息

必需
model_settings ModelSettings | None

可选的模型设置

None
model_request_parameters ModelRequestParameters | None

可选的模型请求参数

None
instrument InstrumentationSettings | bool | None

是否使用 OpenTelemetry/Logfire 检测请求,如果为 None,则使用来自 logfire.instrument_pydantic_ai 的值。

None

返回

类型 描述
StreamedResponseSync

一个 同步流式响应 上下文管理器。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def model_request_stream_sync(
    model: models.Model | models.KnownModelName | str,
    messages: list[messages.ModelMessage],
    *,
    model_settings: settings.ModelSettings | None = None,
    model_request_parameters: models.ModelRequestParameters | None = None,
    instrument: instrumented_models.InstrumentationSettings | bool | None = None,
) -> StreamedResponseSync:
    """Make a streamed synchronous request to a model.

    This is the synchronous version of [`model_request_stream`][pydantic_ai.direct.model_request_stream].
    It uses threading to run the asynchronous stream in the background while providing a synchronous iterator interface.

    ```py {title="model_request_stream_sync_example.py"}

    from pydantic_ai.direct import model_request_stream_sync
    from pydantic_ai.messages import ModelRequest

    messages = [ModelRequest.user_text_prompt('Who was Albert Einstein?')]
    with model_request_stream_sync('openai:gpt-4.1-mini', messages) as stream:
        chunks = []
        for chunk in stream:
            chunks.append(chunk)
        print(chunks)
        '''
        [
            PartStartEvent(index=0, part=TextPart(content='Albert Einstein was ')),
            FinalResultEvent(tool_name=None, tool_call_id=None),
            PartDeltaEvent(
                index=0, delta=TextPartDelta(content_delta='a German-born theoretical ')
            ),
            PartDeltaEvent(index=0, delta=TextPartDelta(content_delta='physicist.')),
        ]
        '''
    ```

    Args:
        model: The model to make a request to. We allow `str` here since the actual list of allowed models changes frequently.
        messages: Messages to send to the model
        model_settings: optional model settings
        model_request_parameters: optional model request parameters
        instrument: Whether to instrument the request with OpenTelemetry/Logfire, if `None` the value from
            [`logfire.instrument_pydantic_ai`][logfire.Logfire.instrument_pydantic_ai] is used.

    Returns:
        A [sync stream response][pydantic_ai.direct.StreamedResponseSync] context manager.
    """
    async_stream_cm = model_request_stream(
        model=model,
        messages=messages,
        model_settings=model_settings,
        model_request_parameters=model_request_parameters,
        instrument=instrument,
    )

    return StreamedResponseSync(async_stream_cm)

StreamedResponseSync dataclass

通过在后台线程中运行异步生产者并提供同步迭代器,实现对异步流式响应的同步包装。

此类必须与 with 语句一起作为上下文管理器使用。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
@dataclass
class StreamedResponseSync:
    """Synchronous wrapper to async streaming responses by running the async producer in a background thread and providing a synchronous iterator.

    This class must be used as a context manager with the `with` statement.
    """

    _async_stream_cm: AbstractAsyncContextManager[StreamedResponse]
    _queue: queue.Queue[messages.ModelResponseStreamEvent | Exception | None] = field(
        default_factory=queue.Queue, init=False
    )
    _thread: threading.Thread | None = field(default=None, init=False)
    _stream_response: StreamedResponse | None = field(default=None, init=False)
    _exception: Exception | None = field(default=None, init=False)
    _context_entered: bool = field(default=False, init=False)
    _stream_ready: threading.Event = field(default_factory=threading.Event, init=False)

    def __enter__(self) -> StreamedResponseSync:
        self._context_entered = True
        self._start_producer()
        return self

    def __exit__(
        self,
        _exc_type: type[BaseException] | None,
        _exc_val: BaseException | None,
        _exc_tb: TracebackType | None,
    ) -> None:
        self._cleanup()

    def __iter__(self) -> Iterator[messages.ModelResponseStreamEvent]:
        """Stream the response as an iterable of [`ModelResponseStreamEvent`][pydantic_ai.messages.ModelResponseStreamEvent]s."""
        self._check_context_manager_usage()

        while True:
            item = self._queue.get()
            if item is None:  # End of stream
                break
            elif isinstance(item, Exception):
                raise item
            else:
                yield item

    def __repr__(self) -> str:
        if self._stream_response:
            return repr(self._stream_response)
        else:
            return f'{self.__class__.__name__}(context_entered={self._context_entered})'

    __str__ = __repr__

    def _check_context_manager_usage(self) -> None:
        if not self._context_entered:
            raise RuntimeError(
                'StreamedResponseSync must be used as a context manager. '
                'Use: `with model_request_stream_sync(...) as stream:`'
            )

    def _ensure_stream_ready(self) -> StreamedResponse:
        self._check_context_manager_usage()

        if self._stream_response is None:
            # Wait for the background thread to signal that the stream is ready
            if not self._stream_ready.wait(timeout=STREAM_INITIALIZATION_TIMEOUT):
                raise RuntimeError('Stream failed to initialize within timeout')

            if self._stream_response is None:  # pragma: no cover
                raise RuntimeError('Stream failed to initialize')

        return self._stream_response

    def _start_producer(self):
        self._thread = threading.Thread(target=self._async_producer, daemon=True)
        self._thread.start()

    def _async_producer(self):
        async def _consume_async_stream():
            try:
                async with self._async_stream_cm as stream:
                    self._stream_response = stream
                    # Signal that the stream is ready
                    self._stream_ready.set()
                    async for event in stream:
                        self._queue.put(event)
            except Exception as e:
                # Signal ready even on error so waiting threads don't hang
                self._stream_ready.set()
                self._queue.put(e)
            finally:
                self._queue.put(None)  # Signal end

        _get_event_loop().run_until_complete(_consume_async_stream())

    def _cleanup(self):
        if self._thread and self._thread.is_alive():
            self._thread.join()

    def get(self) -> messages.ModelResponse:
        """Build a ModelResponse from the data received from the stream so far."""
        return self._ensure_stream_ready().get()

    def usage(self) -> RequestUsage:
        """Get the usage of the response so far."""
        return self._ensure_stream_ready().usage()

    @property
    def model_name(self) -> str:
        """Get the model name of the response."""
        return self._ensure_stream_ready().model_name

    @property
    def timestamp(self) -> datetime:
        """Get the timestamp of the response."""
        return self._ensure_stream_ready().timestamp

__iter__

将响应作为 ModelResponseStreamEvent 的可迭代对象进行流式传输。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
300
301
302
303
304
305
306
307
308
309
310
311
def __iter__(self) -> Iterator[messages.ModelResponseStreamEvent]:
    """Stream the response as an iterable of [`ModelResponseStreamEvent`][pydantic_ai.messages.ModelResponseStreamEvent]s."""
    self._check_context_manager_usage()

    while True:
        item = self._queue.get()
        if item is None:  # End of stream
            break
        elif isinstance(item, Exception):
            raise item
        else:
            yield item

get

get() -> ModelResponse

根据到目前为止从流中接收到的数据构建一个 ModelResponse。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
367
368
369
def get(self) -> messages.ModelResponse:
    """Build a ModelResponse from the data received from the stream so far."""
    return self._ensure_stream_ready().get()

usage

usage() -> RequestUsage

获取到目前为止响应的用量。

源代码位于 pydantic_ai_slim/pydantic_ai/direct.py
371
372
373
def usage(self) -> RequestUsage:
    """Get the usage of the response so far."""
    return self._ensure_stream_ready().usage()

model_name 属性

model_name: str

获取响应的模型名称。

timestamp property

timestamp: datetime

获取响应的时间戳。