Posts Tagged ‘batch

28
Dec
13

The 3 R’s of Spring Batch 3.0.x with Annotations

This page describes how you can Read, wRrite and perform aRithmetic on flat files using The Spring Batch Framework. We will take a comma separated file (csv) that contain employee information, add some information to it, and write it back to the file system.

Full downloadable source for this page is available here. Corrections and enhancements are welcome, fork, change and push back to GitHub.

The basic building blocks of any batch process is

  1. Reading a Item
  2. Performing an operation on it
  3. Writing the Item back

Please take some time to review The Domain Language of Batch before proceeding. It covers much of the fundamental concepts we will be covering here.

Batch Steps

This page is focused on an individual step of the batch process.

The following is from the spring batch documentation

A Step is a domain object that encapsulates an independent, sequential phase of a batch job. Therefore, every Job is composed entirely of one or more steps. A Step contains all of the information necessary to define and control the actual batch processing. This is a necessarily vague description because the contents of any given Step are at the discretion of the developer writing a Job. A Step can be as simple or complex as the developer desires. A simple Step might load data from a file into the database, requiring little or no code. (depending upon the implementations used) A more complex Step may have complicated business rules that are applied as part of the processing.

Step Processing types

There are 2 ways a step can process data,

Tasklet

If the step requires only to execute a single task then you can use a tasklet. Typical use case for this is when you need to run a stored procedure, or copy a file from one location to the other. In the “Hello World” example we used a Tasklet to print the message to the console.

Chunk oriented

Chunk oriented processing involves specifying a reader, processor and writer. The input is read one item at a time in sequence and passed to the processor and eventually to the writer in chunks within a transaction boundary. Once the commit interval is reached the items are committed to the writer. Chunk oriented processing is what we will cover on this page.

Library Versions

  • Spring Batch 3.0.0-M3 or above

Input Data

The following is the input csv file that will be read. Please create the following file in the projects resource directory.

vi src/main/resources/input_data.txt

7876,ADAMS,CLERK,1100
7499,ALLEN,SALESMAN,1600
7698,BLAKE,MANAGER,2850
7782,CLARK,MANAGER,2450
7902,FORD,ANALYST,3000
7900,JAMES,CLERK,950
7566,JONES,MANAGER,2975
7839,KING,PRESIDENT,-5000
7654,MARTIN,SALESMAN,1250
7934,MILLER,CLERK,1300
7788,SCOTT,ANALYST,3000
7369,SMITH,CLERK,800
7844,TURNER,SALESMAN,1500
7521,WARD,SALESMAN,1250

Employee Bean

This is a simple bean that represents a single Employee.

vi src/main/java/com/test/Employee.java

package com.test;

public class Employee {

	private Integer empId;
	private String lastName;
	private String title;
	private Integer salary;
	private String rank;
	
	public Integer getEmpId() {
		return empId;
	}
	public void setEmpId(Integer empId) {
		this.empId = empId;
	}
	public String getLastName() {
		return lastName;
	}
	public void setLastName(String lastName) {
		this.lastName = lastName;
	}
	public String getTitle() {
		return title;
	}
	public void setTitle(String title) {
		this.title = title;
	}
	public Integer getSalary() {
		return salary;
	}
	public void setSalary(Integer salary) {
		this.salary = salary;
	}
	public void setRank(String rank) {
		this.rank = rank;
	}
	public String getRank() {
		return rank;
	}
	@Override
	public String toString() {
		return "Employee [empId=" + empId + ", lastName=" + lastName
				+ ", title=" + title + ", salary=" + salary + ", rank=" + rank
				+ "]";
	}	
}

Reading

The reader is configured in the ThreeRJobConfig.java ( see reader() method )

Arithmetic

Not really! All we are doing is assigning a Rank based on the salary amount. The item processor takes an input Bean and converts it to an output bean. In this case the beans are the same but they don’t have to be.

vi src/main/java/com/test/EmployeeProcessor.java

package com.test;

import org.springframework.batch.item.ItemProcessor;

public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {

	public Employee process(Employee emp) throws Exception {
		// if salary >= 2500 then set rank as "Director"		
		if(emp.getSalary() >= 2500 ) {
			emp.setRank("Director");			
		} else {
			emp.setRank("N/A");
		}
		return emp;
	}

}

Writing

The reader is configured in the ThreeRJobConfig.java ( See writer() method )

Job Configuration

vi src/main/java/com/test/config/ThreeRJobConfig.java

package com.test.config;

import java.io.File;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileItemWriter;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor;
import org.springframework.batch.item.file.transform.DelimitedLineAggregator;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;

import com.test.Employee;
import com.test.EmployeeProcessor;

@Configuration
@Import(StandaloneInfrastructureConfiguration.class)
public class ThreeRJobConfig {

	@Autowired
	private JobBuilderFactory jobBuilders;
	
	@Autowired
	private StepBuilderFactory stepBuilders;
	
	@Autowired
	private InfrastructureConfiguration infrastructureConfiguration;
	
	@Autowired
	private DataSource dataSource; // just for show...
	
	@Bean
	public Job threeRJob(){
		return jobBuilders.get("threeRJob")
				.start(step())
				.build();
	}
	
	@Bean
	public Step step(){
		return stepBuilders.get("step")
				.<Employee,Employee>chunk(1)
				.reader(reader())
				.processor(processor())
				.writer(writer())
				.build();
	}

	private ItemWriter<Employee> writer() {
		FlatFileItemWriter<Employee> itemWriter = new FlatFileItemWriter<Employee>();
		DelimitedLineAggregator<Employee> la = new DelimitedLineAggregator<Employee>();
		la.setDelimiter(",");
		BeanWrapperFieldExtractor<Employee> fieldExtractor = new BeanWrapperFieldExtractor<Employee>();
		fieldExtractor.setNames(new String[]{"empId","lastName","title","salary","rank"});
		la.setFieldExtractor(fieldExtractor);
		itemWriter.setLineAggregator(la);
		
		itemWriter.setResource(new FileSystemResource(new File("target/output_data.txt")));
		return itemWriter;
	}

	private ItemProcessor<Employee,Employee> processor() {
		return new EmployeeProcessor();
	}

	private ItemReader<Employee> reader() {
		FlatFileItemReader<Employee> itemReader = new FlatFileItemReader<Employee>();
		itemReader.setLineMapper(lineMapper());
		itemReader.setResource(new ClassPathResource("input_data.txt"));
		return itemReader;
	}

