Motr  M0
task_queue.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 # For any questions about this software or licensing,
18 # please email opensource@seagate.com or cortx-questions@seagate.com.
19 #
20 
21 
22 from huey import exceptions
23 from config import huey
24 from tasks import io_workload_task
25 import yaml
26 import json
27 import sys
28 import argparse
29 from argparse import RawTextHelpFormatter
30 import signal
31 import os
32 from datetime import datetime
33 import validator as vr
34 from pprint import pprint
35 
36 def print_info(tid, state, info=None):
37  output = [{'task_id': tid}, {'state': state}]
38  if info:
39  output.append({'info': info})
40  print(json.dumps(output))
41 
42 
43 def validation_failed(errors):
44  print('Validation failed with the following errors:')
45  pprint(errors)
46 
47 def task_add():
48  config = yaml.safe_load(sys.stdin.read())
49  errors = vr.validate_config(config)
50 
51  if all([v for e in errors for v in e.values()]):
52  validation_failed(errors)
53  return
54 
55  # Additional check for cli/srv map
56  if not all([n['srv'] for n in config['common']['nodes']]):
57  validation_failed("Specify all server nodes : ['common']['nodes']['srv']")
58  return
59 
60  if not any([n['cli'] for n in config['common']['nodes']]):
61  validation_failed("Specify at least one client node : ['common']['nodes']['cli']")
62  return
63 
64  opt = { 'enqueue_time': str(datetime.now()) }
65  task = io_workload_task((config, opt), priority=config['common']['priority'])
66  print_info(task.id, 'ENQUEUED')
67 
68 def task_del(tid):
69  huey.revoke_by_id(tid)
70  current_task = huey.get('current_task', peek=True)
71  if current_task and current_task['task_id'] == tid:
72  # TODO: use pgrep or ps aux and search $script_name's PID and kill
73  if 'corrupt' in current_task['args'][0]['common']['type']:
74  with open('/tmp/taskq_active_pid', 'r') as f:
75  pid = yaml.safe_load(f)
76  os.kill(pid['active_pid'], signal.SIGTERM)
77  # TODO: Handle the case when deletion is called twice
78  print_info(tid, 'TERMINATING')
79  else:
80  os.kill(current_task['pid'], signal.SIGKILL)
81  huey.get(current_task['task_id']) # get w/o peek == delete
82  print_info(tid, 'ABORTED')
83  elif huey.get(tid, peek=True):
84  print_info(tid, 'FINISHED')
85  else:
86  print_info(tid, 'REVOKED')
87 
88 def get_args(args, is_yaml):
89  if is_yaml:
90  return "\n" + yaml.dump(args)
91  else:
92  return args
93 
94 def list_queue(is_yaml):
95  task = huey.get('current_task', peek=True)
96  if task:
97  print_info(task['task_id'], 'RUNNING', get_args({**{'conf': task['args'][0]}, **task['args'][1]}, is_yaml))
98  for task in huey.pending():
99  if not huey.is_revoked(task):
100  print_info(task.id, 'QUEUED', get_args({**{'conf': task.args[0][0]}, **task.args[0][1]}, is_yaml))
101 
102 def list_results(is_yaml):
103  passed = []
104  failed = []
105 
106  for r in huey.all_results():
107  if not 'r:' in r and r != 'current_task':
108  try:
109  dummy = huey.result(r, preserve=True)
110  passed.append(r)
111  except exceptions.TaskException:
112  failed.append(r)
113 
114  for r in failed:
115  print_info(r, 'FAILED (exception)')
116 
117  for r in sorted(passed, key = lambda t: huey.result(t, preserve=True)['finish_time']):
118  print_info(r, 'FINISHED', get_args(huey.result(r, preserve=True), is_yaml))
119 
120 
121 def task_set_prio(tid, prio):
122  pending = next((t for t in huey.pending()
123  if t.id == tid and not huey.is_revoked(t)), None)
124  if pending:
125  params = pending.args
126  huey.revoke_by_id(tid)
127  print_info(tid, 'REVOKED')
128  params[0][0]['priority'] = prio
129  task = io_workload_task(params[0], priority=prio)
130  print_info(task.id, 'ENQUEUED')
131  else:
132  print_info(tid, 'NOT_FOUND')
133 
134 
136  description="""
137  task_queue.py: front-end module for cluster usage
138 
139  task_queue.py implements persistent queue and enqueues
140  performance tests requests on the hardware cluster.
141  It takes YAML configuration file as input and supplies
142  io_workload.sh script with configuration overrides,
143  which includes Motr and Halon building options and
144  configuration, m0crate workload configuration, Halon
145  facts, and other configuration data.
146 
147  Task is added by calling task_queue with '-a'/'--add-task'
148  parameter. Tasks are enqueued and executed in recevied
149  order. List of pending and running tasks can be retrieved
150  by invoking '-l'/'--list-queue' parameter. When execution
151  completes, the task results will be put into persistent
152  storage and could be fetched any time by calling the script
153  with '-r'/'--list-results'.
154 
155  Any pending or running task can be cancelled and removed
156  from pending queue by invoking '-d'/'--del-task' parameter.
157 
158  task_queue.py supports task priorities. If priority is given
159  in YAML configuration file, task_queue will enqueue task
160  with that priority, otherwise priority "1" will be used.
161  User can change task priority by calling '-p'/'--set-prio'
162  parameter. Priority can't be changed on the fly, instead
163  the old task is revoked and new one with updated priority
164  is created and enqueued. task_queue returns TID of the
165  newly created task.
166 
167  Task lifecycle diagram:
168  Non-existent -> QUEUED -> RUNNING -> FINISHED
169 
170  Task cancelling diagram:
171  QUEUED/RUNNING -> REVOKED
172 
173  Priority change diagram:
174  QUEUED(TID1) -> REVOKED ... non-existent -> QUEUED(TID2)"""
175 
176  parser = argparse.ArgumentParser(description=description,
177  formatter_class=RawTextHelpFormatter)
178  group = parser.add_mutually_exclusive_group(required=True)
179  group.add_argument('-a', '--add-task',
180  help="Add task to the task queue",
181  action="store_true")
182  group.add_argument('-d', '--del-task',
183  metavar='task_id',
184  help="Delete task from the task queue")
185  group.add_argument('-p', '--set-priority',
186  nargs=2,
187  metavar=("task_id", "priority"),
188  help="Set task priority")
189  group.add_argument('-l', '--list-queue',
190  help="List pending queue",
191  action="store_true")
192  group.add_argument('-r', '--list-results',
193  help="List results",
194  action="store_true")
195  parser.add_argument('-v', '--verbose', action='store_true',
196  help='Print task arguments in YAML representation')
197  if len(sys.argv[1:]) == 0:
198  parser.print_help()
199  parser.exit()
200  return parser.parse_args()
201 
202 def main():
203  args = args_parse()
204  if args.add_task:
205  task_add()
206  elif args.del_task:
207  task_del(args.del_task)
208  elif args.set_priority:
209  task_set_prio(args.set_priority[0],
210  int(args.set_priority[1]))
211  elif args.list_queue:
212  list_queue(args.verbose)
213  elif args.list_results:
214  list_results(args.verbose)
215 
216 if __name__ == '__main__':
217  main()
def get_args(args, is_yaml)
Definition: task_queue.py:88
static M0_UNUSED void print(struct m0_be_list *list)
Definition: list.c:186
def task_add()
Definition: task_queue.py:47
def args_parse()
Definition: task_queue.py:135
def main()
Definition: task_queue.py:202
static int next[]
Definition: cp.c:248
def validation_failed(errors)
Definition: task_queue.py:43
def list_results(is_yaml)
Definition: task_queue.py:102
def io_workload_task(conf_opt, task)
Definition: tasks.py:131
Definition: main.py:1
def list_queue(is_yaml)
Definition: task_queue.py:94
def task_set_prio(tid, prio)
Definition: task_queue.py:121
def print_info(tid, state, info=None)
Definition: task_queue.py:36
def task_del(tid)
Definition: task_queue.py:68