Spring Batch 教程简单教程

lxf2023-05-12 00:59:24

在企业应用中,批处理很常见。但随着数据在互联网上变得越来越普遍,我们如何处理这些数据也变得很重要。有多种解决方案可用。Apache Storm或Apache Spark有助于以所需格式处理和转换数据。在这篇文章中,我们将更仔细地研究 Spring Batch。

什么是Spring Batch?

Spring Batch 是一个旨在促进批处理的轻量级框架。它允许开发人员创建批处理应用程序。反过来,这些批处理应用程序处理传入的数据并将其转换以供进一步使用。

使用Spring Batch的另一大优势是它允许对这些数据进行高性能处理。对于严重依赖数据的应用程序,数据即时可用至关重要。

Spring Batch 允许开发人员使用基于 POJO 的方法。在这种方法中,开发人员可以将批处理数据转换为数据模型,她可以进一步将其用于应用程序业务逻辑。

在这篇文章中,我将介绍一个示例,在该示例中,我们将批处理员工记录的数据密集型 CSV 文件,并转换、验证该数据以加载到我们的数据库中。

什么是批处理?

批处理是一种数据处理方式。它涉及使用所有数据、处理数据、转换数据,然后将其发送到另一个数据源。通常,这是通过自动化作业完成的。触发系统或用户触发作业,并且该作业处理作业定义。作业定义将是关于使用来自其源的数据。

批处理的主要优点是它可以处理大量数据。然而,这个操作可以是异步的。大多数应用程序独立于实时用户交互执行批处理。

接下来,我们将了解 Spring Batch 框架及其组成。

Spring Batch Framework

以下架构显示了 Spring Batch 框架的组件

Spring Batch 教程简单教程

首先,批处理涉及一个作业。用户安排作业在特定时间或基于特定条件运行。这也可能涉及作业触发器。

Spring Batch 框架还包括

  • 日志和追踪
  • 交易管理
  • job处理统计
  • job重启
  • 资源管理

通常,当您配置作业时,它会保存在作业存储库中。Job Repository 保存所有作业的元数据信息。触发器在预定时间启动这些作业。

A job launcher是在作业的预定时间到达时启动作业或运行作业的接口。

Job由作业参数定义。当作业开始时,作业实例会为该作业运行。作业实例的每次执行都有作业执行,它会跟踪作业的状态。一个作业可以有多个步骤。

Step是作业的一个独立阶段。一项工作可以由多个步骤组成。与作业类似,每个步骤都有执行步骤的步骤执行并跟踪步骤的状态。

每个步骤都有一个item reader基本上读取输入数据的步骤,一个item processor处理数据并转换它的步骤,以及一个item writer获取处理后的数据并将其输出的步骤。

现在,让我们在演示中查看所有这些组件。

一个简单的 Spring Batch 教程

作为演示的一部分,我们将通过 Spring Batch Framework 上传一个 csv 文件。因此,首先,创建 spring 项目并添加以下依赖项:

implementation 'org.springframework.boot:spring-boot-starter-batch'

这是我们项目的主要依赖。主应用程序也如下所示:

package com.betterjavacode.springbatchdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class SpringbatchdemoApplication
{

    public static void main(String[] args)
    {
        SpringApplication.run(SpringbatchdemoApplication.class, args);
    }

}

创建 DTO 对象

我将通过 CSV 文件上传员工数据,因此我将为员工创建 DTO 对象,如下所示:

package com.betterjavacode.springbatchdemo.dtos;

import com.betterjavacode.springbatchdemo.models.Company;
import com.betterjavacode.springbatchdemo.models.Employee;
import com.betterjavacode.springbatchdemo.repositories.CompanyRepository;
import org.springframework.beans.factory.annotation.Autowired;


import java.io.Serializable;

public class EmployeeDto implements Serializable
{
    private static final long serialVersionUID = 710566148641281929L;

    @Autowired
    public CompanyRepository companyRepository;

    private int employeeId;
    private int companyId;
    private String firstName;
    private String lastName;
    private String email;
    private String jobTitle;

    public EmployeeDto()
    {

    }

    public EmployeeDto(int employeeId, String firstName, String lastName, String email,
                        String jobTitle, int companyId)
    {
        this.employeeId = employeeId;
        this.firstName = firstName;
        this.lastName = lastName;
        this.email = email;
        this.jobTitle = jobTitle;
        this.companyId = companyId;
    }

    public Employee employeeDtoToEmployee()
    {
        Employee employee = new Employee();
        employee.setEmployeeId(this.employeeId);
        employee.setFirstName(this.firstName);
        employee.setLastName(this.lastName);
        employee.setEmail(this.email);
        Company company = companyRepository.findById(this.companyId).get();
        employee.setCompany(company);
        employee.setJobTitle(this.jobTitle);
        return employee;
    }

