1

I'm writing a multi threaded program in Java, and have a single writer thread running alongside. Once the threads have processed a chunk of data, they write to the LinkedBlockingQueue in the writer thread via the synchronized writeToFile method in the writer.

The idea is that once the queue gets to a certain size, the threads are blocked from appending to the queue, and the data is output to the file. I am processing large amounts of data (20-50GB at a time), and this helps to reduce the RAM used. (If theres a better way to do this, i'm open to suggestions!)

The problem i'm having is that despite making the writeToFile method synchronized, and writing to file via the emptyQueues in a synchonrized block, the threads are still appending to the queue, whilst the thread is writing to the file.

@Component("writer")
public class WriterImpl implements Writer {

private boolean isRunning;
private PrintWriter fastQWriter1, fastQWriter2;
private final Queue<FastQRecord> fastQQueue1 = new LinkedBlockingQueue<>();
private final Queue<FastQRecord> fastQQueue2 = new LinkedBlockingQueue<>();
private final int MAX_QUEUE_SIZE = 5000;

@Override
public void setOutputFiles(File fastQ1, File fastQ2) {
    try{
        fastQWriter1 = new PrintWriter(new FileOutputStream(fastQ1));
        fastQWriter2 = new PrintWriter(new FileOutputStream(fastQ2));
    }catch (IOException ioe){
        System.out.println(ioe.getMessage());
    }
}

@Override
public synchronized void writeToFile(FastQRecord one, FastQRecord two) {
    fastQQueue1.add(one);
    fastQQueue2.add(two);
}

@Override
public void close() {
    isRunning = false;

    emptyQueues();

    fastQWriter1.flush();
    fastQWriter1.close();
    fastQWriter2.flush();
    fastQWriter2.close();
}

@Override
public void run() {
    isRunning = true;

    while(isRunning){
        //do stuff
        if(fastQQueue1.size() > MAX_QUEUE_SIZE){ //empty queues - 5000 record pairs at a time

            synchronized (fastQQueue1){
                synchronized (fastQQueue2){
                    emptyQueues();
                }
            }
        }
    }
}

private void emptyQueues() {
    while(fastQQueue1.size() > 0){
        FastQRecord one = fastQQueue1.poll();

        fastQWriter1.println(one.getId());
        fastQWriter1.println(one.getRawSequence());
        fastQWriter1.println(one.getPlus());
        fastQWriter1.println(one.getQualityString());
    }

    while(fastQQueue2.size() > 0){

        FastQRecord two = fastQQueue2.poll();
        fastQWriter2.println(two.getId());
        fastQWriter2.println(two.getRawSequence());
        fastQWriter2.println(two.getPlus());
        fastQWriter2.println(two.getQualityString());

    }
}
}  

The FastQRecord is just a simple POJO that holds the data I need to write to the file:

public class FastQRecord {

private String id;
private String rawSequence;
private char plus;
private String qualityString;

public FastQRecord(String id, String rawSequence, char plus, String qualityString) {
    this.id = id;
    this.rawSequence = rawSequence;
    this.plus = plus;
    this.qualityString = qualityString;
}

public String getId() {
    return id;
}

public void setId(String id) {
    this.id = id;
}

public String getRawSequence() {
    return rawSequence;
}

public void setRawSequence(String rawSequence) {
    this.rawSequence = rawSequence;
}

public char getPlus() {
    return plus;
}

public void setPlus(char plus) {
    this.plus = plus;
}

public String getQualityString() {
    return qualityString;
}

public void setQualityString(String qualityString) {
    this.qualityString = qualityString;
}

@Override
public boolean equals(Object o) {
    if (this == o) return true;
    if (o == null || getClass() != o.getClass()) return false;

    FastQRecord that = (FastQRecord) o;

    return id.equals(that.id);
}

@Override
public int hashCode() {
    return id.hashCode();
}

@Override
public String toString() {
    return "FastQRecord{" +
            "id=" + id + '\n' +
            ", rawSequence=" + rawSequence + '\n' +
            ", plus=" + plus + '\n' +
            ", qualityString=" + qualityString + '\n' +
            '}';
}
}
3
  • Why wait for the queue to fill up before you start writing to the file? Commented May 29, 2018 at 7:56
  • Because the writing to the file blocks the threads from appending to the queue Commented May 29, 2018 at 8:02
  • No it doesn't. The appending is only blocked if the queue is full. Ah, you just seem to be using BlockingQueue in a completely wrong way. Commented May 29, 2018 at 8:04

1 Answer 1

1

You can take advantage of the BlockingQueue interface (i.e. blocking a thread if there is no space in the queue) by using the put() method instead of the add() one, which is inherited from Collection.

But in order to make a thread wait on the put() operation, your queue has to know its max size, declaring it as a LinkedBlockingQueue<>(MAX_QUEUE_SIZE). If you don't specify the max capacity of the queue, it will be assumed it's Integer.MAX_VALUE

I also suggest you to synchronize your access on the queue before checking its size (or if it's full) and your run() method would look something like this:

@Override
public void run() {
    isRunning = true;

    while(isRunning){
        //do stuff
        synchronized(fastQQueue1){
            if(fastQQueue1.remainingCapacity() == 0){ //empty queues - 5000 record pairs at a time

                synchronized (fastQQueue1){
                    synchronized (fastQQueue2){
                        emptyQueues();
                    }
                }
            }
        }
    }
}

A similar change would be applied to your emptyQueues() method.

Sign up to request clarification or add additional context in comments.

3 Comments

Hi Emanuele, thanks for the help. I've implemented your changes, and all seems to work well.
I'm glad it helped, but take into account Kayaman's words : waiting for a queue to fill up before writing to file makes little sense; the writing thread blocking occurs only when the queue is empty, in case you're using poll(). Take a look at the producer/consumer pattern, it might help you have a faster program
I have made the changes, and the run method now starts writing as long as the remaining capacity does not equal the maximum queue size. i.e as long as there is data in the queue.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.