使用構建器設計模式處理Node.js中的工作流:詳盡指南
工作流基本上是一系列按顺序执行的任务列表,每个任务可以选择将其输出作为下一个任务的输入。这里有一些工作流的不同应用场景,我们举几个例子。
用户注册的流程
- 注册一个用户,包括将用户头像上传到存储空间,然后在数据库中创建用户并将头像链接与其关联,最后发送验证邮件。
- 用户通过网站的联系表单发送消息时,我们将他们作为新的潜在客户添加到CRM中,将他们的消息推送到Slack销售通道,然后确认他们是否为新联系人,并相应发送营销邮件。
在这份指南中,我们将一步步地使用构建器模式来创建我们自己的工作流处理程序,并且为了使这个过程更加有趣,我们将使用注册的例子。
在开始之前,请确保您具备以下条件:在开始实现之前,
- Node.js 和 npm
- 一些基本的 TypeScript 知识
在进入正题之前,我们先试着看看如何用基本的方法来实现这个流程。
1 创建一个新的Node.js应用项目 mkdir workflow-sample
cd workflow-sample
# 使用npm init -y 初始化项目,自动配置 package.json
npm init -y
2. 添加 TypeScript 支持
首先,我们将安装所需的软件包
在命令行中输入以下命令来安装:
npm install -D typescript ts-node @types/node
接下来,我们将在根文件夹中创建一个新的 tsconfig.json 文件,并粘贴以下代码:
{
"compilerOptions": {
"target": "目标",
"module": "模块",
"outDir": "输出目录",
"esModuleInterop": "es模块互操作",
"forceConsistentCasingInFileNames": "强制文件名大小写一致",
"strict": "严格",
"skipLibCheck": "跳过库检查"
}
}
为了能通过命令“npm run dev”运行代码,我们将在package.json中添加这个脚本。
{
...
"scripts": {
"dev": "ts-node index.ts" // 开发脚本,运行 ts-node index.ts
}
}
最后,我们将创建一个新的 index.ts 文件并开始写代码
3. 创建注册功能部分
由于本指南的目标是处理工作流而不是深入探讨每一步的工作原理,我们将采用一种叫做 — 关注分离 的方法。
注册流程中有多个关注点,我们将主要功能拆分成多个子功能,只关心每个子功能的输入和输出。
所以,我们在 index.ts 文件中的主要功能如下:
// 注册需要用户的电子邮件、密码和图像文件
// 这可以通过创建一个API并使用类似multer的库来完成
const register = async (email: string, password: string, image: Blob) => {
try {
// 1. 此函数上传图像文件并返回该图像的链接
let imageLink = await uploadImage(image);
// 2. 此函数将用户存入数据库
let savedUser = await saveUser({ email, password, imageLink });
// 3. 此函数向用户发送确认邮件
let result = await sendVerificationEmail(email);
} catch (error) {
console.log("处理失败了");
}
};
现在,我们将尝试模拟每个函数的输入和输出情况。因为每个子功能运行都需要花费一定的时间,我们将使用一个自定义的超时函数来模拟这种情况。
为了做到这一点,让我们把这个代码添加到 index.ts 文件中
function timeout(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
// 1. 这个函数上传图像文件并返回图像链接
const uploadImage = async (image: Blob) => {
await timeout(1000);
return { imageLink: "图像链接" };
};
// 2. 这个函数将用户信息保存到数据库
const saveUser = async ({
email,
password,
imageLink,
}: {
email: string;
password: string;
imageLink: string;
}) => {
await timeout(1000);
return {
id: "1",
email,
password,
image: imageLink,
};
};
// 3. 这个函数向用户发送验证邮件
const sendVerificationEmail = async ({ email }: { email: string }) => {
await timeout(1000);
console.log("邮件已成功发送至 " + email + "。");
};
关于这种方法的问题
即使这个过程成功完成,还是有一些问题需要处理,促使我们创建一个处理这些问题的流程。
第一个问题是缺少重试机制。所以,例如,如果由于连接问题存储服务没有响应,整个流程就会失败。这时你可能会说,我们可以尝试在流程失败时重新运行它,虽然这个解决方案有效,但它仍有一个缺点,即如果错误不是暂时的连接问题,就会进入无限循环。
第二个问题是,例如,当第二步失败时,我们需要重新从第一步开始运行所有步骤。所以,如果第一步很复杂,需要10秒钟才能运行,那么每次后续步骤失败时,都需要重新运行第一步。
接下来,我们将使用建造者模式创建一个类,来解决所有提到的问题,这个模式简洁实用。
创建我们的工作流类(workflow类)根据我们看到的问题,我们的班级应该能够符合几个条件:
- 我们可以创建一个函数管道
- 每个函数的输出可以作为下一个函数的输入
- 如果一个函数抛出错误,应从该函数开始重新尝试
- 工作流可以设置重试次数限制(这意味着如果设置了3次重试限制,一旦达到3次仍然失败,整个工作流将会被终止)
执行工作流中的各个步骤
让我们创建一个新的 /core/workflow.ts 并从其中导出一个类
export class 工作流 {
private 重试次数限制: number;
constructor(重试次数限制: number = 3) {
this.重试次数限制 = 重试次数限制;
}
static 创建工作流程实例(
重试次数限制: number,
回调函数: (工作流: 工作流) => void
): 工作流 {
const 实例 = new 工作流(重试次数限制);
回调函数(实例);
return 实例;
}
async 执行(): Promise<void> {}
}
我们的类中有一个“createWorkflow”方法,可以设置重试次数的限制,并将创建的工作流实例传递给回调函数。调用“run”方法将触发工作流的开始。
我们现在一个个开始看看能不能符合这些条件。
1- 我们可以设立一个函数的流水线。我们希望我们的类可以创建任意数量的步骤(函数),并且可以创建最终步骤。为此,我们将添加两个方法,‘create’方法会创建一个新的StepFunction,以及‘finally’方法将设置最终的FinallyFunction。让我们修改我们的类。
type StepFunction = (input: any) => Promise<any>;
type FinallyFunction = (input: any) => void;
/**
* 这是一个用于实现工作流处理的类
*/
export class Workflow {
private steps: StepFunction[] = [];
private retryLimit: number;
private finallyCallback?: FinallyFunction;
constructor(retryLimit: number = 3) {
this.retryLimit = retryLimit;
// 构造函数用于初始化工作流实例,并设置重试限制
}
/**
* 创建一个工作流实例并调用回调函数传递该实例
*/
static createWorkflow(
retryLimit: number,
callback: (workflow: Workflow) => void
): Workflow {
const workflow = new Workflow(retryLimit);
callback(workflow);
return workflow;
}
/**
* 创建一个步骤函数,并将其添加到步骤数组中
*/
create(stepFunction: StepFunction): this {
this.steps.push(stepFunction);
return this;
}
/**
* 设置一个最终回调函数
*/
finally(callback: FinallyFunction) {
this.finallyCallback = callback;
}
/**
* 异步执行工作流,返回一个任何类型的Promise对象
*/
async run(): Promise<void> {}
}
你可能已经注意到了 “create” 方法返回了“this”,其目的是实现链式调用(你会看到它是如何工作的)。
对于其他情况,我们将修改我们程序的“运行”方法。
2- 每个函数的输出都能够传递给下一个函数作为输入 // core/workflow.ts
async run(initialInput?: any): Promise<void> {
// 提供初始输入给第一个步骤来启动流程
let input = initialInput;
for (let i = 0; i < this.steps.length; i++) {
const step = this.steps[i];
try {
// 将输入传递给下一个步骤
input = await step(input);
} catch (error) {
console.error("因步骤出错导致工作流中断。");
break;
}
}
if (this.finallyCallback) {
try {
this.finallyCallback(input);
} catch (error) {
console.error("最终步骤出错:", error);
}
}
}
3- 处理错误与重试
为了处理重试情况,我们将不再抛出错误来终止工作流,而是会重新运行这个步骤,直到达到重试次数的上限。为此,我们将按照以下方式调整我们的方法。
// core/workflow.ts
async run(initialInput?: any): Promise<void> {
// 提供初始输入给第一个步骤执行
let input = initialInput;
let attempts = 0;
let success = false;
for (let i = 0; i < this.steps.length; i++) {
const step = this.steps[i];
// 在未达到重试限制并且步骤未成功的情况下,继续重试
// 这也确保了我们从失败的步骤重新开始执行
while (attempts < this.retryLimit && !success) {
try {
// 传递步骤的输入到下一个步骤
input = await step(input);
success = true;
} catch (error) {
attempts++;
if (attempts === this.retryLimit) {
console.error(
`步骤 ${i + 1} 在尝试了 ${attempts} 次后失败,错误信息为:`,
error
);
}
}
}
// 如果在所有尝试之后步骤仍然失败,工作流将被中断并终止
if (!success) {
console.error("由于步骤失败,工作流将被中断并终止。");
break;
}
}
if (this.finallyCallback) {
try {
this.finallyCallback(input);
} catch (error) {
console.error("在最终步骤中出现错误:", error);
}
}
}
这样一来,我们的工作流类就符合所有条件并解决了所有问题。
使用我们的工作流对象在这里,我们将用我们自己的类来处理注册过程。
import { Workflow } from "./core/workflow";
const register = (email: string, password: string, image: Blob) => {
Workflow.createWorkflow(3, (workflow) => {
workflow
.create(上传图片功能)
.create(({ imageLink }) =>
保存用户信息({
email,
password,
imageLink,
})
)
.finally(发送验证邮件的步骤);
}).run(image);
};
register("[email protected]", "password", new Blob());
尽管这种方法完全可行,但建议保持所有步骤的输入和输出的可见性。我们可以这样调整注册功能:
const register = (email: string, password: string, image: Blob) => {
Workflow.createWorkflow(3, (workflow) => {
workflow
.create(async (image) => {
let imageLink = await uploadImage(image);
return { imageLink };
})
.create(async ({ imageLink }) => {
let user = await saveUser({
email,
password,
imageLink,
});
return user; // 返回 { id, email, password, image }
})
.finally(async ({ email }) => {
await sendVerificationEmail给({ email });
// 如果你在API中使用这个工作流,你可以在最后这里响应
// res.status(200).send("用户创建成功")
});
}).run(image);
};
下一步会是什么?
我知道!这本指南太枯燥了,细节太多。当然,你可以直接查看 workflow.ts 文件的代码,并了解最后部分的使用方法。编写这个指南的目的就是让你理解代码的每个部分,这样你就能自己动手修改了。
例如,你可以在这个流程中添加一个额外的条件,来测试自己是否理解。
如果在多次尝试后仍有一步失败,工作流应执行回滚。
试着在相同的注册过程中测试它,通过在保存用户时触发错误,并通过回滚删除上传到存储的图片文件。
感谢阅读!请随时欢迎提供您的反馈,告诉我们怎样可以做得更好🙏共同學習,寫下你的評論
評論加載中...
作者其他優質文章