    public int getEmployeeId ()
    {
        return employeeId;
    }

    public void setEmployeeId (int employeeId)
    {
        this.employeeId = employeeId;
    }

    public int getCompanyId ()
    {
        return companyId;
    }

    public void setCompanyId (int companyId)
    {
        this.companyId = companyId;
    }

    public String getFirstName ()
    {
        return firstName;
    }

    public void setFirstName (String firstName)
    {
        this.firstName = firstName;
    }

    public String getLastName ()
    {
        return lastName;
    }

    public void setLastName (String lastName)
    {
        this.lastName = lastName;
    }

    public String getEmail ()
    {
        return email;
    }

    public void setEmail (String email)
    {
        this.email = email;
    }

    public String getJobTitle ()
    {
        return jobTitle;
    }

    public void setJobTitle (String jobTitle)
    {
        this.jobTitle = jobTitle;
    }
}

此 DTO 类还使用存储库CompanyRepository来获取公司对象并将 DTO 转换为数据库对象。

设置 Spring Batch 配置

现在,我们将为我们的作业设置批处理配置,该作业将运行以将 CSV 文件上传到数据库中。我们的类BatchConfig包含一个注解@EnableBatchProcessing。此注释启用 Spring Batch 功能并提供基本配置以在类中设置批处理作业@Configuration

@Configuration
@EnableBatchProcessing
public class BatchConfig
{

}

此批处理配置将包括我们作业的定义、作业中涉及的步骤。它还将包括我们希望如何读取文件数据并进一步处理它。

    @Bean
    public Job processJob(Step step)
    {
        return jobBuilderFactory.get("processJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .flow(step).end().build();
    }

    @Bean
    public Step orderStep1(JdbcBatchItemWriter writer)
    {
        return stepBuilderFactory.get("orderStep1").<EmployeeDto, EmployeeDto> chunk(10)
                .reader(flatFileItemReader())
                .processor(employeeItemProcessor())
                .writer(writer).build();
    }

上面的 bean 声明作业processJob.incrementer添加作业参数。listener将听取工作并处理工作状态。侦听器的 bean 将处理作业完成或作业失败通知。正如 Spring Batch 架构中所讨论的,每个作业都包含多个步骤。

@Beanfor step 用于stepBuilderFactory创建一个步骤。此步骤处理大小为 10 的数据块。它有一个 Flat File Reader flatFileItemReader()。处理器employeeItemReader将处理 Flat File Item Reader 读取的数据。

    @Bean
    public FlatFileItemReader flatFileItemReader()
    {
        return new FlatFileItemReaderBuilder()
                .name("flatFileItemReader")
                .resource(new ClassPathResource("input/employeedata.csv"))
                .delimited()
                .names(format)
                .linesToSkip(1)
                .lineMapper(lineMapper())
                .fieldSetMapper(new BeanWrapperFieldSetMapper(){{
                    setTargetType(EmployeeDto.class);
                }})
                .build();
    }

    @Bean
    public LineMapper lineMapper()
    {
        final DefaultLineMapper defaultLineMapper = new DefaultLineMapper<>();
        final DelimitedLineTokenizer delimitedLineTokenizer = new DelimitedLineTokenizer();
        delimitedLineTokenizer.setDelimiter(",");
        delimitedLineTokenizer.setStrict(false);
        delimitedLineTokenizer.setNames(format);

        defaultLineMapper.setLineTokenizer(delimitedLineTokenizer);
        defaultLineMapper.setFieldSetMapper(employeeDtoFieldSetMapper);

        return defaultLineMapper;
    }

    @Bean
    public EmployeeItemProcessor employeeItemProcessor()
    {
        return new EmployeeItemProcessor();
    }

    @Bean
    public JobExecutionListener listener()
    {
        return new JobCompletionListener();
    }

    @Bean
    public JdbcBatchItemWriter writer(final DataSource dataSource)
    {
        return new JdbcBatchItemWriterBuilder()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<>())
                .sql("INSERT INTO employee(employeeId, firstName, lastName, jobTitle, email, " +
                        "companyId) VALUES(:employeeId, :firstName, :lastName, :jobTitle, :email," +
                        " " +
                        ":companyId)")
                .dataSource(dataSource)
                .build();
    }

我们现在将看看这些 bean 中的每一个。

FlatFileItemReader将从平面文件中读取数据。我们正在使用 FlatFileItemReaderBuilder 创建一个 EmployeeDto 类型的 FlatFileItemReader。

resource指示文件的位置。

delimited– 这构建了一个带分隔符的分词器。

names– 将显示文件中字段的顺序。

