Parallelize stream processing using bash
(Full bash
script using sed
at end of this post!)
Using sed
for stream filtering
a bit like using parallel but instead of sending blocks of lines to processes it s using a field to send some specific lines to specific processes.
In this use case: having to filter stream to distrubute to many subtasks, sed
should be de quickest way (as sed
is a lot lighter
then perl
and parallel
is a perl
script.
Using sed
will be sensibly quicker, lighter and will consume less of resources! Please look comparison at end of this!
Fist preparing sed
command line:
printf -v sedcmd ' -e \47};/^%d/{s/^. //;w/dev/fd/%d\47 %d> >(exec \
awk \47{c++;s+=$1;ss+=$1*$1}END{a=s/c;print %d,sqrt((ss-a*a)/c)}\47) ' $(
for ((i=0;i<10;i++)) { echo $i $((i+4)) $((i+4)) $i ; })
Then command is: eval sed -n "${sedcmd/\};} -e '};'"
:
eval sed -n "${sedcmd/\};} -e '};'" <testfile
or
eval sed -n "${sedcmd/\};} -e '};'" <testfile | cat
Where $sedcmd
look like:
$ echo -- "$sedcmd"
-- -e '};/^0/{s/^. //;w/dev/fd/4' 4> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 0,sqrt((ss-a*a)/c)}') -e '};/^1/{s/^. //;w/dev/fd/5' 5> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 1,sqrt((ss-a*a)/c)}') -e '};/^2/{s/^. //;w/dev/fd/6' 6> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 2,sqrt((ss-a*a)/c)}') -e '};/^3/{s/^. //;w/dev/fd/7' 7> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 3,sqrt((ss-a*a)/c)}') -e '};/^4/{s/^. //;w/dev/fd/8' 8> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 4,sqrt((ss-a*a)/c)}') -e '};/^5/{s/^. //;w/dev/fd/9' 9> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 5,sqrt((ss-a*a)/c)}') -e '};/^6/{s/^. //;w/dev/fd/10' 10> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 6,sqrt((ss-a*a)/c)}') -e '};/^7/{s/^. //;w/dev/fd/11' 11> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 7,sqrt((ss-a*a)/c)}') -e '};/^8/{s/^. //;w/dev/fd/12' 12> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 8,sqrt((ss-a*a)/c)}') -e '};/^9/{s/^. //;w/dev/fd/13' 13> >(exec \
awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print 9,sqrt((ss-a*a)/c)}')
Where
4> >(exec awk ...)
tell bash
to generate a fd number 4
and run awk
-e "/^0/{s/^. //;w/dev/fd/4" -e "}"
tell sed
to drop first character of lines wich begin by 0
and send it to fd/4
.
parallel.sh
full bash script (draft)
Here is a full parallelFiltering bash
script using sed
:
#!/bin/bash
# parallel.sh - bash script for filtering/parallelising using sed.
# (C) 2023 Felix Hauri - [email protected]
# Licensed under terms of GPL v3. www.gnu.org
prog=${0##*/}
usage() {
cat <<-EOUsage
Usage: $prog -t <tags> [-b <re>] [-a <re>] command args
-h show this
-t <tags> coma separated liste of tags to send to separated tasks
or single tag, '-t' option could be submited multiple times
-b <re> sed regex to match before tags
-a <re> sed regex to match after tags
command Any command to be run once for each tag.
Special string "<RE>" will be replaced by current tag.
EOUsage
}
die() {
echo >&2 "ERROR $prog: $*"
exit 1
}
while getopts "ht:a:b:" opt; do
case $opt in
h ) usage; exit ;;
t ) IFS=, read -a crttags <<<"$OPTARG"
tags+=("$crttags");;
b ) before=$OPTARG ;;
a ) after=$OPTARG ;;
*) die Wrong argument. ;;
esac
done
shift $((OPTIND-1))
[[ -v tags ]] || die "No tags submited"
(( $# )) || die "No command submited"
sedcmd='' paren=''
declare -i crtFd=4
for re in "${tags[@]}";do
printf -v crtcmd '%q ' "${@//\<RE\>/$re}"
printf -v crtcmd ' -e \47%s/%s/{s/%s//;w/dev/fd/%d\47 %d> >(exec %s) ' \
"$paren" "$before$re$after"{,} $crtFd $crtFd "$crtcmd"
paren='};'
sedcmd+="$crtcmd" crtFd+=1
done
sedcmd+=" -e '$paren'"
eval sed -n "$sedcmd"
Usage: parallel.sh -t <tags> [-b <re>] [-a <re>] command args
-h show this
-t <tags> coma separated liste of tags to send to separated tasks
or single tag, '-t' option could be submited multiple times
-b <re> sed regex to match before tags
-a <re> sed regex to match after tags
command Any command to be run once for each tag.
Special string "<RE>" will be replaced by current tag.
This script could be found there: parallel.sh.
Tested with your use case with:
./parallel.sh -t{0..9} -b ^ awk '{c++;s+=$1;ss+=$1*$1}END{a=s/c;print <RE>,sqrt((ss-a*a)/c)}' <testfile
Notice the only change from your command line is print <RE>,sqrt...
where <RE>
will be replaced by each tags (-t
) in each subtask respectively.
9 55.6751
8 58.0447
7 55.6755
6 58.3663
5 58.696
4 58.2724
3 54.9797
2 57.5355
1 54.6131
0 57.1334
Comparison with GNU parallel
Of course this is about line buffered filtering, not suitable for block buffered distribution!!
I've tested with a simple 1000 lines random file:
for ((i=1000;i--;)){ echo $((RANDOM%10)) $((RANDOM%100));} >testfile
then using parallel
:
sd() {
awk '{s+=$'$1';ss+=$'$1'*$'$1'}END{a=s/NR;print sqrt((ss-a*a)/NR)}'
}
export -f sd
time parallel -j10 --colsep ' ' --bin 1 --pipe \
--tagstring {%} sd 2 <testfile |sort
10 58.3703
1 50.7911
2 56.9009
3 55.0832
4 52.5365
5 65.0864
6 61.4079
7 55.5353
8 62.337
9 51.2512
real 0m0.488s
user 0m1.158s
sys 0m0.272s
and using sed
+ bash
:
time ./parallel.sh -t{0..9} -b ^ awk \
'{c++;s+=$1;ss+=$1*$1}END{a=s/c;print <RE>,sqrt((ss-a*a)/c)}' <testfile |
sort
0 58.3703
1 50.7911
2 56.9009
3 55.0832
4 52.5365
5 65.0864
6 61.4079
7 55.5353
8 62.337
9 51.2512
real 0m0.010s
user 0m0.009s
sys 0m0.000s
Fortunately computed results are same! (parallel
version output 10
instead of 0
).
Where bash
+sed
version
- use tags instead of number
- use a lot less system resources
- is something quicker
Test with bigger and smaller files:
Number of lines Real User System
parallel.sh 100'000'000 73.117 72.598 0.416
parallel (perl) 100'000'000 129.264 383.701 36.319
parallel.sh 1'000'000 0.744 0.728 0.013
parallel (perl) 1'000'000 1.798 5.571 0.613
parallel.sh 10'000 0.018 0.007 0.009
parallel (perl) 10'000 0.523 1.148 0.269
Here are ouput of ps --tty pts/4 fw
while parallel.sh
was running in pts/4
:
5352 pts/4 Ss 0:00 -bash
5983 pts/4 S+ 0:00 \_ /bin/bash ./parallel.sh -t0 -t1 -t2..
5985 pts/4 R+ 0:13 | \_ sed -n -e /^0/{s/^0//;w/dev/fd/..
5986 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5987 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5988 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5989 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5990 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5991 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5992 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5993 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5994 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5995 pts/4 S+ 0:00 | \_ awk {c++;s+=$1;ss+=$1*$1}EN..
5984 pts/4 S+ 0:00 \_ sort
Where bash
execute sed
wich run 10x awk
, piped to sort
. Look's ok!
Here are ouput of ps --tty pts/4 fw
while parallel
(perl) was running:
5352 pts/4 Ss 0:00 -bash
5777 pts/4 S+ 0:00 \_ /usr/bin/perl /usr/bin/parallel -j1..
5780 pts/4 R+ 0:17 | \_ /usr/bin/perl /usr/bin/parallel..
5956 pts/4 R 0:16 | | \_ perl -e use B; my $sep = s..
5957 pts/4 R 0:16 | | \_ perl -e use B; my $sep = s..
snip 7 lines
5965 pts/4 R 0:16 | | \_ perl -e use B; my $sep = s..
5793 pts/4 S 0:00 | \_ /usr/bin/bash -c perl -e '{use ..
5794 pts/4 S 0:00 | | \_ perl -e {use POSIX qw(:errn..
5795 pts/4 S 0:00 | | \_ /usr/bin/bash -c perl -e '{..
5796 pts/4 S 0:01 | | \_ awk {s+=$2;ss+=$2*$2}EN..
snip 33 lines
5852 pts/4 S 0:00 | \_ /usr/bin/bash -c perl -e '{use ..
5867 pts/4 S 0:00 | \_ perl -e {use POSIX qw(:errn..
5868 pts/4 S 0:00 | \_ /usr/bin/bash -c perl -e '{..
5870 pts/4 S 0:01 | \_ awk {s+=$2;ss+=$2*$2}EN..
5778 pts/4 S+ 0:00 \_ sort
Well!! 52 process are executed to fork 10 time one stream to 10 subprocess!! Each subprocess require 5 sub tasks?!
Use case:
Quick demo on log file:
{ tags=($(cut -d\[ -f 1 | cut -d\ -f 5 | sort -u)) ;} <daemon.log
./parallel.sh "${tags[@]/#/-t}" -b \\b -a \\[ bash -c \
$'printf \47 - %-20s %8d %8d %8d\\n\47 "$1" $(wc)' -- "<RE>" <daemon.log |
sort
This will run as many task there are tags. Then ouput wc
for each sub stream.
Note: Syntax: ${tags[@]/#/-t}
will be expanded as -tdhclient -tdnsmasq -tssystemd ...
.
- accton 14 154 1165
- dbus-daemon 80 1273 13731
- dhclient 6480 79920 542160
- dnsmasq 6480 49680 401760
- systemd 154608 1474418 10664639
...
But, you could create different filter for differents targets:
tags=( dhclient dnsmasq systemd )
./parallel.sh ${tags[@]/#/-t} -b \\b -a \\[ \
"./filter-<RE>.sh" <daemon.log
Will run 3 different tasks: ./filter-dnsmasq.sh
, ./filter-dhclient.sh
and ./filter-systemd.sh
, then parse log file to send watched lines to specific task.
Remark about parellel output:
Regarding Ole Tange's comment, this seem clear to me: If you ask many task to speak together on same uniq STDOUT
, you may observe stange mixed lines!
If your filter have to ouput continuously, you have to drive his ouput in proper way!
./parallel.sh -t{0..9} -b ^ -a ' ' <testfile sh \
-c 'sed "s/^/group <RE> /" >/tmp/file-<RE>.txt'
cat /tmp/file-?.txt
Nota: I've finally successfully made a test in the spirit of comments:
parellel.sh ... | sort | uniq -c...
.
For this to run, script would by modified by adding fifos
to be merged by a cat
at end of script, something like:
+++ parallel.sh 2023-03-07 19:17:08.976802098 +0100
@@ -40,5 +40,8 @@
for re in "${tags[@]}";do
+ pasteFd=/tmp/fifo-pp-r$crtFd
+ mkfifo $pasteFd
+ pasteFds+=($pasteFd)
printf -v crtcmd '%q ' "${@//\<RE\>/$re}"
- printf -v crtcmd ' -e \47%s/%s/{s/%s//;w/dev/fd/%d\47 %d> >(exec %s) ' \
- "$paren" "$before$re$after"{,} $crtFd $crtFd "$crtcmd"
+ printf -v crtcmd ' -e \47%s/%s/{s/%s//;w/dev/fd/%d\47 %d> >(exec %s >%s) '\
+ "$paren" "$before$re$after"{,} $crtFd $crtFd "$crtcmd" $pasteFd
paren='};'
@@ -47,3 +50,4 @@
sedcmd+=" -e '$paren'"
-
-eval sed -n "$sedcmd"
+eval sed -n "$sedcmd" &
+parcat ${pasteFds[@]}
+rm ${pasteFds[@]}
I will probably add some options for making this properly in my final script (on my website).
seq 1 20 | awk (replace 10 with 3)
; then update the question with the exepcted output for those 20 lines of input so that we have something to compare our results against – Kopjemore complex computation
be performed inawk
or will some other process/binary/program need to be called? – Kopjevery long lines
... some idea of what's in these lines and how they come into play re: the calculation may be of help in coming up with a solution; the sample deals with a simple pair of numbers and so a solution dealing with two numbers is going to be relatively simple; but a solution deailing with several (dozens? hundreds? more?) numbers could very well end up being something other than simple; also, the expected max size (MBytes) of the real data file will help us determine if an in-memory solution will be viable – Kopjeawk
script (eg, use a set of 10-entry arrays) and likely be more efficient than spawning 10 OS background processes; this same simple solution may not be viable for the real file – Kopje