	private LineMapper<Employee> lineMapper() {
		DefaultLineMapper<Employee> lineMapper = new DefaultLineMapper<Employee>();
		DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
		lineTokenizer.setNames(new String[]{"empId","lastName","title","salary"});
		lineTokenizer.setIncludedFields(new int[]{0,1,2,3});
		BeanWrapperFieldSetMapper<Employee> fieldSetMapper = new BeanWrapperFieldSetMapper<Employee>();
		fieldSetMapper.setTargetType(Employee.class);
		lineMapper.setLineTokenizer(lineTokenizer);
		lineMapper.setFieldSetMapper(fieldSetMapper);
		return lineMapper;
	}
	

}

Execute the job

Go to the command line and type the following:

mvn compile exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="com.test.config.ThreeRJobConfig threeRJob"

View the Results

The output file will appear in the target/ folder of the project.

Further Reading

To keep things simple we were reading and writing files located in the project own folders. There are many enterprise design patterns that describe the best practices for feeding data into the batch programs. For further reading on this topic please see the Spring Integrations Framework Homepage.

27
Dec
13

Hello World With Spring Batch 3.0.x with Pure Annotations

This page describes how to get a Spring Batch application to print hello world to the console. This page provides a stepping stone to help you get up and running quickly. Since this is a quick and dirty method of getting up and running with spring batch, it does not cover the fundamental concepts. For further information please visit the Spring Batch project documentation site.

This page shows the latest techniques of configuring spring batch using pure java annotations. This results in a significant reduction in work necessary to get a spring batch job running.

An older version of the page is available here. https://numberformat.wordpress.com/2010/02/05/hello-world-with-spring-batch/

Full downloadable source for this page is available here. Corrections and enhancements are welcome, fork, change and push back to GitHub.

This page takes about 10 minutes to complete and have a working spring batch application.

Background

The following spring batch example program is the simplest way you could setup a job to run in Spring batch. As such there are some limitations with the following approach.

Use of an In Memory Database

The following program uses an uses in memory database to store information about batch execution runs. This means that there is no protection against duplicate job runs and it does not store when a job was started or completed. Since spring is a pluggable architecture you can always change to use a persistent database like mysql or oracle to store job information. I will describe this process in a future blog entry.

If you use scheduling tools like Autosys you should already have a system that maintains information about job executions. These tools would maintain the history of past runs and if the job succeeded or not etc…so using an in-memory database should not big deal.

Library Versions

  • Spring Batch 3.0.0-M3 or above

Modify the pom.xml

