跳转到内容

使用 Modal 的 Slack 潜在客户筛选器

在本例中,我们将构建一个代理应用,它能够:

  • 自动研究加入公司公共 Slack 社区的每位新成员,以判断他们对公司的商业产品的契合度,
  • 将此分析发送到一个(私有)Slack 频道,以及
  • 将过去 24 小时内排名前 5 的潜在客户的每日摘要发送到(另一个)Slack 频道。

我们将在 Modal 上部署此应用,因为它允许你使用 Python 定义一个包含 Web 端点、定时函数和后台函数的应用,并通过命令行界面(CLI)进行部署,无需设置或管理任何基础设施。这是一个很好的方式,可以降低你组织中的人员开始构建和部署 AI 代理以简化其工作的门槛。

我们还添加了 Pydantic Logfire,以便在应用和代理响应 Webhook 和定时任务运行时,获得对其的可观察性。

屏幕截图

这是发送到 Slack 的分析的样子:

Slack message

这是 Logfire 中相应追踪记录的样子:

Logfire trace

所有这些条目都可以点击,以获取有关该步骤发生情况的更多详细信息,包括与 LLM 的完整对话以及 HTTP 请求和响应。

前提条件

如果你只是想看代码,而不实际进行运行所需的设置,请随时跳转到代码部分

Slack 应用

你需要拥有一个 Slack 工作区以及创建应用的必要权限。

  1. 按照 https://docs.slack.dev/quickstart 的说明创建一个新的 Slack 应用。
    1. 在第 2 步“请求作用域”中,请求以下作用域:
    2. 在第 3 步“安装和授权应用”中,记下访问令牌(Access Token),因为我们稍后需要将其作为 Secret 存储在 Modal 中。
    3. 你可以跳过第 4 步和第 5 步。我们之后需要订阅 team_join 事件,但此时你还没有 Webhook URL。
  2. 创建应用将要发布消息的频道,并将 Slack 应用添加到这些频道中:

    • #new-slack-leads
    • #daily-slack-leads-summary

    这些名称在示例中是硬编码的。如果你想使用不同的频道,可以克隆仓库并在 examples/pydantic_examples/slack_lead_qualifier/functions.py 中进行修改。

Logfire 写入令牌

  1. 如果你还没有 Logfire 账户,请在 https://logfire-us.pydantic.dev/ 上创建一个。
  2. 创建一个新项目,例如命名为 slack-lead-qualifier
  3. 生成一个新的写入令牌(Write Token)并记下来,因为我们稍后需要将其作为 Secret 存储在 Modal 中。

OpenAI API 密钥

  1. 如果你还没有 OpenAI 账户,请在 https://platform.openai.com/ 上创建一个。
  2. 在“设置”中创建一个新的 API 密钥(API Key)并记下来,因为我们稍后需要将其作为 Secret 存储在 Modal 中。
  1. 如果你还没有 Modal 账户,请在 https://modal.com/signup 上创建一个。
  2. https://modal.com/secrets 上创建 3 个类型为“Custom”的 Secret:
    • 名称:slack,键:SLACK_API_KEY,值:你之前生成的 Slack 访问令牌
    • 名称:logfire,键:LOGFIRE_TOKEN,值:你之前生成的 Logfire 写入令牌
    • 名称:openai,键:OPENAI_API_KEY,值:你之前生成的 OpenAI API 密钥

用法

  1. 确保你已安装了依赖项

  2. 使用 Modal 进行身份验证:

    python/uv-run -m modal setup
    
  3. 临时的 Modal 应用方式运行示例,这意味着它只会在你使用 Ctrl+C 退出之前运行:

    python/uv-run -m modal serve -m pydantic_ai_examples.slack_lead_qualifier.modal
    
  4. 记下 Created web function web_app => 后面的 URL,这是你的 Webhook 端点 URL。

  5. 回到 https://docs.slack.dev/quickstart 并按照第 4 步“配置应用以监听事件”的说明,使用你记下的 Webhook 端点 URL 作为请求 URL 来订阅 team_join 事件。

现在,当有新人(可能就是你用一个临时邮箱)加入 Slack 工作区时,你会在运行 modal serve 的终端和 Logfire 的实时视图中看到 Webhook 事件正在被处理,等待几秒钟后,你应该会在 #new-slack-leads Slack 频道中看到结果出现!

