21 from config
import huey
22 from datetime
import datetime
27 return " ".join([f
"{x}={y}" for (x,y)
in overrides.items()])
33 options.append(result_dir)
36 options.append(config.config_dir)
38 if hasattr(config,
'fio_test_dir'):
40 options.append(config.fio_test_dir)
42 if 'timeout' in conf[
'common']:
43 options.append(
'--timeout')
44 options.append(conf[
'common'][
'timeout'])
46 options.append(
"--srv-cli-map")
47 nodes_map =
" ".join([f
"{n['srv']}={n['cli']}" for n
in conf[
'common'][
'nodes']])
48 options.append(nodes_map.replace(
'None',
''))
50 for w
in conf[
'workload']:
52 options.append(w[
'app'])
53 if 'cli_options' in w[
'param']:
54 options.append(w[
'param'][
'cli_options'])
55 elif 'config_overrides' in w[
'param']:
59 options.append(
'--motr-git-commit')
60 options.append(conf[
'motr'][
'git'])
61 if conf[
'motr'][
'build_options']:
62 options.append(
'--motr-build-options')
63 options.append(conf[
'motr'][
'build_options'])
64 if conf[
'motr'][
'config_overrides']:
65 options.append(
'--motr_config')
66 options.append(
get_overrides(conf[
'motr'][
'config_overrides']))
69 if 'hare' in conf[
'ha']:
70 options.append(
'--hare')
73 if conf[
'common'][
'type'] ==
's3client' or conf[
'common'][
'type'] ==
's3corrupt':
75 options.append(
'--s3-git-commit')
76 options.append(conf[
's3server'][
'git'])
78 options.append(
'--s3-multiplicity')
79 options.append(conf[
's3server'][
'num_instances'])
81 if conf[
's3server'][
'cmd_line_options']:
82 options.append(
'--s3srv-opts')
83 options.append(conf[
's3server'][
'cmd_line_options'])
85 if conf[
's3server'][
'config_overrides']:
86 options.append(
'--s3srv_config')
87 options.append(
get_overrides(conf[
's3server'][
'config_overrides']))
90 if conf[
'execution_options'][
'no_motr_trace']:
91 options.append(
"--no-motr-trace")
92 if conf[
'execution_options'][
'no_m0trace_files']:
93 options.append(
"--no-m0trace-files")
94 if conf[
'execution_options'][
'no_m0trace_dumps']:
95 options.append(
"--no-m0trace-dumps")
96 if conf[
'execution_options'][
'no_addb_stobs']:
97 options.append(
"--no-addb-stobs")
98 if conf[
'execution_options'][
'no_addb_dumps']:
99 options.append(
"--no-addb-dumps")
100 if conf[
'execution_options'][
'no_m0play_db']:
101 options.append(
"--no-m0play-db")
112 msg = f
"Subject: Cluster task queue{nl}Task {tid} {status}" 113 sendmail = plumbum.local[
"sendmail"]
114 echo = plumbum.local[
"echo"]
115 chain = echo[msg] | sendmail[to]
119 print(f
"Couldn't send email to {to}")
122 tar = plumbum.local[
"tar"]
123 parent_dir =
'/'.join(path.split(
'/')[:-1])
124 archive_dir = path.split(
'/')[-1]
125 tar[f
"-cJvf {parent_dir}/{archive_dir}.tar.xz -C {parent_dir} {archive_dir}".
split(
" ")] & plumbum.FG
126 print(f
"Rm path: {path}")
127 rm = plumbum.local[
"rm"]
130 @huey.task(context=
True)
138 huey.put(
'current_task', current_task)
142 'start_time' : str(datetime.now()),
143 'path' : f
"{config.artifacts_dir}",
144 'artifacts_dir' : f
"{config.artifacts_dir}/result_{task.id}",
148 if config.pack_artifacts:
149 result[
'archive_name'] = f
"result_{task.id}.tar.xz" 151 if conf[
'common'][
'send_email']:
152 send_mail(conf[
'common'][
'user'],
"Task started", task.id)
154 if 'pre_exec_cmds' in conf:
155 run_cmds(conf[
'pre_exec_cmds'], result[
'artifacts_dir'])
157 with plumbum.local.env(MOTR_SRC_DIR = config.motr_src_dir):
158 if conf[
'common'][
'type'] ==
's3client':
159 run_workload = plumbum.local[
"../run_s3_task"]
160 elif conf[
'common'][
'type'] ==
'm0crate':
161 run_workload = plumbum.local[
"../run_task"]
162 elif conf[
'common'][
'type'] ==
's3corrupt':
163 run_workload = plumbum.local[
"../run_s3_corruption"]
164 elif conf[
'common'][
'type'] ==
'm0corrupt':
165 run_workload = plumbum.local[
"../run_corruption_task"]
167 print(
'ERROR: Unknown workload type')
168 result[
'status']=
'FAILED' 172 tee = plumbum.local[
'tee']
174 (run_workload[options] | tee[
'/tmp/workload.log']) & plumbum.FG
175 result[
'status']=
'SUCCESS' 176 except plumbum.commands.processes.ProcessExecutionError:
177 result[
'status']=
'FAILED' 179 mv = plumbum.local[
'mv']
180 mv[
'/tmp/workload.log', result[
"artifacts_dir"]] & plumbum.FG
182 result[
'finish_time'] = str(datetime.now())
184 if 'post_exec_cmds' in conf:
185 run_cmds(conf[
'post_exec_cmds'], result[
'artifacts_dir'])
187 if conf[
'common'][
'send_email']:
188 send_mail(conf[
'common'][
'user'], f
"finished, status {result['status']}",
191 if config.pack_artifacts:
199 print(f
'Task "{task.id}" failed with error: {exc}')
201 huey.get(
'current_task')
static void split(m0_bindex_t offset, int nr, bool commit)
def post_execute_hook(task, task_value, exc)
def get_overrides(overrides)
def parse_options(conf, result_dir)
static M0_UNUSED void print(struct m0_be_list *list)
def io_workload_task(conf_opt, task)
def send_mail(to, status, tid)