3

Creating a multi-threaded application which creates multiple threads and queries same table of the database

Given the input xml file with format:

<transliteration>
<element>
    <source>about us</source>
</element>
</transliteration>

Application reads multiple files and creates multiple threads one for each xml file and output will be another xml file with format

<transliteration>
<element>
    <source>about us</source>
        <target/>
</element>
</transliteration>

below is the run method of the thread

public void run() {

        MultipleDatabaseThread th = new MultipleDatabaseThread();
        Map<String,String> map = new HashMap<String,String>();

        try
        {
            Document doc = loadXmlContentToMemory(this.inputString);

            XPathExpression expr = null;
            XPathFactory xFactory = XPathFactory.newInstance();
            XPath xPath = xFactory.newXPath();
            expr = xPath.compile("/transliteration/element//source");
            Object result = expr.evaluate(doc, XPathConstants.NODESET);
            NodeList nodes = (NodeList) result;
            String sourceString = "";
            if(nodes.getLength() > 0)
            {
                for (int i=0; i<nodes.getLength();i++)
                {
                    //System.out.println("Name: "+nodes.item(i).getNodeName() +" Local Name: "+nodes.item(i).getLocalName() +" Value: "+nodes.item(i).getTextContent());
                    sourceString = nodes.item(i).getTextContent();
                    map = th.getCompleteStringTransliterate(sourceString, this.language);

                    if(map.get(sourceString) == null || map.get(sourceString).equals("") || map.get(sourceString).equals(sourceString))
                    {
                        map.clear();
                        map = th.getRecordsFromDatabase(sourceString, language);

                        Element abc = doc.createElement("target");

                        String targetString = "";

                        String[] tokens = sourceString.trim().split("\\s+");

                        for(int itr=0; itr < tokens.length; itr++)
                        {
                            targetString = targetString+" "+map.get(tokens[itr]);
                        }

                        abc.setTextContent(targetString.trim());
                        nodes.item(i).getParentNode().appendChild(abc);
                    }
                    else
                    {
                       Element abc = doc.createElement("target");
                       abc.setTextContent(map.get(sourceString));
                       nodes.item(i).getParentNode().appendChild(abc);
                    }
                }
            }

            try
            {

                expr = xPath.compile("/transliteration/element//target");
                result = expr.evaluate(doc, XPathConstants.NODESET);
            }catch(XPathExpressionException ex)
            {   }

            NodeList nodesList = (NodeList) result;

            for(int i =0;i<nodesList.getLength();i++)
            {
                System.out.println("Node Name: "+nodesList.item(i).getNodeName()+" Node Value: "+nodesList.item(i).getTextContent());
            }

            try
            {
                Transformer transformer = TransformerFactory.newInstance().newTransformer();
                StreamResult strResult = new StreamResult(new File(this.inputString+"_out.xml"));
                if(doc != null && strResult != null)
                {
                    DOMSource source = new  DOMSource(doc);
                    transformer.transform(source, strResult);
                }
            }
            catch(TransformerException ex)
            {
               System.out.println(""+ex);
            }
            catch(TransformerFactoryConfigurationError ex)
            {
               System.out.println(""+ex);
            }

        }catch(IOException ex)
        {
            ex.printStackTrace(System.out);
        }
        catch(DOMException ex)
        {
            ex.printStackTrace(System.out);
        } 
        catch(ParserConfigurationException ex)
        {
            ex.printStackTrace(System.out);
        }
        catch(SAXException ex)
        {
            ex.printStackTrace(System.out);
        }
        catch(XPathExpressionException ex)
        {
            ex.printStackTrace(System.out);
        }
        catch(InterruptedException ex)
        {
            ex.printStackTrace(System.out);
        }

    }
  • loadXmlContentToMemory** function takes filename as a input and loads xml content in the Document.

  • getCompleteStringTransliterate** is a function of MulltipleDatabaseThread class which returns a map variable which contain source and its traget string in it.

  • getRecordsFromDatabase** is anoher function in same class which splits source string and gets its target string again returns map variable

    public class MultipleDatabaseThread {

    public Map<String,String> getCompleteStringTranslate(String inputString, String language) throws InterruptedException
    {
        Map<String,String> map = new HashMap<String,String>();
    
        synchronized(OutputRecords.getMap())
        {
           //long startTime = System.currentTimeMillis();
    
           OutputRecords.clearOutputStream(); 
           Thread thCompleteString = new DatabaseThread(inputString, language);
           thCompleteString.start();
           thCompleteString.join();
    
           map = OutputRecords.getRecords();
           //System.out.println("Complete String Time Taken:: "+(System.currentTimeMillis()-startTime));
           return map;
        }
    }
    
    
    
    public Map<String,String> getRecordsFromDatabase(String inputString, String language) throws  InterruptedException
    {
        String[] tokens = inputString.split("\\s+");
    
        Map<String,String> map = new HashMap<String,String>();
    
        Thread[] databaseThreads = new Thread[tokens.length];
    
        synchronized(OutputRecords.getMap())
        {
            //long startTime = System.currentTimeMillis();
    
            OutputRecords.clearOutputStream();
            for(int index=0; index < tokens.length; index++)
            {
                databaseThreads[index] = new DatabaseThread(tokens[index],language);
                databaseThreads[index].start();
            }
            for(int index = 0 ; index < tokens.length; index++)
            {
                    databaseThreads[index].join();
            }
    
            map = OutputRecords.getRecords();
            //System.out.println("Tokens Time Taken:: "+(System.currentTimeMillis()-startTime));
    
            return map;
    
        }
    }
    

    }