伪造 Slack 注册

你也可以像这样伪造一个 Slack 注册事件来试用这个代理,可以使用任何你喜欢的名字或邮箱:

curl -X POST <webhook endpoint URL> \
-H "Content-Type: application/json" \
-d '{
    "type": "event_callback",
    "event": {
        "type": "team_join",
        "user": {
            "profile": {
                "email": "samuel@pydantic.dev",
                "first_name": "Samuel",
                "last_name": "Colvin",
                "display_name": "Samuel Colvin"
            }
        }
    }
}'

部署到生产环境

如果你想以持久的方式将此应用部署到你的 Modal 工作区,可以使用此命令:

python/uv-run -m modal deploy -m pydantic_ai_examples.slack_lead_qualifier.modal

你可能需要先下载代码,将其放入一个新的仓库,然后使用 GitHub Actions 进行持续部署

不要忘记将 Slack 事件请求 URL 更新为新的持久 URL!你还需要根据自己的情况修改代理的指令

代码

我们将从基础开始,然后逐步构建完整的应用程序。

模型

个人资料

首先,我们定义一个 Pydantic 模型来表示 Slack 用户个人资料。这些是我们从发送到 Webhook 端点的 team_join 事件中获得的字段,我们稍后会定义这个端点。

slack_lead_qualifier/models.py (L11-L15)
...

class Profile(BaseModel):
    first_name: str | None = None
    last_name: str | None = None
    display_name: str | None = None
    email: str

...

我们还定义了一个 Profile.as_prompt() 辅助方法,它使用 format_as_xml 将个人资料转换为可以发送给模型的字符串。

slack_lead_qualifier/models.py (L7-L19)
...

from pydantic_ai import format_as_xml

...

class Profile(BaseModel):

...

    def as_prompt(self) -> str:
        return format_as_xml(self, root_tag='profile')

...

分析

我们需要的第二个模型代表代理将执行的分析结果。我们包含文档字符串(docstrings),为模型提供关于这些字段应包含内容的额外上下文。

slack_lead_qualifier/models.py (L23-L31)
...

class Analysis(BaseModel):
    profile: Profile
    organization_name: str
    organization_domain: str
    job_title: str
    relevance: Annotated[int, Ge(1), Le(5)]
    """Estimated fit for Pydantic Logfire: 1 = low, 5 = high"""
    summary: str
    """One-sentence welcome note summarising who they are and how we might help"""

...

我们还定义了一个 Analysis.as_slack_blocks() 辅助方法,它将分析结果转换为一些可以发送到 Slack API 以发布新消息的 Slack blocks

slack_lead_qualifier/models.py (L23-L46)
...

class Analysis(BaseModel):

...

    def as_slack_blocks(self, include_relevance: bool = False) -> list[dict[str, Any]]:
        profile = self.profile
        relevance = f'({self.relevance}/5)' if include_relevance else ''
        return [
            {
                'type': 'markdown',
                'text': f'[{profile.display_name}](mailto:{profile.email}), {self.job_title} at [**{self.organization_name}**](https://{self.organization_domain}) {relevance}',
            },
            {
                'type': 'markdown',
                'text': self.summary,
            },
        ]

代理

现在是时候深入了解 Pydantic AI 并定义将进行实际分析的代理了!

我们指定了要使用的模型(openai:gpt-4o),提供了指令,让代理能够访问 DuckDuckGo 搜索工具,并告诉它使用原生输出结构化输出模式输出 AnalysisNone

这个应用的核心在于那些告诉代理如何评估每个新 Slack 成员的指令。如果你打算自己使用这个应用,你当然需要根据自己的情况修改它们。

slack_lead_qualifier/agent.py (L7-L40)
...

from pydantic_ai import Agent, NativeOutput
from pydantic_ai.common_tools.duckduckgo import duckduckgo_search_tool

...

