I am trying to implement a async return for a time consuming task. So task data is inserted in a Q, and return immediately. A bunch of 20 threads take tasks from that data queue. But the data in thread is getting null, I wonder why.
Code segment's (only relevant parts of code)
Definition
private static LinkedList<Object[]> cacheQ = new LinkedList<Object[]>();
private static ArrayList<Thread> cacheThreads = null;
One time initialization
if (cacheThreads == null) {
cacheThreads = new ArrayList<Thread>();
for (i = 0; i < 20; i++) {
CacheThread cacheThread = readWriteOps.new CacheThread();
Thread thread = new Thread(cacheThread);
cacheThreads.add(i, thread);
thread.start();
}
System.out.println("CAche Threads Started!");
}
Adding a new task to the Q at the beginning (addFirst)
public void arrayizeToCache(String key, CacheData value) {
synchronized (cacheThreads) {
Object[] tuple = new Object[SIZE] ;
tuple[KEY] = key ;
tuple[VALUE] = value ;
----NOT NULL----log.debug("CacheThread arrayizeToCache k"+key+"v"+value) ;
cacheQ.addFirst((Object[])tuple);
cacheThreads.notify();
}
}
The actual work of the thread
public class CacheThread implements Runnable {
@Override
public void run() {
System.out.println("CachedThread running!");
CacheData cacheStore = null;
while (true) {
try {
String key;
synchronized (cacheThreads) {
if (cacheQ.isEmpty()) {
log.debug("Cache Q waiting");
cacheThreads.wait();
}
if (cacheQ.isEmpty()) {
log.error("Cache List empty nothing to cache");
continue;
}
Object[] cacheData = (Object[]) cacheQ.removeLast();
key = (String) cacheData[KEY] ;
cacheStore = (CacheData) cacheData[VALUE];
----- HERE -----
//More code, but irrelevant for this question.
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// log.debug(((Runnable) this).toString() +
// "Server : flush SUCCESS");
}
}
}
At -----HERE----- I am getting a null for cacheStore.getValue(). What am I missing here. Something basic.
I have used a LinkedList, for the Q, so structurally it should work. The linkedlist holds the data which I need to process. It holds a Object array since I have more than 1 data element.
Edit :
private static final int KEY = 0 ;
private static final int VALUE = 1 ;
private static final int SIZE = 2 ;
Edit Again : This is how I am calling arrayize
ReadWriteOperations.getInstance(this).arrayizeToCache(key,
new CacheData(subscription, SuperCache.NEVER_EXPIRE));
My CacheData object
public class CacheData {
private Object value ;
private Integer expiry = 0 ;
public CacheData(Object newValue) {
value = newValue ;
}
public CacheData (Object newValue, Integer newExpiry) {
// THIS LINE WAS MISSING value = newValue ;
expiry = newExpiry ;
}
public Object getValue() {
return value ;
}
public Integer getExpiry () {
return expiry ;
}
}
Answer : I was initializing value at all. Kinda assumed that it will be a more complex thing. And its 2am here :) Thanks to @Gray.
ExecutorServicefor this. By forking your own threads and then reading from a job queue, you are in effect rewriting them. See: docs.oracle.com/javase/tutorial/essential/concurrency/…ExecutorService, you should switch yourcacheQqueue to be aBlockingQueuewhich takes care of the synchronized, notify/wait, etc..synchronized (cacheThreads) {block?