This page describes how to convert a single threaded spring batch application to be multi-threaded. We will take a single threaded job and adding a few configuration parameters, convert it to be multi threaded. This page is heavy on demonstration and light on theory. In order to get a better understanding please refer to the Spring Batch User Guide in the reference section below.
Background
Performing batch operations in parallel can sometimes speed-up processing of the whole batch. I was once working on a batch job that took 20 minutes to complete. I noticed during the batch run that the CPU, database and the network bandwidth were not fully being utilized.
My current machine is an Intel Core i7 CPU. It has 8 processors running at 2.67 Ghz each. I noticed during the batch run that only one of the 8 processors on my machine was being used. What a waste!
I noticed that after applying a taskExecutor to my batch job the process which before took 20 minutes to complete was now taking only 7 minutes. More of the processors, network and database power was being utilized.
Requirements
- Maven – Installed and configured (see tutorial on navigation bar)
- Basic Understanding of Spring Batch
Implementation
Project Setup
Create a project using Maven archetype. Open up the command prompt and navigate to an empty directory.
mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart
groupId: com.test
artifactId: springBatchMultiThreaded
Answer the rest of the questions with defaults “Just hit the enter key”
Open up the pom.xml file and make sure it looks like the one below. In most cases you can just copy and paste the one below into your own.
If you are using eclipse then regenerate your project
mvn eclipse:clean eclipse:eclipse
Refresh the project and you should have all the dependencies necessary to proceed.
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test</groupId>
<artifactId>springBatchMultiThreaded</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>springBatchMultiThreaded</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.batch</groupId>
<artifactId>spring-batch-core</artifactId>
<version>2.0.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<version>2.5.6</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.0.2</version>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Reader
src/main/java/com/test/DummyReader.java
package com.test;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
public class DummyReader implements ItemReader<String> {
private int count = 0;
@Override
public synchronized String read() throws Exception, UnexpectedInputException,
ParseException {
if(count < 100) {
count++;
return "test " + count;
}
return null;
}
}
Writer
The writer is implemented as an ItemWriterAdapter. This class simply calls a method of the specified object with the argument that was read by the reader.
In the configuration of the job below we have specified that the writer be simply an Adapter that calls the processSubmission() of the DummyModelImpl. See highlighted code in job.xml.
src/main/java/com/test/DummyModel.java
package com.test;
public interface DummyModel {
public abstract void processSubmission(Object obj);
}
src/main/java/com/test/DummyModelImpl.java
package com.test;
public class DummyModelImpl implements DummyModel {
public void processSubmission(Object obj) {
System.out.printf("processing: %s record in thread: %s\n", obj, Thread.currentThread().toString());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Next we have the job definition.
Before we do that please create a directory for the resources
mkdir -p src/main/resources/com/test
src/main/resources/com/test/job-dummy.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://www.springframework.org/schema/batch
http://www.springframework.org/schema/batch/spring-batch-2.0.xsd">
<!-- This is the dummy job setup to test multi-threading. -->
<!--
mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner \
-Dexec.args="/com/test/job-dummy.xml \
dummyJob job.id=1"
-->
<!-- need to define a job repository -->
<beans:bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<beans:property name="transactionManager" ref="transactionManager"/>
</beans:bean>
<beans:bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<beans:property name="jobRepository" ref="jobRepository" />
</beans:bean>
<beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>
<job id="dummyJob">
<step id="dummyStep">
<tasklet>
<chunk reader="dummyReader" writer="dummyWriter" commit-interval="1"/>
</tasklet>
</step>
</job>
<!-- Readers -->
<beans:bean id="dummyReader" class="com.test.DummyReader"/>
<!-- Writers -->
<beans:bean id="dummyWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
<beans:property name="targetObject" ref="dummyModel" />
<beans:property name="targetMethod" value="processSubmission" />
</beans:bean>
<!-- Common objects -->
<beans:bean id="dummyModel" class="com.test.DummyModelImpl">
</beans:bean>
</beans:beans>
Testing the single threaded batch application
Drop to the command line and type the following in the project’s folder. (where the pom.xml file is located)
mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner \
-Dexec.args="/com/test/job-dummy.xml \
dummyJob job.id=1"
You will see that each item takes 2 seconds to process. It does this processing in a single threaded fashion. Hit CTRL-C to exit out.
Converting to multi-threaded mode
To make the job multi-threaded all you need is to add the following to the job.xml.
<beans:bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<beans:property name="concurrencyLimit" value="10"/>
</beans:bean>
Modify the “step” tag seen above as follows.
<step id="dummyStep">
<tasklet task-executor="taskExecutor">
<chunk reader="dummyReader" writer="dummyWriter" commit-interval="1"/>
</tasklet>
</step>
Be sure to verify that the reader and writers are thread safe. In the example above the DummyReader was made thread safe by synchronizing the read() method. The model class above is automatically thread save since it does not maintain state.
Testing the multi threaded batch application
Drop to the command line and type the following in the project’s folder. (where the pom.xml file is located)
mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner \
-Dexec.args="/com/test/job-dummy.xml \
dummyJob job.id=1"
Deploying the application
The following tutorial describes how to Package and deploy the application as a self contained jar.
You will notice that the system is processing multiple requests at time.
References
That’s all for now.