both these functions uses a static/shared map variable in OutputRecord class and generates multiple threads which actually gives call to database and populates shared map variable and return that variable

but on execution of this program its giving

Exception in thread "Thread-0" java.lang.NullPointerException
    at transliterationthreading.ExecuteOuterThread.run(ExecuteOuterThread.java:66)

on line

if(map.get(sourceString) == null || map.get(sourceString).equals("") || map.get(sourceString).equals(sourceString))

So one thread gets terminated and then other gets executed completely and generates output file. I am not getting the problem can anyone gve some suggestion in solving this problem.

Thanks

6
  • 1
    I suspect map to be null when that thread is running. map = OutputRecords.getRecords(); cuasing it to be NULL? Commented Jun 8, 2013 at 10:10
  • OutputRecords.getRecords(); return static map variable which gets initialized when class loads,it may be empty but i dont know when i could be null Commented Jun 8, 2013 at 10:20
  • Have you tried to declare map variable as volatile? Commented Jun 8, 2013 at 10:35
  • If it's a shared static map, then you shouldn't read and write to it concurrently from multiple threads. Commented Jun 8, 2013 at 10:38
  • @ JB Nizet then how can i return values from the run method of thread Commented Jun 8, 2013 at 10:41

2 Answers 2

0

I woud suspect that the map content changes during evaluation of the line

if(map.get(sourceString) == null || map.get(sourceString).equals("") || map.get(sourceString).equals(sourceString))

This way your null checks succeed but the new value you get from the map could be null. Map is not synchronized!

Change this line to

String sourceStringValue = map.get(sourceString);
if(sourceStringValue == null || sourceStringValue.equals("") || map.get(sourceString).equals(sourceString))
Sign up to request clarification or add additional context in comments.

1 Comment

I tried that too but that didn't work for me. However I solved the problem by removing the shared map approach and using ExecutorService and Callable Interface and by returning the value of each thread using call() method. Thanks all for your efforts
0

Thanks all for your Efforts

I tried solving this problem using a different approach by not using the static shared map and using ExecutorService and Callable Interface approach

here is my code