vi pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.test</groupId>
	<artifactId>spring-batch-helloworld</artifactId>
	<version>20131227</version>
	<name>spring batch hello world</name>
	<packaging>jar</packaging>

	<pluginRepositories>
		<pluginRepository> <!-- Ignore this repository. Its only used for document publication. -->
			<id>numberformat-releases</id>
			<url>https://raw.github.com/numberformat/20130213/master/repo</url>
		</pluginRepository>
	</pluginRepositories>

	<properties>
		<spring.framework.version>3.2.1.RELEASE</spring.framework.version>
		<spring.batch.version>3.0.0.M2</spring.batch.version>
	</properties>

	<repositories>
		<repository>
			<id>spring-s3</id>
			<name>Spring Maven MILESTONE Repository</name>
			<url>http://maven.springframework.org/milestone</url>
		</repository>
	</repositories>
	<dependencies>
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
			<version>2.6</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-infrastructure</artifactId>
			<version>${spring.batch.version}</version>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>4.8.2</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-tx</artifactId>
			<version>${spring.framework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jdbc</artifactId>
			<version>${spring.framework.version}</version>
		</dependency>
		<dependency>
			<groupId>hsqldb</groupId>
			<artifactId>hsqldb</artifactId>
			<version>1.8.0.7</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin> <!-- Ignore this plugin. Its only used for document publication. -->
				<groupId>github.numberformat</groupId>
				<artifactId>blog-plugin</artifactId>
				<version>1.0-SNAPSHOT</version>
				<configuration>
					<gitUrl>https://github.com/numberformat/wordpress/tree/master/${project.version}/${project.artifactId}</gitUrl>
				</configuration>
				<executions>
					<execution>
						<id>1</id>
						<phase>site</phase>
						<goals>
							<goal>generate</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>

<!-- Run the application using: 
mvn compile exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="com.test.config.HelloWorldJobConfig helloWorldJob"
-->
</project>

Setup the log4j configuration files. We will be using a very basic file that outputs to the console.
vi src/main/resources/log4j.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
   
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
        <param name="Target" value="System.out"/>
        <param name="Threshold" value="INFO" />
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%d %-5p %c - %m%n"/>
        </layout>
    </appender>
    <logger name="org.springframework" additivity="false">
        <level value="INFO"/>
        <appender-ref ref="CONSOLE"/>
    </logger>
    <root>
        <level value="ERROR"/>
        <appender-ref ref="CONSOLE"/>
    </root>
</log4j:configuration>

The job configuration is stored in the com.test.config package. The configuration for the infrastructure and the job is present in this package.

Config Interface

vi src/main/java/com/test/config/InfrastructureConfiguration.java

package com.test.config;

import javax.sql.DataSource;

import org.springframework.context.annotation.Bean;

public interface InfrastructureConfiguration {

	@Bean
	public abstract DataSource dataSource();

}

The implementation

vi src/main/java/com/test/config/StandaloneInfrastructureConfiguration.java

package com.test.config;

import javax.sql.DataSource;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseBuilder;
import org.springframework.jdbc.datasource.embedded.EmbeddedDatabaseType;

@Configuration
@EnableBatchProcessing
public class StandaloneInfrastructureConfiguration implements InfrastructureConfiguration {
	
	@Bean
	public DataSource dataSource(){
		EmbeddedDatabaseBuilder embeddedDatabaseBuilder = new EmbeddedDatabaseBuilder();
		return embeddedDatabaseBuilder.addScript("classpath:org/springframework/batch/core/schema-drop-hsqldb.sql")
				.addScript("classpath:org/springframework/batch/core/schema-hsqldb.sql")
				.setType(EmbeddedDatabaseType.HSQL)
				.build();
	}

}

Job Configuration

vi src/main/java/com/test/config/HelloWorldJobConfig.java

package com.test.config;

import javax.sql.DataSource;

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

import com.test.HelloWorldTasklet;

@Configuration
@Import(StandaloneInfrastructureConfiguration.class)
public class HelloWorldJobConfig {

	@Autowired
	private JobBuilderFactory jobBuilders;
	
	@Autowired
	private StepBuilderFactory stepBuilders;
	
	@Autowired
	private InfrastructureConfiguration infrastructureConfiguration;
	
	@Autowired
	private DataSource dataSource; // just for show...
	
	@Bean
	public Job helloWorldJob(){
		return jobBuilders.get("helloWorldJob")
				.start(step())
				.build();
	}
	
	@Bean
	public Step step(){
		return stepBuilders.get("step")
				.tasklet(tasklet())
				.build();
	}
	
	@Bean
	public Tasklet tasklet() {
		return new HelloWorldTasklet();
	}
}

The Tasklet

This class does the actual printing of the message to the console.

vi src/main/java/com/test/HelloWorldTasklet.java

package com.test;

import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;

public class HelloWorldTasklet implements Tasklet {
	 
    public RepeatStatus execute(StepContribution arg0, ChunkContext arg1)
            throws Exception {
        System.out.println("");
        System.out.println(" XXX XXX           XX      XX             ");
        System.out.println("  X   X             X       X             ");
        System.out.println("  X   X             X       X             ");
        System.out.println("  X   X   XXXXX     X       X     XXXXX   ");
        System.out.println("  XXXXX  X     X    X       X    X     X  ");
        System.out.println("  X   X  XXXXXXX    X       X    X     X  ");
        System.out.println("  X   X  X          X       X    X     X  ");
        System.out.println("  X   X  X     X    X       X    X     X  ");
        System.out.println(" XXX XXX  XXXXX   XXXXX   XXXXX   XXXXX   ");
        System.out.println("                                          ");
        System.out.println("                                          ");
        System.out.println("                                          ");
        System.out.println("                                          ");
        System.out.println(" XXX XXX                   XX        XX   ");
        System.out.println("  X   X                     X         X   ");
        System.out.println("  X   X                     X         X   ");
        System.out.println("  X   X   XXXXX  XXX XX     X     XXXXX   ");
        System.out.println("  X X X  X     X   XX  X    X    X    X   ");
        System.out.println("  X X X  X     X   X        X    X    X   ");
        System.out.println("  X X X  X     X   X        X    X    X   ");
        System.out.println("   X X   X     X   X        X    X    X   ");
        System.out.println("   X X    XXXXX  XXXXX    XXXXX   XXXXXX  ");
        System.out.println("");
        return RepeatStatus.FINISHED;
    }
}

Execute the Job

To run the job from the command line type the following.

mvn compile exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="com.test.config.HelloWorldJobConfig helloWorldJob"

Deploying the application

The following tutorial describes how to Package and deploy the application as a self contained jar. (just be mindful to change the commandline args to the one you see here.)

What’s Next?

In the next few articles I plan on describing how to:

  1. Read and write flat files.
  2. Write header and footer records in the output file.
  3. Replace the in-memory database with a HyperSQL Java database so we can have job information persist between job invocations.
  4. Throw an exception in the middle of a large batch job and restart the job execution from the point where it left off.
  5. Use validation framework like “commons-validator” to perform input file validation and create reject records for manual correction and later processing.
24
Apr
12

Shutting Down Spring Batch Jobs Gracefully

This page describes how to exit batch jobs gracefully when the system sends the SIGINT to the JVM. If we ignore these signals the jobs will be left hanging. The solution is to request the Spring Job-operator to gracefully stop the job and in the process save any intermediate data to the spring batch status tables.

Requirements

  • Fully working Spring Batch application
  • Configured Spring batch Job Repository

Start by adding the following to the spring configuration file:

<beans:bean id="jobRegistry" 
	class="org.springframework.batch.core.configuration.support.MapJobRegistry" />
<beans:bean id="jobRegistryBeanPostProcessor"
	class="org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor">
    <beans:property name="jobRegistry" ref="jobRegistry"/>
</beans:bean>	
<beans:bean id="jobOperator" 
	class="org.springframework.batch.core.launch.support.SimpleJobOperator">
		<beans:property name="jobExplorer" ref="jobExplorer"/>
		<beans:property name="jobRepository" ref="jobRepository" />
		<beans:property name="jobRegistry" ref="jobRegistry" />
		<beans:property name="jobLauncher" ref="jobLauncher" />
</beans:bean>
<beans:bean id="jobExplorer" 
	class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
	<beans:property name="dataSource" ref="dataSource"/>
</beans:bean>
<beans:bean name="processShutdownListener" 
	class="com.test.batch.listeners.ProcessShutdownListener">
	<beans:property name="jobOperator" ref="jobOperator"/>
</beans:bean>

Inside the job.xml

	<listeners>
		<listener ref="processShutdownListener"/>
	</listeners>

And finally the listener

package com.test.batch.listeners;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.launch.JobExecutionNotRunningException;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.launch.NoSuchJobExecutionException;

/**
 * This class listens to events from the Operating System requesting the Batch
 * to shutdown. For example when the user hits CTRL-C or the system is shutting
 * down. If we ignore these signals the jobs will be left hanging. This class
 * attempts to remedy this situation by requesting the JobOperator to gracefully
 * stop a job when the JVM calls the shutdown hook.
 */
public class ProcessShutdownListener implements JobExecutionListener {
	private static final Log logger = 
		LogFactory.getLog(ProcessShutdownListener.class);
	
	private JobOperator jobOperator;
	
	@Override
	public void afterJob(JobExecution jobExecution) { /* do nothing. */ }

	@Override
	public void beforeJob(final JobExecution jobExecution) {
		Runtime.getRuntime().addShutdownHook(new Thread() {
			@Override
			public void run() {
				super.run();
				try {
					jobOperator.stop(jobExecution.getId());
					while(jobExecution.isRunning()) {
						logger.info("waiting for job to stop...");
						try {Thread.sleep(100);} catch (InterruptedException e) {}
					}
				} catch (NoSuchJobExecutionException e) { // ignore
				} catch (JobExecutionNotRunningException e) { // ignore
				}
			}
		});
	}

	public void setJobOperator(JobOperator jobOperator) {
		this.jobOperator = jobOperator;
	}

}
17
Oct
11

Spring Batch Validation

This page demonstrates the process of using Spring Batch and the Apache Commons Validator Framework to validate each record in a flat file. To keep this tutorial brief we will stop processing the file and print out the validation failure message on the first occurrence.

Background

The Commons Validator Framework is composed of a core set of classes responsible for validating data. Since the framework is versatile and does not make assumptions about implementation, you can’t use it out of the box unless you write wrapper code to adapt it to your needs.

Years ago the popular Struts framework had provided a set of wrapper classes in the “org.apache.struts.validator” package. The spring MVC framework had made similar adaptations. I had found it strange that the makers of spring batch did not have a drop in replacement so that had lead me to write this page.

Requirements

This tutorial continues from where the previous one left of so please complete the examples in those articles first before proceeding.

Procedure

Start by adding the following dependencies into your pom.xml file.

pom.xml

<!-- The commons Validator Framework -->
	<dependency>
	    <groupId>commons-validator</groupId>
	    <artifactId>commons-validator</artifactId>
	    <version>1.1.4</version>
	</dependency>
<!-- Spring Wrapper classes for the Validator Framework -->
        <dependency>
	    <groupId>org.springmodules</groupId>
	    <artifactId>spring-modules-validation</artifactId>
	    <version>0.8</version>
	</dependency>

The following is the core class that makes this all happen.

src/main/java/com/test/SpringCommonsValidator.java

package com.test;

import java.util.List;

import org.springframework.batch.item.validator.ValidationException;
import org.springframework.batch.item.validator.Validator;
import org.springframework.context.MessageSource;
import org.springframework.context.MessageSourceResolvable;
import org.springframework.validation.BeanPropertyBindingResult;
import org.springframework.validation.Errors;

/**
 * Allows for the commons validation framework to print human readable 
 * validation failure messages. Also its very good at adapting the 
 * Spring Validator Interface to Spring Batch Validator, thus allowing 
 * this class to be used as a Validator in the spring batch framework.
 * 
 * This class delegates most of its work to the spring validator.
 * 
 */
public class SpringCommonsValidator<T> implements Validator<T> {

	private org.springframework.validation.Validator validator;
	private MessageSource messageSource;

	public void validate(T item) throws ValidationException {

		if (!validator.supports(item.getClass())) {
			throw new ValidationException("Validation failed for " + item + ": " + item.getClass().getName()
					+ " class is not supported by validator.");
		}

		BeanPropertyBindingResult errors = new BeanPropertyBindingResult(item, "item");

		validator.validate(item, errors);

		if (errors.hasErrors()) {
			throw new ValidationException("Validation failed for " + item + ": " + errorsToString(errors));
		}
	}

	private String errorsToString(Errors errors) {
		@SuppressWarnings("rawtypes")
		List errorList = errors.getAllErrors();
		StringBuffer buffer = new StringBuffer();
		
		boolean first = true;
		for (Object object : errorList) {
			if(object instanceof MessageSourceResolvable) {
				MessageSourceResolvable r = (MessageSourceResolvable)object;
				if(!first) { buffer.append("; "); } 
				buffer.append(messageSource.getMessage(r, null));
				first = false;
			}
		}
		return buffer.toString();
	}


	public void setValidator(org.springframework.validation.Validator validator) {
		this.validator = validator;
	}

	public void setMessageSource(MessageSource messageSource) {
		this.messageSource = messageSource;
	}

	public MessageSource getMessageSource() {
		return messageSource;
	}
}

Resource Bundle

The following property file is used to display the error messages. I guess some additional work can be done to internationalize these messages but since batch programs are not end-client facing… what’s the point?

On the other hand, having error messages in a property files does have its advantages.

src/main/resources/messages.properties

errors.required={0} is required.
errors.minlength={0} can not be less than {1} characters.
errors.maxlength={0} can not be greater than {1} characters.
errors.invalid={0} is invalid.
 
errors.byte={0} must be a byte.
errors.short={0} must be a short.
errors.integer={0} must be an integer.
errors.long={0} must be a long.
errors.float={0} must be a float.
errors.double={0} must be a double.
 
errors.date={0} is not a date.
errors.range={0} is not in the range {1} through {2}.
errors.creditcard={0} is an invalid credit card number.
errors.email={0} is an invalid e-mail address.

Configuration

Modify the spring configuration file to add the validatorFactory, beanValidator and springValidator. These classes allow you to have the ValidatingItemProcessor validate each item that is passed to it.

src/main/resources/simpleJob.xml

Replace the following section

<!-- Processor -->
<beans:bean name="empProcessor" class="com.test.EmployeeProcessor">
</beans:bean>

with the following

<!-- Processor -->
<beans:bean name="empProcessor" class="org.springframework.batch.item.validator.ValidatingItemProcessor">
    <beans:property name="validator" ref="springValidator"/>    
</beans:bean>

<!-- Validator -->
    <beans:bean id="validatorFactory"
          class="org.springmodules.validation.commons.DefaultValidatorFactory">
    <beans:property name="validationConfigLocations">
        <beans:list>
          <beans:value>validation.xml</beans:value>
          <beans:value>validator-rules.xml</beans:value>
        </beans:list>
      </beans:property>
    </beans:bean>
     
    <beans:bean id="beanValidator" class="org.springmodules.validation.commons.DefaultBeanValidator">
        <beans:property name="validatorFactory" ref="validatorFactory"/>
        <beans:property name="useFullyQualifiedClassName" value="true"/>
    </beans:bean> 

    <beans:bean id="springValidator" class="com.test.SpringCommonsValidator">
        <beans:property name="validator" ref="beanValidator"/>
        <beans:property name="messageSource" ref="resourceBundleMessageSource"/>        
    </beans:bean> 

    <beans:bean id="resourceBundleMessageSource" class="org.springframework.context.support.ResourceBundleMessageSource">
        <beans:property name="basename" value="messages"/>    
    </beans:bean> 

Commons validator configuration

The following is the validation configuration file used by commons-validator. If you have used the struts framework before then this file may look familiar to you. This file contains some default “shrink-wrapped” validation checks that were originally included with struts. There is nothing to customize here for now so please just copy and paste this into your program and continue on.

src/main/resources/validator-rules.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE form-validation PUBLIC
          "-//Apache Software Foundation//DTD Commons Validator Rules Configuration 1.1.3//EN"
          "http://jakarta.apache.org/commons/dtds/validator_1_1_3.dtd">
 
<form-validation>
    <global>
        <validator name="required"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateRequired"
            methodParams="java.lang.Object,
                        org.apache.commons.validator.ValidatorAction,
                        org.apache.commons.validator.Field,
                        org.springframework.validation.Errors"
            msg="errors.required">
        </validator>
        <validator name="requiredif"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateRequiredIf"
            methodParams="java.lang.Object,
                               org.springframework.validation.ErrorsAction,
                               org.apache.commons.validator.Field,
                               org.springframework.validation.Errors"
            msg="errors.required" />
 
        <validator name="validwhen" msg="errors.required"
            classname="org.apache.struts.validator.validwhen.ValidWhen" method="validateValidWhen"
            methodParams="java.lang.Object,
                       org.springframework.validation.ErrorsAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors" />
 
        <validator name="minlength"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateMinLength"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.minlength"
            jsFunction="org.apache.commons.validator.javascript.validateMinLength" />
 
        <validator name="maxlength"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateMaxLength"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.maxlength"
            jsFunction="org.apache.commons.validator.javascript.validateMaxLength" />
 
        <validator name="mask"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateMask"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.invalid" />
 
        <validator name="byte"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateByte"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.byte" jsFunctionName="ByteValidations" />
 
        <validator name="short"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateShort"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.short" jsFunctionName="ShortValidations" />
 
        <validator name="integer"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateInteger"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.integer" jsFunctionName="IntegerValidations" />
 
        <validator name="long"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateLong"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.long" />
 
        <validator name="float"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateFloat"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.float" jsFunctionName="FloatValidations" />
 
        <validator name="double"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateDouble"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.double" />
 
        <validator name="date"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateDate"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.date" jsFunctionName="DateValidations" />
 
        <validator name="intRange"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateIntRange"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="integer" msg="errors.range" />
 
        <validator name="floatRange"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateFloatRange"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="float" msg="errors.range" />
 
        <validator name="doubleRange"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateDoubleRange"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="double" msg="errors.range" />
 
        <validator name="creditCard"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateCreditCard"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.creditcard" />
 
        <validator name="email"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateEmail"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.email" />
 
        <validator name="url"
            classname="org.springmodules.validation.commons.FieldChecks" method="validateUrl"
            methodParams="java.lang.Object,
                       org.apache.commons.validator.ValidatorAction,
                       org.apache.commons.validator.Field,
                       org.springframework.validation.Errors"
            depends="" msg="errors.url" />
    </global>
</form-validation>

Validation Rules

The next file contains the actual validations that need to be performed.

These validations are:

  1. last Name is required
  2. last Name must be between 20 and 35 characters in length
  3. rank is required.

src/main/resources/validation.xml

<!DOCTYPE form-validation PUBLIC
          "-//Apache Software Foundation//DTD Commons Validator Rules Configuration 1.1.3//EN"
          "http://jakarta.apache.org/commons/dtds/validator_1_1_3.dtd">
<form-validation>
 
    <formset>
        <form name="com.test.Employee">
            <field property="lastName" depends="required,minlength,maxlength">
                <arg0 key="lastName" />
                <arg1 name="minlength" key="${var:minlength}" resource="false"/>
                <arg1 name="maxlength" key="${var:maxlength}" resource="false"/>
                <var>
                    <var-name>minlength</var-name>
                    <var-value>20</var-value>
                </var>
                <var>
                    <var-name>maxlength</var-name>
                    <var-value>35</var-value>
                </var>                
            </field>
            <field property="rank" depends="required">
                <arg key="rank" />
            </field>
        </form>
    </formset>
</form-validation>

Run the batch

Run the batch application by typing the following on the command line.

mvn clean compile exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"

Observe the Results

The following message should display on the console indicating a validation failure.

SEVERE: Encountered an error executing the step
org.springframework.batch.item.validator.ValidationException: Validation failed for Employee [empId=7876, lastName=ADAMS, title=CLERK, salary=1100, rank=null]: lastName can not be less than 20 characters.; rank is required.
	at com.test.SpringCommonsValidator.validate(SpringCommonsValidator.java:38)
	at org.springframework.batch.item.validator.ValidatingItemProcessor.process(ValidatingItemProcessor.java:77)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:125)
	at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:288)

