0

I am trying to implement a async return for a time consuming task. So task data is inserted in a Q, and return immediately. A bunch of 20 threads take tasks from that data queue. But the data in thread is getting null, I wonder why.

Code segment's (only relevant parts of code)

Definition

private static LinkedList<Object[]> cacheQ = new LinkedList<Object[]>();
private static ArrayList<Thread> cacheThreads = null;

One time initialization

if (cacheThreads == null) {
        cacheThreads = new ArrayList<Thread>();
        for (i = 0; i < 20; i++) {
            CacheThread cacheThread = readWriteOps.new CacheThread();
            Thread thread = new Thread(cacheThread);
            cacheThreads.add(i, thread);
            thread.start();
        }
        System.out.println("CAche Threads Started!");
    }

Adding a new task to the Q at the beginning (addFirst)

public void arrayizeToCache(String key, CacheData value) {
    synchronized (cacheThreads) {
        Object[] tuple = new Object[SIZE] ;
        tuple[KEY] = key ;
        tuple[VALUE] = value ;
----NOT NULL----log.debug("CacheThread arrayizeToCache k"+key+"v"+value) ;
        cacheQ.addFirst((Object[])tuple);
        cacheThreads.notify();
    }
}

The actual work of the thread

public class CacheThread implements Runnable {
    @Override
    public void run() {
        System.out.println("CachedThread running!");
        CacheData cacheStore = null;
        while (true) {
            try {
                String key;
                synchronized (cacheThreads) {
                    if (cacheQ.isEmpty()) {
                        log.debug("Cache Q waiting");
                        cacheThreads.wait();
                    }
                    if (cacheQ.isEmpty()) {
                        log.error("Cache List empty nothing to cache");
                        continue;
                    }
                    Object[] cacheData = (Object[]) cacheQ.removeLast();
                    key = (String) cacheData[KEY] ;
                    cacheStore = (CacheData) cacheData[VALUE];
----- HERE -----
//More code, but irrelevant for this question. 
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // log.debug(((Runnable) this).toString() +
            // "Server : flush SUCCESS");
        }
    }
}

At -----HERE----- I am getting a null for cacheStore.getValue(). What am I missing here. Something basic.

I have used a LinkedList, for the Q, so structurally it should work. The linkedlist holds the data which I need to process. It holds a Object array since I have more than 1 data element.

Edit :

private static final int KEY = 0 ;
private static final int VALUE = 1 ;
private static final int SIZE = 2 ;

Edit Again : This is how I am calling arrayize

ReadWriteOperations.getInstance(this).arrayizeToCache(key,
            new CacheData(subscription, SuperCache.NEVER_EXPIRE));

My CacheData object

public class CacheData {
private Object value ;
private Integer expiry = 0 ;
public CacheData(Object newValue) {
    value = newValue ;
}
public CacheData (Object newValue, Integer newExpiry) {
// THIS LINE WAS MISSING        value = newValue ;
    expiry = newExpiry ;
}
public Object getValue() {
    return value ;
}
public Integer getExpiry () {
    return expiry ;
}
}

Answer : I was initializing value at all. Kinda assumed that it will be a more complex thing. And its 2am here :) Thanks to @Gray.

7
  • 2
    You should consider using an ExecutorService for this. By forking your own threads and then reading from a job queue, you are in effect rewriting them. See: docs.oracle.com/javase/tutorial/essential/concurrency/… Commented Jul 22, 2013 at 19:51
  • If you don't move to an ExecutorService, you should switch your cacheQ queue to be a BlockingQueue which takes care of the synchronized, notify/wait, etc.. Commented Jul 22, 2013 at 19:54
  • I changed linkedlist to ConcurrentLinked and your LinkedBlockingQueue, both did not work. Same result. Something else is the matter here. Commented Jul 22, 2013 at 20:04
  • 3
    Where is cacheStore's value updated? Is it updated after the threads are started? Is it updated inside of a synchronized (cacheThreads) { block? Commented Jul 22, 2013 at 20:07
  • 1
    @Gray good hints, thanks. I fixed it. Will post the root cause shortly. In the meanwhile, please put out a answer, so that I may reciprocate the favor by a upvote and accept. Thanks again. Commented Jul 22, 2013 at 20:19

1 Answer 1

1

Your code looks fine to me. I would be interested to know:

  • Where is cacheStore's value updated?
  • Is it updated after the threads are started? This may be the problem.
  • If so, it needs to be updated inside of a synchronized (cacheThreads) { block for the changes to be visible to the running threads.

Here are some additional comments about the code:

  • You are forking your own threads and then having a synchronized LinkedList for the jobs. I would encourage you to look into the ExecutorService patterns that do both of those for. Definitely recommended for most threading tasks.

  • If you don't move to an ExecutorService, you should switch your cacheQ queue to be a BlockingQueue which takes care of the synchronized, notify()/wait(), etc. for you.

Sign up to request clarification or add additional context in comments.

1 Comment

Here you go, thanks again. I can sleep now, and fix the client issue tomorrow.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.