try
        {
            doc = loadXmlContentToMemory(this.inputString);
            expr = xPath.compile("/transliteration/element//source");
            result = expr.evaluate(doc, XPathConstants.NODESET);

        }catch(ParserConfigurationException ex)
        {
            System.out.println("loadXmlError: "+ex.toString());
        }
        catch(IOException ex)
        {
            System.out.println("loadXmlError: "+ex.toString());
        }
        catch(SAXException ex)
        {
            System.out.println("loadXmlError: "+ex.toString());
        }
        catch(XPathExpressionException ex)
        {
            System.out.println("loadXmlError: "+ex.toString());
        }

        NodeList nodes = (NodeList) result;
        String sourceString = "";

        if(nodes.getLength() >0)
        {
            Map<String,String> fileMap = new HashMap<String,String>(); 
            ExecutorService executor = Executors.newFixedThreadPool(NTHREADS);

            for(int index =0; index <nodes.getLength(); index++)
            {
                sourceString = nodes.item(index).getTextContent();
                Callable<Map<String,String>> worker = new MultipleDatabaseCallable(sourceString,language);
                Future<Map<String,String>> map = executor.submit(worker);

                try
                {
                    //System.out.println(""+Thread.currentThread().getName()+"SourceString:: "+sourceString+"Map: "+map.get().get(sourceString));
                      fileMap.putAll(map.get());
                }catch(InterruptedException ex)
                {
                    System.out.println("future read: "+ex.toString());
                }
                catch(ExecutionException ex)
                {
                    System.out.println("future read: "+ex.toString());
                }
            }

            executor.shutdown();
            // Wait until all threads are finish
            while (!executor.isTerminated()) {

            }
            ExecutorService tokenExecutor = Executors.newFixedThreadPool(NTHREADS);
            for(int i =0 ;i<nodes.getLength();i++)
            {
                sourceString = nodes.item(i).getTextContent();
                if(fileMap.get(sourceString) == null || fileMap.get(sourceString).equals("") || fileMap.get(sourceString).equals(sourceString))
                {
                    fileMap.remove(sourceString);
                    Callable<Map<String,String>> worker = new MultipleTokenCallable(sourceString,language);
                    Future<Map<String,String>> map = tokenExecutor.submit(worker);

                    try
                    {
                        fileMap.putAll(map.get());
                    }
                    catch(InterruptedException ex)
                    {
                        System.out.println("Tokenized put Interupted exception: "+ex.toString());
                    }
                    catch(ExecutionException ex)
                    {
                        System.out.println("Tokenized put Execution exception: "+ex.toString());
                        ex.printStackTrace(System.out);
                    }

                    Element targetElement = doc.createElement("target");
                    String targetString = "";

                    String[] tokens = sourceString.trim().split("\\s+");

                    for(int itr=0; itr < tokens.length; itr++)
                    {
                        targetString = targetString+" "+fileMap.get(tokens[itr]);
                    }
                    targetElement.setTextContent(targetString.trim());
                    nodes.item(i).getParentNode().appendChild(targetElement);
                    //System.out.println(""+Thread.currentThread().getName()+" Target:  "+targetString+" Source:  "+sourceString);
                }
                else
                {
                    Element abc = doc.createElement("target");
                    abc.setTextContent(fileMap.get(sourceString));
                    nodes.item(i).getParentNode().appendChild(abc);
                }
            }

            tokenExecutor.shutdown();
            // Wait until all threads are finish
            while (!tokenExecutor.isTerminated()) {

            }
            try
            {
                Transformer transformer = TransformerFactory.newInstance().newTransformer();
                StreamResult strResult = new StreamResult(new File(this.inputString+"_out.xml"));
                if(doc != null && strResult != null)
                {
                    DOMSource source = new  DOMSource(doc);
                    transformer.transform(source, strResult);
                }
            }
            catch(TransformerException ex)
            {
               System.out.println(""+ex);
            }
            catch(TransformerFactoryConfigurationError ex)
            {
               System.out.println(""+ex);
            }

        }

since multiple threads gets generated using this and all threads try to connect to the database at same time, this might give too many connections error, if number of concurrent threads increases. So you need to maintain a Connection Pool to overcome that problem

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.