Next Steps

The application described here is a quick and dirty approach to illustrate batch validation. In a typical production grade application you will want to allow the job to continue processing the remaining records in a file and send the bad records to a error file. The error file can be reviewed manually and re-processed on the next run.

Comments and feedback are Welcome.

Thanks!

07
Oct
11

Restartable Jobs With Spring Batch

This page demonstrates how spring batch handles job failure and how to recover from them.

Background

Please review The Domain Language of Spring Batch prior to reading this further.

StepExecution

In simple terms each “job” is broken up into individual “steps”. The process of executing a step is called a “StepExecution”. Each execution of a step can either result in Success or Failure. If individual steps of a larger job fail it would be nice to know where the step left off. The Designers of this framework recognized this and came up with a way to allow steps to resume. The JobRepository allows Readers and Writers to store status, start and end times, exit codes, and count information for each Execution of a step within a Job. This information is stored using the ExecutionContext at each commit interval.

ExecutionContext

The “ExecutionContext” is a collection of name/value pairs that are persisted to the database. Each instance that a Step is executed by the spring batch framework has its own ExecutionContext. Many of the default Reader/Writer implementations publish information to the ExecutionContext by default.

Outline of this page

Here is a breakdown of what will be demonstrated:

  1. Simulate a job failure by modifying the EmployeeProcessor.java and throw an exception towards the middle of the file.
  2. Examine the output file and verify that all prior records were written out.
  3. Examine the JobRepository database to verify the ExecutionContext record contains the recovery information.
  4. Modify EmployeeProcessor.java to not throw the test exception.
  5. Examine the output file and verify that all the remaining records were written out.
  6. Note that the Footer of the file is incorrect.
  7. Fix the EmployeeItemWriter.java to read the ExecutionContext and output the correct footer.

