1

This is actually a design question / problem. And I am not sure if writing and reading the file is an ideal solution here. Nonetheless, I will outline what I am trying to do below: I have the following static method that once the reqStreamingData method of obj is called, it starts retrieving data from client server constantly at a rate of 150 milliseconds.

    public static void streamingDataOperations(ClientSocket cs) throws InterruptedException, IOException{
        // call - retrieve streaming data constantly from client server, 
        // and write a line in the csv file at a rate of 150 milliseconds
        // using bufferedWriter and printWriter (print method).
        // Note that the flush method of bufferedWriter is never called,
        // I would assume the data is in fact being written in buffered memory
        // not the actual file. 
       cs.reqStreamingData(output_file); // <- this method comes from client's API.

       // I would like to another thread (aka data processing thread) which repeats itself every 15 minutes.
       // I am aware I can do that by creating a class that extends TimeTask and fix a schedule
       // Now when this thread runs, there are things I want to do. 
       // 1. flush last 15 minutes of data to the output_file (Note no synchronized statement method or statements are used here, hence no object is being locked.)
       // 2. process the data in R
       // 3. wait for the output in R to come back 
       // 4. clear file contents, so that it always store data that only occurs in the last 15 minutes
    }

Now, I am not well versed in multithreading. My concern is that

  1. The request data thread and the data processing thread are reading and writing to the file simultaneously but at a different rate, I am not sure if the data processing thread would delay the request data thread by a significant amount, since the data processing have more computational heavy task to carry out than the request data thread. But given that they are 2 separate threads, would any error or exception occur here ?
  2. I am not too supportive of the idea of writing and reading the same file at the same time but because I have to use R to process and store the data in R's dataframe in real time, I really cannot think of other ways to approach this. Are there any better alternatives ?
  3. Is there a better design to tackle this problem ?

I understand that this is a lengthy problem. Please let me know if you need more information.

8
  • Any reason you can't read from one file and write to another? Commented Jan 6, 2019 at 17:38
  • It doesn't have to be one file but wouldn't 2 files complicate the process because I then have to copy and paste the data from one file to another? Let me just clarify a bit: the code makes a call to client server which returns a line of data every 150 ms . The data is then write to the csv file. This process repeats for 15 minutes, and then the data processing will flush the data and read from the csv file. Commented Jan 6, 2019 at 17:59
  • 1
    Sorry if I am asking questions that you already resolved but why would you not use something that has threading already resolved and has good support in Java (SQLLite, H2, ...) Commented Jan 6, 2019 at 18:16
  • 1
    I think approach with two files is proper. You write lines to a temporary file. When processing picks up, you instantly replace the output stream to write to a new file, and that's the only synchronization you'd need to care about. Meanwhile processing flushes the old stream, renames the file (no copy), and spawns the R-application to consume it. Commented Jan 6, 2019 at 23:36
  • 1
    @yegodm Do you mind writing a sample code so I would have a basic understanding of how the synchronisation is done? Thanks . Commented Jan 7, 2019 at 0:26

1 Answer 1

2

The lines (CSV, or any other text) can be written to a temporary file. When processing is ready to pick up, the only synchronization needed occurs when the temporary file is getting replaced by the new one. This guarantees that the producer never writes to the file that is being processed by the consumer at the same time.

Once that is done, producer continues to add lines to the newer file. The consumer flushes and closes the old file, and then moves it to the file as expected by your R-application.

To further clarify the approach, here is a sample implementation:

public static void main(String[] args) throws IOException {
    // in this sample these dirs are supposed to exist
    final String workingDirectory = "./data/tmp";
    final String outputDirectory = "./data/csv";

    final String outputFilename = "r.out";
    final int addIntervalSeconds = 1;
    final int drainIntervalSeconds = 5;

    final FileBasedTextBatch batch = new FileBasedTextBatch(Paths.get(workingDirectory));
    final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);

    final ScheduledFuture<?> producer = executor.scheduleAtFixedRate(
        () -> batch.add(
            // adding formatted date/time to imitate another CSV line
            LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME)
        ),
        0, addIntervalSeconds, TimeUnit.SECONDS);

    final ScheduledFuture<?> consumer = executor.scheduleAtFixedRate(
        () -> batch.drainTo(Paths.get(outputDirectory, outputFilename)),
        0, drainIntervalSeconds, TimeUnit.SECONDS);

    try {
        // awaiting some limited time for demonstration 
        producer.get(30, TimeUnit.SECONDS);
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    catch (ExecutionException e) {
        System.err.println("Producer failed: " + e);
    }
    catch (TimeoutException e) {
        System.out.println("Finishing producer/consumer...");
        producer.cancel(true);
        consumer.cancel(true);
    }
    executor.shutdown();
}

static class FileBasedTextBatch {
    private final Object lock = new Object();
    private final Path workingDir;
    private Output output;

    public FileBasedTextBatch(Path workingDir) throws IOException {
        this.workingDir = workingDir;
        output = new Output(this.workingDir);
    }

    /**
     * Adds another line of text to the batch.
     */
    public void add(String textLine) {
        synchronized (lock) {
            output.writer.println(textLine);
        }
    }

    /**
     * Moves currently collected batch to the file at the specified path.
     * The file will be overwritten if exists.
     */
    public void drainTo(Path targetPath) {
        try {
            final long startNanos = System.nanoTime();
            final Output output = getAndSwapOutput();
            final long elapsedMillis =
                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
            System.out.printf("Replaced the output in %d millis%n", elapsedMillis);
            output.close();
            Files.move(
                output.file,
                targetPath,
                StandardCopyOption.ATOMIC_MOVE,
                StandardCopyOption.REPLACE_EXISTING
            );
        }
        catch (IOException e) {
            System.err.println("Failed to drain: " + e);
            throw new IllegalStateException(e);
        }
    }

    /**
     * Replaces the current output with the new one, returning the old one.
     * The method is supposed to execute very quickly to avoid delaying the producer thread.
     */
    private Output getAndSwapOutput() throws IOException {
        synchronized (lock) {
            final Output prev = this.output;
            this.output = new Output(this.workingDir);
            return prev;
        }
    }
}

static class Output {
    final Path file;
    final PrintWriter writer;

    Output(Path workingDir) throws IOException {
        // performs very well on local filesystems when working directory is empty;
        // if too slow, maybe replaced with UUID based name generation
        this.file = Files.createTempFile(workingDir, "csv", ".tmp");
        this.writer = new PrintWriter(Files.newBufferedWriter(this.file));
    }

    void close() {
        if (this.writer != null)
            this.writer.flush();
            this.writer.close();
    }
}
Sign up to request clarification or add additional context in comments.

9 Comments

Thank you very much for your answer. I will study it in depth. Thanks once again.
I am wondering if I remove static for class FileBasedTextBatch and class Output, what would the implication be ?
They are static as I placed those within another class. Without static an instance of the outer class would be required to create an instance of any of these two. If you move each to a separate .java file in the same package and remove static it would work just fine.
I don't fully understand what is the use of outputFilename = "r.out"; ? Seems like nothing will ever be written to it.
Files.move(...,StandardCopyOption.REPLACE_EXISTING) creates the file when target file does not exist, otherwise overwrites it. batch.drainTo(Paths.get(outputDirectory, outputFilename)) is there to give the output file a well-defined name at a well-defined location.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.