Motr  M0
tasks.py
Go to the documentation of this file.
1 #
2 # Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 # http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # For any questions about this software or licensing,
17 # please email opensource@seagate.com or cortx-questions@seagate.com.
18 #
19 
20 import config
21 from config import huey
22 from datetime import datetime
23 import os
24 import plumbum
25 
26 def get_overrides(overrides):
27  return " ".join([f"{x}={y}" for (x,y) in overrides.items()])
28 
29 def parse_options(conf, result_dir):
30  options = []
31 
32  options.append('-p')
33  options.append(result_dir)
34 
35  options.append('-c')
36  options.append(config.config_dir)
37 
38  if hasattr(config, 'fio_test_dir'):
39  options.append('-f')
40  options.append(config.fio_test_dir)
41 
42  if 'timeout' in conf['common']:
43  options.append('--timeout')
44  options.append(conf['common']['timeout'])
45 
46  options.append("--srv-cli-map")
47  nodes_map = " ".join([f"{n['srv']}={n['cli']}" for n in conf['common']['nodes']])
48  options.append(nodes_map.replace('None', ''))
49 
50  for w in conf['workload']:
51  options.append('-w')
52  options.append(w['app'])
53  if 'cli_options' in w['param']:
54  options.append(w['param']['cli_options'])
55  elif 'config_overrides' in w['param']:
56  options.append(get_overrides(w['param']['config_overrides']))
57 
58  # Motr parameters
59  options.append('--motr-git-commit')
60  options.append(conf['motr']['git'])
61  if conf['motr']['build_options']:
62  options.append('--motr-build-options')
63  options.append(conf['motr']['build_options'])
64  if conf['motr']['config_overrides']:
65  options.append('--motr_config')
66  options.append(get_overrides(conf['motr']['config_overrides']))
67 
68  # HA parameters
69  if 'hare' in conf['ha']:
70  options.append('--hare')
71 
72  # S3 server stuff
73  if conf['common']['type'] == 's3client' or conf['common']['type'] == 's3corrupt':
74 
75  options.append('--s3-git-commit')
76  options.append(conf['s3server']['git'])
77 
78  options.append('--s3-multiplicity')
79  options.append(conf['s3server']['num_instances'])
80 
81  if conf['s3server']['cmd_line_options']:
82  options.append('--s3srv-opts')
83  options.append(conf['s3server']['cmd_line_options'])
84 
85  if conf['s3server']['config_overrides']:
86  options.append('--s3srv_config')
87  options.append(get_overrides(conf['s3server']['config_overrides']))
88 
89  # Execution options:
90  if conf['execution_options']['no_motr_trace']:
91  options.append("--no-motr-trace")
92  if conf['execution_options']['no_m0trace_files']:
93  options.append("--no-m0trace-files")
94  if conf['execution_options']['no_m0trace_dumps']:
95  options.append("--no-m0trace-dumps")
96  if conf['execution_options']['no_addb_stobs']:
97  options.append("--no-addb-stobs")
98  if conf['execution_options']['no_addb_dumps']:
99  options.append("--no-addb-dumps")
100  if conf['execution_options']['no_m0play_db']:
101  options.append("--no-m0play-db")
102 
103  print(options)
104  return options
105 
106 def run_cmds(cmds, path):
107  # TODO: Implement me!
108  return
109 
110 def send_mail(to, status, tid):
111  nl="\n"
112  msg = f"Subject: Cluster task queue{nl}Task {tid} {status}"
113  sendmail = plumbum.local["sendmail"]
114  echo = plumbum.local["echo"]
115  chain = echo[msg] | sendmail[to]
116  try:
117  chain()
118  except:
119  print(f"Couldn't send email to {to}")
120 
121 def pack_artifacts(path):
122  tar = plumbum.local["tar"]
123  parent_dir = '/'.join(path.split('/')[:-1])
124  archive_dir = path.split('/')[-1]
125  tar[f"-cJvf {parent_dir}/{archive_dir}.tar.xz -C {parent_dir} {archive_dir}".split(" ")] & plumbum.FG
126  print(f"Rm path: {path}")
127  rm = plumbum.local["rm"]
128  # rm[f"-rf {path}".split(" ")]()
129 
130 @huey.task(context=True)
131 def io_workload_task(conf_opt, task):
132  conf,opt = conf_opt
133  current_task = {
134  'task_id': task.id,
135  'pid' : os.getpid(),
136  'args' : conf_opt,
137  }
138  huey.put('current_task', current_task)
139 
140  result = {
141  'conf' : conf,
142  'start_time' : str(datetime.now()),
143  'path' : f"{config.artifacts_dir}",
144  'artifacts_dir' : f"{config.artifacts_dir}/result_{task.id}",
145  }
146  result.update(opt)
147 
148  if config.pack_artifacts:
149  result['archive_name'] = f"result_{task.id}.tar.xz"
150 
151  if conf['common']['send_email']:
152  send_mail(conf['common']['user'], "Task started", task.id)
153 
154  if 'pre_exec_cmds' in conf:
155  run_cmds(conf['pre_exec_cmds'], result['artifacts_dir'])
156 
157  with plumbum.local.env(MOTR_SRC_DIR = config.motr_src_dir):
158  if conf['common']['type'] == 's3client':
159  run_workload = plumbum.local["../run_s3_task"]
160  elif conf['common']['type'] == 'm0crate':
161  run_workload = plumbum.local["../run_task"]
162  elif conf['common']['type'] == 's3corrupt':
163  run_workload = plumbum.local["../run_s3_corruption"]
164  elif conf['common']['type'] == 'm0corrupt':
165  run_workload = plumbum.local["../run_corruption_task"]
166  else:
167  print('ERROR: Unknown workload type')
168  result['status']='FAILED'
169  return result
170 
171  try:
172  tee = plumbum.local['tee']
173  options = parse_options(conf, result["artifacts_dir"])
174  (run_workload[options] | tee['/tmp/workload.log']) & plumbum.FG
175  result['status']='SUCCESS'
176  except plumbum.commands.processes.ProcessExecutionError:
177  result['status']='FAILED'
178 
179  mv = plumbum.local['mv']
180  mv['/tmp/workload.log', result["artifacts_dir"]] & plumbum.FG
181 
182  result['finish_time'] = str(datetime.now())
183 
184  if 'post_exec_cmds' in conf:
185  run_cmds(conf['post_exec_cmds'], result['artifacts_dir'])
186 
187  if conf['common']['send_email']:
188  send_mail(conf['common']['user'], f"finished, status {result['status']}",
189  task.id)
190 
191  if config.pack_artifacts:
192  pack_artifacts(result["artifacts_dir"])
193 
194  return result
195 
196 @huey.post_execute()
197 def post_execute_hook(task, task_value, exc):
198  if exc is not None:
199  print(f'Task "{task.id}" failed with error: {exc}')
200  # Current task finished - do cleanup
201  huey.get('current_task')
static void split(m0_bindex_t offset, int nr, bool commit)
Definition: extmap.c:230
def post_execute_hook(task, task_value, exc)
Definition: tasks.py:197
def get_overrides(overrides)
Definition: tasks.py:26
def parse_options(conf, result_dir)
Definition: tasks.py:29
static M0_UNUSED void print(struct m0_be_list *list)
Definition: list.c:186
def io_workload_task(conf_opt, task)
Definition: tasks.py:131
def pack_artifacts(path)
Definition: tasks.py:121
def run_cmds(cmds, path)
Definition: tasks.py:106
static void chain(void)
Definition: sm.c:595
def send_mail(to, status, tid)
Definition: tasks.py:110