This page picks up where the last one left of. So if you have not done so already please implement the example application described here.

Requirements

  • Java 5 or above and Maven 2
  • Spring Batch 2.x
  • Successful completion of xxx

Inserting An Error

The below highlighted code stops the process when the record for the “president” is processed.

package com.test;

import org.springframework.batch.item.ItemProcessor;

public class EmployeeProcessor implements ItemProcessor<Employee, Employee> {

	public Employee process(Employee emp) throws Exception {
		if("PRESIDENT".equals(emp.getTitle())) throw new Exception("test exception: found President");
		// if salary >= 2500 then set rank as "Director"		
		if(emp.getSalary() >= 2500 ) {
			emp.setRank("Director");
		} else {
			emp.setRank("N/A");
		}
		return emp;
	}

}

Run the command:

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"

Open the target/output_data.txt file and observe that only 7 records were processed.

Comment out line 8 from EmployeeProcessor.java above and re-run the batch.

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"

Open target/output_data.txt one more time and observe that the remaining records were processed however note that the footer information is incorrect. It should say 14 records instead of 7.

Making Spring Batch Restartable

If you remember in the previous article we modified the EmployeeItemWriter class to print out a header and footer records. The footer record was generated using state variables (totalAmount, and recordCount). Since the EmployeeItemWriter maintains state, when the batch job restarts the state variables are re-initialized to their original values. Since the second invocation of the job starts from the middle of the input file the writer does not get an opportunity to properly initialize the state variables.

To fix this problem we will use the executionContext. The executionContext gets updated on each commit interval. It is the place where Spring batch keeps information that is important to the currently running job Execution.

We will modify the open() and update() methods of the EmployeeItemWriter to lookup the values from the ExecutionContext.

/src/main/java/com/test/EmployeeItemWriter.java

	public void open(ExecutionContext executionContext) throws ItemStreamException {
		if(executionContext.containsKey("current.recordCount")) {
			recordCount = new Long(executionContext.getLong("current.recordCount")).intValue();
		} else {
			recordCount = 0;
		}		
		
		if(executionContext.containsKey("current.totalAmount")) {
			totalAmount = new BigDecimal(executionContext.getDouble("current.totalAmount"));
		} else {
			totalAmount = BigDecimal.ZERO;
		}		
		this.delegate.open(executionContext);
	}

	public void update(ExecutionContext executionContext) throws ItemStreamException {
		executionContext.putLong("current.recordCount", recordCount);
		executionContext.putDouble("current.totalAmount", totalAmount.doubleValue());
		
		this.delegate.update(executionContext);
	}