agent = Agent(
    'openai:gpt-4o',
    instructions=dedent(
        """
        When a new person joins our public Slack, please put together a brief snapshot so we can be most useful to them.

        **What to include**

        1. **Who they are:**  Any details about their professional role or projects (e.g. LinkedIn, GitHub, company bio).
        2. **Where they work:**  Name of the organisation and its domain.
        3. **How we can help:**  On a scale of 1–5, estimate how likely they are to benefit from **Pydantic Logfire**
           (our paid observability tool) based on factors such as company size, product maturity, or AI usage.
           *1 = probably not relevant, 5 = very strong fit.*

        **Our products (for context only)**
        • **Pydantic Validation** – Python data-validation (open source)
        • **Pydantic AI** – Python agent framework (open source)
        • **Pydantic Logfire** – Observability for traces, logs & metrics with first-class AI support (commercial)

        **How to research**

        • Use the provided DuckDuckGo search tool to research the person and the organization they work for, based on the email domain or what you find on e.g. LinkedIn and GitHub.
        • If you can't find enough to form a reasonable view, return **None**.
        """
    ),
    tools=[duckduckgo_search_tool()],
    output_type=NativeOutput([Analysis, NoneType]),
)

...

analyze_profile

我们还定义了一个名为 analyze_profile 的辅助函数,它接受一个 Profile,运行代理,并返回一个 Analysis(或 None),并使用 Logfire 对其进行检测。

slack_lead_qualifier/agent.py (L44-L47)
...

@logfire.instrument('Analyze profile')
async def analyze_profile(profile: Profile) -> Analysis | None:
    result = await agent.run(profile.as_prompt())
    return result.output

分析存储

我们需要的下一个构建块是存储所有已完成分析的地方,这样我们在发送每日摘要时就可以查阅它们。

幸运的是,Modal 为我们提供了一种方便的方式来存储一些数据,这些数据可以在后续的 Modal 运行(Webhook 或定时任务)中被读取:modal.Dict

我们定义了一些便捷的方法,以便轻松地添加、列出和清除分析。

slack_lead_qualifier/store.py (L4-L31)
...

import modal

...

class AnalysisStore:
    @classmethod
    @logfire.instrument('Add analysis to store')
    async def add(cls, analysis: Analysis):
        await cls._get_store().put.aio(analysis.profile.email, analysis.model_dump())

    @classmethod
    @logfire.instrument('List analyses from store')
    async def list(cls) -> list[Analysis]:
        return [
            Analysis.model_validate(analysis)
            async for analysis in cls._get_store().values.aio()
        ]

    @classmethod
    @logfire.instrument('Clear analyses from store')
    async def clear(cls):
        await cls._get_store().clear.aio()

    @classmethod
    def _get_store(cls) -> modal.Dict:
        return modal.Dict.from_name('analyses', create_if_missing=True)  # type: ignore

注意

请注意最后一行的 # type: ignore —— 不幸的是 modal 并未完全定义其类型,所以我们需要这样做来阻止我们的静态类型检查器 pyright(我们在所有 Pydantic AI 代码包括示例中都运行它)报错。

发送 Slack 消息

接下来,我们需要一种实际发送 Slack 消息的方法,因此我们定义了一个简单的函数,它使用 Slack 的 chat.postMessage API。

slack_lead_qualifier/slack.py (L8-L30)
...

API_KEY = os.getenv('SLACK_API_KEY')
assert API_KEY, 'SLACK_API_KEY is not set'


@logfire.instrument('Send Slack message')
async def send_slack_message(channel: str, blocks: list[dict[str, Any]]):
    client = httpx.AsyncClient()
    response = await client.post(
        'https://slack.com/api/chat.postMessage',
        json={
            'channel': channel,
            'blocks': blocks,
        },
        headers={
            'Authorization': f'Bearer {API_KEY}',
        },
        timeout=5,
    )
    response.raise_for_status()
    result = response.json()
    if not result.get('ok', False):
        error = result.get('error', 'Unknown error')
        raise Exception(f'Failed to send to Slack: {error}')

特性

现在我们可以开始将这些构建模块组合起来,以实现我们想要的实际功能了!

process_slack_member

此函数接收一个 Profile,使用代理对其进行分析,将其添加到 AnalysisStore 中,然后将分析结果发送#new-slack-leads 频道。

slack_lead_qualifier/functions.py (L4-L45)
...

from .agent import analyze_profile
from .models import Profile

from .slack import send_slack_message
from .store import AnalysisStore

