Java Process with concurrent Input/Output Streams
Asked Answered
N

2

7

I am trying to create a sort of console/terminal that allows the user to input a string, which then gets made into a process and the results are printed out. Just like a normal console. But I am having trouble managing the input/output streams. I have looked into this thread, but that solution sadly doesn't apply to my problem.

Along with the standard commands like "ipconfig" and "cmd.exe", I need to be able to run a script and use the same inputstream to pass some arguments, if the script is asking for input.

For example, after running a script "python pyScript.py", I should be able pass further input to the script if it is asking for it(example: raw_input), while also printing the output from the script. The basic behavior you would expect from a terminal.

What I've got so far:

import java.awt.BorderLayout;
import java.awt.Color;
import java.awt.Dimension;
import java.awt.event.KeyEvent;
import java.awt.event.KeyListener;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;

import javax.swing.JFrame;
import javax.swing.JPanel;
import javax.swing.JScrollPane;
import javax.swing.JTextPane;
import javax.swing.text.BadLocationException;
import javax.swing.text.Document;

public class Console extends JFrame{

    JTextPane inPane, outPane;
    InputStream inStream, inErrStream;
    OutputStream outStream;

    public Console(){
        super("Console");
        setPreferredSize(new Dimension(500, 600));
        setLocationByPlatform(true);
        setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);

        // GUI
        outPane = new JTextPane();
        outPane.setEditable(false);
        outPane.setBackground(new Color(20, 20, 20));
        outPane.setForeground(Color.white);
        inPane = new JTextPane();
        inPane.setBackground(new Color(40, 40, 40));
        inPane.setForeground(Color.white);
        inPane.setCaretColor(Color.white);

        JPanel panel = new JPanel(new BorderLayout());
        panel.add(outPane, BorderLayout.CENTER);
        panel.add(inPane, BorderLayout.SOUTH);

        JScrollPane scrollPanel = new JScrollPane(panel);

        getContentPane().add(scrollPanel);

        // LISTENER
        inPane.addKeyListener(new KeyListener(){
            @Override
            public void keyPressed(KeyEvent e){
              if(e.getKeyCode() == KeyEvent.VK_ENTER){
                    e.consume();
                    read(inPane.getText());
                }
            }
            @Override
            public void keyTyped(KeyEvent e) {}

            @Override
            public void keyReleased(KeyEvent e) {}
        });