lineMapper是将行从文件映射到域对象的接口。

fieldSetMapper将数据从 fieldset 映射到一个对象。

lineMapperbean 需要 tokenizer 和 fieldsetmapper。

employeeDtoFieldSetMapper是我们在这个类中自动装配的另一个 bean。


package com.betterjavacode.springbatchdemo.configurations.processor;

import com.betterjavacode.springbatchdemo.dtos.EmployeeDto;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindException;

@Component
public class EmployeeDtoFieldSetMapper implements FieldSetMapper
{

    @Override
    public EmployeeDto mapFieldSet (FieldSet fieldSet) throws BindException
    {
        int employeeId = fieldSet.readInt("employeeId");
        String firstName = fieldSet.readRawString("firstName");
        String lastName = fieldSet.readRawString("lastName");
        String jobTitle = fieldSet.readRawString("jobTitle");
        String email = fieldSet.readRawString("email");
        int companyId = fieldSet.readInt("companyId");

        return new EmployeeDto(employeeId, firstName, lastName, jobTitle, email, companyId);
    }
}

如您所见,此 FieldSetMapper 将字段映射到各个对象以创建一个EmployeeDto.

EmployeeItemProcessor实现接口 ItemProcessor。基本上在这个类中,我们验证 EmployeeDto 数据以验证员工所属的公司是否存在。

JobCompletionListener检查作业完成状态。

    @Override
    public void afterJob(JobExecution jobExecution)
    {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED)
        {
            // Log statement
            System.out.println("BATCH JOB COMPLETED SUCCESSFULLY");
        }
    }

现在,让我们来看看ItemWriter。这个bean基本上使用JdbcBatchItemWriterJdbcBatchItemWriter使用 INSERT sql 语句将处理后的 EmployeeDto 数据插入到配置的数据源中。

配置应用程序属性

在我们运行我们的应用程序来处理文件之前,让我们看一下application.properties.


spring.datasource.url=jdbc:mysql://127.0.0.1/springbatchdemo?autoReconnect=true&useSSL=false
spring.datasource.username = root
spring.datasource.password=*******
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.jpa.show-sql=true
spring.jpa.properties.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
spring.datasource.hikari.connection-test-query=SELECT 1
spring.batch.initialize-schema=ALWAYS

除了常规的数据源属性,我们还应该了解 property spring.batch.initialize-schema=ALWAYS。如果我们不使用这个属性并启动应用程序,应用程序就会报错Table batch_job_instance doesn't exist

为避免此错误,我们基本上告诉您在启动期间创建与批处理作业相关的元数据。此属性将在您的数据库中创建其他数据库表batch_job_execution,如batch_job_execution_contextbatch_job_execution_paramsbatch_job_instance等。

演示

现在,如果我执行我的 Spring Boot 应用程序,它将运行并执行作业。有多种方法可以触发作业。在企业应用程序中,您将在某种存储位置(S3 或 Amazon SNS-SQS)中收到文件或数据,您将有一个作业将监视此位置以触发文件加载 Spring Batch 作业。

Spring Batch 教程简单教程

您可以在执行中看到有关作业完成的消息 –  “BATCH JOB COMPLETED SUCCESSFULLY“ 。如果我们检查我们的数据库表,我们将看到加载的数据。
Spring Batch 教程简单教程

更多功能

我在这里介绍了 Spring Batch 教程,但这还不是全部。Spring Batch 的内容远不止这个介绍性部分。您可以有不同的输入数据源,也可以使用各种数据处理规则将数据从一个文件加载到另一个文件。

还有一些方法可以使这些作业自动化并以高效的方式处理大量数据。

结论

在这篇文章中,我逐步展示了 Spring Batch 教程。有很多方法可以处理批处理作业,但 Spring Batch 使这变得非常简单。

本网站是一个以CSS、JavaScript、Vue、HTML为核心的前端开发技术网站。我们致力于为广大前端开发者提供专业、全面、实用的前端开发知识和技术支持。 在本网站中,您可以学习到最新的前端开发技术,了解前端开发的最新趋势和最佳实践。我们提供丰富的教程和案例,让您可以快速掌握前端开发的核心技术和流程。 本网站还提供一系列实用的工具和插件,帮助您更加高效地进行前端开发工作。我们提供的工具和插件都经过精心设计和优化,可以帮助您节省时间和精力,提升开发效率。 除此之外,本网站还拥有一个活跃的社区,您可以在社区中与其他前端开发者交流技术、分享经验、解决问题。我们相信,社区的力量可以帮助您更好地成长和进步。 在本网站中,您可以找到您需要的一切前端开发资源,让您成为一名更加优秀的前端开发者。欢迎您加入我们的大家庭,一起探索前端开发的无限可能!