...

NEW_LEAD_CHANNEL = '#new-slack-leads'

...

@logfire.instrument('Process Slack member')
async def process_slack_member(profile: Profile):
    analysis = await analyze_profile(profile)
    logfire.info('Analysis', analysis=analysis)

    if analysis is None:
        return

    await AnalysisStore().add(analysis)

    await send_slack_message(
        NEW_LEAD_CHANNEL,
        [
            {
                'type': 'header',
                'text': {
                    'type': 'plain_text',
                    'text': f'New Slack member with score {analysis.relevance}/5',
                },
            },
            {
                'type': 'divider',
            },
            *analysis.as_slack_blocks(),
        ],
    )

...

send_daily_summary

该函数列出 AnalysisStore 中的所有分析,按相关性取前 5 个,将它们发送#daily-slack-leads-summary 频道,并清除 AnalysisStore,以便下一次每日运行时不会再次处理这些分析。

slack_lead_qualifier/functions.py (L8-L85)
...

from .slack import send_slack_message
from .store import AnalysisStore

...

DAILY_SUMMARY_CHANNEL = '#daily-slack-leads-summary'

...

@logfire.instrument('Send daily summary')
async def send_daily_summary():
    analyses = await AnalysisStore().list()
    logfire.info('Analyses', analyses=analyses)

    if len(analyses) == 0:
        return

    sorted_analyses = sorted(analyses, key=lambda x: x.relevance, reverse=True)
    top_analyses = sorted_analyses[:5]

    blocks = [
        {
            'type': 'header',
            'text': {
                'type': 'plain_text',
                'text': f'Top {len(top_analyses)} new Slack members from the last 24 hours',
            },
        },
    ]

    for analysis in top_analyses:
        blocks.extend(
            [
                {
                    'type': 'divider',
                },
                *analysis.as_slack_blocks(include_relevance=True),
            ]
        )

    await send_slack_message(
        DAILY_SUMMARY_CHANNEL,
        blocks,
    )

    await AnalysisStore().clear()

Web 应用

就目前而言,这两个函数实际上都没有被任何地方调用。

让我们实现一个 FastAPI 端点来处理 team_join Slack Webhook(也称为 Slack Events API),并调用我们刚刚定义的 process_slack_member 函数。我们还顺便使用 Logfire 对 FastAPI 进行了检测。

slack_lead_qualifier/app.py (L20-L36)
...

app = FastAPI()
logfire.instrument_fastapi(app, capture_headers=True)


@app.post('/')
async def process_webhook(payload: dict[str, Any]) -> dict[str, Any]:
    if payload['type'] == 'url_verification':
        return {'challenge': payload['challenge']}
    elif (
        payload['type'] == 'event_callback' and payload['event']['type'] == 'team_join'
    ):
        profile = Profile.model_validate(payload['event']['user']['profile'])

        process_slack_member(profile)
        return {'status': 'OK'}

    raise HTTPException(status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)

使用 Modal 的 process_slack_member

我刚才耍了点小聪明——我们实际上并没有直接调用我们在 functions.py 中定义的 process_slack_member 函数,因为 Slack 要求 Webhook 在 3 秒内响应,而我们需要比这更长的时间来与 LLM 对话、进行一些网页搜索以及发送 Slack 消息。

取而代之的是,我们调用了与应用一起定义的以下函数,它使用 Modal 的 modal.Function.spawn 功能在后台运行一个函数。(如果你好奇这个函数的 Modal 端是什么样子,可以跳转到后面查看。)

因为 modal.py(我们将在下一节看到)导入了 app.py,所以我们在函数定义内部从 modal.py 导入,因为在顶层这样做会导致循环导入错误。

我们还传递了当前的 Logfire 上下文以实现分布式追踪,这意味着后台函数的执行将显示在 Webhook 请求追踪的嵌套下,这样我们就能将与该请求相关的所有内容集中在一处。

slack_lead_qualifier/app.py (L11-L16)
...

def process_slack_member(profile: Profile):
    from .modal import process_slack_member as _process_slack_member

    _process_slack_member.spawn(
        profile.model_dump(), logfire_ctx=get_context()
    )

...

现在让我们看看 Modal 让部署这一切变得多么容易。

