Showing posts with label Threads. Show all posts
Showing posts with label Threads. Show all posts

Saturday, 4 November 2023

Customizing Thread Creation with ThreadFactory

ThreadFactory is a Functional interface that provides a convenient a way to create new threads on demand. ThreadFactory is used to decouple the process of thread creation from the rest of your application's logic.

Definition of ThreadFactory interface

public interface ThreadFactory {

    Thread newThread(Runnable r);
}

 

The newThread method takes a Runnable object as a parameter and returns a new Thread object.

 

For example, following CustomThreaFactory class generate thread with priority 7.

 


CustomThreadFactory.java
package com.sample.app.threads;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

public class CustomThreadFactory implements ThreadFactory{
	private int threadCount = 1;
	
	@Override
	public Thread newThread(Runnable runnable) {
		Thread thread = Executors.defaultThreadFactory().newThread(runnable);
        thread.setName("MyThread-" + threadCount++);
        thread.setPriority(7);
        return thread;
	}

}

 

‘CustomThreadFactoryDemo’ use CustomThreadFactory to create new threads for you.

 

CustomThreadFactoryDemo.java

 

package com.sample.app;

import java.util.Random;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import com.sample.app.threads.CustomThreadFactory;

public class CustomThreadFactoryDemo {
	
	public static void main(String[] args) {
		Runnable runnable = () -> {
			System.out.println(Thread.currentThread().getName() + " executing this code");
			sleepRandomSeconds(5);
			System.out.println(Thread.currentThread().getName() + " finished executing this code");
			
		};
		
		ThreadFactory threadFactory = new CustomThreadFactory();
		
		for(int i = 0; i<10; i++) {
			threadFactory.newThread(runnable).start();
		}
		
	}
	
	private static void sleepRandomSeconds(int upperBound) {
		try {
			TimeUnit.SECONDS.sleep(new Random().nextInt(upperBound));
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

 

Sample Output

MyThread-1 executing this code
MyThread-2 executing this code
MyThread-3 executing this code
MyThread-4 executing this code
MyThread-5 executing this code
MyThread-6 executing this code
MyThread-7 executing this code
MyThread-8 executing this code
MyThread-9 executing this code
MyThread-10 executing this code
MyThread-10 finished executing this code
MyThread-1 finished executing this code
MyThread-4 finished executing this code
MyThread-3 finished executing this code
MyThread-7 finished executing this code
MyThread-2 finished executing this code
MyThread-5 finished executing this code
MyThread-6 finished executing this code
MyThread-8 finished executing this code
MyThread-9 finished executing this code

 

In this example, CustomThreadFactory implements the ThreadFactory interface and overrides the newThread method. It uses the defaultThreadFactory() method from Executors to create the underlying Thread object and then customizes the thread's name by appending a count to it, and set the priority to 7.

 

Using Custom Thread factory, we can customize the thread creation in number of ways, for example, we can

 

a.   Set the thread name prefix

b.   Set the thread priority

c.    Set the thread daemon status

d.   Set the thread stack size

e.   Set the thread security context

f.     Create threads from a specific thread group

 

Let’s improvise CustomThreadFactory by passing the thread priority as argument.

 

CustomThreadFactory.java

 

package com.sample.app.threads;

import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomThreadFactory implements ThreadFactory{
	private static final AtomicInteger THREAD_COUNT = new AtomicInteger();
	private int priority;
	
	public CustomThreadFactory(int priority) {
		this.priority = priority;
	}
	
	@Override
	public Thread newThread(Runnable runnable) {
		Thread thread = Executors.defaultThreadFactory().newThread(runnable);
        thread.setName("MyThread-" + THREAD_COUNT.incrementAndGet());
        thread.setPriority(priority);
        return thread;
	}

}

Use ThredFactory in ThreadPool consruction

ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), new CustomThreadFactory(7));

 

The executor service will now use the CustomThreadFactory to create new threads as needed. All of the new threads will have the name prefix "MyThread-" and the thread priority 7.

 

Find the below working application.

 

ThreadPoolDemo.java

package com.sample.app;

import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.sample.app.threads.CustomThreadFactory;

public class ThreadPoolDemo {

