Skip to content

Commit ce3e745

Browse files
committed
jobutils improvements
version 2 fixing some performance issues: * instead of actively monitoring logs every N-s, we now use an asynchronous background task using the "tail -f | grep" pattern Committed as a separate script for the moment in order to test this in parallel to existing solution. A complete switch-over will be made once tests are successful.
1 parent ff03ce9 commit ce3e745

1 file changed

Lines changed: 397 additions & 0 deletions

File tree

Utilities/Tools/jobutils2.sh

Lines changed: 397 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,397 @@
1+
# Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
# See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
# All rights not expressly granted are reserved.
4+
#
5+
# This software is distributed under the terms of the GNU General Public
6+
# License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
#
8+
# In applying this license CERN does not waive the privileges and immunities
9+
# granted to it by virtue of its status as an Intergovernmental Organization
10+
# or submit itself to any jurisdiction.
11+
#
12+
# author: Sandro Wenzel
13+
14+
# This file contains a couple of utility functions for reuse
15+
# in shell job scripts (such as on the GRID).
16+
# In order to use these functions in scripts, this file needs to be
17+
# simply sourced into the target script. The script needs bash versions > 4
18+
19+
# TODOs:
20+
# -harmonize use of bc/awk for calculations
21+
# -harmonize coding style for variables
22+
23+
o2_cleanup_shm_files() {
24+
if [ "${JOBUTILS_INTERNAL_DPL_SESSION}" ]; then
25+
# echo "cleaning up session ${JOBUTILS_INTERNAL_DPL_SESSION}"
26+
fairmq-shmmonitor -s ${JOBUTILS_INTERNAL_DPL_SESSION} -c &> /dev/null
27+
fi
28+
}
29+
30+
# Function to find out all the (recursive) child processes starting from a parent PID.
31+
# The output includes the parent
32+
childprocs() {
33+
local parent=$1
34+
if [ ! "$2" ]; then
35+
child_pid_list=""
36+
fi
37+
if [ "$parent" ] ; then
38+
child_pid_list="$child_pid_list $parent"
39+
for childpid in $(pgrep -P ${parent}); do
40+
childprocs $childpid "nottoplevel"
41+
done;
42+
fi
43+
# return via a string list (only if toplevel)
44+
if [ ! "$2" ]; then
45+
echo "${child_pid_list}"
46+
fi
47+
}
48+
49+
taskwrapper_cleanup() {
50+
MOTHERPID=$1
51+
SIGNAL=${2:-SIGTERM}
52+
for p in $(childprocs ${MOTHERPID}); do
53+
echo "killing child $p"
54+
kill -s ${SIGNAL} $p 2> /dev/null
55+
done
56+
sleep 2
57+
# remove leftover shm files
58+
o2_cleanup_shm_files
59+
unset JOBUTILS_INTERNAL_DPL_SESSION
60+
}
61+
62+
taskwrapper_cleanup_handler() {
63+
PID=$1
64+
SIGNAL=$2
65+
echo "CLEANUP HANDLER FOR PROCESS ${PID} AND SIGNAL ${SIGNAL}"
66+
taskwrapper_cleanup ${PID} ${SIGNAL}
67+
# I prefer to exit the current job completely
68+
return 1 2>/dev/null || exit 1
69+
}
70+
71+
# Function monitoring DPL output for signs of failure
72+
monitorlog() {
73+
# We need to grep on multitude of things:
74+
# - all sorts of exceptions (may need to fine-tune)
75+
# - segmentation violation
76+
# - there was a crash
77+
# - bus error (often occuring with shared mem)
78+
pattern="-e \"\<[Ee]xception\" \
79+
-e \"segmentation violation\" \
80+
-e \"error while setting up workflow\" \
81+
-e \"bus error\" \
82+
-e \"Assertion.*failed\" \
83+
-e \"Fatal in\" \
84+
-e \"libc++abi.*terminating\" \
85+
-e \"There was a crash.\" \
86+
-e \"arrow.*Check failed\" \
87+
-e \"terminate called after\" \
88+
-e \"terminate called without an active\" \
89+
-e \"\[FATAL\]\" \
90+
-e \"TASK-EXIT-CODE\" \
91+
-e \"\*\*\* Error in\"" # <--- LIBC fatal error messages
92+
93+
# arguments to function:
94+
logfile=$1
95+
96+
# runs resource-friendly until trigger match found
97+
# only invokes tail + grep once
98+
tmpfile=$(mktemp /tmp/taskwrapper-tail.XXXXXX)
99+
command="grep ${pattern} -m 1 &> /dev/null"
100+
( tail -f ${logfile} & echo $! >${tmpfile} ) | eval "${command}"
101+
kill -s SIGKILL $(<${tmpfile})
102+
rm ${tmpfile}
103+
104+
tail -n 10 ${logfile} | grep "TASK-EXIT-CODE" &> /dev/null
105+
RC=$? # this will be one whenever TASK-EXIT-CODE could not be found
106+
# echo "check for task exit code yielded ${RC}"
107+
108+
if [ "$RC" = "1" ]; then
109+
echo "Detected critical problem in logfile $logfile"
110+
if [ "${JOBUTILS_PRINT_ON_ERROR}" ]; then
111+
grepcommand="grep -a -H -A 2 -B 2 ${pattern} $logfile ${JOBUTILS_JOB_SUPERVISEDFILES}"
112+
eval ${grepcommand}
113+
fi
114+
115+
# this gives some possibility to customize the wrapper
116+
# and do some special task at the start. The hook takes 2 arguments:
117+
# The original command and the logfile
118+
if [ "${JOBUTILS_JOB_FAILUREHOOK}" ]; then
119+
hook="${JOBUTILS_JOB_FAILUREHOOK} '$command' $logfile"
120+
eval "${hook}"
121+
fi
122+
123+
exit 1
124+
fi
125+
exit 0
126+
} # end of function monitorlog
127+
128+
# Function wrapping some process and asyncronously supervises and controls it.
129+
# Main features provided at the moment are:
130+
# - optional recording of walltime and memory consumption (time evolution)
131+
# - optional recording of CPU utilization
132+
# - Some job control and error detection (in particular for DPL workflows).
133+
# If exceptions are found, all participating processes will be sent a termination signal.
134+
# The rational behind this function is to be able to determine failing
135+
# conditions early and prevent longtime hanging executables
136+
# (until DPL offers signal handling and automatic shutdown)
137+
# - possibility to provide user hooks for "start" and "failure"
138+
# - possibility to skip (jump over) job alltogether
139+
# - possibility to define timeout
140+
# - possibility to control/limit the CPU load
141+
taskwrapper() {
142+
unset JOBUTILS_INTERNAL_DPL_SESSION
143+
# nested helper to parse DPL session ID
144+
_parse_DPL_session ()
145+
{
146+
childpids=$(childprocs ${1})
147+
for p in ${childpids}; do
148+
command=$(ps -o command ${p} | grep -v "COMMAND" | grep "session")
149+
if [ "$?" = "0" ]; then
150+
# echo "parsing from ${command}"
151+
session=`echo ${command} | sed 's/.*--session//g' | awk '//{print $1}'`
152+
if [ "${session}" ]; then
153+
# echo "found ${session}"
154+
break
155+
fi
156+
fi
157+
done
158+
echo "${session:-""}"
159+
}
160+
161+
local logfile=$1
162+
shift 1
163+
local command="$*"
164+
165+
STARTTIME=$SECONDS
166+
167+
# launch the actual command in the background
168+
echo "Launching task: ${command} &> $logfile &"
169+
# the command might be a complex block: For the timing measurement below
170+
# it is better to execute this as a script
171+
SCRIPTNAME="${logfile}_tmp.sh"
172+
echo "export LIBC_FATAL_STDERR_=1" > ${SCRIPTNAME} # <--- needed ... otherwise the LIBC fatal messages appear on a different tty
173+
echo "${command};" >> ${SCRIPTNAME}
174+
echo 'RC=$?; echo "TASK-EXIT-CODE: ${RC}"; exit ${RC}' >> ${SCRIPTNAME}
175+
chmod +x ${SCRIPTNAME}
176+
177+
# this gives some possibility to customize the wrapper
178+
# and do some special task at the start. The hook takes 2 arguments:
179+
# The original command and the logfile
180+
if [ "${JOBUTILS_JOB_STARTHOOK}" ]; then
181+
hook="${JOBUTILS_JOB_STARTHOOK} '$command' $logfile"
182+
eval "${hook}"
183+
fi
184+
185+
# We offer the possibility to jump this stage/task when a "done" file is present.
186+
# (this is mainly interesting for debugging in order to avoid going through all pipeline stages again)
187+
# The feature should be used with care! To make this nice, a proper dependency chain and a checksum mechanism
188+
# needs to be put into place.
189+
if [ "${JOBUTILS_SKIPDONE}" ]; then
190+
if [ -f "${logfile}_done" ]; then
191+
echo "Skipping task since file ${logfile}_done found";
192+
[ ! "${JOBUTILS_KEEPJOBSCRIPT}" ] && rm ${SCRIPTNAME} 2> /dev/null
193+
return 0
194+
fi
195+
fi
196+
[ -f "${logfile}_done" ] && rm "${logfile}"_done
197+
198+
199+
200+
# the time command is non-standard on MacOS
201+
if [ "$(uname)" == "Darwin" ]; then
202+
GTIME=$(which gtime)
203+
TIMECOMMAND=${GTIME:+"${GTIME} --output=${logfile}_time"}
204+
else
205+
TIMECOMMAND="/usr/bin/time --output=${logfile}_time"
206+
fi
207+
208+
# with or without memory monitoring ?
209+
finalcommand="TIME=\"#walltime %e\" ${TIMECOMMAND} ./${SCRIPTNAME}"
210+
if [[ "$(uname)" != "Darwin" && "${JOBUTILS_MONITORMEM}" ]]; then
211+
finalcommand="TIME=\"#walltime %e\" ${O2_ROOT}/share/scripts/monitor-mem.sh ${TIMECOMMAND} './${SCRIPTNAME}'"
212+
fi
213+
echo "Running: ${finalcommand}" > ${logfile}
214+
215+
# launch task to monitoring log (in background)
216+
monitorlog ${logfile} &
217+
MONITORLOGPID=$!
218+
219+
eval ${finalcommand} >> ${logfile} 2>&1 & #cannot disown here since we want to retrieve exit status later on
220+
221+
# THE NEXT PART IS THE SUPERVISION PART
222+
# get the PID
223+
PID=$!
224+
# register signal handlers
225+
trap "taskwrapper_cleanup_handler ${PID} SIGINT" SIGINT
226+
trap "taskwrapper_cleanup_handler ${PID} SIGTERM" SIGTERM
227+
228+
cpucounter=1
229+
inactivitycounter=0 # used to detect periods of inactivity
230+
NLOGICALCPUS=$(getNumberOfLogicalCPUCores)
231+
232+
control_iteration=1
233+
while [ "${CONTROLLOOP}" ]; do
234+
# check if command returned which may bring us out of the loop
235+
ps -p $PID > /dev/null
236+
[ $? == 1 ] && break
237+
238+
if [ "${JOBUTILS_MONITORMEM}" ]; then
239+
if [ "${JOBUTILS_INTERNAL_DPL_SESSION}" ]; then
240+
MAX_FMQ_SHM=${MAX_FMQ_SHM:-0}
241+
text=$(fairmq-shmmonitor -v -s ${JOBUTILS_INTERNAL_DPL_SESSION})
242+
line=$(echo ${text} | tr '[' '\n[' | grep "^0" | tail -n1)
243+
CURRENT_FMQ_SHM=$(echo ${line} | sed 's/.*used://g')
244+
# echo "current shm ${CURRENT_FMQ_SHM}"
245+
MAX_FMQ_SHM=$(awk -v "t=${CURRENT_FMQ_SHM}" -v "s=${MAX_FMQ_SHM}" 'BEGIN { if(t>=s) { print t; } else { print s; } }')
246+
fi
247+
fi
248+
249+
if [ "${JOBUTILS_MONITORCPU}" ]; then
250+
# NOTE: The following section is "a bit" compute intensive and currently not optimized
251+
# A careful evaluation of awk vs bc or other tools might be needed -- or a move to a more
252+
# system oriented language/tool
253+
254+
# get some CPU usage statistics per process --> actual usage can be calculated thereafter
255+
total=`awk 'BEGIN{s=0}/cpu /{for (i=1;i<=NF;i++) s+=$i;} END {print s}' /proc/stat`
256+
previous_total=${current_total}
257+
current_total=${total}
258+
# quickly fetch the data
259+
childpids=$(childprocs ${PID})
260+
261+
for p in $childpids; do
262+
while read -r name utime stime; do
263+
echo "${cpucounter} ${p} ${total} ${utime} ${stime} ${name}" >> ${logfile}_cpuusage
264+
previous[$p]=${current[$p]}
265+
current[$p]=${utime}
266+
name[$p]=${name}
267+
done <<<$(awk '//{print $2" "$14" "$15}' /proc/${p}/stat 2>/dev/null)
268+
done
269+
fi
270+
271+
# a good moment to check for jobs timeout (or other resources)
272+
if [ "$JOBUTILS_JOB_TIMEOUT" ]; then
273+
$(awk -v S="${SECONDS}" -v T="${JOBUTILS_JOB_TIMEOUT}" -v START="${STARTTIME}" 'BEGIN {if((S-START)>T){exit 1;} exit 0;}')
274+
if [ "$?" = "1" ]; then
275+
echo "task timeout reached .. killing all processes";
276+
taskwrapper_cleanup $PID SIGKILL
277+
# call a more specialized hook for this??
278+
if [ "${JOBUTILS_JOB_FAILUREHOOK}" ]; then
279+
hook="${JOBUTILS_JOB_FAILUREHOOK} '$command' $logfile"
280+
eval "${hook}"
281+
fi
282+
[ "${JOBUTILS_PRINT_ON_ERROR}" ] && echo ----- Last log: ----- && pwd && cat ${logfile} && echo ----- End of log -----
283+
[[ ! "${JOBUTILS_NOEXIT_ON_ERROR}" ]] && [[ ! $- == *i* ]] && exit 1
284+
return 1
285+
fi
286+
fi
287+
288+
# Try to find out DPL session ID
289+
# if [ -z "${JOBUTILS_INTERNAL_DPL_SESSION}" ]; then
290+
JOBUTILS_INTERNAL_DPL_SESSION=$(_parse_DPL_session ${PID})
291+
# echo "got session ${JOBUTILS_INTERNAL_DPL_SESSION}"
292+
# fi
293+
294+
# sleep for some time (can be customized for power user)
295+
sleep ${JOBUTILS_WRAPPER_SLEEP:-1}
296+
297+
# power feature: we allow to call a user hook at each i-th control
298+
# iteration
299+
if [ "${JOBUTILS_JOB_PERIODICCONTROLHOOK}" ]; then
300+
if [ "${control_iteration}" = "${JOBUTILS_JOB_CONTROLITERS:-10}" ]; then
301+
hook="${JOBUTILS_JOB_PERIODICCONTROLHOOK} '$command' $logfile"
302+
eval "${hook}"
303+
control_iteration=0
304+
fi
305+
fi
306+
307+
let control_iteration=control_iteration+1
308+
done
309+
310+
wait ${MONITORLOGPID}
311+
MRC=$?
312+
if [ "${MRC}" = "1" ]; then
313+
echo "Abnormal problem detected; Bringing down workflow after 2 seconds"
314+
sleep 2
315+
[ ! "${JOBUTILS_DEBUGMODE}" ] && taskwrapper_cleanup ${PID} SIGKILL
316+
fi
317+
318+
# wait for man task PID and fetch return code
319+
# ?? should directly exit here?
320+
wait $PID || QUERY_RC_FROM_LOG="ON"
321+
322+
# query return code from log (seems to be safer as sometimes the wait issues "PID" not a child of this shell)
323+
RC=$(awk '/TASK-EXIT-CODE:/{print $2}' ${logfile})
324+
if [ ! "${RC}" ]; then
325+
RC=1
326+
fi
327+
RC_ACUM=$((RC_ACUM+RC))
328+
if [ "${RC}" -eq "0" ]; then
329+
if [ ! "${JOBUTILS_JOB_SKIPCREATEDONE}" ]; then
330+
# if return code 0 we mark this task as done
331+
echo "Command \"${command}\" successfully finished." > "${logfile}"_done
332+
echo "The presence of this file can be used to skip this command in future runs" >> "${logfile}"_done
333+
echo "of the pipeline by setting the JOBUTILS_SKIPDONE environment variable." >> "${logfile}"_done
334+
fi
335+
else
336+
echo "command ${command} had nonzero exit code ${RC}"
337+
[ "${JOBUTILS_PRINT_ON_ERROR}" ] && echo ----- Last log: ----- && pwd && cat ${logfile} && echo ----- End of log -----
338+
fi
339+
[ ! "${JOBUTILS_KEEPJOBSCRIPT}" ] && rm ${SCRIPTNAME} 2> /dev/null
340+
341+
# deregister signal handlers
342+
trap '' SIGINT
343+
trap '' SIGTERM
344+
345+
o2_cleanup_shm_files #--> better to register a general trap at EXIT
346+
347+
# this gives some possibility to customize the wrapper
348+
# and do some special task at the ordinary exit. The hook takes 3 arguments:
349+
# - The original command
350+
# - the logfile
351+
# - the return code from the execution
352+
if [ "${JOBUTILS_JOB_ENDHOOK}" ]; then
353+
hook="${JOBUTILS_JOB_ENDHOOK} '$command' $logfile ${RC}"
354+
eval "${hook}"
355+
fi
356+
357+
if [ ! "${RC}" -eq "0" ]; then
358+
if [ ! "${JOBUTILS_NOEXIT_ON_ERROR}" ]; then
359+
# in case of incorrect termination, we usually like to stop the whole outer script (== we are in non-interactive mode)
360+
[[ ! $- == *i* ]] && exit ${RC}
361+
fi
362+
fi
363+
if [ "${JOBUTILS_MONITORMEM}" ]; then
364+
# convert bytes in MB
365+
MAX_FMQ_SHM=${MAX_FMQ_SHM:-0}
366+
MAX_FMQ_SHM=$(awk -v "s=${MAX_FMQ_SHM}" 'BEGIN { print s/(1024.*1024) }')
367+
echo "PROCESS MAX FMQ_SHM = ${MAX_FMQ_SHM}" >> ${logfile}
368+
fi
369+
unset JOBUTILS_INTERNAL_DPL_SESSION
370+
return ${RC}
371+
}
372+
373+
getNumberOfPhysicalCPUCores() {
374+
if [ "$(uname)" == "Darwin" ]; then
375+
CORESPERSOCKET=`system_profiler SPHardwareDataType | grep "Total Number of Cores:" | awk '{print $5}'`
376+
if [ "$(uname -m)" == "arm64" ]; then
377+
SOCKETS=1
378+
else
379+
SOCKETS=`system_profiler SPHardwareDataType | grep "Number of Processors:" | awk '{print $4}'`
380+
fi
381+
else
382+
# Do something under GNU/Linux platform
383+
CORESPERSOCKET=`lscpu | grep "Core(s) per socket" | awk '{print $4}'`
384+
SOCKETS=`lscpu | grep "Socket(s)" | awk '{print $2}'`
385+
fi
386+
N=`bc <<< "${CORESPERSOCKET}*${SOCKETS}"`
387+
echo "${N}"
388+
}
389+
390+
getNumberOfLogicalCPUCores() {
391+
if [ "$(uname)" == "Darwin" ]; then
392+
echo $(sysctl -n hw.logicalcpu)
393+
else
394+
# Do something under GNU/Linux platform
395+
echo $(grep "processor" /proc/cpuinfo | wc -l)
396+
fi
397+
}

0 commit comments

Comments
 (0)