0

What is the best practice to measure amount in time? I have a multithreading application. Thread can be any number. I want to perform N times operation per second. I tried several techniques but still have no 100% success. Here is a snippet and you might see the problem more clearly.

To be more clear I want to send max 100 messages within one second(1000 millis). E.g If those threads are able to do it within 450 millis then I want to force all threads to wait 550 millis and then do the same operation again and again. I call this speedLimitMetter.getWaitTime() from threads. If it gives X > 0 then I force thread to wait X millis.

Any hint will be helpful

public class SpeedLimitMeter {
    private int speedLimit;

    private volatile int messageCounter = 0;

    private volatile long firstTime = 0;
    private volatile long lastTime = 0;

    private volatile long waitTime = 0;
    private volatile long waitUntil = 0;

    public SpeedLimitMeter(int speedLimit) {
        this.speedLimit = speedLimit;
    }

    public synchronized long getWaitTime() {
        long currTime = System.currentTimeMillis();

        if (messageCounter == speedLimit) {
            if (waitTime == 0) {
                long elapedTime = currTime - firstTime;
                if (elapedTime < 1000) {
                    waitTime = 1000 - elapedTime;
                    waitUntil = currTime + waitTime;

                    return waitTime;
                }

                reset();
            } else if (currTime < waitUntil) {
                return waitTime;
            } else {
                reset();
            }

        }

        if (messageCounter == 0) firstTime = currTime;
        lastTime = currTime;

        messageCounter++;

        return 0;
    }

    private synchronized void reset() {
        firstTime = 0;
        lastTime = 0;
        waitTime = 0;
        waitUntil = 0;
        messageCounter = 0;
    }

}
3
  • What is the purpose of wait time? Commented Mar 8, 2021 at 12:04
  • 1
    I don't see any reference to "N", the number of operations per second that you want to achieve. I don't see anything in your code that is trying to limit operations either. You state that you want to limit to N operations per second, but you "don't have 100% success". What does this code do instead? Please describe your problem more clearly and make sure that the code that you post reproduces your stated problem. Commented Mar 8, 2021 at 12:12
  • Thanks for your response. I copied this code from my project and that's why names are different. speedLimit is the max amount of operations that I want to perform in 1 second(1000 millis). e.g If I performed 100 operation within 400 miles I want to force those threads to wait 600 millis and then do it again Commented Mar 8, 2021 at 13:27

2 Answers 2

2

I recomment taking a look at the functionalities provided by ScheduledThreadPoolExecutor (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledThreadPoolExecutor.html) as whatever it exactly is you are trying to do, it might be solvable that way, in a proper way.

ScheduledThreadPoolExecutor allows you to schedule the periodic execution of a job. You could, e.g. use this to release a semaphore, if (and only if) that semaphore is depleted. This way, the threads you are trying to regulate could draw a lease, instead of requesting a wait time. It is much more clean this way (imho).

import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class ThreadLeaser {
    
    private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
    private final Semaphore semaphore;
    private final long nanosecondsPause;
    
    public ThreadLeaser(float leasesPerSecond, boolean fair) {
        this.nanosecondsPause = Math.round(1_000_000_000f / leasesPerSecond);
        this.semaphore = new Semaphore(0, fair);
        
        Runnable semRelease = () -> {
            if (this.semaphore.availablePermits() == 0) {
                this.semaphore.release();
            }
        };
        executor.scheduleAtFixedRate(semRelease, 0, nanosecondsPause, TimeUnit.NANOSECONDS);
    }
    
    public void drawNextAvailableLease() throws InterruptedException {
        semaphore.acquire();
    }
}

Be aware, that this solution is not perfectly precise, because if a thread has just acquired a lease and then the release happens immediately after, then the next thread might acquire right away. This does not guarantee a 'time distance' but rather guarantees a somewhat constant frequency, if the threads try to acquire regularly enough.

Also, the thing needs some fleshing out (a proper way to terminate and so on), but I leave this to you.

This testcase shows the behavior (and roughly the precision), for a high supply of waiting threads, started in order (with a tiny waiting period in between).

import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.List;

import org.junit.Test;

import thread.ThreadLeaser;

public class ThreadLeaseTester {
    @Test
    public void test_tpl() {
        final ThreadLeaser leaser = new ThreadLeaser(5f, true);
        
        List<Thread> toJoin = new ArrayList<>();
        
        long start = System.nanoTime();
        
        for (int i = 0; i < 50; ++i) {
            final int d = i;
            
            try {
                Thread.sleep(5);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            
            Thread thread = new Thread() {
                public void run() {
                    try {
                        leaser.drawNextAvailableLease();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    System.out.println("Thread: " + d + " Time: " + ((System.nanoTime() - start) / 1_000_000) + "ms");
                }
            };
            thread.start();
            toJoin.add(thread);
        }
        
        toJoin.forEach(t -> {
            try {
                t.join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        assertTrue(toJoin.size() == 100);
    }
}

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for your response. I will try
0

If you want to perform N operations within a given period of time, without knowing, how much time an operation needs to terminate, you have to start each operation in its own thread. When more than N operations may be active any given time, you do not need to limit the number of threads (although this might kill your system, if the execution time is long compared with the given period of time).

The shortest time you can handle is more or less a millisecond for a single operation, but only theoretically, because of thread creation and ramp up times. Nevertheless, the pattern remains the same …

final var maxOperationsPerPeriod = …; // your N
final var lengthOfPeriod = 1000L; // the period of time in ms
final var waitTime = lengthOfPeriod / maxOperationsPerPeriod;

Thread thread;
while( (thread = getNextOperation()) != null )
{
    thread.start();
    thread.sleep( waitTime );
}

I omitted the mandatory exception handling because of personal laziness … apart from that, the code should do the job, when getNextOperation() returns the prepared thread for the next operation, or null when there is no more work.

And each thread must terminate automatically after the operation is done.

Possible optimisations are to use a thread pool and/or more sophisticated concurrent APIs (like Futures as the return value of getNextOperation() instead of bare Threads).

And "100%" can be achieved only, if N divides 1000 without a remainder … but as the timing of the JVM is always "unreliable" when you go down to such a high frequency, even that will not work.

Going to 100 operations within an hour would work with that 100% exactness ...

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.