	private static void sleepRandomSeconds(int upperBound) {
		try {
			TimeUnit.SECONDS.sleep(new Random().nextInt(upperBound));
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 20, 0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<>(), new CustomThreadFactory(7));

		Runnable runnable = () -> {
			System.out.println(Thread.currentThread().getName() + " executing this code");
			sleepRandomSeconds(5);
			System.out.println(Thread.currentThread().getName() + " finished executing this code");

		};
		
		for(int i=0; i < 10; i++) {
			executorService.submit(runnable);
		}
		
		executorService.shutdown();

	}

}

Sample Output

MyThread-1 executing this code
MyThread-3 executing this code
MyThread-2 executing this code
MyThread-4 executing this code
MyThread-5 executing this code
MyThread-3 finished executing this code
MyThread-6 executing this code
MyThread-7 executing this code
MyThread-8 executing this code
MyThread-9 executing this code
MyThread-10 executing this code
MyThread-10 finished executing this code
MyThread-9 finished executing this code
MyThread-4 finished executing this code
MyThread-7 finished executing this code
MyThread-6 finished executing this code
MyThread-2 finished executing this code
MyThread-1 finished executing this code
MyThread-8 finished executing this code
MyThread-5 finished executing this code



Previous                                                    Next                                                    Home

Saturday, 29 April 2023

Extend the thread to check it’s running status

Extend thread class to support following functionalities.

 

boolean isFinished()

Return true then thread finishes its execution, else false.

 

public void waitUntilFinished(long timeout)

Wait until the thread finished it’s execution or the timeout is elapsed.

 

Find the below working application.

 


WorkerThread.java

package com.sample.app.threads;

public class WorkerThread extends Thread {
	private Object notify;
	private volatile boolean finished = false;
	private final Runnable runnable;

	public WorkerThread(Runnable runnable, Object notify) {
		this.runnable = runnable;
		this.notify = notify != null ? notify : this;
	}

	public WorkerThread(Runnable runnable) {
		this(runnable, null);
	}

	public synchronized boolean isFinished() {
		return finished;
	}

	public void waitUntilFinished(long timeout) throws InterruptedException {
		long endTime = System.currentTimeMillis() + timeout;
		synchronized (notify) {
			long currentTime = System.currentTimeMillis();
			while (!finished && currentTime < endTime) {
				notify.wait(endTime - currentTime);
				currentTime = System.currentTimeMillis();
			}
		}
	}

	public void run() {
		try {
			runnable.run();
		} catch (Throwable t) {
			t.printStackTrace(System.err);
			throw t;
		} finally {
			synchronized (notify) {
				finished = true;
				notify.notifyAll();
			}
		}
	}
}

 

WorkerThreadDemo.java

package com.sample.app.threads;

import java.util.concurrent.TimeUnit;

public class WorkerThreadDemo {

	public static void main(String[] args) throws InterruptedException {
		Runnable runnable = new Runnable() {

			@Override
			public void run() {
				System.out.println("Started executing the process");
				try {
					TimeUnit.SECONDS.sleep(6);
				} catch (InterruptedException e) {
					e.printStackTrace(System.err);
				}
				System.out.println("Finished execution....");
			}
		};

		WorkerThread workerThread = new WorkerThread(runnable);

		workerThread.start();

		workerThread.waitUntilFinished(4000l);
		System.out.println("Is finished : " + workerThread.isFinished());

		workerThread.waitUntilFinished(3000l);
		System.out.println("Is finished : " + workerThread.isFinished());

	}

}

 

Output

Started executing the process
Is finished : false
Finished execution....
Is finished : true

 

 

 

You may like

Interview Questions

Quick guide to assertions in Java

java.util.Date vs java.sql.Date

How to break out of nested loops in Java?

Implementation of Bag data structure in Java

Scanner throws java.util.NoSuchElementException while reading the input

How to get all the enum values in Java?

Thursday, 19 May 2022

Quick guide to race condition in Java with examples

Race condition is one of the most common problem in multi-threaded applications.

 

Race condition occur when two or more threads try to perform some operation on a Shared variable at a time.

 

Let me explain it with an example. Suppose, there are 3 persons A, B, and C.

 

a.   A has bank balance of 500

b.   B has bank balance of 200

c.    C has bank balance of 400

 

Assume that Person A initiated a transfer of 400 to both the accounts B and C at a time and two threads started to complete the requests simultaneously.

a.   Thread1: Transfer 400 from account A to account B

b.   Thread2: Transfer 400 from account A to account C.

 

 


As you see the above diagram, even though Account A has 500 rupees, 800 rupees are deducted and 400 credited to account B and 400 credited to account C.

 

How to address this problem?

By synchronizing the amount transfer operation, we can address this problem. In this example,

a.   Both the threads try to acquire the lock on account A.

b.   Assume thread 1 get the lock on account A and thread 2 waits for the lock.

c.    Thread 1 transfer the amount from account A to account B successfully and release the lock.

d.   Thread 2 gets the lock on Account A and confirms that Account A do not have sufficient funds and terminate the transaction.

 

Flow diagram with synchronization looks like below.

 


Java examples of race condition

There are two common code patterns that leads to race condition.

a.   Check and act pattern

b.   Read-modify-update pattern

 

Check and act pattern

Let me explain this with singleton pattern example.

public static Connection getConnection() {
	if(connection == null) { // Race condition occur when two threads see the conneciton as null at a time
		connection = new Connection();
	}
	
	return connection;
}

If the getConnection() method called by more than one thread at a time, there is a possibility that more than one thread can pass the connection == null check and end up in creating more than one object, which breaks the singleton pattern.

 

Let me add some delay in getConnection() method to show the race condition.

 

DataSource.java

package com.sample.app;

import java.util.concurrent.TimeUnit;

public class DataSource {
	