We don’t need to modify “empReader” bean since it is backed by “FlatFileItemReader” and it’s implementation is restartable by default.

Clear the database

In order to start fresh again, Delete all files under src/main/resources/db. This is where HSQLDB keeps its database files. Once you delete this the DDLUtils framework will regenerate the database with all the schema information.

Re-run the job

Comment IN the EmployeeProcessor.java so that when “PRESIDENT” is encountered it throws an exception and re-run the job.

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"

Comment OUT the line in EmployeeProcessor.java and re-run the job again.

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"

Verify the output

Open target/output_data.txt one more time and observe that the remaining records were processed and the count should be 14 records.

That’s all for now.

02
Oct
11

Spring Batch HyperSQL Job Repository

This page describes the process of setting up a semi-permanent job repository that will maintain state between job invocations. No external databases will be necessary since HSQLDB is a pure java implementation of a database.

Background

The spring batch framework offers an in-memory Repository that does not persist domain objects once the batch job is complete. This is a severe limitation since:

  • Job restarts will not be possible (resume on restart functionality)
  • Can not guarantee that two job instances with the same parameters are not launched simultaneously
  • Not suitable for multi-threaded job invocations

Requirements

Solution

In order to have the job repository stored in our application we will be using HSQLDB as our database implementation and and DDLUtils to manage the schema.

Modify the pom.xml and insert the following dependencies.

pom.xml

    <dependency>
        <groupId>org.apache.ddlutils</groupId>
        <artifactId>ddlutils</artifactId>
        <version>1.0</version>
    </dependency>
        <dependency>
            <groupId>org.hsqldb</groupId>
            <artifactId>hsqldb</artifactId>
            <version>1.8.0.10</version>
        </dependency>

DDL

The following xml file is used by Apache DDLUtils to create the Spring Batch JobRepository database. When the application starts for the first time the tables will be created. Each subsequent invocation will cause the DDLUtils to check the database schema against the xml file. If the database is different then it will make the necessary alters to update the database to reflect the schema represented in the below file.

src/main/resources/ddl.xml

<?xml version="1.0"?>
<!DOCTYPE database SYSTEM "http://db.apache.org/torque/dtd/database.dtd">
<database name="testdb">

	<table name="BATCH_JOB_INSTANCE">
		<column name="JOB_INSTANCE_ID" type="BIGINT" primaryKey="true"
			autoIncrement="true"/>
		<column name="VERSION" type="BIGINT" />
		<column name="JOB_NAME" type="VARCHAR" size="100" required="true" />
		<column name="JOB_KEY" type="VARCHAR" size="32" required="true" />
		<unique name="JOB_INST_UN">
		  <unique-column name="JOB_NAME"/>
		  <unique-column name="JOB_KEY"/>
		</unique>
	</table>

    <table name="BATCH_JOB_EXECUTION">
        <column name="JOB_EXECUTION_ID" type="BIGINT" primaryKey="true"/>
        <column name="VERSION" type="BIGINT" />
	    <column name="JOB_INSTANCE_ID" type="BIGINT" required="true"/>
	    <column name="CREATE_TIME" type="TIMESTAMP" required="true"/>
	    <column name="START_TIME" type="TIMESTAMP"/>
	    <column name="END_TIME" type="TIMESTAMP"/>
	    <column name="STATUS" type="VARCHAR" size="10"/>
	    <column name="EXIT_CODE" type="VARCHAR" size="20"/>
	    <column name="EXIT_MESSAGE" type="VARCHAR" size="2500"/>
	    <column name="LAST_UPDATED" type="TIMESTAMP"/>
	    <foreign-key name="JOB_INST_EXEC_FK" foreignTable="BATCH_JOB_INSTANCE">
	       <reference foreign="JOB_INSTANCE_ID" local="JOB_INSTANCE_ID"/>
	    </foreign-key>
    </table>

    <table name="BATCH_JOB_PARAMS">
        <column name="JOB_INSTANCE_ID" type="BIGINT" required="true"/>
	    <column name="TYPE_CD" type="VARCHAR" size="6" required="true"/>
	    <column name="KEY_NAME" type="VARCHAR" size="100" required="true"/> 
	    <column name="STRING_VAL" type="VARCHAR" size="250"/> 
	    <column name="DATE_VAL" type="TIMESTAMP"/> 
	    <column name="LONG_VAL" type="BIGINT"/>
	    <column name="DOUBLE_VAL" type="DOUBLE"/>
        <foreign-key name="JOB_INST_PARAMS_FK" foreignTable="BATCH_JOB_INSTANCE">
           <reference foreign="JOB_INSTANCE_ID" local="JOB_INSTANCE_ID"/>
        </foreign-key>
    </table>

    <table name="BATCH_STEP_EXECUTION">
		<column name="STEP_EXECUTION_ID" type="BIGINT" primaryKey="true" 
		  autoIncrement="true" />
		<column name="VERSION" type="BIGINT" required="true"/>
		<column name="STEP_NAME" type="VARCHAR" size="100" required="true"/>
		<column name="JOB_EXECUTION_ID" type="BIGINT" required="true" />
		<column name="START_TIME" type="TIMESTAMP" required="true"/> 
		<column name="END_TIME" type="TIMESTAMP"/>  
		<column name="STATUS" type=" VARCHAR" size="10"/>
		<column name="COMMIT_COUNT" type="BIGINT"/> 
		<column name="READ_COUNT" type="BIGINT"/>
		<column name="FILTER_COUNT" type="BIGINT"/>
		<column name="WRITE_COUNT" type="BIGINT"/>
		<column name="READ_SKIP_COUNT" type="BIGINT"/>
		<column name="WRITE_SKIP_COUNT" type="BIGINT"/>
		<column name="PROCESS_SKIP_COUNT" type="BIGINT"/>
		<column name="ROLLBACK_COUNT" type="BIGINT"/> 
		<column name="EXIT_CODE" type="VARCHAR" size="20"/>
		<column name="EXIT_MESSAGE" type="VARCHAR" size="2500"/>
		<column name="LAST_UPDATED" type="TIMESTAMP"/>
        <foreign-key name="JOB_EXEC_STEP_FK" foreignTable="BATCH_JOB_EXECUTION">
           <reference foreign="JOB_EXECUTION_ID" local="JOB_EXECUTION_ID"/>
        </foreign-key>
    </table>

    <table name="BATCH_STEP_EXECUTION_CONTEXT">
	    <column name="STEP_EXECUTION_ID" type="BIGINT" primaryKey="true"/>
	    <column name="SHORT_CONTEXT" type="VARCHAR" size="2500" required="true"/>
	    <column name="SERIALIZED_CONTEXT" type="VARCHAR" />
        <foreign-key name="STEP_EXEC_CTX_FK" foreignTable="BATCH_STEP_EXECUTION">
           <reference foreign="STEP_EXECUTION_ID" local="STEP_EXECUTION_ID"/>
        </foreign-key>
    </table>

    <table name="BATCH_JOB_EXECUTION_CONTEXT">
		<column name="JOB_EXECUTION_ID" type="BIGINT" primaryKey="true"/>
		<column name="SHORT_CONTEXT" type="VARCHAR" size="2500" required="true"/>
		<column name="SERIALIZED_CONTEXT" type="VARCHAR"/> 
        <foreign-key name="JOB_EXEC_CTX_FK" foreignTable="BATCH_JOB_EXECUTION">
           <reference foreign="JOB_EXECUTION_ID" local="JOB_EXECUTION_ID"/>
        </foreign-key>        
    </table>
	<table name="BATCH_STEP_EXECUTION_SEQ">
		<column name="ID" type="BIGINT" primaryKey="true" autoIncrement="true" />
	</table>
	<table name="BATCH_JOB_EXECUTION_SEQ">
		<column name="ID" type="BIGINT" primaryKey="true" autoIncrement="true" />
	</table>
	<table name="BATCH_JOB_SEQ">
		<column name="ID" type="BIGINT" primaryKey="true" autoIncrement="true" />
	</table>
    
