1 回答

TA貢獻1982條經驗 獲得超2個贊
我想知道 spring 批處理是否能夠在單個作業中讀取由不同格式組成的多個 CSV 文件?
是的,您可以有一個包含多個步驟的作業,每個步驟處理一個給定類型的文件。關鍵是如何設計工作。您可以應用的一種技術是使用臨時表。批處理作業可以創建臨時臨時表,在其中加載所需的所有數據,然后在完成后將其刪除。
在您的情況下,您可以通過兩個步驟將每個文件加載到特定的暫存表中。每個步驟都可以應用特定于每個文件的驗證邏輯。如果這些步驟之一失敗,您的工作就會失敗。臨時表可以有一個用于無效記錄的標記列(這對報告很有用)。
完成這兩個準備步驟后,您可以在另一個步驟中從兩個臨時表中讀取數據,并對連接的數據應用交叉驗證規則(例如,從兩個表中選擇并通過BatchId和連接PersonId)。如果此步驟失敗,則作業失敗。否則,您將在適當的地方寫入數據。
這種技術的優點是數據在整個作業期間都可以在臨時表中使用。因此,只要驗證步驟失敗,您就可以使用流將失敗的步驟重定向到“報告步驟”(讀取無效數據并發送報告),然后使作業失敗。這是您可以使用的自包含示例:
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
@EnableBatchProcessing
public class FlowJobSample {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Step personLoadingStep() {
return steps.get("personLoadingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("personLoadingStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step addressLoadingStep() {
return steps.get("addressLoadingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("addressLoadingStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step crossValidationStep() {
return steps.get("crossValidationStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("crossValidationStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Step reportingStep() {
return steps.get("reportingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("reportingStep");
return RepeatStatus.FINISHED;
})
.build();
}
@Bean
public Job job() {
return jobs.get("job")
.start(personLoadingStep()).on("INVALID").to(reportingStep())
.from(personLoadingStep()).on("*").to(addressLoadingStep())
.from(addressLoadingStep()).on("INVALID").to(reportingStep())
.from(addressLoadingStep()).on("*").to(crossValidationStep())
.from(crossValidationStep()).on("INVALID").to(reportingStep())
.from(crossValidationStep()).on("*").end()
.from(reportingStep()).on("*").fail()
.build()
.build();
}
public static void main(String[] args) throws Exception {
ApplicationContext context = new AnnotationConfigApplicationContext(FlowJobSample.class);
JobLauncher jobLauncher = context.getBean(JobLauncher.class);
Job job = context.getBean(Job.class);
jobLauncher.run(job, new JobParameters());
}
}
要使其中一個步驟失敗,請將退出狀態設置為INVALID,例如:
@Bean
public Step personLoadingStep() {
return steps.get("personLoadingStep")
.tasklet((contribution, chunkContext) -> {
System.out.println("personLoadingStep");
chunkContext.getStepContext().getStepExecution().setExitStatus(new ExitStatus("INVALID"));
return RepeatStatus.FINISHED;
})
.build();
}
我希望這有幫助。
添加回答
舉報