	public static Connection connection = null;
	
	public static Connection getConnection() {
		if(connection == null) { // Race condition occur when two threads see the conneciton as null at a time
			sleep(2);
			connection = new Connection();
		}
		
		return connection;
	}
	
	public static class Connection{
		
		public Connection() {
			System.out.println("Connection object created : " + this);
		}
	}
	
	public static void sleep(int noOfSeconds) {
		try {
			TimeUnit.SECONDS.sleep(noOfSeconds);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

DataSourceTest.java

package com.sample.app;

public class DataSourceTest {

	public static void main(String[] args) {
		Runnable task1 = () -> {
			DataSource.getConnection();
		};

		Thread thread1 = new Thread(task1);
		Thread thread2 = new Thread(task1);

		thread1.start();
		thread2.start();
	}

}

Output

Connection object created : com.sample.app.DataSource$Connection@5b917b6c
Connection object created : com.sample.app.DataSource$Connection@4199cf4a

From the output, you can confirm that two connection objects created. We can solve this problem by synchronizing the singleton object creation.

public static Connection getConnection() {
	if(connection == null) {
		synchronized(DataSource.class){ // a thread gets lock only when the lock is available or other thread release
			sleep(2);
			if(connection == null) {
				connection = new Connection();
			}
		}
		
	}
	
	return connection;
}

 

You can read my below posts for more details on synchronization.

Synchronization

Synchronized methods

Synchronized blocks

Static synchronized methods


b. Read-modify-update pattern

Let me explain it with a simple incremental operation. For example, to increment a variable ‘count’ value, you need to perform 3 steps.

a.   Read the current value of count.

b.   Update the value by 1

c.    Write the new value to the count memory location.

 

private static int counter = 0;

private static void increaseCounter() {
	counter++;
}

In the above snippet, race condition might occur like below.




Since both the threads started working parallelly, there is a chance of data corruption. In the above scenario, counter value becomes 1 (but ideally it should be 2). Let’s confirm the same by below example.

 

ReadModifyUpdateDemo.java


package com.sample.app;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ReadModifyUpdateDemo {

	private static int counter = 0;

	private static void increaseCounter() {
		counter++;
	}

	public static void main(String[] args) throws InterruptedException {

		Runnable task1 = () -> {
			for (int i = 0; i < 1000; i++) {
				increaseCounter();
			}
		};

		ExecutorService executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>());

		for (int i = 0; i < 5; i++) {
			executor.execute(task1);
		}

		TimeUnit.SECONDS.sleep(5);
		System.out.println("counter : " + counter);

		System.exit(0);

	}

}

Output on run 1

count : 4410

 

Output on run 2

count : 5000

 

Output on run 3

count : 4904

 

Output on run 3

count : 3913

 

Ideally I should get the counter value as 5000, because of race condition, I am getting corrupted data.

 

You can solve this problem by synchronizing the method ‘increaseCounter’ like below.

private static synchronized void increaseCounter() {
	counter++;
}

Above snippet makes sure that, at any point of time only one thread can work with the shared variable counter.

 

ReadModifyUpdateDemo.java

package com.sample.app;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ReadModifyUpdateDemo {

	private static int counter = 0;

	private static synchronized void increaseCounter() {
		counter++;
	}

	public static void main(String[] args) throws InterruptedException {

		Runnable task1 = () -> {
			for (int i = 0; i < 1000; i++) {
				increaseCounter();
			}
		};

		ExecutorService executor = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
				new LinkedBlockingQueue<Runnable>());

		for (int i = 0; i < 5; i++) {
			executor.execute(task1);
		}

		TimeUnit.SECONDS.sleep(5);
		System.out.println("counter : " + counter);

		System.exit(0);

	}

}

Output

counter : 5000

 

 

Solutions to avoid race condition

a.   Avoid shared values whenever possible.

b.   Synchronize the operation, when there is a chance of multiple threads work on the task.


You may like

Interview Questions

How to solve UnsupportedClassVersionError in Java?

How to find the Java major and minor versions from a .class file

Can I run a class file that is compiled in lower environment?

How to solve the Error: Could not find or load main?

float vs double in Java