设置 Modal

我们首先定义 Modal 应用,指定要使用的基础镜像(带有 Python 3.13 的 Debian)、它需要的所有 Python 包,以及在 Modal 界面中定义的所有需要在运行时可用的 Secret。

slack_lead_qualifier/modal.py (L4-L21)
...

import modal

image = modal.Image.debian_slim(python_version='3.13').pip_install(
    'pydantic',
    'pydantic_ai_slim[openai,duckduckgo]',
    'logfire[httpx,fastapi]',
    'fastapi[standard]',
    'httpx',
)
app = modal.App(
    name='slack-lead-qualifier',
    image=image,
    secrets=[
        modal.Secret.from_name('logfire'),
        modal.Secret.from_name('openai'),
        modal.Secret.from_name('slack'),
    ],
)

...

设置 Logfire

接下来,我们定义一个函数来为 Pydantic AI 和 HTTPX 设置 Logfire 检测。

我们不能在文件的顶层执行此操作,因为所请求的包(如 logfire)仅在 Modal 上运行的函数(如下面我们将定义的那些)内部可用。此文件 modal.py 在你的本地机器上运行,并且只能访问 modal 包。

slack_lead_qualifier/modal.py (L25-L30)
...

def setup_logfire():
    import logfire

    logfire.configure(service_name=app.name)
    logfire.instrument_pydantic_ai()
    logfire.instrument_httpx(capture_all=True)

...

Web 应用

要在 Modal 上部署一个 Web 端点,我们只需定义一个返回 ASGI 应用(如 FastAPI)的函数,并用 @app.function()@modal.asgi_app() 对其进行装饰。

这个 web_app 函数将在 Modal 上运行,因此在函数内部我们可以调用需要 logfire 包的 setup_logfire 函数,并导入使用其他请求包的 app.py

默认情况下,Modal 会按需启动一个容器来处理函数调用(如 Web 请求),这意味着每个请求都会有一点启动时间。然而,Slack 要求 Webhook 在 3 秒内响应,因此我们指定 min_containers=1 以保持 Web 端点持续运行并随时准备响应请求。这有点烦人且浪费资源,但幸运的是 Modal 的定价相当合理,你每月有 30 美元的免费计算额度,并且他们为初创公司和学术研究人员提供高达 5 万美元的免费额度。

slack_lead_qualifier/modal.py (L34-L41)
...

@app.function(min_containers=1)
@modal.asgi_app()  # type: ignore
def web_app():
    setup_logfire()

    from .app import app as _app

    return _app

...

注意

请注意 @modal.asgi_app() 行上的 # type: ignore —— 不幸的是,modal 并未完全定义其类型,所以我们需要这样做来阻止我们的静态类型检查器 pyright(我们在所有 Pydantic AI 代码包括示例中都运行它)报错。

定时发送 send_daily_summary

要定义一个定时函数,我们可以使用带有 schedule 参数的 @app.function() 装饰器。这个 Modal 函数将在每天 UTC 时间早上 8 点调用我们导入的 send_daily_summary 函数。

slack_lead_qualifier/modal.py (L60-L66)
...

@app.function(schedule=modal.Cron('0 8 * * *'))  # Every day at 8am UTC
async def send_daily_summary():
    setup_logfire()

    from .functions import send_daily_summary as _send_daily_summary

    await _send_daily_summary()

后台处理的 process_slack_member

最后,我们定义一个 Modal 函数来包装我们的 process_slack_member 函数,这样它就可以在后台运行。

正如你从我们从 Web 应用中启动此函数时记得的那样,我们传递了 Logfire 上下文以实现分布式追踪,所以我们需要在这里附加它。

slack_lead_qualifier/modal.py (L45-L56)
...

@app.function()
async def process_slack_member(profile_raw: dict[str, Any], logfire_ctx: Any):
    setup_logfire()

    from logfire.propagate import attach_context

    from .functions import process_slack_member as _process_slack_member
    from .models import Profile

    with attach_context(logfire_ctx):
        profile = Profile.model_validate(profile_raw)
        await _process_slack_member(profile)

...

结论

就是这样!现在,假设你已经满足了前提条件,你就可以使用用法下的命令来运行或部署这个应用了。