</database>

Java Code

The following file initializes the Job Repository and creates the schema based on a DDL contained in an xml configuration file. It also allows the system to gracefully shut-down the database and persist the in-memory data back to the file system after batch job completion.

src/main/java/com/test/JobRepositoryInitializer.java

package com.test;

import java.io.InputStreamReader;

import javax.sql.DataSource;

import org.apache.ddlutils.Platform;
import org.apache.ddlutils.PlatformFactory;
import org.apache.ddlutils.io.DatabaseIO;
import org.apache.ddlutils.model.Database;
import org.springframework.jdbc.core.simple.SimpleJdbcTemplate;

public class JobRepositoryInitializer {
	private DataSource dataSource;
	private String configFile;
	
	/**
	 * This method reads the schema from an xml file and determines
	 * if it needs to issue DDL statements to create or modify tables
	 * already in the database.
	 * 
	 * @throws Exception
	 */
	public void checkDDL() throws Exception {
        Platform platform = PlatformFactory
        .createNewPlatformInstance(dataSource);

        // Apache DDLUtils framework is used to create/modify schema.
		Database database = new DatabaseIO().read(new InputStreamReader(
		        getClass().getResourceAsStream(configFile)));
		
		platform.alterTables(database, false);
		System.out.println("HSQLDB is ready: " + platform);
	}

	public void shutdownDatabase() {
        SimpleJdbcTemplate template = new SimpleJdbcTemplate(dataSource);
        template.update("SHUTDOWN;");
        System.out.println("HSQLDB was gracefully shutdown. ");
	}
	public void setDataSource(DataSource dataSource) {
		this.dataSource = dataSource;
	}

	public DataSource getDataSource() {
		return dataSource;
	}

	public void setConfigFile(String configFile) {
		this.configFile = configFile;
	}

	public String getConfigFile() {
		return configFile;
	}	
}

Configuration

In order to get the application to use HSQLDB and generate the JobRepository DDL Schema you need to do the following.

Configure the database

The following bean allows you to define a dataSource that will be used to connect to the HSQLDB database. The database files will be located in the src/main/resources/db folder. (This is specified in the database URL property)

src/main/resources/applicationContext.xml

<bean name="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
        <property name="driverClassName" value="org.hsqldb.jdbcDriver"/>
        <property name="url" value="jdbc:hsqldb:file:src/main/resources/db/testdb"/>
        <property name="username" value="sa"/>
        <property name="password" value=""/>
</bean>

Job Repository Configuration

Update the configuration for the job repository to look like this:

src/main/resources/applicationContext.xml

    <beans:bean id="jobRepository"
        class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean">
        <beans:property name="databaseType" value="HSQL"/>
        <beans:property name="dataSource" ref="dataSource"/>
        <beans:property name="transactionManager" ref="transactionManager" />
    </beans:bean>

Apache DDLUtil Configuration

When the application starts for the first time the system will need to create the JobRepository tables. The following Spring Bean initializes the job repository and creates tables that are defined in the
src/main/resources/applicationContext.xml

	<beans:bean id="jobRepositoryInitializer" class="com.test.JobRepositoryInitializer"
		init-method="checkDDL" destroy-method="shutdownDatabase">
		<beans:property name="dataSource" ref="dataSource" />
		<beans:property name="configFile" value="/ddl.xml" />
	</beans:bean>

Run the application

To run the job from the command line type the following:

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"

You will notice that if you run the job again the console will print the following:

SEVERE: Job Terminated in error: A job instance already exists and is complete for parameters={}. If you want to run this job again, change the parameters.
org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={}. If you want to run this job again, change the parameters.

This is a successful indication that the HSQLDB is working persisting the data. Spring Batch framework is indicating that the helloWorld job invocation has already happened. More than one invocation with the same parameters is not allowed. If you want to read more about this then please read the following article.

You can also read the database script file located here:

src/main/resources/db/testdb.script

INSERT INTO BATCH_JOB_INSTANCE VALUES(0,0,'helloWorldJob','d41d8cd98f00b204e9800998ecf8427e')
INSERT INTO BATCH_JOB_EXECUTION VALUES(0,2,0,'2011-10-02 19:53:54.601000000','2011-10-02 19:53:54.617000000','2011-10-02 19:53:54.642000000','COMPLETED','COMPLETED','','2011-10-02 19:53:54.642000000')
INSERT INTO BATCH_STEP_EXECUTION VALUES(0,3,'step1',0,'2011-10-02 19:53:54.626000000','2011-10-02 19:53:54.639000000','COMPLETED',1,0,0,0,0,0,0,0,'COMPLETED','','2011-10-02 19:53:54.639000000')
INSERT INTO BATCH_STEP_EXECUTION_CONTEXT VALUES(0,'{"map":""}',NULL)
INSERT INTO BATCH_JOB_EXECUTION_CONTEXT VALUES(0,'{"map":""}',NULL)
INSERT INTO BATCH_STEP_EXECUTION_SEQ VALUES(0)
INSERT INTO BATCH_JOB_EXECUTION_SEQ VALUES(0)
INSERT INTO BATCH_JOB_SEQ VALUES(0)

