I have an app with two threads, 1 that writes to a queue and the second one that read async from it. I need to create a third one that generate 20 more. the newly created threads will run till explicitly stopped. those 20 threads should get "live" data in order to analyze it. each of the 20 has a unique ID/name. I need to send the relevant data (that the READ thread collect) to the correct thread (of the 20 threads). e.g. if the data include a string with id (in it) of 2 --> I need to send it to thread with the ID =2. my question is: how should I hold a "pointer" to each of the 20 threads and send it the relevant data? (I can search the id in a runnable list (that will hold the threads)--> but then I need to call to a method "NewData(string)" in order to send the data to the running thread). How should I do it? TIA Paz
-
@OldCurmudgeon has a good answer. One thing you may want to be careful of is the concept of "sending data" to a thread. Have a quick read on the Java Concurrency tutorial, which may help you better understand the concepts: docs.oracle.com/javase/tutorial/essential/concurrencyMeesh– Meesh2013-10-07 15:35:18 +00:00Commented Oct 7, 2013 at 15:35
-
Thanks, this on I already read--> could not find there an answer to my needs, cheersuser2319608– user23196082013-10-07 18:29:45 +00:00Commented Oct 7, 2013 at 18:29
Add a comment
|
1 Answer
You would probably be better to use a Queue to communicate with your threads. You could then put all of the queues in a map for easy access. I would recommend a BlockingQueue.
public class Test {
// Special stop message to tell the worker to stop.
public static final Message Stop = new Message("Stop!");
static class Message {
final String msg;
// A message to a worker.
public Message(String msg) {
this.msg = msg;
}
public String toString() {
return msg;
}
}
class Worker implements Runnable {
private volatile boolean stop = false;
private final BlockingQueue<Message> workQueue;
public Worker(BlockingQueue<Message> workQueue) {
this.workQueue = workQueue;
}
@Override
public void run() {
while (!stop) {
try {
Message msg = workQueue.poll(10, TimeUnit.SECONDS);
// Handle the message ...
System.out.println("Worker " + Thread.currentThread().getName() + " got message " + msg);
// Is it my special stop message.
if (msg == Stop) {
stop = true;
}
} catch (InterruptedException ex) {
// Just stop on interrupt.
stop = true;
}
}
}
}
Map<Integer, BlockingQueue<Message>> queues = new HashMap<>();
public void test() throws InterruptedException {
// Keep track of my threads.
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 20; i++) {
// Make the queue for it.
BlockingQueue<Message> queue = new ArrayBlockingQueue(10);
// Build its thread, handing it the queue to use.
Thread thread = new Thread(new Worker(queue), "Worker-" + i);
threads.add(thread);
// Store the queue in the map.
queues.put(i, queue);
// Start the process.
thread.start();
}
// Test one.
queues.get(5).put(new Message("Hello"));
// Close down.
for (BlockingQueue<Message> q : queues.values()) {
// Stop each queue.
q.put(Stop);
}
// Join all threads to wait for them to finish.
for (Thread t : threads) {
t.join();
}
}
public static void main(String args[]) {
try {
new Test().test();
} catch (Throwable t) {
t.printStackTrace(System.err);
}
}
}
5 Comments
user2319608
many Thanks, I already use BlockQueue for the 2 threads that "write" and "Read" the initial data into this queue. but in your implementation --> each worker (and I am going to have 20 of those) will have it's own Queue --> so I will have 20 queues and I need 40 --> I will have 40 in my main classs --> is this efficient?
user2319608
Also: why do I need all the threads to wait ? (the t.join) --> every thread is a stand alone thread that has to deal with its own messages (which might be empty sometimes and sometimes not)
OldCurmudgeon
@user2319608 - I use the join to allow all threads to finish at the end of my test. You may not need to do that. I don't understand your need for 40 threads. The Queues will take little space, so long as you limit them in size. I have limited each queue to 10 items, you could use less if you wish.
user2319608
thanks, I use the threads to analyze market data. The market data (financials) starts Sunday and stop only Friday --> each item/asset has its own data so I do the analysis per asset in a separate thread and it is live from the start to end of the week. BTW: do u think that I should use executer instead of thread list? also: I think about using FORK/Join to boost the system --> do you think that it is relevant here?
OldCurmudgeon
@user2319608 - Use an executor if there is a queue of actions that can be handed to any free thread - in the case you describe this is not the case, each message must be targeted to a specific thread. Fork/Join pools are for algorithms that split easily into multiple parts - your description suggests that this would also not be appropriate.