22 from huey
import exceptions
23 from config
import huey
24 from tasks
import io_workload_task
29 from argparse
import RawTextHelpFormatter
32 from datetime
import datetime
33 import validator
as vr
34 from pprint
import pprint
37 output = [{
'task_id': tid}, {
'state': state}]
39 output.append({
'info': info})
40 print(json.dumps(output))
44 print(
'Validation failed with the following errors:')
48 config = yaml.safe_load(sys.stdin.read())
49 errors = vr.validate_config(config)
51 if all([v
for e
in errors
for v
in e.values()]):
56 if not all([n[
'srv']
for n
in config[
'common'][
'nodes']]):
60 if not any([n[
'cli']
for n
in config[
'common'][
'nodes']]):
61 validation_failed(
"Specify at least one client node : ['common']['nodes']['cli']")
64 opt = {
'enqueue_time': str(datetime.now()) }
65 task =
io_workload_task((config, opt), priority=config[
'common'][
'priority'])
69 huey.revoke_by_id(tid)
70 current_task = huey.get(
'current_task', peek=
True)
71 if current_task
and current_task[
'task_id'] == tid:
73 if 'corrupt' in current_task[
'args'][0][
'common'][
'type']:
74 with open(
'/tmp/taskq_active_pid',
'r') as f: 75 pid = yaml.safe_load(f) 76 os.kill(pid['active_pid'], signal.SIGTERM)
80 os.kill(current_task[
'pid'], signal.SIGKILL)
81 huey.get(current_task[
'task_id'])
83 elif huey.get(tid, peek=
True):
90 return "\n" + yaml.dump(args)
95 task = huey.get(
'current_task', peek=
True)
97 print_info(task[
'task_id'],
'RUNNING',
get_args({**{
'conf': task[
'args'][0]}, **task[
'args'][1]}, is_yaml))
98 for task
in huey.pending():
99 if not huey.is_revoked(task):
100 print_info(task.id,
'QUEUED',
get_args({**{
'conf': task.args[0][0]}, **task.args[0][1]}, is_yaml))
106 for r
in huey.all_results():
107 if not 'r:' in r
and r !=
'current_task':
109 dummy = huey.result(r, preserve=
True)
111 except exceptions.TaskException:
117 for r
in sorted(passed, key =
lambda t: huey.result(t, preserve=
True)[
'finish_time']):
122 pending =
next((t
for t
in huey.pending()
123 if t.id == tid
and not huey.is_revoked(t)),
None)
125 params = pending.args
126 huey.revoke_by_id(tid)
128 params[0][0][
'priority'] = prio
137 task_queue.py: front-end module for cluster usage 139 task_queue.py implements persistent queue and enqueues 140 performance tests requests on the hardware cluster. 141 It takes YAML configuration file as input and supplies 142 io_workload.sh script with configuration overrides, 143 which includes Motr and Halon building options and 144 configuration, m0crate workload configuration, Halon 145 facts, and other configuration data. 147 Task is added by calling task_queue with '-a'/'--add-task' 148 parameter. Tasks are enqueued and executed in recevied 149 order. List of pending and running tasks can be retrieved 150 by invoking '-l'/'--list-queue' parameter. When execution 151 completes, the task results will be put into persistent 152 storage and could be fetched any time by calling the script 153 with '-r'/'--list-results'. 155 Any pending or running task can be cancelled and removed 156 from pending queue by invoking '-d'/'--del-task' parameter. 158 task_queue.py supports task priorities. If priority is given 159 in YAML configuration file, task_queue will enqueue task 160 with that priority, otherwise priority "1" will be used. 161 User can change task priority by calling '-p'/'--set-prio' 162 parameter. Priority can't be changed on the fly, instead 163 the old task is revoked and new one with updated priority 164 is created and enqueued. task_queue returns TID of the 167 Task lifecycle diagram: 168 Non-existent -> QUEUED -> RUNNING -> FINISHED 170 Task cancelling diagram: 171 QUEUED/RUNNING -> REVOKED 173 Priority change diagram: 174 QUEUED(TID1) -> REVOKED ... non-existent -> QUEUED(TID2)""" 176 parser = argparse.ArgumentParser(description=description,
177 formatter_class=RawTextHelpFormatter)
178 group = parser.add_mutually_exclusive_group(required=
True)
179 group.add_argument(
'-a',
'--add-task',
180 help=
"Add task to the task queue",
182 group.add_argument(
'-d',
'--del-task',
184 help=
"Delete task from the task queue")
185 group.add_argument(
'-p',
'--set-priority',
187 metavar=(
"task_id",
"priority"),
188 help=
"Set task priority")
189 group.add_argument(
'-l',
'--list-queue',
190 help=
"List pending queue",
192 group.add_argument(
'-r',
'--list-results',
195 parser.add_argument(
'-v',
'--verbose', action=
'store_true',
196 help=
'Print task arguments in YAML representation')
197 if len(sys.argv[1:]) == 0:
200 return parser.parse_args()
208 elif args.set_priority:
210 int(args.set_priority[1]))
211 elif args.list_queue:
213 elif args.list_results:
216 if __name__ ==
'__main__':
def get_args(args, is_yaml)
static M0_UNUSED void print(struct m0_be_list *list)
def validation_failed(errors)
def list_results(is_yaml)
def io_workload_task(conf_opt, task)
def task_set_prio(tid, prio)
def print_info(tid, state, info=None)