Skip to content

Commit 8c16b0c

Browse files
Benedikt Volkelchiarazampolli
authored andcommitted
[AnalysisQC] Enable common args per analysis
* use for instance --add-common-args EMCAL-shm-segment-size 2500000000 which will be forwarded as a common argument and appended to the analysis pipeline of EMCAL analysis as --shm-segment-size 2500000000 * Use also e.g. ALL-readers <value> to influence set --readers <values> for all analyses * <analysis>-<key> <value> supersedes ALL-<key> <value>
1 parent 7520493 commit 8c16b0c

3 files changed

Lines changed: 69 additions & 13 deletions

File tree

MC/analysis_testing/analysis_test.sh

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ NTF=$(find ./ -name "tf*" -type d | wc | awk '//{print $1}')
2424
include_disabled=
2525
testanalysis=
2626
aod=
27+
add_common_args=
2728

2829
if [[ "${#}" == "1" ]] ; then
2930
# make it backward-compatible
@@ -48,6 +49,12 @@ else
4849
shift
4950
shift
5051
;;
52+
--add-common-args)
53+
add_common_args=" ${2} ${3} "
54+
shift
55+
shift
56+
shift
57+
;;
5158
*)
5259
echo "ERROR: Unknown argument ${1}"
5360
exit 1
@@ -59,6 +66,7 @@ fi
5966
# basic checks
6067
[[ "${testanalysis}" == "" ]] && { echo "ERROR: No analysis specified to be run" ; exit 1 ; }
6168
[[ "${aod}" == "" ]] && { echo "ERROR: No AOD found to be analysed" ; exit 1 ; }
69+
[[ "${add_common_args}" != "" ]] && add_common_args="--add-common-args ${add_common_args}"
6270

