Posts Tagged ‘concurrency

27
Apr
10

Multi-threading Using Spring Batch

This page describes how to convert a single threaded spring batch application to be multi-threaded. We will take a single threaded job and adding a few configuration parameters, convert it to be multi threaded. This page is heavy on demonstration and light on theory. In order to get a better understanding please refer to the Spring Batch User Guide in the reference section below.

Background

Performing batch operations in parallel can sometimes speed-up processing of the whole batch. I was once working on a batch job that took 20 minutes to complete. I noticed during the batch run that the CPU, database and the network bandwidth were not fully being utilized.

My current machine is an Intel Core i7 CPU. It has 8 processors running at 2.67 Ghz each. I noticed during the batch run that only one of the 8 processors on my machine was being used. What a waste!

I noticed that after applying a taskExecutor to my batch job the process which before took 20 minutes to complete was now taking only 7 minutes. More of the processors, network and database power was being utilized.

Requirements

  1. Maven – Installed and configured (see tutorial on navigation bar)
  2. Basic Understanding of Spring Batch

Implementation

Project Setup

Create a project using Maven archetype. Open up the command prompt and navigate to an empty directory.

mvn archetype:generate -DarchetypeArtifactId=maven-archetype-quickstart

groupId: com.test
artifactId: springBatchMultiThreaded

Answer the rest of the questions with defaults “Just hit the enter key”

Open up the pom.xml file and make sure it looks like the one below. In most cases you can just copy and paste the one below into your own.

If you are using eclipse then regenerate your project

mvn eclipse:clean eclipse:eclipse

Refresh the project and you should have all the dependencies necessary to proceed.

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.test</groupId>
	<artifactId>springBatchMultiThreaded</artifactId>
	<packaging>jar</packaging>
	<version>1.0-SNAPSHOT</version>
	<name>springBatchMultiThreaded</name>
	<url>http://maven.apache.org</url>
	<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>3.8.1</version>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.batch</groupId>
			<artifactId>spring-batch-core</artifactId>
			<version>2.0.4.RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring</artifactId>
			<version>2.5.6</version>
		</dependency>
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
			<version>2.5</version>
		</dependency>		
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>2.0.2</version>
				<configuration>
					<source>1.6</source>
					<target>1.6</target>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

Reader

src/main/java/com/test/DummyReader.java

package com.test;

import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

public class DummyReader implements ItemReader<String> {

	private int count = 0;

	@Override
	public synchronized String read() throws Exception, UnexpectedInputException,
			ParseException {

		if(count < 100) {
			count++;
			return "test " + count;
		}

		return null;
	}

}

Writer

The writer is implemented as an ItemWriterAdapter. This class simply calls a method of the specified object with the argument that was read by the reader.

In the configuration of the job below we have specified that the writer be simply an Adapter that calls the processSubmission() of the DummyModelImpl. See highlighted code in job.xml.

src/main/java/com/test/DummyModel.java

package com.test;

public interface DummyModel {
	public abstract void processSubmission(Object obj);
}

src/main/java/com/test/DummyModelImpl.java

package com.test;

