Motr  M0
motr_mini_prov.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2021 Seagate Technology LLC and/or its Affiliates
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 # For any questions about this software or licensing,
18 # please email opensource@seagate.com or cortx-questions@seagate.com.
19 #
20 import sys
21 import errno
22 import os
23 import re
24 import subprocess
25 import logging
26 import glob
27 import time
28 import yaml
29 import psutil
30 from typing import List, Dict, Any
31 from cortx.utils.conf_store import Conf
32 from cortx.utils.cortx import Const
33 
34 MOTR_SERVER_SCRIPT_PATH = "/usr/libexec/cortx-motr/motr-start"
35 MOTR_MKFS_SCRIPT_PATH = "/usr/libexec/cortx-motr/motr-mkfs"
36 MOTR_FSM_SCRIPT_PATH = "/usr/libexec/cortx-motr/motr-free-space-monitor"
37 MOTR_CONFIG_SCRIPT = "/opt/seagate/cortx/motr/libexec/motr_cfg.sh"
38 LNET_CONF_FILE = "/etc/modprobe.d/lnet.conf"
39 LIBFAB_CONF_FILE = "/etc/libfab.conf"
40 SYS_CLASS_NET_DIR = "/sys/class/net/"
41 MOTR_SYS_CFG = "/etc/sysconfig/motr"
42 MOTR_WORKLOAD_DIR = "/opt/seagate/cortx/motr/workload"
43 FSTAB = "/etc/fstab"
44 LOGFILE = "/var/log/seagate/motr/mini_provisioner"
45 LOGDIR = "/var/log/seagate/motr"
46 LOGGER = "mini_provisioner"
47 IVT_DIR = "/var/log/seagate/motr/ivt"
48 MOTR_LOG_DIR = "/var/motr"
49 TIMEOUT_SECS = 120
50 MACHINE_ID_LEN = 32
51 MOTR_LOG_DIRS = [LOGDIR, MOTR_LOG_DIR]
52 BE_LOG_SZ = 4*1024*1024*1024 #4G
53 BE_SEG0_SZ = 128 * 1024 *1024 #128M
54 ALLIGN_SIZE = 4096
55 MACHINE_ID_FILE = "/etc/machine-id"
56 TEMP_FID_FILE= "/opt/seagate/cortx/motr/conf/service_fid.yaml"
57 CMD_RETRY_COUNT = 5
58 MEM_THRESHOLD = 4*1024*1024*1024
59 CVG_COUNT_KEY = "num_cvg"
60 
62  """ Generic Exception with error code and output """
63 
64  def __init__(self, rc, message, *args):
65  self._rc = rc
66  self._desc = message % (args)
67 
68  def __str__(self):
69  return f"error[{self._rc}]: {self._desc}"
70 
71 def execute_command_without_log(cmd, timeout_secs = TIMEOUT_SECS,
72  verbose = False, retries = 1, stdin = None, logging=False):
73  ps = subprocess.Popen(cmd, stdin=subprocess.PIPE,
74  stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
75  shell=True)
76  if stdin:
77  ps.stdin.write(stdin.encode())
78  stdout, stderr = ps.communicate(timeout=timeout_secs);
79  stdout = str(stdout, 'utf-8')
80 
81  time.sleep(1)
82  if ps.returncode != 0:
83  raise MotrError(ps.returncode, f"\"{cmd}\" command execution failed")
84 
85 #TODO: logger config(config_log) takes only self as argument so not configurable,
86 # need to make logger configurable to change formater, etc and remove below
87 # duplicate code,
88 def execute_command_console(self, command):
89  logger = logging.getLogger("console")
90  if not os.path.exists(LOGDIR):
91  try:
92  os.makedirs(LOGDIR, exist_ok=True)
93  with open(f'{self.logfile}', 'w'): pass
94  except:
95  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
96  else:
97  if not os.path.exists(self.logfile):
98  try:
99  with open(f'{self.logfile}', 'w'): pass
100  except:
101  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
102  logger.setLevel(logging.DEBUG)
103  # create file handler which logs debug message in log file
104  fh = logging.FileHandler(self.logfile)
105  fh.setLevel(logging.DEBUG)
106  # create console handler to log messages ERROR and above
107  ch = logging.StreamHandler()
108  ch.setLevel(logging.INFO)
109  formatter = logging.Formatter('%(asctime)s - %(message)s')
110  fh.setFormatter(formatter)
111  ch.setFormatter(formatter)
112  logger.addHandler(fh)
113  logger.addHandler(ch)
114  logger.info(f"executing command {command}")
115  try:
116  process = subprocess.Popen(command, stdin=subprocess.PIPE,
117  stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
118  shell=True)
119  except Exception as e:
120  logger.error("ERROR {} when running {} with exception {}".format(sys.exc_info()[1],
121  command, e.message))
122  return None
123  while True:
124  stdout = process.stdout.readline()
125  if process.poll() is not None:
126  break
127  if stdout:
128  logger.info(stdout.strip().decode())
129  rc = process.poll()
130  return rc
131 
132 
133 def execute_command(self, cmd, timeout_secs = TIMEOUT_SECS, verbose = False,
134  retries = 1, stdin = None, logging=True):
135  # logging flag is set False when we execute any command
136  # before logging is configured.
137  # If logging is False, we use print instead of logger
138  # verbose(True) and logging(False) can not be set simultaneously.
139 
140  for i in range(retries):
141  if logging == True:
142  self.logger.info(f"Retry: {i}. Executing cmd: '{cmd}'")
143 
144  ps = subprocess.Popen(cmd, stdin=subprocess.PIPE,
145  stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
146  shell=True)
147  if stdin:
148  ps.stdin.write(stdin.encode())
149  stdout, stderr = ps.communicate(timeout=timeout_secs);
150  stdout = str(stdout, 'utf-8')
151 
152  if logging == True:
153  self.logger.info(f"ret={ps.returncode}\n")
154 
155  if (self._debug or verbose) and (logging == True):
156  self.logger.debug(f"[CMD] {cmd}\n")
157  self.logger.debug(f"[OUT]\n{stdout}\n")
158  self.logger.debug(f"[RET] {ps.returncode}\n")
159  if ps.returncode == 0:
160  break
161  time.sleep(1)
162  if ps.returncode != 0:
163  raise MotrError(ps.returncode, f"\"{cmd}\" command execution failed")
164  return stdout, ps.returncode
165 
166 # For normal command, we execute command for CMD_RETRY_COUNT(5 default) times and for each retry timeout is of TIMEOUT_SECS(120s default).
167 # For daemon(e.g. m0d services), retry_count is 1 and tmeout is 0 so that we just execute this daemon command only once without timeout.
168 def execute_command_verbose(self, cmd, timeout_secs = TIMEOUT_SECS, verbose = False, set_timeout=True, retry_count = CMD_RETRY_COUNT):
169  self.logger.info(f"Executing cmd : '{cmd}' \n")
170  # For commands without timeout
171  if set_timeout == False:
172  timeout_secs = None
173  retry_count = 1
174  cmd_retry_delay = 1
175  for cmd_retry_count in range(retry_count):
176  ps = subprocess.run(cmd, stdin=subprocess.PIPE,
177  stdout=subprocess.PIPE, timeout=timeout_secs,
178  stderr=subprocess.PIPE, shell=True)
179  self.logger.info(f"ret={ps.returncode}")
180  self.logger.debug(f"Executing {cmd_retry_count} time")
181  stdout = ps.stdout.decode('utf-8')
182  self.logger.debug(f"[OUT]{stdout}")
183  self.logger.debug(f"[ERR]{ps.stderr.decode('utf-8')}")
184  self.logger.debug(f"[RET] {ps.returncode}")
185  if ps.returncode != 0:
186  time.sleep(cmd_retry_delay)
187  continue
188  return stdout, ps.returncode
189  return
190 
191 def execute_command_without_exception(self, cmd, timeout_secs = TIMEOUT_SECS, retries = 1):
192  for i in range(retries):
193  self.logger.info(f"Retry: {i}. Executing cmd : '{cmd}'\n")
194  ps = subprocess.run(list(cmd.split(' ')), timeout=timeout_secs)
195  self.logger.info(f"ret={ps.returncode}\n")
196  if ps.returncode == 0:
197  break
198  time.sleep(1)
199  return ps.returncode
200 
201 def check_type(var, vtype, msg):
202  if not isinstance(var, vtype):
203  raise MotrError(errno.EINVAL, f"Invalid {msg} type. Expected: {vtype}")
204  if not bool(var):
205  raise MotrError(errno.EINVAL, f"Empty {msg}.")
206 
207 def configure_machine_id(self, phase):
208  if Conf.machine_id:
209  self.machine_id = Conf.machine_id
210  if not os.path.exists(f"{MACHINE_ID_FILE}"):
211  if phase == "start":
212  with open(f"{MACHINE_ID_FILE}", "w") as fp:
213  fp.write(f"{self.machine_id}\n")
214  else:
215  op = execute_command(self, f"cat {MACHINE_ID_FILE}", logging=False)[0].strip("\n")
216  if op != self.machine_id:
217  raise MotrError(errno.EINVAL, "machine id does not match")
218  else:
219  raise MotrError(errno.ENOENT, "machine id not available in conf")
220 
221 def get_server_node(self):
222  """Get current node name using machine-id."""
223  try:
224  machine_id = self.machine_id;
225  server_node = Conf.get(self._index, 'node')[machine_id]
226  except:
227  raise MotrError(errno.EINVAL, f"MACHINE_ID {machine_id} does not exist in ConfStore")
228 
229  check_type(server_node, dict, "server_node")
230  return server_node
231 
232 def calc_size(self, sz):
233  ret = -1
234  suffixes = ['K', 'Ki', 'Kib', 'M', 'Mi', 'Mib', 'G', 'Gi', 'Gib']
235  sz_map = {
236  "K": 1024, "M": 1024*1024, "G": 1024*1024*1024,
237  "Ki": 1024, "Mi": 1024*1024, "Gi": 1024*1024*1024,
238  "Kib": 1024, "Mib": 1024*1024, "Gib": 1024*1024*1024 }
239 
240  # Check if sz ends with proper suffixes. It matches only one suffix.
241  temp = list(filter(sz.endswith, suffixes))
242  if len(temp) > 0:
243  suffix = temp[0]
244  num_sz = re.sub(r'[^0-9]', '', sz) # Ex: If sz is 128MiB then num_sz=128
245  map_val = sz_map[suffix] # Ex: If sz is 128MiB then map_val = 1024*1024*1024
246  ret = int(num_sz) * int(map_val)
247  return ret
248  else:
249  self.logger.error(f"Invalid format of mem limit: {sz}\n")
250  self.logger.error("Please use valid format Ex: 1024, 1Ki, 1Mi, 1Gi etc..\n")
251  return ret
252 
253 def set_setup_size(self, service):
254  ret = False
255  sevices_limits = Conf.get(self._index, 'cortx>motr>limits')['services']
256 
257  # Default self.setup_size is "small"
258  self.setup_size = "small"
259 
260  # For services other then ioservice and confd, return True
261  # It will set default setup size i.e. small
262  if service not in ["ioservice", "ios", "io", "all", "confd"]:
263  self.setup_size = "small"
264  self.logger.info(f"service is {service}. So seting setup size to {self.setup_size}\n")
265  return True
266 
267  #Provisioner passes io as parameter to motr_setup.
268  #Ex: /opt/seagate/cortx/motr/bin/motr_setup config --config yaml:///etc/cortx/cluster.conf --services io
269  #But in /etc/cortx/cluster.conf io is represented by ios. So first get the service names right
270  if service in ["io", "ioservice"]:
271  svc = "ios"
272  else:
273  svc = service
274  for arr_elem in sevices_limits:
275  # For ios, confd we check for setup size according to mem size
276  if arr_elem['name'] == svc:
277  min_mem = arr_elem['memory']['min']
278 
279  if min_mem.isnumeric():
280  sz = int(min_mem)
281  else:
282  sz = calc_size(self, min_mem)
283 
284  self.logger.info(f"mem limit in config is {min_mem} i.e. {sz}\n")
285 
286  # Invalid min mem format
287  if sz < 0:
288  ret = False
289  break
290  # If mem limit in ios > 4G then it is large setup size
291  elif sz > MEM_THRESHOLD:
292  self.setup_size = "large"
293  self.logger.info(f"setup_size set to {self.setup_size}\n")
294  ret = True
295  break
296  else:
297  self.setup_size = "small"
298  self.logger.info(f"setup_size set to {self.setup_size}\n")
299  ret = True
300  break
301  if ret == False:
302  raise MotrError(errno.EINVAL, f"Setup size is not set properly for service {service}."
303  f"Please update valid mem limits for {service}")
304  else:
305  self.logger.info(f"service={service} and setup_size={self.setup_size}\n")
306  return ret
307 
308 def get_value(self, key, key_type):
309  """Get data."""
310  try:
311  val = Conf.get(self._index, key)
312  except:
313  raise MotrError(errno.EINVAL, "{key} does not exist in ConfStore")
314 
315  check_type(val, key_type, key)
316  return val
317 
319  """Get logical_node_class."""
320  try:
321  logical_node_class = self.cluster['logical_node_class']
322  except:
323  raise MotrError(errno.EINVAL, f"{logical_node_class} does not exist in ConfStore")
324  check_type(logical_node_class, list, "logical_node_class")
325  return logical_node_class
326 
327 def get_storage(self):
328  storage = self.node['storage']
329  check_type(storage, dict, "storage")
330  return storage
331 
332 def restart_services(self, services):
333  for service in services:
334  self.logger.info(f"Restarting {service} service\n")
335  cmd = f"systemctl stop {service}"
336  execute_command(self, cmd)
337  cmd = f"systemctl start {service}"
338  execute_command(self, cmd)
339  cmd = f"systemctl status {service}"
340  execute_command(self, cmd)
341 
342 def validate_file(file):
343  if not os.path.exists(file):
344  raise MotrError(errno.ENOENT, f"{file} does not exist")
345 
346 # Check if file paths are valid
347 def validate_files(files):
348  for file in files:
349  if not os.path.exists(file):
350  raise MotrError(errno.ENOENT, f"{file} does not exist")
351 
352 # Create directories
353 def create_dirs(self, dirs):
354  for entry in dirs:
355  if not os.path.exists(entry):
356  cmd = f"mkdir -p {entry}"
357  execute_command(self, cmd, logging=False)
358 
359 def is_hw_node(self):
360  try:
361  node_type = self.server_node['type']
362  except:
363  raise MotrError(errno.EINVAL, "node_type not found")
364 
365  check_type(node_type, str, "node type")
366  if node_type == "HW":
367  return True
368  else:
369  return False
370 
372  '''
373  1. check m0tr.ko exists in current kernel modules
374  2. check /etc/sysconfig/motr
375  '''
376  cmd = "uname -r"
377  cmd_res = execute_command(self, cmd)
378  op = cmd_res[0]
379  kernel_ver = op.replace('\n', '')
380  check_type(kernel_ver, str, "kernel version")
381 
382  kernel_module = f"/lib/modules/{kernel_ver}/kernel/fs/motr/m0tr.ko"
383  self.logger.info(f"Checking for {kernel_module}\n")
384  validate_file(kernel_module)
385 
386  self.logger.info(f"Checking for {MOTR_SYS_CFG}\n")
387  validate_file(MOTR_SYS_CFG)
388 
389 def update_config_file(self, fname, kv_list):
390  lines = []
391  # Get all lines of file in buffer
392  with open(f"{MOTR_SYS_CFG}", "r") as fp:
393  for line in fp:
394  lines.append(line)
395  num_lines = len(lines)
396  self.logger.info(f"Before update, in file {fname}, num_lines={num_lines}\n")
397 
398  #Check for keys in file
399  for (k, v) in kv_list:
400  found = False
401  for lno in range(num_lines):
402  # If found, update inline.
403  if lines[lno].startswith(f"{k}="):
404  lines[lno] = f"{k}={v}\n"
405  found = True
406  break
407  # If not found, append
408  if not found:
409  lines.append(f"{k}={v}\n")
410  found = False
411 
412  num_lines = len(lines)
413  self.logger.info(f"After update, in file {fname}, num_lines={num_lines}\n")
414 
415  # Write buffer to file
416  with open(f"{MOTR_SYS_CFG}", "w+") as fp:
417  for line in lines:
418  fp.write(f"{line}")
419 
421  local_path = self.local_path
422  log_path = self.log_path
423  machine_id = self.machine_id
424  validate_files([MOTR_SYS_CFG, local_path, log_path])
425  MOTR_M0D_DATA_DIR = f"{local_path}/motr"
426  if not os.path.exists(MOTR_M0D_DATA_DIR):
427  create_dirs(self, [f"{MOTR_M0D_DATA_DIR}"])
428  MOTR_LOCAL_SYSCONFIG_DIR = f"{MOTR_M0D_DATA_DIR}/sysconfig"
429  if not os.path.exists(MOTR_LOCAL_SYSCONFIG_DIR):
430  create_dirs(self, [f"{MOTR_LOCAL_SYSCONFIG_DIR}"])
431 
432  MOTR_M0D_CONF_DIR = f"{MOTR_LOCAL_SYSCONFIG_DIR}/{machine_id}"
433  MOTR_M0D_CONF_XC = f"{MOTR_M0D_CONF_DIR}/confd.xc"
434  MOTR_M0D_ADDB_STOB_DIR = f"{log_path}/motr/{machine_id}/addb"
435  MOTR_M0D_TRACE_DIR = f"{log_path}/motr/{machine_id}/trace"
436  # Skip MOTR_CONF_XC
437  dirs = [MOTR_M0D_DATA_DIR, MOTR_M0D_ADDB_STOB_DIR, MOTR_M0D_TRACE_DIR, MOTR_M0D_CONF_DIR]
438  create_dirs(self, dirs)
439 
440  # Update new config keys to config file /etc/sysconfig/motr
441  config_kvs = [("MOTR_M0D_CONF_DIR", f"{MOTR_M0D_CONF_DIR}"),
442  ("MOTR_M0D_DATA_DIR", f"{MOTR_M0D_DATA_DIR}"),
443  ("MOTR_M0D_CONF_XC", f"{MOTR_M0D_CONF_XC}"),
444  ("MOTR_M0D_ADDB_STOB_DIR", f"{MOTR_M0D_ADDB_STOB_DIR}"),
445  ("MOTR_M0D_TRACE_DIR", f"{MOTR_M0D_TRACE_DIR}")]
446 
447  update_config_file(self, f"{MOTR_SYS_CFG}", config_kvs)
448 
449  # Copy config file to new path
450  cmd = f"cp {MOTR_SYS_CFG} {MOTR_M0D_CONF_DIR}"
451  execute_command(self, cmd)
452 
453 # Get lists of metadata disks from Confstore of all cvgs
454 # Input: node_info
455 # Output: [['/dev/sdc'], ['/dev/sdf']]
456 # where ['/dev/sdc'] is list of metadata disks of cvg[0]
457 # ['/dev/sdf'] is list of metadata disks of cvg[1]
458 def get_md_disks_lists(self, node_info):
459  md_disks_lists = []
460  cvg_count = node_info['storage'][CVG_COUNT_KEY]
461  cvg = node_info['storage']['cvg']
462  for i in range(cvg_count):
463  temp_cvg = cvg[i]
464  if temp_cvg['devices']['metadata']:
465  md_disks_lists.append(temp_cvg['devices']['metadata'])
466  self.logger.info(f"md_disks lists on node = {md_disks_lists}\n")
467  return md_disks_lists
468 
469 # Get metada disks from list of lists of metadata disks of
470 # different cvgs of node
471 # Input: [['/dev/sdc'], ['/dev/sdf']]
472 # where ['/dev/sdc'] is ist of metadata disks of cvg[0]
473 # ['/dev/sdf'] is list of metadata disks of cvg[1]
474 # Output: ['/dev/sdc', '/dev/sdf']
475 def get_mdisks_from_list(self, md_lists):
476  md_disks = []
477  md_len_outer = len(md_lists)
478  for i in range(md_len_outer):
479  md_len_innner = len(md_lists[i])
480  for j in range(md_len_innner):
481  md_disks.append(md_lists[i][j])
482  self.logger.info(f"md_disks on node = {md_disks}\n")
483  return md_disks
484 
485 # Update metadata disk entries to motr-hare confstore
486 def update_to_file(self, index, url, machine_id, md_disks):
487  ncvgs = len(md_disks)
488  for i in range(ncvgs):
489  md = md_disks[i]
490  len_md = len(md)
491  for j in range(len_md):
492  md_disk = md[j]
493  self.logger.info(f"setting key server>{machine_id}>cvg[{i}]>m0d[{j}]>md_seg1"
494  f" with value {md_disk} in {url}")
495  Conf.set(index, f"server>{machine_id}>cvg[{i}]>m0d[{j}]>md_seg1",f"{md_disk}")
496  Conf.save(index)
497 
498 # populate self.storage_nodes with machine_id for all storage_nodes
499 def get_data_nodes(self):
500  machines: Dict[str,Any] = self.nodes
501  storage_nodes: List[str] = []
502  services = Conf.search(self._index, 'node', 'services', Const.SERVICE_MOTR_IO.value)
503  for machine_id in machines.keys():
504  result = [svc for svc in services if machine_id in svc]
505  # skipped control , HA and server pod
506  if result:
507  storage_nodes.append(machine_id)
508  return storage_nodes
509 
510 def update_motr_hare_keys(self, nodes):
511  # key = machine_id value = node_info
512  for machine_id in self.storage_nodes:
513  node_info = nodes.get(machine_id)
514  md_disks_lists = get_md_disks_lists(self, node_info)
515  update_to_file(self, self._index_motr_hare, self._url_motr_hare, machine_id, md_disks_lists)
516 
517 def motr_config_k8(self):
518  if not verify_libfabric(self):
519  raise MotrError(errno.EINVAL, "libfabric is not up.")
520 
521  if self.machine_id not in self.storage_nodes:
522  # Modify motr config file
524  return
525 
526  # If setup_size is large i.e.HW, read the (key,val)
527  # from /opt/seagate/cortx/motr/conf/motr.conf and
528  # update to /etc/sysconfig/motr
529  if self.setup_size == "large":
530  cmd = "{} {}".format(MOTR_CONFIG_SCRIPT, " -c")
531  execute_command(self, cmd, verbose = True)
532 
533  update_motr_hare_keys(self, self.nodes)
534  execute_command(self, MOTR_CONFIG_SCRIPT, verbose = True)
535 
536  # Update be_seg size only for storage node
537  update_bseg_size(self)
538 
539  # Modify motr config file
541  return
542 
543 def motr_config(self):
544  # Just to check if lnet is working properly
545  try:
546  transport_type = self.server_node['network']['data']['transport_type']
547  except:
548  raise MotrError(errno.EINVAL, "transport_type not found")
549 
550  check_type(transport_type, str, "transport_type")
551 
552  if transport_type == "lnet":
553  if not verify_lnet(self):
554  raise MotrError(errno.EINVAL, "lent is not up.")
555  elif transport_type == "libfabric":
556  if not verify_libfabric(self):
557  raise MotrError(errno.EINVAL, "libfabric is not up.")
558 
559  is_hw = is_hw_node(self)
560  if is_hw:
561  self.logger.info(f"Executing {MOTR_CONFIG_SCRIPT}")
562  execute_command(self, MOTR_CONFIG_SCRIPT, verbose = True)
563 
564 def configure_net(self):
565  """Wrapper function to detect lnet/libfabric transport."""
566  try:
567  transport_type = Conf.get(self._index, 'cortx>motr>transport_type')
568  except:
569  raise MotrError(errno.EINVAL, "transport_type not found")
570 
571  check_type(transport_type, str, "transport_type")
572 
573  if transport_type == "lnet":
574  configure_lnet(self)
575  elif transport_type == "libfab":
576  configure_libfabric(self)
577  else:
578  raise MotrError(errno.EINVAL, "Unknown data transport type\n")
579 
580 def configure_lnet(self):
581  '''
582  Get iface and /etc/modprobe.d/lnet.conf params from
583  conf store. Configure lnet. Start lnet service
584  '''
585  try:
586  iface = self.server_node['network']['data']['private_interfaces'][0]
587  except:
588  raise MotrError(errno.EINVAL, "private_interfaces[0] not found\n")
589 
590  self.logger.info(f"Validate private_interfaces[0]: {iface}\n")
591  cmd = f"ip addr show {iface}"
592  execute_command(self, cmd)
593 
594  try:
595  iface_type = self.server_node['network']['data']['interface_type']
596  except:
597  raise MotrError(errno.EINVAL, "interface_type not found\n")
598 
599  lnet_config = (f"options lnet networks={iface_type}({iface}) "
600  f"config_on_load=1 lnet_peer_discovery_disabled=1\n")
601  self.logger.info(f"lnet config: {lnet_config}")
602 
603  with open(LNET_CONF_FILE, "w") as fp:
604  fp.write(lnet_config)
605 
606  execute_command(self, "systemctl enable lnet")
607  restart_services(self, ["lnet"])
608  time.sleep(2)
609  # Ping to nid
610  self.logger.info("Doing ping to nids\n")
611  ret = lnet_self_ping(self)
612  if not ret:
613  raise MotrError(errno.EINVAL, "lent self ping failed\n")
614 
616  cmd = "fi_info"
617  execute_command(self, cmd, verbose=True)
618 
620  cmd = "fi_info"
621  execute_command(self, cmd)
622  return True
623 
624 def swap_on(self):
625  cmd = "swapon -a"
626  execute_command(self, cmd)
627 
628 def swap_off(self):
629  cmd = "swapoff -a"
630  execute_command(self, cmd, retries=3)
631 
632 def add_swap_fstab(self, dev_name):
633  '''
634  1. check swap entry found in /etc/fstab
635  2. if found, do nothing
636  3. if not found, add swap entry in /etc/fstab
637  '''
638  swap_entry = f"{dev_name} swap swap defaults 0 0\n"
639  swap_found = False
640  swap_off(self)
641 
642  try:
643  with open(FSTAB, "r") as fp:
644  lines = fp.readlines()
645  for line in lines:
646  ret = line.find(dev_name)
647  if ret == 0:
648  swap_found = True
649  self.logger.info(f"Swap entry found: {swap_entry}\n")
650  except:
651  swap_on(self)
652  raise MotrError(errno.EINVAL, f"Cant read f{FSTAB}\n")
653 
654  try:
655  if not swap_found:
656  with open(FSTAB, "a") as fp:
657  fp.write(swap_entry)
658  self.logger.info(f"Swap entry added: {swap_entry}\n")
659  except:
660  raise MotrError(errno.EINVAL, f"Cant append f{FSTAB}\n")
661  finally:
662  swap_on(self)
663 
664 def del_swap_fstab_by_vg_name(self, vg_name):
665  swap_off(self)
666 
667  cmd = f"sed -i '/{vg_name}/d' {FSTAB}"
668  execute_command(self, cmd)
669 
670  swap_on(self)
671 
672 def create_swap(self, swap_dev):
673  self.logger.info(f"Make swap of {swap_dev}\n")
674  cmd = f"mkswap -f {swap_dev}"
675  execute_command(self, cmd)
676 
677  self.logger.info(f"Test {swap_dev} swap device\n")
678  cmd = f"test -e {swap_dev}"
679  execute_command(self, cmd)
680 
681  self.logger.info(f"Adding {swap_dev} swap device to {FSTAB}\n")
682  add_swap_fstab(self, swap_dev)
683 
684 
685 def create_lvm(self, index, metadata_dev):
686  '''
687  1. validate /etc/fstab
688  2. validate metadata device file
689  3. check requested volume group exist
690  4. if exist, remove volume group and swap related with it.
691  because if user request same volume group with different device.
692  5. If not exist, create volume group and lvm
693  6. create swap from lvm
694  '''
695  try:
696  cmd = f"fdisk -l {metadata_dev}2"
697  execute_command(self, cmd)
698  except MotrError:
699  pass
700  else:
701  metadata_dev = f"{metadata_dev}2"
702 
703  try:
704  cmd = f"pvdisplay {metadata_dev}"
705  out = execute_command(self, cmd)
706  except MotrError:
707  pass
708  else:
709  self.logger.warning(f"Volumes are already created on {metadata_dev}\n{out[0]}\n")
710  return False
711 
712  index = index + 1
713  node_name = self.server_node['name']
714  vg_name = f"vg_{node_name}_md{index}"
715  lv_swap_name = f"lv_main_swap{index}"
716  lv_md_name = f"lv_raw_md{index}"
717  swap_dev = f"/dev/{vg_name}/{lv_swap_name}"
718 
719  self.logger.info(f"metadata device: {metadata_dev}\n")
720 
721  self.logger.info(f"Checking for {FSTAB}\n")
722  validate_file(FSTAB)
723 
724  self.logger.info(f"Checking for {metadata_dev}\n")
725  validate_file(metadata_dev)
726 
727  cmd = f"fdisk -l {metadata_dev}"
728  execute_command(self, cmd)
729 
730  try:
731  cmd = f"vgs {vg_name}"
732  execute_command(self, cmd)
733  except MotrError:
734  pass
735  else:
736  self.logger.info(f"Removing {vg_name} volume group\n")
737 
738  del_swap_fstab_by_vg_name(self, vg_name)
739 
740  cmd = f"vgchange -an {vg_name}"
741  execute_command(self, cmd)
742 
743  cmd = f"vgremove {vg_name} -ff"
744  execute_command(self, cmd)
745 
746  self.logger.info(f"Creating physical volume from {metadata_dev}\n")
747  cmd = f"pvcreate {metadata_dev} --yes"
748  execute_command(self, cmd)
749 
750  self.logger.info(f"Creating {vg_name} volume group from {metadata_dev}\n")
751  cmd = f"vgcreate {vg_name} {metadata_dev}"
752  execute_command(self, cmd)
753 
754  self.logger.info(f"Adding {node_name} tag to {vg_name} volume group\n")
755  cmd = f"vgchange --addtag {node_name} {vg_name}"
756  execute_command(self, cmd)
757 
758  self.logger.info("Scanning volume group\n")
759  cmd = "vgscan --cache"
760  execute_command(self, cmd)
761 
762  self.logger.info(f"Creating {lv_swap_name} lvm from {vg_name}\n")
763  cmd = f"lvcreate -n {lv_swap_name} {vg_name} -l 51%VG --yes"
764  execute_command(self, cmd)
765 
766  self.logger.info(f"Creating {lv_md_name} lvm from {vg_name}\n")
767  cmd = f"lvcreate -n {lv_md_name} {vg_name} -l 100%FREE --yes"
768  execute_command(self, cmd)
769 
770  swap_check_cmd = "free -m | grep Swap | awk '{print $2}'"
771  free_swap_op = execute_command(self, swap_check_cmd)
772  allocated_swap_size_before = int(float(free_swap_op[0].strip(' \n')))
773  create_swap(self, swap_dev)
774  allocated_swap_op = execute_command(self, swap_check_cmd)
775  allocated_swap_size_after = int(float(allocated_swap_op[0].strip(' \n')))
776  if allocated_swap_size_before >= allocated_swap_size_after:
777  raise MotrError(errno.EINVAL, f"swap size before allocation"
778  f"({allocated_swap_size_before}M) must be less than "
779  f"swap size after allocation({allocated_swap_size_after}M)\n")
780  else:
781  self.logger.info(f"swap size before allocation ={allocated_swap_size_before}M\n")
782  self.logger.info(f"swap_size after allocation ={allocated_swap_size_after}M\n")
783  return True
784 
785 def calc_lvm_min_size(self, lv_path, lvm_min_size):
786  cmd = f"lsblk --noheadings --bytes {lv_path} | " "awk '{print $4}'"
787  res = execute_command(self, cmd)
788  lv_size = res[0].rstrip("\n")
789  lv_size = int(lv_size)
790  self.logger.info(f"{lv_path} size = {lv_size} \n")
791  if lvm_min_size is None:
792  lvm_min_size = lv_size
793  return lvm_min_size
794  lvm_min_size = min(lv_size, lvm_min_size)
795  return lvm_min_size
796 
798  try:
799  cvg_cnt = self.server_node['storage'][CVG_COUNT_KEY]
800  except:
801  raise MotrError(errno.EINVAL, "cvg_cnt not found\n")
802 
803  check_type(cvg_cnt, str, CVG_COUNT_KEY)
804 
805  try:
806  cvg = self.server_node['storage']['cvg']
807  except:
808  raise MotrError(errno.EINVAL, "cvg not found\n")
809 
810  # Check if cvg type is list
811  check_type(cvg, list, "cvg")
812 
813  # Check if cvg is non empty
814  if not cvg:
815  raise MotrError(errno.EINVAL, "cvg is empty\n")
816  return cvg_cnt, cvg
817 
819  check_type(storage, list, "storage")
820  for elem in storage:
821  check_type(elem, dict, "storage element")
822  for key, val in elem.items():
823  if key=="name":
824  val_type=str
825  check_type(val, val_type, key)
826  if key=="type":
827  val_type=str
828  check_type(val, val_type, key)
829  if key=="metadata_devices":
830  val_type=list
831  check_type(val, val_type, key)
832  sz = len(val)
833  for i in range(sz):
834  check_type(val[i], str, f"metadata_devices[{i}]")
835  if key=="data_devices":
836  val_type=list
837  check_type(val, val_type, key)
838  sz = len(val)
839  for i in range(sz):
840  check_type(val[i], str, f"data_devices[{i}]")
841 
843  try:
844  cvg = self.storage['cvg']
845  cvg_cnt = len(cvg)
846  except:
847  raise MotrError(errno.EINVAL, "cvg not found\n")
848  # Check if cvg type is list
849  check_type(cvg, list, "cvg")
850  return cvg_cnt, cvg
851 
852 def align_val(val, size):
853  return (int(val/size) * size)
854 
856  dev_count = 0
857  lvm_min_size = None
858 
859  md_disks_list = get_md_disks_lists(self, self.node)
860  md_disks = get_mdisks_from_list(self, md_disks_list)
861  md_len = len(md_disks)
862  for i in range(md_len):
863  lvm_min_size = calc_lvm_min_size(self, md_disks[i], lvm_min_size)
864  if lvm_min_size:
865  align_val(lvm_min_size, ALLIGN_SIZE)
866  self.logger.info(f"setting MOTR_M0D_IOS_BESEG_SIZE to {lvm_min_size}\n")
867  cmd = f'sed -i "/MOTR_M0D_IOS_BESEG_SIZE/s/.*/MOTR_M0D_IOS_BESEG_SIZE={lvm_min_size}/" {MOTR_SYS_CFG}'
868  execute_command(self, cmd)
869  return
870 
871 def config_lvm(self):
872  dev_count = 0
873  lvm_min_size = None
874  lvm_min_size = None
875 
876  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
877  for i in range(int(cvg_cnt)):
878  cvg_item = cvg[i]
879  try:
880  metadata_devices = cvg_item["metadata_devices"]
881  except:
882  raise MotrError(errno.EINVAL, "metadata devices not found\n")
883  check_type(metadata_devices, list, "metadata_devices")
884  self.logger.info(f"\nlvm metadata_devices: {metadata_devices}\n\n")
885 
886  for device in metadata_devices:
887  ret = create_lvm(self, dev_count, device)
888  if ret == False:
889  continue
890  dev_count += 1
891  lv_md_name = f"lv_raw_md{dev_count}"
892  cmd = f"lvs -o lv_path | grep {lv_md_name}"
893  res = execute_command(self, cmd)
894  lv_path = res[0].rstrip("\n")
895  lvm_min_size = calc_lvm_min_size(self, lv_path, lvm_min_size)
896  if lvm_min_size:
897  self.logger.info(f"setting MOTR_M0D_IOS_BESEG_SIZE to {lvm_min_size}\n")
898  cmd = f'sed -i "/MOTR_M0D_IOS_BESEG_SIZE/s/.*/MOTR_M0D_IOS_BESEG_SIZE={lvm_min_size}/" {MOTR_SYS_CFG}'
899  execute_command(self, cmd)
900 
901 def get_lnet_xface() -> str:
902  """Get lnet interface."""
903  lnet_xface = None
904  try:
905  with open(LNET_CONF_FILE, 'r') as f:
906  # Obtain interface name
907  for line in f.readlines():
908  if len(line.strip()) <= 0: continue
909  tokens = re.split(r'\W+', line)
910  if len(tokens) > 4:
911  lnet_xface = tokens[4]
912  break
913  except:
914  raise MotrError(errno.EINVAL, f"Cant parse {LNET_CONF_FILE}")
915 
916  if lnet_xface == None:
917  raise MotrError(errno.EINVAL,
918  f"Cant obtain iface details from {LNET_CONF_FILE}")
919  if lnet_xface not in os.listdir(SYS_CLASS_NET_DIR):
920  raise MotrError(errno.EINVAL,
921  f"Invalid iface {lnet_xface} in lnet.conf")
922  return lnet_xface
923 
924 def check_pkgs(self, pkgs):
925  """Check rpm packages."""
926  for pkg in pkgs:
927  ret = 1
928  cmd = f"rpm -q {pkg}"
929 
930  try:
931  cmd_res = execute_command(self, cmd)
932  ret = cmd_res[1]
933  except MotrError:
934  pass
935 
936  if ret == 0:
937  self.logger.info(f"rpm found: {pkg}\n")
938  else:
939  raise MotrError(errno.ENOENT, f"Missing rpm: {pkg}")
940 
941 def get_nids(self, nodes):
942  """Get lnet nids of all available nodes in cluster."""
943  nids = []
944  myhostname = self.server_node["hostname"]
945 
946  for node in nodes:
947  if (myhostname == node):
948  cmd = "lctl list_nids"
949  else:
950  cmd = (f"ssh {node}"
951  " lctl list_nids")
952 
953  op = execute_command(self, cmd)
954  nids.append(op[0].rstrip("\n"))
955 
956  return nids
957 
958 def get_nodes(self):
959  nodes_info = Conf.get(self._index, 'server_node')
960  nodes= []
961  for value in nodes_info.values():
962  nodes.append(value["hostname"])
963  return nodes
964 
965 def lnet_ping(self):
966  """Lnet lctl ping on all available nodes in cluster."""
967  nodes = get_nodes(self)
968  # nodes is a list of hostnames
969  nids = get_nids(self, nodes)
970  self.logger.info("lnet pinging on all nodes in cluster\n")
971  for nid in nids:
972  cmd = f"lctl ping {nid}"
973  self.logger.info(f"lctl ping on: {nid}\n")
974  execute_command(self, cmd)
975 
976 def test_lnet(self):
977  '''
978  1. check lustre rpm
979  2. validate lnet interface which was configured in init
980  3. ping on lnet interface
981  4. lctl ping on all nodes in cluster. motr_setup post_install and prepare
982  MUST be performed on all nodes before executing this step.
983  '''
984  self.logger.info("post_install and prepare phases MUST be performed "
985  "on all nodes before executing test phase\n")
986  search_lnet_pkgs = ["kmod-lustre-client", "lustre-client"]
987  check_pkgs(self, search_lnet_pkgs)
988 
989  lnet_xface = get_lnet_xface()
990  self.logger.info(f"lnet interface found: {lnet_xface}\n")
991 
992  cmd = f"ip addr show {lnet_xface}"
993  cmd_res = execute_command(self, cmd)
994  ip_addr = cmd_res[0]
995 
996  try:
997  ip_addr = ip_addr.split("inet ")[1].split("/")[0]
998  self.logger.info(f"lnet interface ip: {ip_addr}\n")
999  except:
1000  raise MotrError(errno.EINVAL, f"Cant parse {lnet_xface} ip addr")
1001 
1002  self.logger.info(f"ping on: {ip_addr}\n")
1003  cmd = f"ping -c 3 {ip_addr}"
1004  execute_command(self, cmd)
1005 
1006  lnet_ping(self)
1007 
1008 def test_libfabric(self):
1009  search_libfabric_pkgs = ["libfabric"]
1010  check_pkgs(self, search_libfabric_pkgs)
1011 
1013  dev_count = 0
1014  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
1015  for i in range(int(cvg_cnt)):
1016  cvg_item = cvg[i]
1017  try:
1018  metadata_devices = cvg_item["metadata_devices"]
1019  except:
1020  raise MotrError(errno.EINVAL, "metadata devices not found\n")
1021  check_type(metadata_devices, list, "metadata_devices")
1022  self.logger.info(f"\nlvm metadata_devices: {metadata_devices}\n\n")
1023 
1024  for device in metadata_devices:
1025  dev_count += 1
1026  return dev_count
1027 
1028 def lvm_exist(self):
1029  metadata_disks_count = get_metadata_disks_count(self)
1030  node_name = self.server_node['name']
1031 
1032  # Fetch lvm paths of existing lvm's e.g. /dev/vg_srvnode-1_md1/lv_raw_md1
1033  lv_list = execute_command(self, "lvdisplay | grep \"LV Path\" | awk \'{ print $3 }\'")[0].split('\n')
1034  lv_list = lv_list[0:len(lv_list)-1]
1035 
1036  # Check if motr lvms are already created.
1037  # If all are already created, return
1038  for i in range(1, metadata_disks_count+1):
1039  md_lv_path = f'/dev/vg_{node_name}_md{i}/lv_raw_md{i}'
1040  swap_lv_path = f'/dev/vg_{node_name}_md{i}/lv_main_swap{i}'
1041 
1042  if md_lv_path in lv_list:
1043  if swap_lv_path in lv_list:
1044  continue
1045  else:
1046  self.logger.warning(f"{swap_lv_path} does not exist. Need to create lvm\n")
1047  return False
1048  else:
1049  self.logger.warning(f"{md_lv_path} does not exist. Need to create lvm\n")
1050  return False
1051  return True
1052 
1053 def cluster_up(self):
1054  cmd = '/usr/bin/hctl status'
1055  self.logger.info(f"Executing cmd : '{cmd}'\n")
1056  ret = execute_command_without_exception(self, cmd)
1057  if ret == 0:
1058  return True
1059  else:
1060  return False
1061 
1062 def pkg_installed(self, pkg):
1063  cmd = f'/usr/bin/yum list installed {pkg}'
1064  ret = execute_command_without_exception(self, cmd)
1065  if ret == 0:
1066  self.logger.info(f"{pkg} is installed\n")
1067  return True
1068  else:
1069  self.logger.info(f"{pkg} is not installed\n")
1070  return False
1071 
1072 def test_io(self):
1073  mix_workload_path = f"{MOTR_WORKLOAD_DIR}/mix_workload.yaml"
1074  m0worklaod_path = f"{MOTR_WORKLOAD_DIR}/m0workload"
1075  m0crate_path = f"{MOTR_WORKLOAD_DIR}/m0crate_workload_batch_1_file1.yaml"
1076  if (
1077  os.path.isfile(m0worklaod_path) and
1078  os.path.isfile(mix_workload_path) and
1079  os.path.isfile(m0crate_path)
1080  ):
1081  cmd = f"{m0worklaod_path} -t {mix_workload_path}"
1082  out = execute_command(self, cmd, timeout_secs=1000)
1083  self.logger.info(f"{out[0]}\n")
1084  else:
1085  self.logger.error("workload files are missing\n")
1086 
1087  # Configure motr mini provisioner logger.
1088  # File to log motr mini prov logs: /var/log/seagate/motr/mini_provisioner.
1089  # Currently we log to both console and /var/log/seagate/motr/mini_provisioner.
1090  # Firstly check if /var/log/seagate/motr exist. If not, create it.
1091 
1092 def config_logger(self):
1093  logger = logging.getLogger(LOGGER)
1094  if not os.path.exists(LOGDIR):
1095  try:
1096  os.makedirs(LOGDIR, exist_ok=True)
1097  with open(f'{self.logfile}', 'w'): pass
1098  except:
1099  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
1100  else:
1101  if not os.path.exists(self.logfile):
1102  try:
1103  with open(f'{self.logfile}', 'w'): pass
1104  except:
1105  raise MotrError(errno.EINVAL, f"{self.logfile} creation failed\n")
1106  logger.setLevel(logging.DEBUG)
1107  # create file handler which logs debug message in log file
1108  fh = logging.FileHandler(self.logfile)
1109  fh.setLevel(logging.DEBUG)
1110  # create console handler to log messages ERROR and above
1111  ch = logging.StreamHandler()
1112  ch.setLevel(logging.ERROR)
1113  formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
1114  fh.setFormatter(formatter)
1115  ch.setFormatter(formatter)
1116  logger.addHandler(fh)
1117  logger.addHandler(ch)
1118  return logger
1119 
1120 def remove_dirs(self, log_dir, patterns):
1121  if not os.path.exists(os.path.dirname(log_dir)):
1122  self.logger.warning(f"{log_dir} does not exist")
1123  return
1124 
1125  if len(patterns) == 0:
1126  self.logger.info(f"Removing {log_dir}")
1127  execute_command(self, f"rm -rf {log_dir}")
1128  return
1129 
1130  for pattern in patterns:
1131  removed_dirs = []
1132 
1133  # Search directories for files/dirs with pattern in their names and remove it.
1134  # e.g. removes addb* dirs from /var/motr
1135  # search_pat=/var/motr/addb*
1136 
1137  search_pat = "{}/{}*".format(log_dir, pattern)
1138  for dname in glob.glob(search_pat, recursive=True):
1139  removed_dirs.append(dname)
1140  execute_command(self, f"rm -rf {dname}")
1141  if len(removed_dirs) > 0:
1142  self.logger.info(f"Removed below directories of pattern {pattern} from {log_dir}.\n{removed_dirs}")
1143 
1144 def remove_logs(self, patterns):
1145  for log_dir in MOTR_LOG_DIRS:
1146  if os.path.exists(log_dir):
1147  remove_dirs(self, log_dir, patterns)
1148  else:
1149  self.logger.warning(f"{log_dir} does not exist")
1150  if os.path.exists(IVT_DIR):
1151  self.logger.info(f"Removing {IVT_DIR}")
1152  execute_command(self, f"rm -rf {IVT_DIR}")
1153 
1154 def check_services(self, services):
1155  for service in services:
1156  self.logger.info(f"Checking status of {service} service\n")
1157  cmd = f"systemctl status {service}"
1158  execute_command(self, cmd)
1159  ret = execute_command_without_exception(self, cmd)
1160  if ret != 0:
1161  return False
1162  return True
1163 
1164 def verify_lnet(self):
1165  self.logger.info("Doing ping to nids.\n")
1166  ret = lnet_self_ping(self)
1167  if not ret:
1168  # Check if lnet is up. If lnet is not up, restart lnet and try ping nid.
1169  # Else, ping nid after some delay since lnet is already up.
1170  if not check_services(self, ["lnet"]):
1171  self.logger.info("lnet is not up. Restaring lnet.\n")
1172  restart_services(self, ["lnet"])
1173  self.logger.info("Doing ping to nids after 5 seconds.\n")
1174  else:
1175  self.logger.warning("lnet is up. Doing ping to nids after 5 seconds.\n")
1176  execute_command_without_exception(self, "sleep 5")
1177  ret = lnet_self_ping(self)
1178  return ret
1179 
1180 def lnet_self_ping(self):
1181  nids = []
1182 
1183  op = execute_command(self, "lctl list_nids")
1184  nids.append(op[0].strip("\n"))
1185  self.logger.info(f"nids= {nids}\n")
1186  for nid in nids:
1187  cmd = f"lctl ping {nid}"
1188  self.logger.info(f"lctl ping on: {nid}\n")
1189  ret = execute_command_without_exception(self, cmd)
1190  if ret != 0:
1191  return False
1192  return True
1193 
1195  hostname = self.server_node["hostname"]
1196  nodes_info = Conf.get(self._index, 'server_node')
1197  retry_count = 60
1198  retry_delay = 2
1199  for value in nodes_info.values():
1200  host = value["hostname"]
1201  cvg_count = value["storage"][CVG_COUNT_KEY]
1202  name = value["name"]
1203  self.logger.info(f"update_motr_hare_keys for {host}\n")
1204  for i in range(int(cvg_count)):
1205  lv_path = None
1206  lv_md_name = f"lv_raw_md{i + 1}"
1207 
1208  if (hostname == value["hostname"]):
1209  cmd = ("lvs -o lv_path")
1210  res = execute_command_verbose(self, cmd)
1211  r = re.compile(f".*{lv_md_name}")
1212  try:
1213  lvm_find = list(filter(r.match,res[0].split()))
1214  lv_path = lvm_find[0].strip()
1215  except Exception as e:
1216  self.logger.info(f"exception pass {e}\n")
1217  else:
1218  cmd = (f"ssh {host}"
1219  f" \"lvs -o lv_path\"")
1220  for retry in range(1, retry_count):
1221  self.logger.info(f"Getting LVM data for {host}, attempt: {retry}\n")
1222  res = execute_command_verbose(self, cmd)
1223  r = re.compile(f".*{lv_md_name}")
1224  try:
1225  lvm_find = list(filter(r.match,res[0].split()))
1226  lv_path = lvm_find[0].strip()
1227  except Exception as e:
1228  self.logger.info(f"exception pass {e}\n")
1229  if lv_path:
1230  self.logger.info(f"found lvm {lv_path} after {retry} count")
1231  break
1232  else:
1233  time.sleep(retry_delay)
1234  if not lv_path:
1235  raise MotrError(res[1], f"[ERR] {lv_md_name} not found on {host}\n")
1236  self.logger.info(f"setting key server>{name}>cvg[{i}]>m0d[0]>md_seg1"
1237  f" with value {lv_path} in {self._motr_hare_conf}")
1238  Conf.set(self._index_motr_hare,f"server>{name}>cvg[{i}]>m0d[0]>md_seg1",f"{lv_path.strip()}")
1239  Conf.save(self._index_motr_hare)
1240 
1241  for value in nodes_info.values():
1242  if (hostname == value["hostname"]):
1243  continue
1244  else:
1245  host = value["hostname"]
1246  cmd = (f"scp {self._motr_hare_conf}"
1247  f" {host}:{self._motr_hare_conf}")
1248  execute_command(self, cmd)
1249 
1250 # Get voulme groups created on metadata devices mentioned in config file
1251 def get_vol_grps(self):
1252  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
1253 
1254  vol_grps = []
1255  for i in range(int(cvg_cnt)):
1256  cvg_item = cvg[i]
1257  try:
1258  metadata_devices = cvg_item["metadata_devices"]
1259  except:
1260  raise MotrError(errno.EINVAL, "metadata devices not found\n")
1261  check_type(metadata_devices, list, "metadata_devices")
1262  self.logger.info(f"lvm metadata_devices: {metadata_devices}")
1263 
1264  for device in metadata_devices:
1265  cmd = f"pvs | grep {device} " "| awk '{print $2}'"
1266  ret = execute_command(self, cmd)
1267  if ret[0]:
1268  vol_grps.append(ret[0].strip())
1269  return vol_grps
1270 
1271 def lvm_clean(self):
1272  self.logger.info("Removing cortx lvms")
1273  vol_grps = get_vol_grps(self)
1274  if (len(vol_grps) == 0):
1275  self.logger.info("No cortx volume groups (e.g. vg_srvnode-1_md1) are found \n")
1276  return
1277  self.logger.info(f"Volume groups found: {vol_grps}")
1278  self.logger.info("Executing swapoff -a")
1279  swap_off(self)
1280  self.logger.info(f"Removing cortx LVM entries from {FSTAB}")
1281  execute_command(self, f"sed -i.bak '/vg_srvnode/d' {FSTAB}")
1282  for vg in vol_grps:
1283  cmd = f"pvs|grep {vg} |" "awk '{print $1}'"
1284  pv_names = execute_command(self, cmd)[0].split('\n')[0:-1]
1285  cmd = f"lvs|grep {vg} |" "awk '{print $1}'"
1286  lv_names = execute_command(self, cmd)[0].split('\n')[0:-1]
1287 
1288  # Removing logical volumes
1289  for lv in lv_names:
1290  lv_path = f"/dev/{vg}/{lv}"
1291  self.logger.info(f"Executing lvchange -an {lv_path}")
1292  execute_command(self, f"lvchange -an {lv_path}")
1293  self.logger.info(f"Executing lvremove {lv_path}")
1294  execute_command(self, f"lvremove {lv_path}")
1295  if os.path.exists(lv_path):
1296  self.logger.info("Removing dmsetup entries using cmd "
1297  f"\'dmsetup remove {lv_path}\'")
1298  execute_command(self, f"dmsetup remove {lv_path}")
1299 
1300  # Removing volume groups
1301  self.logger.info(f"Executing vgchange -an {vg}")
1302  execute_command(self, f"vgchange -an {vg}")
1303  self.logger.info(f"Executing vgremove {vg}")
1304  execute_command(self, f"vgremove {vg}")
1305 
1306  # Removing physical volumes
1307  for pv in pv_names:
1308  self.logger.info(f"Executing pvremove {pv}")
1309  execute_command(self, f"pvremove {pv}")
1310  self.logger.info(f"Executing wipefs -a {pv}")
1311  execute_command(self, f"wipefs -a {pv}")
1312 
1313  # In some case, we still have dm entries even though all VG, LV, PV
1314  # are removed. This is observed in hw setups where lvms are not cleaned up
1315  remove_dm_entries(self)
1316 
1318  cmd = "ls -l /dev/vg_srvnode*/* | awk '{print $9}'"
1319  lv_paths = execute_command(self, cmd)[0].split('\n')
1320  for lv_path in lv_paths:
1321  if lv_path == '':
1322  continue
1323  else:
1324  if os.path.exists(lv_path):
1325  self.logger.info(f"dmsetup remove {lv_path}")
1326  execute_command(self, f"dmsetup remove {lv_path}")
1327 
1328 def get_disk_size(self, device):
1329  cmd = f"fdisk -l {device} |" f"grep {device}:" "| awk '{print $5}'"
1330  ret = execute_command(self, cmd)
1331  return ret[0].strip()
1332 
1333 def read_config(file):
1334  fp = open(file, "r")
1335  file_data = fp.read()
1336  config_dict = {}
1337  for line in file_data.splitlines():
1338  if line.startswith('#') or (len(line.strip()) == 0):
1339  continue
1340  entry = line.split('=',1)
1341  config_dict[entry[0]] = entry[1]
1342  return config_dict
1343 
1344 def part_clean(self):
1345  cvg_cnt, cvg = get_cvg_cnt_and_cvg(self)
1346  dev_count = 1
1347  ret = 0
1348  for i in range(int(cvg_cnt)):
1349  cvg_item = cvg[i]
1350  try:
1351  metadata_devices = cvg_item["metadata_devices"]
1352  except:
1353  raise MotrError(errno.EINVAL, "metadata devices not found\n")
1354  check_type(metadata_devices, list, "metadata_devices")
1355  self.logger.info(f"\nlvm metadata_devices: {metadata_devices}\n\n")
1356  for device in metadata_devices:
1357  ret = delete_parts(self, dev_count, device)
1358  #if ret != 0:
1359  # return ret
1360  dev_count += 1
1361  return ret
1362 
1363 # It will give num of partitions + 1.
1364 # To get partition numbers, subract 1 from output
1365 def get_part_count(self, device):
1366  fname = os.path.split(device)
1367  cmd = f"lsblk -o name | grep -c {fname}"
1368  ret = int(execute_command(self, cmd, verbose=True)[0]) - 1
1369  return ret
1370 
1371 def delete_parts(self, dev_count, device):
1372  # Delete 2 partitions be_log, raw_md
1373  total_parts = get_part_count(self, device)
1374  if total_parts == 0:
1375  self.logger.info(f"No partitions found on {device}")
1376  return
1377  self.logger.info(f"No. of partitions={total_parts} on {device}")
1378  for i in range(int(total_parts)):
1379  part_num = i + 1
1380  ret = delete_part(self, device, part_num)
1381  if ret != 0:
1382  self.logger.error(f"Deletion of partition({part_num}) failed on {device}")
1383  return ret
1384  time.sleep(2)
1385 
1386 def delete_part(self, device, part_num):
1387  cmd = f"fdisk {device}"
1388  stdin_str = str("d\n"+f"{part_num}"+"\n" + "w\n")
1389  ret = execute_command(self, cmd, stdin=stdin_str, verbose=True)
1390  return ret[1]
1391 
1392 def get_fid(self, fids, service, idx):
1393  fids_list = []
1394  len_fids_list = len(fids)
1395 
1396  # Prepare list of all fids of matching service
1397  for i in range(len_fids_list):
1398  if fids[i]["name"] == service:
1399  fids_list.append(fids[i]["fid"])
1400 
1401  num_fids = len(fids_list)
1402 
1403  # --idx argument is started from index 1, to read fetch-fids from index 0
1404  idx = int(idx) - 1
1405 
1406  if num_fids > 0:
1407  if idx < num_fids:
1408  return fids_list[idx]
1409  else:
1410  self.logger.error(f"Invalid index({idx}) of service({service})"
1411  f"Valid index should be in range [0-{num_fids-1}]."
1412  "Returning -1.")
1413  return -1
1414  else:
1415  self.logger.error(f"No fids for service({service}). Returning -1.")
1416  return -1
1417 
1418 # Fetch fid of service using command 'hctl fetch-fids'
1419 # First populate a yaml file with the output of command 'hctl fetch-fids'
1420 # Use this yaml file to get proper fid of required service.
1421 def fetch_fid(self, service, idx):
1422  hare_lib_path = f"{self.local_path}/hare/config/{self.machine_id}"
1423  cmd = f"hctl fetch-fids --conf-dir {hare_lib_path}"
1424  out = execute_command(self, cmd)
1425  self.logger.info(f"Available fids:\n{out[0]}\n")
1426  fp = open(TEMP_FID_FILE, "w")
1427  fp.write(out[0])
1428  fp.close()
1429  fp = open(TEMP_FID_FILE, "r")
1430  fids = yaml.safe_load(fp)
1431  if len(fids) == 0:
1432  self.logger.error("No fids returned by 'hctl fetch-fids'. Returning -1.\n")
1433  return -1
1434  fid = get_fid(self, fids, service, idx)
1435  return fid
1436 
1438  '''
1439  Get list of running m0d process
1440  '''
1441  listOfProc = []
1442  # Iterate over the list
1443  for proc in psutil.process_iter():
1444  try:
1445  # Fetch process details as dict
1446  pinfo = proc.as_dict(attrs=['pid', 'name', 'username'])
1447  if pinfo.get('name') == "m0d":
1448  # Append dict to list
1449  listOfProc.append(pinfo);
1450  except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
1451  pass
1452  return listOfProc
1453 
1454 def receiveSigTerm(signalNumber, frame):
1455  for proc in getListOfm0dProcess():
1456  cmd=f"KILL -SIGTERM {proc.get('pid')}"
1458  return
1459 
1460 # If service is one of [ios,confd,hax] then we expect fid to start the service
1461 # and start services using motr-mkfs and motr-server.
1462 # For other services like 'motr-free-space-mon' we do nothing.
1463 def start_service(self, service, idx):
1464  self.logger.info(f"service={service}\nidx={idx}\n")
1465 
1466  if service in ["fsm", "client", "motr_client"]:
1467  cmd = f"{MOTR_FSM_SCRIPT_PATH}"
1468  execute_command_verbose(self, cmd, set_timeout=False)
1469  return
1470 
1471  # Copy confd_path to /etc/sysconfig
1472  # confd_path = MOTR_M0D_CONF_DIR/confd.xc
1473  confd_path = f"{self.local_path}/motr/sysconfig/{self.machine_id}/confd.xc"
1474  create_dirs(self, ["/etc/motr"])
1475 
1476  cmd = f"cp -f {confd_path} /etc/motr/"
1477  execute_command(self, cmd)
1478 
1479  cmd = f"cp -v {self.local_path}/motr/sysconfig/{self.machine_id}/motr /etc/sysconfig/"
1480  execute_command(self, cmd)
1481 
1482  fid = fetch_fid(self, service, idx)
1483  if fid == -1:
1484  return -1
1485  #Run log rotate in background to avoid delay in startup
1486  cmd = "/opt/seagate/cortx/motr/libexec/m0trace_logrotate.sh &"
1487  execute_command(self, cmd)
1488  cmd = "/opt/seagate/cortx/motr/libexec/m0addb_logrotate.sh &"
1489  execute_command(self, cmd)
1490 
1491  #Start motr services
1492  cmd = f"{MOTR_SERVER_SCRIPT_PATH} m0d-{fid}"
1493  execute_command_console(self, cmd)
1494  return
def receiveSigTerm(signalNumber, frame)
def add_swap_fstab(self, dev_name)
def create_lvm(self, index, metadata_dev)
static void split(m0_bindex_t offset, int nr, bool commit)
Definition: extmap.c:230
static struct m0_list list
Definition: list.c:144
def update_config_file(self, fname, kv_list)
def config_lvm(self)
def get_storage(self)
def motr_config(self)
def verify_libfabric(self)
def get_disk_size(self, device)
def motr_config_k8(self)
def check_type(var, vtype, msg)
def get_fid(self, fids, service, idx)
static int error
Definition: mdstore.c:64
def update_copy_motr_config_file(self)
def execute_command_console(self, command)
def update_bseg_size(self)
static void decode(struct m0_xcode_obj *obj)
Definition: xcode.c:487
def calc_lvm_min_size(self, lv_path, lvm_min_size)
def get_md_disks_lists(self, node_info)
def execute_command_without_exception(self, cmd, timeout_secs=TIMEOUT_SECS, retries=1)
def test_libfabric(self)
def configure_machine_id(self, phase)
def restart_services(self, services)
def calc_size(self, sz)
Definition: filter.py:1
def validate_storage_schema(storage)
def test_lnet(self)
def delete_parts(self, dev_count, device)
def check_pkgs(self, pkgs)
def execute_command_verbose(self, cmd, timeout_secs=TIMEOUT_SECS, verbose=False, set_timeout=True, retry_count=CMD_RETRY_COUNT)
def lnet_ping(self)
def delete_part(self, device, part_num)
def validate_motr_rpm(self)
def validate_file(file)
def verify_lnet(self)
def configure_net(self)
def create_dirs(self, dirs)
def get_cvg_cnt_and_cvg(self)
def __init__(self, rc, message, args)
def align_val(val, size)
def create_swap(self, swap_dev)
def remove_logs(self, patterns)
def config_logger(self)
def get_metadata_disks_count(self)
def set_setup_size(self, service)
def configure_lnet(self)
def validate_files(files)
def part_clean(self)
def lvm_exist(self)
def cluster_up(self)
def get_part_count(self, device)
def get_vol_grps(self)
def remove_dirs(self, log_dir, patterns)
def get_nids(self, nodes)
def fetch_fid(self, service, idx)
format
Definition: hist.py:128
def read_config(file)
def update_motr_hare_keys_for_all_nodes(self)
static long long min(long long a, long long b)
Definition: crate.c:191
def execute_command(self, cmd, timeout_secs=TIMEOUT_SECS, verbose=False, retries=1, stdin=None, logging=True)
def del_swap_fstab_by_vg_name(self, vg_name)
def lvm_clean(self)
def swap_off(self)
def execute_command_without_log(cmd, timeout_secs=TIMEOUT_SECS, verbose=False, retries=1, stdin=None, logging=False)
def pkg_installed(self, pkg)
def get_data_nodes(self)
def get_server_node(self)
def update_to_file(self, index, url, machine_id, md_disks)
def lnet_self_ping(self)
def get_value(self, key, key_type)
def configure_libfabric(self)
def get_cvg_cnt_and_cvg_k8(self)
def get_nodes(self)
def start_service(self, service, idx)
def get_mdisks_from_list(self, md_lists)
def remove_dm_entries(self)
def swap_on(self)
def check_services(self, services)
def is_hw_node(self)
def get_logical_node_class(self)
def update_motr_hare_keys(self, nodes)