6371
# check if enabled
6472
enabled=$($O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_config.py check -t ${testanalysis} --status)
@@ -68,7 +76,9 @@ enabled=$($O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_config.py check -t
6876
mkdir Analysis 2>/dev/null
6977
include_disabled=${include_disabled:+--include-disabled}
7078
workflow_path="Analysis/workflow_analysis_test_${testanalysis}.json"
71-
$O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_workflow.py --is-mc -f ${aod} -o ${workflow_path} --only-analyses ${testanalysis} ${include_disabled}
79+
rm ${workflow_path} 2>/dev/null
80+
$O2DPG_ROOT/MC/analysis_testing/o2dpg_analysis_test_workflow.py --is-mc -f ${aod} -o ${workflow_path} --only-analyses ${testanalysis} ${include_disabled} ${add_common_args}
81+
[[ ! -f "${workflow_path}" ]] && { echo "Could not construct workflow for analysis ${testanalysis}" ; exit 1 ; }
7282
$O2DPG_ROOT/MC/bin/o2_dpg_workflow_runner.py -f ${workflow_path} -tt Analysis_${testanalysis}$ --rerun-from Analysis_${testanalysis}$
7383

7484
RC=$?

MC/analysis_testing/o2dpg_analysis_test_utils.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,3 +67,50 @@ def get_collision_system(collision_system=None):
6767
def full_ana_name(raw_ana_name):
6868
"""Make the standard name of the analysis how it should appear in the workflow"""
6969
return f"{ANALYSIS_LABEL}_{raw_ana_name}"
70+
71+
72+
def get_common_args_as_string(analysis_name, all_common_args):
73+
"""
74+
all_common_args is of the form
75+
[<ana_name1>-shm-segment-size <value>, <ana_name2>-readers <value>, ...]
76+
77+
Find common arguments for this specific analysis
78+
"""
79+
80+
def make_args_string(args_map_in):
81+
out_string = ""
82+
for key, value in args_map_in.items():
83+
out_string += f" --{key} {value}"
84+
return out_string
85+
86+
# default arguments for all analyses
87+
args_map = {"shm-segment-size": 2000000000,
88+
"readers": 1,
89+
"aod-memory-rate-limit": 500000000}
90+
91+
# arguments dedicated for this analysis
92+
args_map_overwrite = {}
93+
94+
if not all_common_args:
95+
return make_args_string(args_map)
96+
97+
if len(all_common_args) % 2:
98+
print("ERROR: Cannot digest common args.")
99+
return None
100+
101+
for i in range(0, len(all_common_args), 2):
102+
tokens = all_common_args[i].split("-")
103+
key = "-".join(tokens[1:])
104+
if tokens[0] == analysis_name:
105+
# for this analysis, add to dedicated dict
106+
args_map_overwrite[key] = all_common_args[i+1]
107+
continue
108+
if tokens[0] == "ALL":
109+
# otherwise add to default dict
110+
args_map[key] = all_common_args[i+1]
111+
112+
# overwrite default dict with dedicated arguments
113+
for key, value in args_map_overwrite.items():
114+
args_map[key] = value
115+
116+
return make_args_string(args_map)

MC/analysis_testing/o2dpg_analysis_test_workflow.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,7 @@
9090
from o2dpg_analysis_test_utils import *
9191

9292

93-
def create_ana_task(name, cmd, output_dir, *, needs=None, shmsegmentsize="--shm-segment-size 2000000000",
94-
aodmemoryratelimit="--aod-memory-rate-limit 500000000",
95-
readers="--readers 1", extraarguments="-b", is_mc=False):
93+
def create_ana_task(name, cmd, output_dir, *, needs=None, extraarguments="-b", is_mc=False):
9694
"""Quick helper to create analysis task
9795
9896
This creates an analysis task from various arguments
@@ -107,12 +105,6 @@ def create_ana_task(name, cmd, output_dir, *, needs=None, shmsegmentsize="--shm-
107105
Keyword args (optional):
108106
needs: tuple, list
109107
list of other tasks to be run before
110-
shmsegmentsize: str
111-
O2/DPL argument string for shared mem size
112-
aodmemoryratelimit: str
113-
O2/DPL argument string for AOD memory rate limit
114-
readers: O2/DPL argument string
115-
number of readers
116108
extraarguments: str
117109
O2/DPL argument string for any other desired arguments to be added to the executed cmd
118110
Return:
@@ -125,7 +117,7 @@ def create_ana_task(name, cmd, output_dir, *, needs=None, shmsegmentsize="--shm-
125117
task = createTask(name=full_ana_name(name), cwd=join(output_dir, name), lab=[ANALYSIS_LABEL, name], cpu=1, mem='2000', needs=needs)
126118
if is_mc:
127119
task["labels"].append(ANALYSIS_LABEL_ON_MC)
128-
task['cmd'] = f"{cmd} {shmsegmentsize} {aodmemoryratelimit} {readers} {extraarguments}"
120+
task['cmd'] = f"{cmd} {extraarguments}"
129121
return task
130122

131123

@@ -220,7 +212,7 @@ def get_additional_workflows(input_aod):
220212
return additional_workflows
221213

222214

223-
def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None):
215+
def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis", *, analyses_only=None, is_mc=True, collision_system=None, needs=None, autoset_converters=False, include_disabled_analyses=False, timeout=None, add_common_args=None):
224216
"""Add default analyses to user workflow
225217
226218
Args:
@@ -265,12 +257,18 @@ def add_analysis_tasks(workflow, input_aod="./AO2D.root", output_dir="./Analysis
265257
continue
266258
print(f"INFO: Analysis {ana['name']} uses configuration {configuration}")
267259

260+
add_common_args_ana = get_common_args_as_string(ana["name"], add_common_args)
261+
if not add_common_args_ana:
262+
print(f"ERROR: Cannot parse common args for analysis {ana['name']}")
263+
continue
264+
268265
for i in additional_workflows:
269266
if i not in ana["tasks"]:
270267
# print("Appending extra task", i, "to analysis", ana["name"], "as it is not there yet and needed for conversion")
271268
ana["tasks"].append(i)
272269
piped_analysis = f" --configuration {configuration} | ".join(ana["tasks"])
273270
piped_analysis += f" --configuration {configuration} --aod-file {input_aod}"
271+
piped_analysis += add_common_args_ana
274272
if timeout is not None:
275273
piped_analysis += f" --time-limit {timeout}"
276274
workflow.append(create_ana_task(ana["name"], piped_analysis, output_dir, needs=needs, is_mc=is_mc))
@@ -330,7 +328,7 @@ def run(args):
330328
return 1
331329

332330
workflow = []
333-
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system)
331+
add_analysis_tasks(workflow, args.input_file, expanduser(args.analysis_dir), is_mc=args.is_mc, analyses_only=args.only_analyses, autoset_converters=args.autoset_converters, include_disabled_analyses=args.include_disabled, timeout=args.timeout, collision_system=args.collision_system, add_common_args=args.add_common_args)
334332
if args.with_qc_upload:
335333
add_analysis_qc_upload_tasks(workflow, args.period_name, args.run_number, args.pass_name)
336334
if not workflow:
@@ -356,6 +354,7 @@ def main():
356354
parser.add_argument("--autoset-converters", dest="autoset_converters", action="store_true", help="Compatibility mode to automatically set the converters for the analysis")
357355
parser.add_argument("--timeout", type=int, default=None, help="Timeout for analysis tasks in seconds.")
358356
parser.add_argument("--collision-system", dest="collision_system", help="Set the collision system. If not set, tried to be derived from ALIEN_JDL_LPMInterationType. Fallback to pp")
357+
parser.add_argument("--add-common-args", dest="add_common_args", nargs="*", help="Pass additional common arguments per analysis, for instance --add-common-args EMCAL-shm-segment-size 2500000000 will add --shm-segment-size 2500000000 to the EMCAL analysis")
359358
parser.set_defaults(func=run)
360359
args = parser.parse_args()
361360
return(args.func(args))

0 commit comments

Comments
 (0)