public class DummyModelImpl implements DummyModel {
	public void processSubmission(Object obj) {
		System.out.printf("processing: %s record in thread: %s\n", obj, Thread.currentThread().toString());
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

Next we have the job definition.

Before we do that please create a directory for the resources

mkdir -p src/main/resources/com/test

src/main/resources/com/test/job-dummy.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/batch"
     xmlns:beans="http://www.springframework.org/schema/beans"
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
           http://www.springframework.org/schema/batch
           http://www.springframework.org/schema/batch/spring-batch-2.0.xsd">

<!-- This is the dummy job setup to test multi-threading. -->
<!--
mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner \
-Dexec.args="/com/test/job-dummy.xml \
dummyJob job.id=1"
-->

<!-- need to define a job repository -->
<beans:bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<beans:property name="transactionManager" ref="transactionManager"/>
</beans:bean>

    <beans:bean id="jobLauncher"
        class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <beans:property name="jobRepository" ref="jobRepository" />
    </beans:bean>

    <beans:bean id="transactionManager" class="org.springframework.batch.support.transaction.ResourcelessTransactionManager"/>

<job id="dummyJob">

	<step id="dummyStep">
		<tasklet>
			<chunk reader="dummyReader" writer="dummyWriter" commit-interval="1"/>
		</tasklet>
	</step>
</job>

<!-- Readers -->
<beans:bean id="dummyReader" class="com.test.DummyReader"/>

<!-- Writers -->
<beans:bean id="dummyWriter" class="org.springframework.batch.item.adapter.ItemWriterAdapter">
    <beans:property name="targetObject" ref="dummyModel" />
    <beans:property name="targetMethod" value="processSubmission" />
</beans:bean>

<!-- Common objects -->
<beans:bean id="dummyModel" class="com.test.DummyModelImpl">
</beans:bean>

</beans:beans>

Testing the single threaded batch application

Drop to the command line and type the following in the project’s folder. (where the pom.xml file is located)

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner \
-Dexec.args="/com/test/job-dummy.xml \
dummyJob job.id=1"

You will see that each item takes 2 seconds to process. It does this processing in a single threaded fashion. Hit CTRL-C to exit out.

Converting to multi-threaded mode

To make the job multi-threaded all you need is to add the following to the job.xml.

<beans:bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
	<beans:property name="concurrencyLimit" value="10"/>
</beans:bean>

Modify the “step” tag seen above as follows.

<step id="dummyStep">
	<tasklet task-executor="taskExecutor">
		<chunk reader="dummyReader" writer="dummyWriter" commit-interval="1"/>
	</tasklet>
</step>

Be sure to verify that the reader and writers are thread safe. In the example above the DummyReader was made thread safe by synchronizing the read() method. The model class above is automatically thread save since it does not maintain state.

Testing the multi threaded batch application

Drop to the command line and type the following in the project’s folder. (where the pom.xml file is located)

mvn exec:java -Dexec.mainClass=org.springframework.batch.core.launch.support.CommandLineJobRunner \
-Dexec.args="/com/test/job-dummy.xml \
dummyJob job.id=1"

Deploying the application

The following tutorial describes how to Package and deploy the application as a self contained jar.

You will notice that the system is processing multiple requests at time.

References

That’s all for now.

24
Apr
10

Filling a byte array with Random Numbers using Threads

This page describes the process of filling a byte array with encryption grade random numbers. We will be using Java Threads.

Background

This little project started out because I wanted to see how fast my computer can fill an array with random numbers. When I finished implementation described in attempt1() method, I saw that only one of the 2 processors on my machine was at 100%. I was thinking if I break up the process in 2 threads the operating system would assign both processors and I would get the job done in 1/2 the time.

Results

The results are disappointing… It seems that both processors were used but it too a little more time than the single threaded mode. I suspect that this was due some array copy operations i had to do. I played with the constants at the top of the class but the process took a few hundred milliseconds longer than it would have if it was single threaded.

Request for Help

If you guys know a better way to utilize multi-threads to achieve the same results faster please let me know by posting some code in the comments box below.

Complete source code for the test program included. Below

package com.test;

import java.security.SecureRandom;
import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class App2 {
	private static final int DATA_SIZE = 1024*100000;
	private static final int BLOCK_SIZE = 8024;
	private static final int NUM_OF_THREADS = 16;
	private static final SecureRandom sr = new SecureRandom();

	public App2() {
		attempt1();
		attempt2();
	}
	public static void main(String args[]) {
		new App2();
	}
	private void attempt2() {
		System.out.println("Attempt 2");
		int size = DATA_SIZE;
		//System.out.printf("size of array: %s\n", new DecimalFormat("###,###,###,###").format(size));
		
		final byte data[] = new byte[size];
		
		ExecutorService es = Executors.newFixedThreadPool(NUM_OF_THREADS);
		int endcount = (size/BLOCK_SIZE) + 1;
		//System.out.println(endcount);
		long start = System.currentTimeMillis();
		for(int i=0; i< endcount; i++) {
			es.execute(new Run(data, i));		
		}
		es.shutdown();
		try {
			es.awaitTermination(3000, TimeUnit.SECONDS);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		
		System.out.printf("System took %d ms to process.\n", end - start);

	}
	
	/**
	 * Tried to do it all in one shot. with a single thread
	 * average time 22.7 seconds.
	 */
	private void attempt1() {
		System.out.println("Attempt 1");
		int size = DATA_SIZE;
		System.out.printf("size of array: %s\n", new DecimalFormat("###,###,###,###").format(size));
		
		byte data[] = new byte[size];
		long start = System.currentTimeMillis();
		sr.nextBytes(data);
		long end = System.currentTimeMillis();
		//System.out.println(Arrays.toString(data));
System.out.printf("System took %d ms to process.\n", end - start);
	}
	class Run implements Runnable {
		private int block;
		private byte[] data;
		
		public Run(byte[] data, int block) {
			this.block = block;
			this.data = data;
		}
		@Override
		public void run() {
			byte[] temp = new byte[BLOCK_SIZE];  // 8k block
			int remainder = 0;
			if(block*BLOCK_SIZE+BLOCK_SIZE > data.length) {
				// get the remainder
				remainder = data.length % (block*BLOCK_SIZE);
			} else {
				remainder = BLOCK_SIZE;
			}
			//System.out.println("remainder: " + remainder);
			sr.nextBytes(temp);
			System.arraycopy(temp, 0, data, block*BLOCK_SIZE, remainder);
		}
	}
}




Follow

Get every new post delivered to your Inbox.

Join 34 other followers