亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定

通過Amazon EventBridge Scheduler和Amazon SES為LJ Gilmore手袋公司安排產品提醒郵件

我们涵盖了如何利用 Amazon EventBridge Scheduler 和 Amazon SES 来安排发送给客户的邮件,并提供了用 AWS CDK 和 TypeScript 编写的示例。

前言

在这篇文章中,

✔️ 我们讨论了 Amazon EventBridge 调度器。
✔️ 我们讨论了整体架构的设计。
✔️ 我们讨论了一个完整的代码示例,展示如何实现。

一个问候👋🏽

本文将讨论如何使用Amazon EventBridge Scheduler和Amazon SES为虚构的在线手袋公司“LJ Gilmore手袋”安排发送产品提醒邮件。

您可以在这里找到这篇文章的完整可运行代码。

https://github.com/leegilmorecode/amazon-event-bridge-scheduler-and-amazon-ses (亚马逊 Event Bridge 调度器和 Amazon SES 项目)

👇 在我们继续之前, 请注册我的免费 Serverless Advocate 通讯(简报),获取实用技巧和提示、新文章、社区最新贡献、新的 AWS 服务等等,更多精彩内容:

[Serverless Advocate 通讯 | Substack 欢迎

欢迎来到 Serverless Advocate 通讯!在这里,您可以每周享受最新的新闻和文章……serverlessadvocate.substack.com](https://serverlessadvocate.substack.com/?source=post_page-----c8b4442ed19d---------------------------------------)

Amazon EventBridge调度器是什么? ⏲️

让我们先来讨论一下Amazon EventBridge Scheduler是什么以及它是如何运作的,在我们进入解决方案架构和代码之前。

“Amazon EventBridge Scheduler 是一个无服务器调度器,允许您从一个中央管理的服务中创建、运行和管理任务。高度可扩展的 EventBridge Scheduler 允许您安排数百万的任务,这些任务可以调用超过 270 个 AWS 服务和超过 6,000 个 API 操作。无需配置和管理基础设施,或与多个服务集成,EventBridge Scheduler 可以为您提供大规模交付调度并降低维护成本。

EventBridge Scheduler 可可靠地交付任务,并内置机制根据下游目标的可用性来调整调度。使用 EventBridge Scheduler,您可以使用 cron 和 rate 表达式来创建重复模式的调度,或配置一次性调用。您可以设置灵活的交付时间窗口,定义重试限制,并设定失败触发器的最长保留时间。” — https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html

现在我们了解了它是啥以及它是怎么工作的,让我们看看接下来要建什么。

我们在建什么呢?⚙️

我们可以从下面的架构示例中,看到我们将采用这种方式保持简单、解耦并且可扩展,利用一些关键的无服务器服务。

从上图我们可以看出:

  1. 我们客户的使用场景(在本例中没有展示)是他们使用我们的亚马逊的API Gateway REST API来创建新提醒。

2. Lambda 函数会在我们的 Amazon DynamoDB 数据库中创建新的提醒项。

3. 我们启用 DynamoDB 流功能,该 Lambda 函数读取这些流,以便安排新的提醒事件进入 Amazon EventBridge Scheduler 中。

4. 当调度器被调用的时候,它会将事件推送到 Amazon SQS 队列中,然后 Lambda 函数会从队列中读取提醒事件,并向用户发送邮件。

现在我们来看看下一节的关键代码。

聊聊关键代码👨‍💻

咱们从无状态堆栈代码开始聊关键代码:gilmore-handbags/stateless/stateless.ts

💡 注意:我们只讨论关键代码部分。如果你想查看完整的代码,请访问以下代码仓库:https://github.com/leegilmorecode/amazon-event-bridge-scheduler-and-amazon-ses

无状态栈

我们从下面展示的SQS队列开始:

    // 创建用于处理提醒的SQS队列
    this.queue = new sqs.Queue(this, 'ReminderProcessingQueue', {
      queueName: `${stage} - gilmore-handbags-reminder-processing-queue`,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    // 创建用于处理提醒的SQS死信队列
    this.deadLetterQueue = new sqs.Queue(
      this,
      'ReminderProcessingDeadLetterQueue',
      {
        queueName: `${stage} - gilmore-handbags-reminder-processing-dead-letter-queue`,
        removalPolicy: cdk.RemovalPolicy.DESTROY,
      }
    );

你可以从上面的代码中看出来,我们在架构的第4步中创建了SQS队列和DL队列,这将允许调度器向其发送提醒消息。

我们接下来创建我们的 EventBridge 调度器组,该组将把所有提醒事件集中在一起。同时,我们创建一个服务角色,以便 EventBridge 调度程序服务能够向 SQS 队列发送消息。

    // 创建一个新的计划组(这将用来存储我们的计划事件)
    const scheduleGroup = new scheduler.CfnScheduleGroup(  
      this,  
      'RemindersScheduleGroup',  
      {  
        name: `${stage}-gilmore-handbags-reminders-schedule-group`,  
      }  
    );  

    // 确保计划程序能够向SQS队列发送消息
    const launchRole = new iam.Role(this, 'SchedulerRole', {  
      assumedBy: new iam.ServicePrincipal('scheduler.amazonaws.com'),  
    });  
    new iam.Policy(this, 'SchedulePolicy', {  
      policyName: 'ScheduleToSendSqSMessage',  
      roles: [launchRole],  
      statements: [  
        new iam.PolicyStatement({  
          effect: iam.Effect.ALLOW,  
          actions: [  
            'SQS:SendMessage',  
            'SQS:GetQueueAttributes',  
            'SQS:GetQueueUrl',  
          ],  
          resources: [this.queue.queueArn],  
        }),  
      ],  
    });

接下来,我们将使用 AWS CDK 代码创建三个 Lambda 函数,这里为了简洁起见,具体实现细节不详述。

  • 从Amazon API Gateway创建初始提醒任务作为代理。
  • 从Amazon DynamoDB流读取新项目并生成计划任务。
  • 从SQS队列读取消息并发送电子邮件。

我们然后创建我们的RESTful API,以便客户端可以创建新的提醒,如下。

    // 创建我们的REST API以创建提醒事项
    this.api = new apigw.RestApi(this, 'Api', {  
      description: `(${stage}) 手袋提醒事项API接口`,  
      restApiName: `${stage}-gilmore-handbags-api`,  
      deploy: true,  
      deployOptions: {  
        stageName: 'api',  
        loggingLevel: apigw.MethodLoggingLevel.INFO, // 日志级别
      },  
    });  

    const root: apigw.Resource = this.api.root.addResource('v1');  
    const appointments: apigw.Resource = root.addResource('reminders');  

    // POST请求指向创建提醒事项的Lambda函数
    appointments.addMethod(  
      'POST',  
      new apigw.LambdaIntegration(createReminderLambda, {  
        proxy: true,  
      })  
    );

接着我们配置了事件源设置,一个用于DynamoDB流,另一个用于SQS队列:

     // 允许流处理函数读取ddb流中的数据  

    streamProcessorLambda.addEventSource(  
      new lambdaEventSources.DynamoEventSource(this.table, {  
        startingPosition: lambda.StartingPosition.LATEST,  
        batchSize: 10,  
        retryAttempts: 1,  
        reportBatchItemFailures: true,  
      })  
    );  

    // 确保邮件队列处理函数能够读取sqs队列中的消息  

    queueProcessorLambda.addEventSource(  
      new lambdaEventSources.SqsEventSource(this.queue, {  
        batchSize: 10,  
        maxConcurrency: 2,  
        reportBatchItemFailures: true,  
      })  
    );

我们终于做到了,确保我们的队列处理Lambda函数可以将提醒邮件发送到SES。

    // 让队列处理器 Lambda 函数能够使用 SES 发送电子邮件
    queueProcessorLambda(queueProcessorLambda)添加到角色策略中(
      new iam.PolicyStatement({
        actions: ['ses:SendEmail', 'ses:SendRawEmail'],
        resources: ['所有资源'],
      })
    );

现在让我们来看看我们应用程序中的真正 Typescript 代码。

应用代码

我们从通过API网关创建新提醒并将其添加到DynamoDB表中的用例开始,如下。

    import { CreateReminder, Reminder } from '@dto/reminder';  
    import { getISOString, logger, schemaValidator } from '@shared';  

    import { config } from '@config';  
    import { schema } from '@schemas/reminder';  
    import { upsert } from '@adapters/secondary/dynamodb-adapter';  
    import { v4 as uuid } from 'uuid';  

    const tableName = config.get('tableName');  

    export async function createReminderUseCase(  
      createReminder: CreateReminder  
    ): Promise<Reminder> {  
      const createdDate = getISOString();  

      const reminder: Reminder = {  
        id: uuid(),  
        created: createdDate,  
        updated: createdDate,  
        ...createReminder,  
      };  

      schemaValidator(schema, reminder);  

      await upsert<Reminder>(reminder, tableName, reminder.id);  

      logger.info(`已为用户 ${createReminder.userEmail} 创建提醒事项`);  

      return reminder;  
    }

这使用了一个AWS SDK的辅助适配器,用来把物品存到DynamoDB里,如下:

    import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';  

    import { logger } from '@shared/index';  
    import { marshall } from '@aws-sdk/util-dynamodb';  

    const dynamoDb = new DynamoDBClient({});  

    // 示例代码:  
    // const upsertedUser: User = await upsert<User>(newUser, "users", "user123");  
    export async function upsert<T>(  
      newItem: T,  
      tableName: string,  
      id: string  
    ): Promise<T> {  
      const params = {  
        TableName: tableName,  
        Item: marshall(newItem),  
      };  

      try {  
        await dynamoDb.send(new PutItemCommand(params));  

        logger.info(`创建了ID为 ${id} 的项目到 ${tableName}`);  

        return newItem;  
      } catch (error) {  
        console.error(`创建项目时出错:`, error);  
        throw error;  
      }  
    }

接下来,我们有根据流中的新DynamoDB项来调度事件的示例代码:

    import { Reminder } from '@dto/reminder';  
    import { config } from '@config';  
    import { logger } from '@shared';  
    import { scheduleEvent } from '@adapters/secondary/schedule-events';  

    const destinationQueueArn = config.get('destinationQueueArn');  
    const scheduleRoleArn = config.get('scheduleRoleArn');  
    const scheduleGroupName = config.get('scheduleGroupName');  
    const destinationDeadLetterQueueArn = config.get(  
      'destinationDeadLetterQueueArn'  
    );  

    export async function processStreamUseCase(  
      reminder: Reminder  
    ): Promise<Reminder> {  
      await scheduleEvent(  
        reminder,  
        destinationQueueArn,  
        scheduleRoleArn,  
        scheduleGroupName,  
        destinationDeadLetterQueueArn  
      );  

      logger.info(  
        `提醒已为用户 ${reminder.userEmail} 在日期 ${reminder.reminderDate} 安排完成`  
      );  

      return reminder;  
    }

你从上面可以看到,它使用了一个次要适配器将事件调度到Amazon EventBridge Scheduler服务,如下图所示:

    import {  
      ActionAfterCompletion,  
      CreateScheduleCommand,  
      FlexibleTimeWindowMode,  
      SchedulerClient,  
    } from '@aws-sdk/client-scheduler';  

    import { Reminder } from '@dto/reminder';  
    import { formatISODatetimeForScheduler } from '@shared/date-utils';  

    const client = new SchedulerClient({});  

    export async function scheduleEvent(  
      event: Reminder,  
      destinationQueueArn: string,  
      scheduleRoleArn: string,  
      scheduleGroupName: string,  
      destinationDeadLetterQueueArn: string  
    ) {  
      await client.send(  
        new CreateScheduleCommand({  
          Name: event.id,  
          GroupName: scheduleGroupName,  
          ActionAfterCompletion: ActionAfterCompletion.DELETE,  
          Target: {  
            RoleArn: scheduleRoleArn,  
            Arn: destinationQueueArn,  
            Input: JSON.stringify(event),  
            DeadLetterConfig: { Arn: destinationDeadLetterQueueArn },  
          },  
          FlexibleTimeWindow: {  
            Mode: FlexibleTimeWindowMode.OFF,  
          },  
          Description: `${event.userEmail} 创建了 ${event.id}`,  
          ScheduleExpression: `at(${formatISODatetimeForScheduler(  
            event.reminderDate  
          )})`,  
        })  
      );  
    }

此代码是一个通用的次级适配器,它接收事件数据,目标队列,角色ARN,我们要创建事件的计划组的名称,以及DLQ ARN,之后使用AWS SDK创建该计划事件。

最后,我们有通过SES发送邮件和从队列中读取提醒信息的代码示例:

    import { Reminder } from '@dto/reminder';  
    import { SQSRecord } from 'aws-lambda';  
    import { config } from '@config';  
    import { sendEmail } from '@adapters/secondary/email-adapter';  

    const fromEmailAddress = config.get('fromEmailAddress');  

    export async function queueProcessorUseCase(  
      newEvent: SQSRecord  
    ): Promise<SQSRecord> {  
      const reminder = JSON.parse(newEvent.body) as Reminder;  

      const emailSubject = `产品提醒:产品ID ${reminder.productId}`;  
      const emailBody =  
        reminder.message ||  
        `这是一条关于您产品的提醒(ID: ${reminder.productId})。`;  

      await sendEmail(  
        fromEmailAddress,  
        [reminder.userEmail],  
        emailSubject,  
        emailBody  
      );  

      return newEvent;  
    }

与前两个用例相同,你将看到它使用了一个二次适配器来发送电子邮件到Amazon SES服务。

    import { SESClient, SendEmailCommand } from '@aws-sdk/client-ses';  

    import { logger } from '@shared/logger';  

    const sesClient = new SESClient();  

    export async function sendEmail(  
      sourceEmail: string,  
      toAddresses: string[],  
      subject: string,  
      emailBody: string  
    ): Promise<void> {  
      try {  
        const params = {  
          Source: sourceEmail,  
          Destination: {  
            ToAddresses: toAddresses,  
          },  
          Message: {  
            Subject: {  
              Data: subject,  
            },  
            Body: {  
              Text: {  
                Data: emailBody,  
              },  
            },  
          },  
        };  

        const command = new SendEmailCommand(params);  
        const response = await sesClient.send(command);  
        logger.info(`邮件发送成功,消息ID是 ${response.MessageId}`);  
      } catch (error) {  
        logger.error(`发送邮件时发生错误: ${error}`);  
        throw error;  
      }  
    }

这是一款相当通用的邮件适配器,它处理用于通过 AWS SDK 发送邮件所需的相关信息。

搞定核心代码的讨论后,我们来看看部署后的测试。

💡 提示:仓库中的 README 文件提供了在测试前部署解决方案的说明。

👇 在我们继续之前, 请在 LinkedIn 上关注我,以便了解未来的博客文章和无服务器新闻 https://www.linkedin.com/in/lee-james-gilmore/

三试一下 🧪

一旦我们部署了代码,就可以使用这里的Postman文件 postman/Gilmore Handbag Store Reminders.postman_collection.json 来创建关于购买手袋的提醒,给妻子过生日时使用,趁促销活动期间!

这是一次发送到我们服务器的POST请求,其JSON负载如下所示:

{
  "productId": "handbag-001",
  "reminderDate": "提醒日期: 2024-10-13T04:47:00Z",
  "userEmail": "[email protected]",
  "message": "提醒您在节假日促销结束前买这款手袋"
}

如果我们现在查看 AWS 控制台的话,我们会看到一个为提醒设置的计划事件,该提醒项也被存储在我们的 DynamoDB 表中。

当预定事件的日期和时间到达时,我们的预定事件将消息发布到Amazon SQS队列中,在那里我们的Lambda函数会抓取该消息并通过Amazon SES发送提醒邮件,作为一个非常基本的例子。

这是一款非常强大的工具,通过一个管理服务来处理潜在的数百万个计划事件的能力,来增强我们无服务器工具箱的功能。

请克隆这个仓库并亲自试试看,但记得先修改你的邮箱地址,然后再进行尝试。

结尾 🏆

感谢您读完这篇文章,最后再简单总结一下。

✔️ 我们讨论了 Amazon EventBridge 调度器。
✔️ 我们讨论了整体架构设计。
✔️ 我们讨论了一个完整的代码示例。

结束啦👋ᵗᵒᵈᵒ 晚安

希望你喜欢这篇文章,喜欢的话,欢迎分享和反馈!谢谢!

请去我的Youtube频道订阅,那里有类似的内容!

我也很想通过以下方式和你保持联系。

以下是Lee James Gilmore的LinkedIn和Twitter页面链接:

https://www.linkedin.com/in/lee-james-gilmore/
https://twitter.com/LeeJamesGilmore

如果你喜欢我的文章,请关注我的个人资料李詹姆斯吉尔莫,以获取更多文章和系列,并记得打个招呼并连接 👋

请你也点赞,如果喜欢这篇帖子!(你可以不止点一次赞)

关于我的一些信息

“大家好,我是李,一名 AWS 无服务器英雄(AWS Serverless Hero)、博主,持有 AWS 认证的云架构师,同时也是总部位于英国的首席云架构师和云实践负责人;过去十年主要在 AWS 上做全栈 JavaScript 开发。”

我是一名无服务器技术的支持者和爱好者,热爱AWS、创新、软件架构和各种技术。

_** 这些只是我个人的观点,我不会为别人使用这些信息产生的后果负责。 _**

您可能还对以下感兴趣:

无服务器文章 🚀一个方便集中浏览的无服务器相关内容汇总,包括视频、文章等等..blog.serverlessadvocate.com
點擊查看更多內容
TA 點贊

若覺得本文不錯,就分享一下吧!

評論

作者其他優質文章

正在加載中
  • 推薦
  • 評論
  • 收藏
  • 共同學習,寫下你的評論
感謝您的支持,我會繼續努力的~
掃碼打賞,你說多少就多少
贊賞金額會直接到老師賬戶
支付方式
打開微信掃一掃,即可進行掃碼打賞哦
今天注冊有機會得

100積分直接送

付費專欄免費學

大額優惠券免費領

立即參與 放棄機會
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號

舉報

0/150
提交
取消