Using named pipes with bash - Problem with data loss
Asked Answered
M

6

16

Did some search online, found simple 'tutorials' to use named pipes. However when I do anything with background jobs I seem to lose a lot of data.

[[Edit: found a much simpler solution, see reply to post. So the question I put forward is now academic - in case one might want a job server]]

Using Ubuntu 10.04 with Linux 2.6.32-25-generic #45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU/Linux

GNU bash, version 4.1.5(1)-release (x86_64-pc-linux-gnu).

My bash function is:

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

I run this in the background:

> jqs&
[1] 5336

And now I feed it:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

The output is inconsistent. I frequently don't get all success echoes. I get at most as many new text echos as success echoes, sometimes less.

If I remove the '&' from the 'feed', it seems to work, but I am blocked until the output is read. Hence me wanting to let sub-processes get blocked, but not the main process.

The aim being to write a simple job control script so I can run say 10 jobs in parallel at most and queue the rest for later processing, but reliably know that they do run.

Full job manager below:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

Calling

jq <name> <max processes> <command>
jq abc 2 sleep 20

will start one process. That part works fine. Start a second one, fine. One by one by hand seem to work fine. But starting 10 in a loop seems to lose the system, as in the simpler example above.

Any hints as to what I can do to solve this apparent loss of IPC data would be greatly appreciated.

Regards, Alain.

Metaprotein answered 27/11, 2010 at 8:8 Comment(1)
Have a look on second 2018 Edit to How to set a variable to the output from a command or on GitHub.com: Connector-bash. Where I connect subprocess hoding tools to my current shell session.Psalter
E
30

Your problem is if statement below:

while true
do
    if read txt <"$pipe"
    ....
done

What is happening is that your job queue server is opening and closing the pipe each time around the loop. This means that some of the clients are getting a "broken pipe" error when they try to write to the pipe - that is, the reader of the pipe goes away after the writer opens it.

To fix this, change your loop in the server open the pipe once for the entire loop:

while true
do
    if read txt
    ....
done < "$pipe"

Done this way, the pipe is opened once and kept open.

You will need to be careful of what you run inside the loop, as all processing inside the loop will have stdin attached to the named pipe. You will want to make sure you redirect stdin of all your processes inside the loop from somewhere else, otherwise they may consume the data from the pipe.

Edit: With the problem now being that you are getting EOF on your reads when the last client closes the pipe, you can use jilles method of duping the file descriptors, or you can just make sure you are a client too and keep the write side of the pipe open:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

This will hold the write side of the pipe open on fd 3. The same caveat applies with this file descriptor as with stdin. You will need to close it so any child processes dont inherit it. It probably matters less than with stdin, but it would be cleaner.

Etruria answered 27/11, 2010 at 12:7 Comment(4)
Wow, great answer. Makes sense. Thanks. Will give it a try immediately.Metaprotein
Ok, now you solved the key issue, I have another one: how do you get the read to wait for input? I'll post a reply to myself further below with sample code.Metaprotein
@asoundmove: I've updated the answer with solution to EOF on read.Etruria
This method avoids depending on non-POSIX behaviour but there might be a reason why it is not good in some cases.Erased
E
8

As said in other answers you need to keep the fifo open at all times to avoid losing data.

However, once all writers have left after the fifo has been open (so there was a writer), reads return immediately (and poll() returns POLLHUP). The only way to clear this state is to reopen the fifo.

POSIX does not provide a solution to this but at least Linux and FreeBSD do: if reads start failing, open the fifo again while keeping the original descriptor open. This works because in Linux and FreeBSD the "hangup" state is local to a particular open file description, while in POSIX it is global to the fifo.

This can be done in a shell script like this:

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done
Erased answered 27/11, 2010 at 21:17 Comment(6)
In Bash, rather than { ... read ... } <&3, you can use read -u 3 to read from a specified file descriptor number rather instead of 0.Siberson
@Siberson What advantage does read -u 3 x provide above read x <&3?Erased
Wow, this works! Can you explain why I can't use fd 1 instead of 3? It works first time round, but then fails. I'll post a separate comment to show the latest script in full.Metaprotein
@jilles: As I understand read x <&3 closes the pipe when complete, whereas read -u 3 x leaves the pipe open for the next read.Metaprotein
@jilles: The advantage is that it's obvious (without looking for the done matching the while ...; do) that read is reading from FD 3, and that the rest of the do ...; done body does not have FD 0 redirected.Siberson
@ephemient: The same result can be obtained with while read x <&3; do ...; done so why does read -u exist?Erased
M
2

Just for those that might be interested, [[re-edited]] following comments by camh and jilles, here are two new versions of the test server script.

Both versions now works exactly as hoped.

camh's version for pipe management:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

jille's version for pipe management:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

Thanks to all for your help.

Metaprotein answered 27/11, 2010 at 22:39 Comment(2)
You cannot catch SIGKILL. No point trying. Also, see my last edit for a simpler approach that does not need duping file descriptors.Etruria
Ok. Tested and of course you are right. Thanks for the pointers camh.Metaprotein
M
1

Like camh & Dennis Williamson say don't break the pipe.

Now I have smaller examples, direct on the command line:

Server:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

Client:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

Can replace the key line with:

    (echo "Test-$i" > tst-fifo&);

All client data sent to the pipe gets read, though with option two of the client one may need to start the server a couple of times before all data is read.

But although the read waits for data in the pipe to start with, once data has been pushed, it reads the empty string forever.

Any way to stop this?

Thanks for any insights again.

Metaprotein answered 27/11, 2010 at 17:38 Comment(0)
M
0

On the one hand the problem is worse than I thought: Now there seems to be a case in my more complex example (jq_manage) where the same data is being read over and over again from the pipe (even though no new data is being written to it).

On the other hand, I found a simple solution (edited following Dennis' comment):

function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
}

Works like a charm. No socket or pipe involved. Simple.

Metaprotein answered 27/11, 2010 at 8:37 Comment(3)
There's no reason to export __jqty__ (or any of the exports in the original). Why do you echo something directly to /dev/null? Why use eval? Why not just do $@&? It's not necessary to quote >=. I agree with camh's answer.Outwork
It all boils down to reading and filtering the output of ps. echo to /dev/null because I don't actually want the output, I just want the right strings in the output of 'ps'. Same goes with eval, otherwise ps shows the variable names, not the expanded variables, eval does the expansion. I never used ((...)) before, so thanks for pointing out that I don't need quotes, I was just going off an example I read somewhere & thanks also about the export, it is a left over of the previous more complicated scripts that had sub-processes and required the export.Metaprotein
Sorry I meant 'jobs', not 'ps'Metaprotein
S
0

run say 10 jobs in parallel at most and queue the rest for later processing, but reliably know that they do run

You can do this with GNU Parallel. You will not need a this scripting.

http://www.gnu.org/software/parallel/man.html#options

You can set max-procs "Number of jobslots. Run up to N jobs in parallel." There is an option to set the number of CPU cores you want to use. You can save the list of executed jobs to a log file, but that is a beta feature.

Sigfrid answered 2/5, 2013 at 16:38 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.