        pack();
        setVisible(true);
    }

    private void read(String command){
        println(command);

        // Write to Process
        if (outStream != null) {
            System.out.println("Outstream again");
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outStream));
            try {
                writer.write(command);
                //writer.flush();
                //writer.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }

        // Execute Command
        try {
            exec(command);
        } catch (IOException e) {}

        inPane.setText("");
    }

    private void exec(String command) throws IOException{
        Process pro = Runtime.getRuntime().exec(command, null);

        inStream = pro.getInputStream();
        inErrStream = pro.getErrorStream();
        outStream = pro.getOutputStream();

        Thread t1 = new Thread(new Runnable() {
            public void run() {
                try {
                    String line = null;
                    while(true){
                        BufferedReader in = new BufferedReader(new InputStreamReader(inStream));
                        while ((line = in.readLine()) != null) {
                            println(line);
                        }
                        BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream));
                        while ((line = inErr.readLine()) != null) {
                            println(line);
                        }
                        Thread.sleep(1000);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        t1.start();
    }

    public void println(String line) {
        Document doc = outPane.getDocument();
        try {
            doc.insertString(doc.getLength(), line + "\n", null);
        } catch (BadLocationException e) {}
    }

    public static void main(String[] args){
        new Console();
    }
}

I don't use the mentioned ProcessBuilder, since I do like to differentiate between error and normal stream.

UPDATE 29.08.2016

With the help of @ArcticLord we have achieved what was asked in the original question. Now it is just a matter of ironing out any strange behavior like the non terminating process. The Console has a "stop" button that simply calls pro.destroy(). But for some reason this does not work for infinitely running processes, that are spamming outputs.

Console: http://pastebin.com/vyxfPEXC

InputStreamLineBuffer: http://pastebin.com/TzFamwZ1

Example code that does not stop:

public class Infinity{
    public static void main(String[] args){ 
        while(true){
            System.out.println(".");
        }
    }
}

Example code that does stop:

import java.util.concurrent.TimeUnit;

public class InfinitySlow{
    public static void main(String[] args){ 
        while(true){
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(".");
        }
    }
}
Nerval answered 29/10, 2015 at 15:34 Comment(0)
M
12

You are on the right way with your code. There are only some minor things you missed.
Lets start with your read method:

private void read(String command){
    [...]
    // Write to Process
    if (outStream != null) {
        [...]
        try {
            writer.write(command + "\n");  // add newline so your input will get proceed
            writer.flush();  // flush your input to your process
        } catch (IOException e1) {
            e1.printStackTrace();
        }
    }
    // ELSE!! - if no outputstream is available
    // Execute Command
    else {
        try {
            exec(command);
        } catch (IOException e) {
            // Handle the exception here. Mostly this means
            // that the command could not get executed
            // because command was not found.
            println("Command not found: " + command);
        }
    }
    inPane.setText("");
}

Now lets fix your exec method. You should use separate threads for reading normal process output and error output. Additionally I introduce a third thread that waits for the process to end and closes the outputStream so next user input is not meant for process but is a new command.

private void exec(String command) throws IOException{
    Process pro = Runtime.getRuntime().exec(command, null);

    inStream = pro.getInputStream();
    inErrStream = pro.getErrorStream();
    outStream = pro.getOutputStream();

    // Thread that reads process output
    Thread outStreamReader = new Thread(new Runnable() {
        public void run() {
            try {
                String line = null;
                BufferedReader in = new BufferedReader(new InputStreamReader(inStream));                        
                while ((line = in.readLine()) != null) {
                    println(line);                       
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Exit reading process output");
        }
    });
    outStreamReader.start();

    // Thread that reads process error output
    Thread errStreamReader = new Thread(new Runnable() {
        public void run() {
            try {
                String line = null;           
                BufferedReader inErr = new BufferedReader(new InputStreamReader(inErrStream));
                while ((line = inErr.readLine()) != null) {
                    println(line);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Exit reading error stream");
        }
    });
    errStreamReader.start();

    // Thread that waits for process to end
    Thread exitWaiter = new Thread(new Runnable() {
        public void run() {
            try {
                int retValue = pro.waitFor();
                println("Command exit with return value " + retValue);
                // close outStream
                outStream.close();
                outStream = null;
            } catch (InterruptedException e) {
                e.printStackTrace(); 
            } catch (IOException e) {
                e.printStackTrace();
            } 
        }
    });
    exitWaiter.start();
}

Now this should work.
If you enter ipconfig it prints the command output, closes the output stream and is ready for a new command.
If you enter cmd it prints the output and let you enter more cmd commands like dir or cd and so on until you enter exit. Then it closes the output stream and is ready for a new command.

You may run into problems with executing python scripts because there are problems with reading Process InputStreams with Java if they are not flushed into system pipeline.
See this example python script

print "Input something!"
str = raw_input()
print "Received input is : ", str

You could run this with your Java programm and also enter the input but you will not see the script output until the script is finished.
The only fix I could find is to manually flush the output in the script.

import sys
print "Input something!"
sys.stdout.flush()
str = raw_input()
print "Received input is : ", str
sys.stdout.flush()

Running this script will bahave as you expect.
You can read more about this problem at

EDIT: I have just found another very easy solution for the stdout.flush() problem with Python Scripts. Start them with python -u script.py and you don't need to flush manually. This should solve your problem.

EDIT2: We discussed in the comments that with this solution output and error Stream will be mixed up since they run in different threads. The problem here is that we cannot distinguish if output writing is finish when error stream thread comes up. Otherwise classic thread scheduling with locks could handle this situation. But we have a continuous stream until process is finished no matter if data flows or not. So we need a mechanism here that logs how much time has elapsed since last line was read from each stream.

For this I will introduce a class that gets an InputStream and starts a Thread for reading the incoming data. This Thread stores each line in a Queue and stops when end of stream arrives. Additionally it holds the time when last line was read and added to Queue.

public class InputStreamLineBuffer{
    private InputStream inputStream;
    private ConcurrentLinkedQueue<String> lines;
    private long lastTimeModified;
    private Thread inputCatcher;
    private boolean isAlive;

    public InputStreamLineBuffer(InputStream is){
        inputStream = is;
        lines = new ConcurrentLinkedQueue<String>();
        lastTimeModified = System.currentTimeMillis();
        isAlive = false;
        inputCatcher = new Thread(new Runnable(){
            @Override
            public void run() {
                StringBuilder sb = new StringBuilder(100);
                int b;
                try{
                    while ((b = inputStream.read()) != -1){  
                        // read one char
                        if((char)b == '\n'){
                            // new Line -> add to queue
                            lines.offer(sb.toString());
                            sb.setLength(0); // reset StringBuilder
                            lastTimeModified = System.currentTimeMillis();
                        }
                        else sb.append((char)b); // append char to stringbuilder
                    }
                } catch (IOException e){
                    e.printStackTrace();
                } finally {
                    isAlive = false;
                }
            }});
    }
    // is the input reader thread alive
    public boolean isAlive(){
        return isAlive;
    }
    // start the input reader thread
    public void start(){
        isAlive = true;
        inputCatcher.start();
    }
    // has Queue some lines
    public boolean hasNext(){
        return lines.size() > 0;
    }
    // get next line from Queue
    public String getNext(){
        return lines.poll();
    }
    // how much time has elapsed since last line was read
    public long timeElapsed(){
        return (System.currentTimeMillis() - lastTimeModified);
    }
}

With this class we could combine the output and error reading thread into one. That lives while the input reading buffer threads live and have not comsumed data. In each run it checks if some time has passed since last output was read and if so it prints all unprinted lines at a stroke. The same with the error output. Then it sleeps for some millis for not wasting cpu time.

private void exec(String command) throws IOException{
    Process pro = Runtime.getRuntime().exec(command, null);

    inStream = pro.getInputStream();
    inErrStream = pro.getErrorStream();
    outStream = pro.getOutputStream();

    InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream);
    InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream);

    Thread streamReader = new Thread(new Runnable() {       
        public void run() {
            // start the input reader buffer threads
            outBuff.start();
            errBuff.start();

            // while an input reader buffer thread is alive
            // or there are unconsumed data left
            while(outBuff.isAlive() || outBuff.hasNext() ||
                errBuff.isAlive() || errBuff.hasNext()){

                // get the normal output if at least 50 millis have passed
                if(outBuff.timeElapsed() > 50)
                    while(outBuff.hasNext())
                        println(outBuff.getNext());
                // get the error output if at least 50 millis have passed
                if(errBuff.timeElapsed() > 50)
                    while(errBuff.hasNext())
                        println(errBuff.getNext());
                // sleep a bit bofore next run
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }                 
            }
            System.out.println("Finish reading error and output stream");
        }          
    });
    streamReader.start();

    // remove outStreamReader and errStreamReader Thread
    [...]
}

Maybe this is not a perfect solution but it should handle the situation here.


EDIT (31.8.2016)
We discussed in comments that there is still a problem with the code while implementing a stop button that kills the started process using Process#destroy(). A process that produces very much output e.g. in an infinite loop will be destroyed immediately by calling destroy(). But since it has already produced a lot of output that has to be consumed by our streamReader we can't get back to normal programm behaviour.
So we need some small changes here:

We will introduce a destroy() method to the InputStreamLineBuffer that stops the output reading and clears the queue.
The changes will look like this:

public class InputStreamLineBuffer{
    private boolean emergencyBrake = false;
    [...]
    public InputStreamLineBuffer(InputStream is){
        [...]
                while ((b = inputStream.read()) != -1 && !emergencyBrake){
                    [...]
                }
    }
    [...]

    // exits immediately and clears line buffer
    public void destroy(){
        emergencyBrake = true;
        lines.clear();
    }
}

And some little changes in the main programm

public class ExeConsole extends JFrame{
    [...]
    // The line buffers must be declared outside the method
    InputStreamLineBuffer outBuff, errBuff; 
    public ExeConsole{
        [...]
        btnStop.addActionListener(new ActionListener() {
            public void actionPerformed(ActionEvent e) {
                 if(pro != null){
                      pro.destroy();
                      outBuff.destroy();
                      errBuff.destroy();
                 }
        }});
    }
    [...]
    private void exec(String command) throws IOException{
        [...]
        //InputStreamLineBuffer outBuff = new InputStreamLineBuffer(inStream);
        //InputStreamLineBuffer errBuff = new InputStreamLineBuffer(inErrStream);        
        outBuff = new InputStreamLineBuffer(inStream);
        errBuff = new InputStreamLineBuffer(inErrStream);    
        [...]
    }
}

Now it should be able to destroy even some output spamming processes.

Note: I found out that Process#destroy() is not able to destroy child processes. So if you start cmd on windows and start a java programm from there you will end up destroying the cmd process while the java programm is still running. You will see it in the task manager. This problem could not be solved with java itself. it will need some os depending external tools to get the pids of these processes and kill them manually.

Murrell answered 28/12, 2015 at 10:33 Comment(16)
Thank you very much! I actually I found a similar solution yesterday. But there is a small problem with the error and out stream. Since they are in two separate threads, they often mix up their output. For example: "cmd", "dir", "foo", "bar". since foo in an invalid input, cmd returns 2 lines(command and a new line) with outstream and 2 lines (error description) with the errorstream. But the result is often: out,err, out, err.Nerval
Right, but you cannot find out if printing the ouput of dir command is finished when errorstream starts to print that foo is not a command. So I think you can only use ProcessBuilder with redirectErrorStream and ignore the second thread here. That would solve this problem.Murrell
I was experimenting around with redirectErrorStream, but it didn't seem to actually print out the error stream. Only the outStream. And also as soon as it would have printed an errorStream, it just ended the process. (PS: Also I like having separate streams, because I can mark the errorstream with red color). If only there was a way to bring them in the right order...Nerval
I have added a solution for the mixup problem to my answer. I hope this fill your needs :)Murrell
Thank you for your tremendous work! You have definitely earned yourself the reward! Sadly I couldn't get it to work on my end. Could you maybe send me the whole console code via pastebin or something? (I tried to merge your solution with my version, which is different to the op. This might have caused some issues...)Nerval
No Problem, its a very interesting project. Full Code @ pastebin.com/fd9b81SqMurrell
Please paste full code so that i am able to make threads to get input and output from single process builder and use a jtext component for communication with the processDorian
I had posted the full code at pastebin. Link is two comments above!Murrell
Dear @ArcticLord. I'm really sorry to be bothering you again, but I need your help once more. Your code is working great as is, but to integrate it into my project I need two key features, that I just can't get to work: Since there can be non terminating processes, I need to be able to manually destroy the process, which I thought would be as easy as calling pro.destroy, but this doesn't work on infinitely running processes. Also I want to wrap my exec output in some print statements but this is not possible, since the exec function is non blocking. I Would really appreciate your help. HaeriNerval
Hey @Haeri, the only thing java offers isProcess.destroy() and it should work on infinitely running processes. If not you can only use some OS dependent calls to kill a process. I recommend you to read this Thread. And maybe ask a new question here if you have a reproducible example of a process that can't be destroyed. I don't understand your second feature. You need to explain a bit more what you are trying to achieve and what do you mean with wraping the output.Murrell
@Murrell I have updated the op to exactly explain what the problem is. Would be great if you could take a look :DNerval
@Haeri. From a quick look I can tell you that pro.destroy() worked. The process is gone immediately. But since it has produced a lot of output the reader threads are busy for long time consuming all the data. We need some kind of emergency brake that signals the readers to stop and leave the rest unconsumed. I will have a look on this tomorrow.Murrell
@Murrell That would be amazing!Nerval
@Murrell you are amazing!Nerval
@Murrell Hello. I'ts me. Again :S. So now somehow the most basic part where streams should be consumed simultaneously broke again :( Java works fine for some reason, but gcc, g++ and python executables, that have input don't work. The streams are out of order... A simple "print("hello") \n input("your input")" results in a blank console waiting for the input and only after an input, spitting out the "hello"Nerval
@Nerval Hmm I cannot reproduce this problem. Still works for me with my last changes. Don't forget that it is still important to use -u with python scripts and fflush ( stdout ); in c code to get all output flushed to the java process streams.Murrell
H
2

Although @ArticLord solution is nice and neat, recently I faced the same kind of problem and came up with a solution that's conceptually equivalent, but slightly different in its implementation.

The concept is the same, namely "bulk reads": when a reader thread acquires its turn, it consumes all the stream it handles, and pass the hand only when it is done.
This guarantees the out/err print order.

But instead of using a timer-based turn assignment, I use a lock-based non-blocking read simulation:

// main method for testability: replace with private void exec(String command)
public static void main(String[] args) throws Exception
{
    // create a lock that will be shared between reader threads
    // the lock is fair to minimize starvation possibilities
    ReentrantLock lock = new ReentrantLock(true);

    // exec the command: I use nslookup for testing on windows 
    // because it is interactive and prints to stderr too
    Process p = Runtime.getRuntime().exec("nslookup");

    // create a thread to handle output from process (uses a test consumer)
    Thread outThread = createThread(p.getInputStream(), lock, System.out::print);
    outThread.setName("outThread");
    outThread.start();

    // create a thread to handle error from process (test consumer, again)
    Thread errThread = createThread(p.getErrorStream(), lock, System.err::print);
    errThread.setName("errThread");
    errThread.start();

    // create a thread to handle input to process (read from stdin for testing purpose)
    PrintWriter writer = new PrintWriter(p.getOutputStream());
    Thread inThread = createThread(System.in, null, str ->
    {
        writer.print(str);
        writer.flush();
    });
    inThread.setName("inThread");
    inThread.start();

    // create a thread to handle termination gracefully. Not really needed in this simple
    // scenario, but on a real application we don't want to block the UI until process dies
    Thread endThread = new Thread(() ->
    {
        try
        {
            // wait until process is done
            p.waitFor();
            logger.debug("process exit");

            // signal threads to exit
            outThread.interrupt();
            errThread.interrupt();
            inThread.interrupt();

            // close process streams
            p.getOutputStream().close();
            p.getInputStream().close();
            p.getErrorStream().close();

            // wait for threads to exit
            outThread.join();
            errThread.join();
            inThread.join();

            logger.debug("exit");
        }
        catch(Exception e)
        {
            throw new RuntimeException(e.getMessage(), e);
        }
    });
    endThread.setName("endThread");
    endThread.start();

    // wait for full termination (process and related threads by cascade joins)
    endThread.join();

    logger.debug("END");
}

// convenience method to create a specific reader thread with exclusion by lock behavior
private static Thread createThread(InputStream input, ReentrantLock lock, Consumer<String> consumer)
{
    return new Thread(() ->
    {
        // wrap input to be buffered (enables ready()) and to read chars
        // using explicit encoding may be relevant in some case
        BufferedReader reader = new BufferedReader(new InputStreamReader(input));

        // create a char buffer for reading
        char[] buffer = new char[8192];

        try
        {
            // repeat until EOF or interruption
            while(true)
            {
                try
                {
                    // wait for your turn to bulk read
                    if(lock != null && !lock.isHeldByCurrentThread())
                    {
                        lock.lockInterruptibly();
                    }

                    // when there's nothing to read, pass the hand (bulk read ended)
                    if(!reader.ready())
                    {
                        if(lock != null)
                        {
                            lock.unlock();
                        }

                        // this enables a soft busy-waiting loop, that simultates non-blocking reads
                        Thread.sleep(100);
                        continue;
                    }

                    // perform the read, as we are sure it will not block (input is "ready")
                    int len = reader.read(buffer);
                    if(len == -1)
                    {
                        return;
                    }

                    // transform to string an let consumer consume it
                    String str = new String(buffer, 0, len);
                    consumer.accept(str);
                }
                catch(InterruptedException e)
                {
                    // catch interruptions either when sleeping and waiting for lock
                    // and restore interrupted flag (not necessary in this case, however it's a best practice)
                    Thread.currentThread().interrupt();
                    return;
                }
                catch(IOException e)
                {
                    throw new RuntimeException(e.getMessage(), e);
                }
            }
        }
        finally
        {
            // protect the lock against unhandled exceptions
            if(lock != null && lock.isHeldByCurrentThread())
            {
                lock.unlock();
            }

            logger.debug("exit");
        }
    });
}

Note that both solutions, @ArticLord's and mine, are not totally starvation-safe, and chances (really few) are inversely proportional to consumers speed.

Happy 2016! ;)

Honaker answered 1/1, 2016 at 13:44 Comment(1)
How do i make threads to get input and output from single process builder and use a jtext componentDorian

© 2022 - 2024 — McMap. All rights reserved.