HSQLDB uses this file to persist data to the file system in-between batch invocations.

That’s all for now!

01
Oct
11

Spring Batch Headers and Footers

This page describes the process of creating running a Spring Batch job that creates an output file with header and footer information.

Background

In prior articles we created a batch job that read information and outputted the result to another file. The output file was missing a header and footer lines. In this article we will look into generating this information.

Requirements

  • Java 5
  • Maven 2
  • This page picks up where the following page left off. Please Review and Implement the following article.

Header

The header of the will simply contain the filename and the timestamp.

fileName,Timestamp

Footer

The footer contained in the output be composed of 2 fields.

#ofRecords,salarySum

where:
#ofRecords – Count of the number of records in the output
salarySum – Sum of all values in the salary field

Solution

Create a Item Writer that will wrap the “FlatFileItemWriter”. The “headerCallback” and “footerCallback” methods of the ItemWriter will be called at the beginning and ending of the file. This allows developers to implement code that will allow them to write information to the header and footer of the output file.

src/main/java/com/test/EmployeeItemWriter.java

package com.test;

import java.io.IOException;
import java.io.Writer;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;

import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileFooterCallback;
import org.springframework.batch.item.file.FlatFileHeaderCallback;
import org.springframework.batch.item.file.FlatFileItemWriter;

public class EmployeeItemWriter implements ItemWriter<Employee>,
		FlatFileFooterCallback, FlatFileHeaderCallback, ItemStream {

    private FlatFileItemWriter<Employee> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

	private int recordCount = 0;
    
    public void writeFooter(Writer writer) throws IOException {
        writer.write(""+recordCount + "," + totalAmount);
    }
	public void writeHeader(Writer writer) throws IOException {
        writer.write("output_file.txt" + "," + new Date());
	}
    public void write(List<? extends Employee> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        int chunkRecord = 0;
        for (Employee employee : items) {
            chunkRecord++;
            chunkTotal = chunkTotal.add(new BigDecimal(employee.getSalary()));
        }
        delegate.write(items);
        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
        recordCount += chunkRecord;
	}
    
    public void setDelegate(FlatFileItemWriter<Employee> delegate) {
		this.delegate = delegate;
	}

	public void close() throws ItemStreamException {
		this.delegate.close();
	}

	public void open(ExecutionContext arg0) throws ItemStreamException {
		this.delegate.open(arg0);
	}

	public void update(ExecutionContext arg0) throws ItemStreamException {
		this.delegate.update(arg0);
	}
}

Batch XML Configuration

The following xml will allow the ItemWriter to wrap the FlatFileItemWriter and maintain a running total.

The highlighted lines are the changes from the prior version.

/src/main/resources/simpleJob.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
     xmlns:beans="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
 
http://www.springframework.org/schema/batch
 
http://www.springframework.org/schema/batch/spring-batch-2.1.xsd

http://www.springframework.org/schema/beans
 
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd">
 
    <beans:import resource="applicationContext.xml"/>
 
<!-- Tokenizer - Converts a delimited string into a Set of Fields -->
<beans:bean name="defaultTokenizer" 
    class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer"/>

<!-- FieldSetMapper - Populates a bean's attributes with using the FieldSet -->
<beans:bean name="employeeFieldSetMapper" class="com.test.EmployeeFieldSetMapper"/>

<!-- LineMapper - Uses the tokenizer and Mapper to create instances of a Bean. -->
<beans:bean name="employeeLineMapper" class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
    <beans:property name="lineTokenizer" ref="defaultTokenizer"/>    
    <beans:property name="fieldSetMapper" ref="employeeFieldSetMapper"/>        
</beans:bean>

<!-- Reader - used by the tasklet to process one Item from the input. -->
<beans:bean name="empReader" class="org.springframework.batch.item.file.FlatFileItemReader">
    <beans:property name="lineMapper" ref="employeeLineMapper"/>
    
    <!-- use spring integrations for the following, but for now filename is hard coded -->
    <beans:property name="resource" value="input_data.txt"/>    
</beans:bean>

<!-- Processor -->

<beans:bean name="empProcessor" class="com.test.EmployeeProcessor">
</beans:bean>

<!-- Writer -->
<beans:bean id="empWriter" class="org.springframework.batch.item.file.FlatFileItemWriter">
    <beans:property name="resource" value="file:target/output_data.txt" />
    <beans:property name="lineAggregator">
        <beans:bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
            <beans:property name="delimiter" value=","/>
            <beans:property name="fieldExtractor">
                <beans:bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
                    <beans:property name="names" value="empId,lastName,title,salary,rank"/>
                </beans:bean>
            </beans:property>
        </beans:bean>
    </beans:property>
    <beans:property name="footerCallback" ref="empHeaderFooterWriter" />
    <beans:property name="headerCallback" ref="empHeaderFooterWriter" />
</beans:bean>

<beans:bean id="empHeaderFooterWriter" class="com.test.EmployeeItemWriter">
    <beans:property name="delegate" ref="empWriter"/>
</beans:bean>

<job id="helloWorldJob">
    <step id="step1" next="step2">
        <tasklet ref="helloWorldTasklet" />
    </step>
    <step id="step2">
        <tasklet>
            <chunk reader="empReader" processor="empProcessor" writer="empHeaderFooterWriter" commit-interval="1"/>
        </tasklet>
    </step>
</job>
 
<beans:bean name="helloWorldTasklet" class="com.test.HelloWorldTasklet"/>
 
<!--
To run the job from the command line type the following:
mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"
 -->
</beans:beans>

Execute the job

Go to the command line and type the following:

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner -Dexec.args="simpleJob.xml helloWorldJob"

View the results

The output file will appear in the target/ folder of the project.

The file should look like this:

output_file.txt,Sat Oct 01 22:50:17 EDT 2011
7876,ADAMS,CLERK,1100,N/A
7499,ALLEN,SALESMAN,1600,N/A
7698,BLAKE,MANAGER,2850,Director
7782,CLARK,MANAGER,2450,N/A
7902,FORD,ANALYST,3000,Director
7900,JAMES,CLERK,950,N/A
7566,JONES,MANAGER,2975,Director
7839,KING,PRESIDENT,5000,Director
7654,MARTIN,SALESMAN,1250,N/A
7934,MILLER,CLERK,1300,N/A
7788,SCOTT,ANALYST,3000,Director
7369,SMITH,CLERK,800,N/A
7844,TURNER,SALESMAN,1500,N/A
7521,WARD,SALESMAN,1250,N/A
14,29025

That’s all for now!




Enter your email address to subscribe to this blog and receive notifications of new posts by email.

Join 74 other followers

March 2017
S M T W T F S
« Mar    
 1234
567891011
12131415161718
19202122232425
262728293031  

Blog Stats

  • 801,397 hits