我们涵盖了如何利用 Amazon EventBridge Scheduler 和 Amazon SES 来安排发送给客户的邮件,并提供了用 AWS CDK 和 TypeScript 编写的示例。
前言在这篇文章中,
✔️ 我们讨论了 Amazon EventBridge 调度器。
✔️ 我们讨论了整体架构的设计。
✔️ 我们讨论了一个完整的代码示例,展示如何实现。
本文将讨论如何使用Amazon EventBridge Scheduler和Amazon SES为虚构的在线手袋公司“LJ Gilmore手袋”安排发送产品提醒邮件。
您可以在这里找到这篇文章的完整可运行代码。
https://github.com/leegilmorecode/amazon-event-bridge-scheduler-and-amazon-ses (亚马逊 Event Bridge 调度器和 Amazon SES 项目)
👇 在我们继续之前, 请注册我的免费 Serverless Advocate 通讯(简报),获取实用技巧和提示、新文章、社区最新贡献、新的 AWS 服务等等,更多精彩内容:
[Serverless Advocate 通讯 | Substack 欢迎欢迎来到 Serverless Advocate 通讯!在这里,您可以每周享受最新的新闻和文章……serverlessadvocate.substack.com](https://serverlessadvocate.substack.com/?source=post_page-----c8b4442ed19d---------------------------------------)
Amazon EventBridge调度器是什么? ⏲️让我们先来讨论一下Amazon EventBridge Scheduler是什么以及它是如何运作的,在我们进入解决方案架构和代码之前。
“Amazon EventBridge Scheduler 是一个无服务器调度器,允许您从一个中央管理的服务中创建、运行和管理任务。高度可扩展的 EventBridge Scheduler 允许您安排数百万的任务,这些任务可以调用超过 270 个 AWS 服务和超过 6,000 个 API 操作。无需配置和管理基础设施,或与多个服务集成,EventBridge Scheduler 可以为您提供大规模交付调度并降低维护成本。
EventBridge Scheduler 可可靠地交付任务,并内置机制根据下游目标的可用性来调整调度。使用 EventBridge Scheduler,您可以使用 cron 和 rate 表达式来创建重复模式的调度,或配置一次性调用。您可以设置灵活的交付时间窗口,定义重试限制,并设定失败触发器的最长保留时间。” — https://docs.aws.amazon.com/scheduler/latest/UserGuide/what-is-scheduler.html
现在我们了解了它是啥以及它是怎么工作的,让我们看看接下来要建什么。
我们在建什么呢?⚙️我们可以从下面的架构示例中,看到我们将采用这种方式保持简单、解耦并且可扩展,利用一些关键的无服务器服务。
从上图我们可以看出:
- 我们客户的使用场景(在本例中没有展示)是他们使用我们的亚马逊的API Gateway REST API来创建新提醒。
2. Lambda 函数会在我们的 Amazon DynamoDB 数据库中创建新的提醒项。
3. 我们启用 DynamoDB 流功能,该 Lambda 函数读取这些流,以便安排新的提醒事件进入 Amazon EventBridge Scheduler 中。
4. 当调度器被调用的时候,它会将事件推送到 Amazon SQS 队列中,然后 Lambda 函数会从队列中读取提醒事件,并向用户发送邮件。
现在我们来看看下一节的关键代码。
聊聊关键代码👨💻咱们从无状态堆栈代码开始聊关键代码:gilmore-handbags/stateless/stateless.ts
。
无状态栈💡 注意:我们只讨论关键代码部分。如果你想查看完整的代码,请访问以下代码仓库:https://github.com/leegilmorecode/amazon-event-bridge-scheduler-and-amazon-ses
我们从下面展示的SQS队列开始:
// 创建用于处理提醒的SQS队列
this.queue = new sqs.Queue(this, 'ReminderProcessingQueue', {
queueName: `${stage} - gilmore-handbags-reminder-processing-queue`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
// 创建用于处理提醒的SQS死信队列
this.deadLetterQueue = new sqs.Queue(
this,
'ReminderProcessingDeadLetterQueue',
{
queueName: `${stage} - gilmore-handbags-reminder-processing-dead-letter-queue`,
removalPolicy: cdk.RemovalPolicy.DESTROY,
}
);
你可以从上面的代码中看出来,我们在架构的第4步中创建了SQS队列和DL队列,这将允许调度器向其发送提醒消息。
我们接下来创建我们的 EventBridge 调度器组,该组将把所有提醒事件集中在一起。同时,我们创建一个服务角色,以便 EventBridge 调度程序服务能够向 SQS 队列发送消息。
// 创建一个新的计划组(这将用来存储我们的计划事件)
const scheduleGroup = new scheduler.CfnScheduleGroup(
this,
'RemindersScheduleGroup',
{
name: `${stage}-gilmore-handbags-reminders-schedule-group`,
}
);
// 确保计划程序能够向SQS队列发送消息
const launchRole = new iam.Role(this, 'SchedulerRole', {
assumedBy: new iam.ServicePrincipal('scheduler.amazonaws.com'),
});
new iam.Policy(this, 'SchedulePolicy', {
policyName: 'ScheduleToSendSqSMessage',
roles: [launchRole],
statements: [
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'SQS:SendMessage',
'SQS:GetQueueAttributes',
'SQS:GetQueueUrl',
],
resources: [this.queue.queueArn],
}),
],
});
接下来,我们将使用 AWS CDK 代码创建三个 Lambda 函数,这里为了简洁起见,具体实现细节不详述。
- 从Amazon API Gateway创建初始提醒任务作为代理。
- 从Amazon DynamoDB流读取新项目并生成计划任务。
- 从SQS队列读取消息并发送电子邮件。
我们然后创建我们的RESTful API,以便客户端可以创建新的提醒,如下。
// 创建我们的REST API以创建提醒事项
this.api = new apigw.RestApi(this, 'Api', {
description: `(${stage}) 手袋提醒事项API接口`,
restApiName: `${stage}-gilmore-handbags-api`,
deploy: true,
deployOptions: {
stageName: 'api',
loggingLevel: apigw.MethodLoggingLevel.INFO, // 日志级别
},
});
const root: apigw.Resource = this.api.root.addResource('v1');
const appointments: apigw.Resource = root.addResource('reminders');
// POST请求指向创建提醒事项的Lambda函数
appointments.addMethod(
'POST',
new apigw.LambdaIntegration(createReminderLambda, {
proxy: true,
})
);
接着我们配置了事件源设置,一个用于DynamoDB流,另一个用于SQS队列:
// 允许流处理函数读取ddb流中的数据
streamProcessorLambda.addEventSource(
new lambdaEventSources.DynamoEventSource(this.table, {
startingPosition: lambda.StartingPosition.LATEST,
batchSize: 10,
retryAttempts: 1,
reportBatchItemFailures: true,
})
);
// 确保邮件队列处理函数能够读取sqs队列中的消息
queueProcessorLambda.addEventSource(
new lambdaEventSources.SqsEventSource(this.queue, {
batchSize: 10,
maxConcurrency: 2,
reportBatchItemFailures: true,
})
);
我们终于做到了,确保我们的队列处理Lambda函数可以将提醒邮件发送到SES。
// 让队列处理器 Lambda 函数能够使用 SES 发送电子邮件
queueProcessorLambda(queueProcessorLambda)添加到角色策略中(
new iam.PolicyStatement({
actions: ['ses:SendEmail', 'ses:SendRawEmail'],
resources: ['所有资源'],
})
);
现在让我们来看看我们应用程序中的真正 Typescript 代码。
应用代码我们从通过API网关创建新提醒并将其添加到DynamoDB表中的用例开始,如下。
import { CreateReminder, Reminder } from '@dto/reminder';
import { getISOString, logger, schemaValidator } from '@shared';
import { config } from '@config';
import { schema } from '@schemas/reminder';
import { upsert } from '@adapters/secondary/dynamodb-adapter';
import { v4 as uuid } from 'uuid';
const tableName = config.get('tableName');
export async function createReminderUseCase(
createReminder: CreateReminder
): Promise<Reminder> {
const createdDate = getISOString();
const reminder: Reminder = {
id: uuid(),
created: createdDate,
updated: createdDate,
...createReminder,
};
schemaValidator(schema, reminder);
await upsert<Reminder>(reminder, tableName, reminder.id);
logger.info(`已为用户 ${createReminder.userEmail} 创建提醒事项`);
return reminder;
}
这使用了一个AWS SDK的辅助适配器,用来把物品存到DynamoDB里,如下:
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';
import { logger } from '@shared/index';
import { marshall } from '@aws-sdk/util-dynamodb';
const dynamoDb = new DynamoDBClient({});
// 示例代码:
// const upsertedUser: User = await upsert<User>(newUser, "users", "user123");
export async function upsert<T>(
newItem: T,
tableName: string,
id: string
): Promise<T> {
const params = {
TableName: tableName,
Item: marshall(newItem),
};
try {
await dynamoDb.send(new PutItemCommand(params));
logger.info(`创建了ID为 ${id} 的项目到 ${tableName}`);
return newItem;
} catch (error) {
console.error(`创建项目时出错:`, error);
throw error;
}
}
接下来,我们有根据流中的新DynamoDB项来调度事件的示例代码:
import { Reminder } from '@dto/reminder';
import { config } from '@config';
import { logger } from '@shared';
import { scheduleEvent } from '@adapters/secondary/schedule-events';
const destinationQueueArn = config.get('destinationQueueArn');
const scheduleRoleArn = config.get('scheduleRoleArn');
const scheduleGroupName = config.get('scheduleGroupName');
const destinationDeadLetterQueueArn = config.get(
'destinationDeadLetterQueueArn'
);
export async function processStreamUseCase(
reminder: Reminder
): Promise<Reminder> {
await scheduleEvent(
reminder,
destinationQueueArn,
scheduleRoleArn,
scheduleGroupName,
destinationDeadLetterQueueArn
);
logger.info(
`提醒已为用户 ${reminder.userEmail} 在日期 ${reminder.reminderDate} 安排完成`
);
return reminder;
}
你从上面可以看到,它使用了一个次要适配器将事件调度到Amazon EventBridge Scheduler服务,如下图所示:
import {
ActionAfterCompletion,
CreateScheduleCommand,
FlexibleTimeWindowMode,
SchedulerClient,
} from '@aws-sdk/client-scheduler';
import { Reminder } from '@dto/reminder';
import { formatISODatetimeForScheduler } from '@shared/date-utils';
const client = new SchedulerClient({});
export async function scheduleEvent(
event: Reminder,
destinationQueueArn: string,
scheduleRoleArn: string,
scheduleGroupName: string,
destinationDeadLetterQueueArn: string
) {
await client.send(
new CreateScheduleCommand({
Name: event.id,
GroupName: scheduleGroupName,
ActionAfterCompletion: ActionAfterCompletion.DELETE,
Target: {
RoleArn: scheduleRoleArn,
Arn: destinationQueueArn,
Input: JSON.stringify(event),
DeadLetterConfig: { Arn: destinationDeadLetterQueueArn },
},
FlexibleTimeWindow: {
Mode: FlexibleTimeWindowMode.OFF,
},
Description: `${event.userEmail} 创建了 ${event.id}`,
ScheduleExpression: `at(${formatISODatetimeForScheduler(
event.reminderDate
)})`,
})
);
}
此代码是一个通用的次级适配器,它接收事件数据,目标队列,角色ARN,我们要创建事件的计划组的名称,以及DLQ ARN,之后使用AWS SDK创建该计划事件。
最后,我们有通过SES发送邮件和从队列中读取提醒信息的代码示例:
import { Reminder } from '@dto/reminder';
import { SQSRecord } from 'aws-lambda';
import { config } from '@config';
import { sendEmail } from '@adapters/secondary/email-adapter';
const fromEmailAddress = config.get('fromEmailAddress');
export async function queueProcessorUseCase(
newEvent: SQSRecord
): Promise<SQSRecord> {
const reminder = JSON.parse(newEvent.body) as Reminder;
const emailSubject = `产品提醒:产品ID ${reminder.productId}`;
const emailBody =
reminder.message ||
`这是一条关于您产品的提醒(ID: ${reminder.productId})。`;
await sendEmail(
fromEmailAddress,
[reminder.userEmail],
emailSubject,
emailBody
);
return newEvent;
}
与前两个用例相同,你将看到它使用了一个二次适配器来发送电子邮件到Amazon SES服务。
import { SESClient, SendEmailCommand } from '@aws-sdk/client-ses';
import { logger } from '@shared/logger';
const sesClient = new SESClient();
export async function sendEmail(
sourceEmail: string,
toAddresses: string[],
subject: string,
emailBody: string
): Promise<void> {
try {
const params = {
Source: sourceEmail,
Destination: {
ToAddresses: toAddresses,
},
Message: {
Subject: {
Data: subject,
},
Body: {
Text: {
Data: emailBody,
},
},
},
};
const command = new SendEmailCommand(params);
const response = await sesClient.send(command);
logger.info(`邮件发送成功,消息ID是 ${response.MessageId}`);
} catch (error) {
logger.error(`发送邮件时发生错误: ${error}`);
throw error;
}
}
这是一款相当通用的邮件适配器,它处理用于通过 AWS SDK 发送邮件所需的相关信息。
搞定核心代码的讨论后,我们来看看部署后的测试。
💡 提示:仓库中的 README 文件提供了在测试前部署解决方案的说明。
👇 在我们继续之前, 请在 LinkedIn 上关注我,以便了解未来的博客文章和无服务器新闻 https://www.linkedin.com/in/lee-james-gilmore/
一旦我们部署了代码,就可以使用这里的Postman文件 postman/Gilmore Handbag Store Reminders.postman_collection.json
来创建关于购买手袋的提醒,给妻子过生日时使用,趁促销活动期间!
这是一次发送到我们服务器的POST
请求,其JSON负载如下所示:
{
"productId": "handbag-001",
"reminderDate": "提醒日期: 2024-10-13T04:47:00Z",
"userEmail": "[email protected]",
"message": "提醒您在节假日促销结束前买这款手袋"
}
如果我们现在查看 AWS 控制台的话,我们会看到一个为提醒设置的计划事件,该提醒项也被存储在我们的 DynamoDB 表中。
当预定事件的日期和时间到达时,我们的预定事件将消息发布到Amazon SQS队列中,在那里我们的Lambda函数会抓取该消息并通过Amazon SES发送提醒邮件,作为一个非常基本的例子。
这是一款非常强大的工具,通过一个管理服务来处理潜在的数百万个计划事件的能力,来增强我们无服务器工具箱的功能。
请克隆这个仓库并亲自试试看,但记得先修改你的邮箱地址,然后再进行尝试。
结尾 🏆感谢您读完这篇文章,最后再简单总结一下。
✔️ 我们讨论了 Amazon EventBridge 调度器。
✔️ 我们讨论了整体架构设计。
✔️ 我们讨论了一个完整的代码示例。
希望你喜欢这篇文章,喜欢的话,欢迎分享和反馈!谢谢!
请去我的Youtube频道订阅,那里有类似的内容!
我也很想通过以下方式和你保持联系。
以下是Lee James Gilmore的LinkedIn和Twitter页面链接:
https://www.linkedin.com/in/lee-james-gilmore/
https://twitter.com/LeeJamesGilmore
如果你喜欢我的文章,请关注我的个人资料李詹姆斯吉尔莫,以获取更多文章和系列,并记得打个招呼并连接 👋
请你也点赞,如果喜欢这篇帖子!(你可以不止点一次赞)
关于我的一些信息“大家好,我是李,一名 AWS 无服务器英雄(AWS Serverless Hero)、博主,持有 AWS 认证的云架构师,同时也是总部位于英国的首席云架构师和云实践负责人;过去十年主要在 AWS 上做全栈 JavaScript 开发。”
我是一名无服务器技术的支持者和爱好者,热爱AWS、创新、软件架构和各种技术。
_** 这些只是我个人的观点,我不会为别人使用这些信息产生的后果负责。 _**
您可能还对以下感兴趣:
无服务器文章 🚀一个方便集中浏览的无服务器相关内容汇总,包括视频、文章等等..blog.serverlessadvocate.com共同學習,寫下你的評論
評論加載中...
作者其他優質文章