Motr  M0
addb2db.py
Go to the documentation of this file.
1 #!/usr/bin/env python3
2 #
3 # Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17 # For any questions about this software or licensing,
18 # please email opensource@seagate.com or cortx-questions@seagate.com.
19 #
20 
21 # METADATA PATH DB SCHEMA DIAGRAM
22 # ===============================
23 # |-----------------------------------CLIENT-SIDE------------------------------------|-----------------------SERVER-SIDE----------------------|
24 #
25 # (rpc_to_sxid)
26 # | ^
27 # V |
28 # (sxid_to_rpc)
29 # client_to_dix dix_to_mdix dix_to_cas cas_to_rpc | fom_to_tx
30 # | | (meta_dix) | | | |
31 # client_req:id --> dix_req:id --> dix_req:id -----> cas_req:id --> rpc_req:id ------------> fom_desc:{rpc_sm_id, fom_sm_id} -----> be_tx:id
32 # \ \...
33 # \ +-----------> cas_req:id --> rpc_req:id ------------> fom_desc:{rpc_sm_id, fom_sm_id} -----> be_tx:id
34 # \
35 # +---------------------------> cas_req:id --> rpc_req:id ------------> fom_desc:{rpc_sm_id, fom_sm_id} -----> be_tx:id
36 # \ ...
37 # \
38 # +---------------------------> cas_req:id --> rpc_req:id ------------> fom_desc:{rpc_sm_id, fom_sm_id} -----> be_tx:id
39 #
40 # I/O PATH DB SCHEMA DIAGRAM
41 # ==========================
42 # (rpc_to_sxid)
43 # | ^
44 # V |
45 # (sxid_to_rpc)
46 # client_to_ioo ioo_to_rpc | fom_to_tx
47 # | | | |
48 # client_req:id ------------------> ioo_req:id -----------------> rpc_req:id --------------> fom_desc:{rpc_sm_id, fom_sm_id} ------> be_tx:id
49 # \ ...
50 # \ client_to_cob cob_to_rpc | fom_to_tx
51 # \ | | | |
52 # +-----------------> cob_req:id ------------------> rpc_req:id --------------> fom_desc:{rpc_sm_id, fom_sm_id} ------> be_tx:id
53 # \
54 # +--> bulk_req:id
55 
56 import argparse
57 import logging
58 import yaml
59 import numpy
60 import time
61 from peewee import *
62 from typing import List
63 import multiprocessing
64 from itertools import zip_longest
65 from collections import defaultdict
66 from tqdm import tqdm
67 from plumbum.cmd import wc
68 from math import ceil
69 import sys
70 from functools import partial
71 from os.path import basename, splitext
72 import re
73 from dateutil.parser import parse as dtparse
74 from datetime import datetime
75 
76 
77 DB = SqliteDatabase(None)
78 BLOCK = 16<<10
79 PROC_NR = 48
80 DBBATCH = 95
81 PID = 0
82 
83 def die(what: str):
84  print(what, file=sys.stderr)
85  sys.exit(1)
86 
87 # ======================================================================
88 
90  class Meta:
91  database = DB
92 
94  pid = IntegerField()
95  fom_id = IntegerField()
96  tx_id = IntegerField()
97 
99  pid = IntegerField()
100  fom_id = IntegerField()
101  crow_fom_id = IntegerField()
102 
104  time = IntegerField()
105  pid = IntegerField()
106  service = TextField()
107  sender = TextField()
108  req_opcode = TextField()
109  rep_opcode = TextField()
110  local = TextField()
111  rpc_sm_id = IntegerField()
112  fom_sm_id = IntegerField()
113  fom_state_sm_id = IntegerField()
114 
116  time = IntegerField()
117  pid = IntegerField()
118  opcode = IntegerField()
119  xid = IntegerField()
120  session_id = IntegerField()
121  id = IntegerField()
122 
124  time = IntegerField()
125  pid = IntegerField()
126  opcode = IntegerField()
127  xid = IntegerField()
128  session_id = IntegerField()
129  id = IntegerField()
130 
132  time = IntegerField()
133  pid = IntegerField()
134  id = IntegerField()
135  state = TextField()
136 
138  time = IntegerField()
139  pid = IntegerField()
140  id = IntegerField()
141  state = TextField()
142 
144  pid = IntegerField()
145  tx_id = IntegerField()
146  gr_id = IntegerField()
147 
149  time = IntegerField()
150  pid = IntegerField()
151  id = IntegerField()
152  state = TextField()
153 
155  time = IntegerField()
156  pid = IntegerField()
157  id = IntegerField()
158  state = TextField()
159 
161  time = IntegerField()
162  pid = IntegerField()
163  id = IntegerField()
164  state = TextField()
165 
167  time = IntegerField()
168  pid = IntegerField()
169  id = IntegerField()
170  state = TextField()
171 
173  time = IntegerField()
174  pid = IntegerField()
175  id = IntegerField()
176  state = TextField()
177 
179  pid = IntegerField()
180  cas_id = IntegerField()
181  rpc_id = IntegerField()
182 
184  pid = IntegerField()
185  dix_id = IntegerField()
186  cas_id = IntegerField()
187 
189  pid = IntegerField()
190  dix_id = IntegerField()
191  mdix_id = IntegerField()
192 
194  pid = IntegerField()
195  client_id = IntegerField()
196  dix_id = IntegerField()
197 
199  pid = IntegerField()
200  client_id = IntegerField()
201  cob_id = IntegerField()
202 
204  pid = IntegerField()
205  cob_id = IntegerField()
206  rpc_id = IntegerField()
207 
209  pid = IntegerField()
210  client_id = IntegerField()
211  ioo_id = IntegerField()
212 
214  pid = IntegerField()
215  ioo_id = IntegerField()
216  rpc_id = IntegerField()
217 
219  time = IntegerField()
220  pid = IntegerField()
221  id = IntegerField()
222  state = TextField()
223 
225  time = IntegerField()
226  pid = IntegerField()
227  id = IntegerField()
228  state = TextField()
229 
231  pid = IntegerField()
232  fom_id = IntegerField()
233  stio_id = IntegerField()
234 
236  time = IntegerField()
237  pid = IntegerField()
238  id = IntegerField()
239  state = TextField()
240 
242  entity_id = IntegerField()
243  pid = IntegerField()
244  name = TextField()
245  val = TextField()
246 
248  bulk_id = IntegerField()
249  rpc_id = IntegerField()
250  pid = IntegerField()
251 
253  pid = IntegerField()
254  type = TextField()
255  locality = IntegerField()
256  time = IntegerField()
257  nr = IntegerField()
258  min = IntegerField()
259  max = IntegerField()
260  avg = FloatField()
261  dev = FloatField()
262 
264  pid = IntegerField()
265  s3_request_id = IntegerField()
266  client_id = IntegerField()
267 
269  pid = IntegerField()
270  id = IntegerField()
271  uuid = TextField()
272 
274  time = IntegerField()
275  pid = IntegerField()
276  id = IntegerField()
277  state = TextField()
278 
280  time = IntegerField()
281  pid = IntegerField()
282  name = TextField()
283  val1 = IntegerField(null=False)
284  val2 = IntegerField(null=True)
285  val3 = IntegerField(null=True)
286  val4 = IntegerField(null=True)
287  val5 = IntegerField(null=True)
288  val6 = IntegerField(null=True)
289  val7 = IntegerField(null=True)
290  val8 = IntegerField(null=True)
291  val9 = IntegerField(null=True)
292  val10 = IntegerField(null=True)
293  val11 = IntegerField(null=True)
294  val12 = IntegerField(null=True)
295  val13 = IntegerField(null=True)
296 
297 db_create_delete_tables = [client_to_dix, dix_to_mdix, dix_to_cas, cas_to_rpc,
298  cas_req, dix_req, client_req, rpc_req, rpc_to_sxid,
299  sxid_to_rpc, fom_desc, fom_to_tx, fom_req, be_tx,
300  client_to_cob, cob_to_rpc, client_to_ioo, ioo_to_rpc,
301  cob_req, ioo_req, fom_req_state, queues, tx_to_gr,
302  fom_to_stio, stio_req, attr, bulk_to_rpc,
303  cas_fom_to_crow_fom, s3_request_to_client,
304  s3_request_uid, s3_request_state, s3_measurement]
305 
307  with DB:
308  DB.create_tables(db_create_delete_tables)
309 
311  with DB:
312  DB.drop_tables(db_create_delete_tables)
313 
314 def db_init(path):
315  DB.init(path, pragmas={
316  'journal_mode': 'off',
317  'cache_size': -1024*1024*256,
318  'synchronous': 'off',
319  })
320 
322  DB.connect()
323 
324 def db_close():
325  DB.close()
326 
327 # ======================================================================
328 
329 class profiler:
330  def __init__(self, what):
331  self.what = what
332 
333  def __exit__(self, exp_type, exp_val, traceback):
334  delta = time.time() - self.start
335  logging.info(f"{self.what}: {delta}s")
336  def __enter__(self):
337  self.start = time.time()
338 
339 # ======================================================================
340 
341 class ADDB2PP:
342  @staticmethod
343  def clean_yaml(yml):
344  return yml.translate(str.maketrans("><-","''_"))
345 
346  @staticmethod
347  def to_unix(motr_time):
348  mt = list(motr_time)
349  mt[10] = 'T'
350  np_time = numpy.datetime64("".join(mt))
351  return np_time.item()
352 
353  # ['*', '2019-09-18-19:08:50.975943665', 'fom-phase',
354  # 'sm_id:', '38', '-->', 'HA_LINK_OUTGOING_STATE_WAIT_REPLY']
355  def p_sm_req(measurement, labels, table):
356  name = measurement[2]
357  time = measurement[1]
358  state = measurement[-1]
359  sm_id = measurement[4]
360  return((table, { 'time': ADDB2PP.to_unix(time), 'state': state, 'id': int(sm_id) }))
361 
362  # ['*', '2019-08-29-12:16:54.279414683',
363  # 'client-to-dix', 'client_id:', '1170,', 'dix_id:', '1171']
364  def p_1_to_2(measurement, labels, table):
365  ret = yaml.safe_load("{"+" ".join(measurement[3:])+"}")
366  return((table, ret))
367 
368  # ['*', '2019-08-29-12:08:23.766071289', 'fom-descr', 'service:', '<0:0>,', 'sender:', '0,', 'req-opcode:', 'none,', 'rep-opcode:', 'none,', 'local:', 'false,', 'rpc_sm_id:', '0,', 'fom_sm_id:', '0']
369  # ['*', '2019-08-29-12:16:48.097420953', 'rpc-item-id-assign', 'id:', '19,', 'opcode:', '117,', 'xid:', '1,', 'session_id:', '98789222400000038']
370  # [* 2020-03-03-21:55:21.632535498 stio-req-state stio_id: 1345, stio_state: M0_AVI_LIO_ENDIO]
371  # [* 2020-03-03-21:55:19.141584520 s3-request-state s3_request_id: 3, state: START]
372  # [* 2019-09-07-09:57:43.936545770 cob-req-state cob_id: 1310, cob_state: 2]
373  # def p_cob_req(measurement, labels, table):
374  # def p_stio_req(measurement, mnl, param):
375  # def p_rpc_item_id(measurement, labels, table):
376  # def p_yaml_req(measurement, labels, table):
377  def p_yaml_translate(translate_dict, measurement, labels, table):
378  # cob_req: {id: cob_id, state: cob_state}
379  # stio_req: {id: stio_id, state: stio_state}
380  # s3_req: {id: s3_request_id}
381  # rpc_item_id: {}
382  # yaml_req: {}
383  name = measurement[2]
384  time = measurement[1]
385  # XXX: This is a hot fix. Sergey, Dmitry please find out better solution.
386  # XXX: test case: {'id': 19, 'opcode': 33, 'xid': '_1', 'session_id': 0, 'time': 1586878694440410275, 'pid': 30},
387  # XXX: xid = "_1" should be "-1"
388  clean = (lambda x: x) if name in ["rpc-item-id-fetch",
389  "rpc-item-id-assign"] else ADDB2PP.clean_yaml
390  for i,m in enumerate(measurement[3:]) :
391  measurement[i+3] = m.replace("::", "_")
392  ret = yaml.safe_load(clean("{"+" ".join(measurement[3:])+"}"))
393  ret['time'] = ADDB2PP.to_unix(time)
394  for k,v in translate_dict.items():
395  ret[k] = ret.pop(v)
396  return((table, ret))
397 
398  # [* 2019-11-01-20:27:37.467306782 wail nr: 992 min: 1 max: 4 avg: 2.719758 dev: 0.461787]
399  # [.. | .. locality 0]
400  def p_queue(measurement, labels, table):
401  name = measurement[2]
402  time = measurement[1]
403  stat = measurement[3:13]
404  ret = dict(zip([s[:-1] for s in stat[::2]], stat[1::2]))
405  ret['time'] = ADDB2PP.to_unix(time)
406  ret['type'] = name
407  ret.update({"locality":
408  labels.get("locality") or
409  labels.get("stob-ioq-thread") or
410  die(f" {measurement} / {labels} : Label not found!")})
411  return((table, ret))
412 
413  # ['*'
414  # '2019-11-21-11:32:38.717028449',
415  # 'attr',
416  # 'entity_id:', '1150,', 'M0_AVI_ATTR__RPC_OPCODE:', 'M0_IOSERVICE_READV_OPCODE']
417  def p_attr(measurement, labels, table):
418  name = measurement[2]
419  entity_id = measurement[4][:-1]
420  attr_name = measurement[5][:-1]
421  attr_val = str(measurement[6])
422  ret = { 'entity_id': entity_id, 'name': attr_name, 'val': attr_val }
423  return((table, ret))
424 
425  # ['*',
426  # '2020-01-26-17:14:57.134583699'
427  # 's3-request-uid'
428  # 's3_request_id:'
429  # '3,',
430  # 'uid_first_64_bits:'
431  # '0x9d4251f41ddb76f0,',
432  # 'uid_last_64_bits:',
433  # '0xbe11ec28e6e52a80']
434  # uid form: f076db1d-f451-429d-802a-e5e628ec11be
435  def s3req_uid(measurement, labels, table):
436  def s3req_bytes_swap(hexstr):
437  t = int(hexstr.strip(" ,"), 16)
438  t = f"{t:016x}"
439  s = []
440  for i in range(int(len(t) / 2)):
441  s.append(int(t[2*i:2*i+2], 16))
442  s.reverse()
443  return "".join(map(lambda a: f"{a:02x}", s))
444 
445  ret = {}
446  ret['id'] = int(measurement[4][:-1])
447  first = s3req_bytes_swap(measurement[6])
448  last = s3req_bytes_swap(measurement[8])
449  ret['uuid'] = f"{first[:8]}-{first[8:12]}-{first[12:16]}-{last[:4]}-{last[4:]}"
450  return((table, ret))
451 
452  # ['*', '2020-05-20-01:41:07.988231709', 's3-measurement', 'TRACE_POINT,', '1']
453  # ['*', '2020-05-20-01:41:08.060563989', 's3-measurement', 'TRACE_POINT_2,', '2,', '3']
454  def p_s3_msrm(measurement, labels, table):
455  time = measurement[1]
456  msrm = measurement[3].strip(',')
457  ret = dict(zip_longest(
458  [f"val{i}" for i in range(1,14)],
459  map(lambda x: int(x.strip(',')), measurement[4:])))
460  ret["time"] = ADDB2PP.to_unix(time)
461  ret["name"] = msrm
462  return((table, ret))
463 
464  def __init__(self):
465  self.parsers = {
466  "runq" : (ADDB2PP.p_queue, "queues"),
467  "wail" : (ADDB2PP.p_queue, "queues"),
468  "fom-active" : (ADDB2PP.p_queue, "queues"),
469  "loc-forq-hist" : (ADDB2PP.p_queue, "queues"),
470  "loc-wait-hist" : (ADDB2PP.p_queue, "queues"),
471  "loc-cb-hist" : (ADDB2PP.p_queue, "queues"),
472  "loc-queue-hist" : (ADDB2PP.p_queue, "queues"),
473  "stob-ioq-inflight" : (ADDB2PP.p_queue, "queues"),
474  "stob-ioq-queued" : (ADDB2PP.p_queue, "queues"),
475  "stob-ioq-got" : (ADDB2PP.p_queue, "queues"),
476  "rpc-item-id-fetch" : (partial(ADDB2PP.p_yaml_translate, {}), "sxid_to_rpc"),
477  "fom-descr" : (partial(ADDB2PP.p_yaml_translate, {}), "fom_desc"),
478  "tx-state" : (ADDB2PP.p_sm_req, "be_tx"),
479  "fom-phase" : (ADDB2PP.p_sm_req, "fom_req"),
480  "fom-state" : (ADDB2PP.p_sm_req, "fom_req_state"),
481  "fom-to-tx" : (ADDB2PP.p_1_to_2, "fom_to_tx"),
482  "tx-to-gr" : (ADDB2PP.p_1_to_2, "tx_to_gr"),
483  "cas-to-rpc" : (ADDB2PP.p_1_to_2, "cas_to_rpc"),
484  "dix-to-cas" : (ADDB2PP.p_1_to_2, "dix_to_cas"),
485  "dix-to-mdix" : (ADDB2PP.p_1_to_2, "dix_to_mdix"),
486  "client-to-dix" : (ADDB2PP.p_1_to_2, "client_to_dix"),
487  "rpc-item-id-assign": (partial(ADDB2PP.p_yaml_translate, {}), "rpc_to_sxid"),
488  "rpc-out-phase" : (ADDB2PP.p_sm_req, "rpc_req"),
489  "rpc-in-phase" : (ADDB2PP.p_sm_req, "rpc_req"),
490  "cas-req-state" : (ADDB2PP.p_sm_req, "cas_req"),
491  "dix-req-state" : (ADDB2PP.p_sm_req, "dix_req"),
492  "op-state" : (ADDB2PP.p_sm_req, "client_req"),
493  "client-to-cob" : (ADDB2PP.p_1_to_2, "client_to_cob"),
494  "cob-to-rpc" : (ADDB2PP.p_1_to_2, "cob_to_rpc"),
495  "client-to-ioo" : (ADDB2PP.p_1_to_2, "client_to_ioo"),
496  "ioo-to-rpc" : (ADDB2PP.p_1_to_2, "ioo_to_rpc"),
497  "ioo-req-state" : (ADDB2PP.p_sm_req, "ioo_req"),
498  "cob-req-state" : (partial(ADDB2PP.p_yaml_translate, {"id": "cob_id", "state": "cob_state"}), "cob_req"),
499  "stio-req-state" : (partial(ADDB2PP.p_yaml_translate, {"id": "stio_id", "state": "stio_state"}), "stio_req"),
500  "fom-to-stio" : (ADDB2PP.p_1_to_2, "fom_to_stio"),
501  "attr" : (ADDB2PP.p_attr, "attr"),
502  "bulk-to-rpc" : (ADDB2PP.p_1_to_2, "bulk_to_rpc"),
503  "cas-fom-to-crow-fom" : (ADDB2PP.p_1_to_2, "cas_fom_to_crow_fom"),
504 
505  "s3-request-to-client" : (ADDB2PP.p_1_to_2, "s3_request_to_client"),
506  "s3-request-uid" : (ADDB2PP.s3req_uid, "s3_request_uid"),
507  "s3-request-state" : (partial(ADDB2PP.p_yaml_translate, {"id": "s3_request_id"}), "s3_request_state"),
508  "s3-measurement" : (ADDB2PP.p_s3_msrm, "s3_measurement"),
509  }
510 
511  def consume_record(self, rec):
512  def _add_pid(_,ret):
513  ret.update({"pid": PID})
514  return ((_,ret))
515 
516  # measurement[0] and labels[1..] (mnl)
517  mnl = rec.split("|")
518  measurement = mnl[0].split()
519  if measurement == []:
520  return
521  measurement_name = measurement[2]
522 
523  labels=dict([kvf for kvf in [kv.strip().split() for kv in mnl[1:]]
524  if kvf and len(kvf)==2])
525 
526  for pname, (parser, table) in self.parsers.items():
527  if pname == measurement_name:
528  return _add_pid(*parser(measurement, labels, table))
529  return None
530 
531 APP = ADDB2PP()
533  return APP.consume_record(rec) if rec else None
534 
535 def fd_consume_data(file, pool):
536  def grouper(n, iterable, padvalue=None):
537  return zip_longest(*[iter(iterable)]*n,
538  fillvalue=padvalue)
539  results=[]
540  _wc = int(wc["-l", file]().split()[0])
541  _wc = ceil(_wc/BLOCK)*BLOCK
542 
543  with tqdm(total=_wc, desc=f"Read file: {file}") as t:
544  with open(file) as fd:
545  for chunk in grouper(BLOCK, fd):
546  results.extend(pool.map(fd_consume_record, chunk))
547  t.update(BLOCK)
548 
549  return results
550 
551 def fd_id_get(f):
552  f_name = splitext((basename(f)))[0]
553  fid = f_name.split("_")[-1]
554  return int(fid) if fid.isnumeric() else int(fid, base=16)
555 
556 def db_consume_data(files: List[str]):
557  rows = []
558  tables = defaultdict(list)
559 
560  if len(files) == 0:
561  return
562 
565 
566  with profiler(f"Read files: {files}"):
567  for f in files:
568  def pool_init(pid):
569  global PID; PID=pid
570  # Ugly reinitialisation of the pool due to PID value propagation
571  pool = multiprocessing.Pool(PROC_NR, pool_init, (fd_id_get(f),))
572  rows.extend(filter(None, fd_consume_data(f, pool)))
573  for k,v in rows:
574  tables[k].append(v)
575 
576  with tqdm(total=len(rows), desc="Insert records") as t:
577  with profiler("Write to db"):
578  for k in tables.keys():
579  batching = DBBATCH
580  while batching >= 0:
581  if batching == 0:
582  raise Exception("Cannot insert records with zero dbbatch size")
583  with DB.atomic() as dbatomic:
584  with tqdm(total=len(tables[k]), desc=f"into {k}") as tbl:
585  with profiler(f" {k}/{len(tables[k])}"):
586  try:
587  for batch in chunked(tables[k], batching):
588  globals()[k].insert_many(batch).execute()
589  tbl.update(len(batch))
590  break
591  except OperationalError as ex:
592  if "too many" in str(ex):
593  logging.warning(f"insert recs int {k} err {ex}")
594  dbatomic.rollback()
595  batching = batching // 2
596  else:
597  raise ex
598  t.update(len(tables[k]))
599 
601  format='%(asctime)s %(name)s %(levelname)s %(message)s'
602  level=logging.INFO
603  level_sh=logging.WARN
604  logging.basicConfig(filename='logfile.txt',
605  filemode='w',
606  level=level,
607  format=format)
608 
609  sh = logging.StreamHandler()
610  sh.setFormatter(logging.Formatter(format))
611  sh.setLevel(level_sh)
612  log = logging.getLogger()
613  log.addHandler(sh)
614 
615 
616 auth_srv_format = re.compile(r"\A([0-9\- :,]{23,29})\s+\w+\s+\[ReqId:([0-9a-fA-F\-]{36})\]\s+(\S+).*")
617 
619  mm = auth_srv_format.match(res)
620  ret = {}
621  if mm:
622  dt = dtparse(mm.group(1))
623  tdelt = int((dt - datetime.utcfromtimestamp(0)).total_seconds())
624  ret["time"] = tdelt * 1000000000 + dt.microsecond * 1000
625  ret["id"] = mm.group(2)
626  ret["state"] = mm.group(3)
627  return ret
628 
629 def parse_app_data(file, pool):
630  results={}
631  cont = []
632 
633  try:
634  with open(file) as f:
635  cont = f.readlines()
636  except:
637  print(f"Error app reading file {file}")
638  return results
639 
640  blk = min(BLOCK, len(cont) // PROC_NR)
641 
642  with tqdm(total=len(cont), desc=f"Process app file: {file}") as t:
643  for r in pool.imap_unordered(parse_app_record, cont, blk):
644  if "id" in r:
645  results.setdefault(r["id"], []).append(r)
646  t.update(1)
647 
648  return results
649 
650 def parse_app_logs(applogs: List[str], pool):
651  states = {}
652 
653  with profiler(f"Read application files: {applogs}"):
654  for app in applogs:
655  states.update(parse_app_data(app, pool))
656 
657  req_states = []
658  for req in s3_request_uid.select():
659  if req.uuid in states:
660  for st in states[req.uuid]:
661  req_states.append(
662  {"time":st["time"],
663  "pid":req.pid,
664  "id":req.id,
665  "state": st["state"]})
666 
667  with profiler("Write app to db"):
668  batching = DBBATCH
669  while batching >= 0:
670  if batching == 0:
671  raise Exception("Cannot insert app records with zero dbbatch size")
672  with DB.atomic() as dbatomic:
673  try:
674  with tqdm(total=len(req_states), desc="Insert app records") as t:
675  for batch in chunked(req_states, batching):
676  s3_request_state.insert_many(batch).execute()
677  t.update(len(batch))
678  break
679  except OperationalError as ex:
680  if "too many" in str(ex):
681  logging.warning(f"app insert recs err {ex}")
682  dbatomic.rollback()
683  batching = batching // 2
684  else:
685  raise ex
686 
688  parser = argparse.ArgumentParser(description="""
689 addb2db.py: creates sql database containing performance samples
690  """)
691  parser.add_argument('--dumps', nargs='+', type=str, required=False,
692  default=[],
693  help="""
694 A bunch of addb2dump.txts can be passed here for processing:
695 python3 addb2db.py --dumps dump1.txt dump2.txt ...
696 """)
697  parser.add_argument('--db', type=str, required=False,
698  default="m0play.db",
699  help="Output database file")
700  parser.add_argument('--procs', type=int, required=False,
701  default=PROC_NR,
702  help="Number of processes to parse dump files")
703  parser.add_argument('--block', type=int, required=False,
704  default=BLOCK,
705  help="Block of data from dump files processed at once")
706  parser.add_argument('--batch', type=int, required=False,
707  default=DBBATCH,
708  help="Number of samples commited at once")
709  parser.add_argument('--app', nargs='+', type=str, required=False,
710  default=[],
711  help="Application logs path, e.g. AuthServer")
712 
713  return parser.parse_args()
714 
715 if __name__ == '__main__':
717  BLOCK=args.block
718  PROC_NR=args.procs
719  DBBATCH=args.batch
720 
721  db_init(args.db)
723  db_connect()
724 
725  db_consume_data(args.dumps)
726 
727  with multiprocessing.Pool(PROC_NR) as gpool:
728  parse_app_logs(args.app, gpool)
729 
730  db_close()
def parse_app_logs
Definition: addb2db.py:650
def __exit__(self, exp_type, exp_val, traceback)
Definition: addb2db.py:333
def db_setup_loggers()
Definition: addb2db.py:600
static void split(m0_bindex_t offset, int nr, bool commit)
Definition: extmap.c:230
static struct m0_list list
Definition: list.c:144
def p_attr(measurement, labels, table)
Definition: addb2db.py:417
def __init__(self, what)
Definition: addb2db.py:330
def p_sm_req(measurement, labels, table)
Definition: addb2db.py:355
def parse_app_record(res)
Definition: addb2db.py:618
def clean_yaml(yml)
Definition: addb2db.py:343
def db_create_tables()
Definition: addb2db.py:306
def db_init(path)
Definition: addb2db.py:314
def p_queue(measurement, labels, table)
Definition: addb2db.py:400
def db_connect()
Definition: addb2db.py:321
Definition: filter.py:1
static M0_UNUSED void print(struct m0_be_list *list)
Definition: list.c:186
def fd_id_get(f)
Definition: addb2db.py:551
def __init__(self)
Definition: addb2db.py:464
def consume_record(self, rec)
Definition: addb2db.py:511
parser
Definition: queues.py:206
def p_1_to_2(measurement, labels, table)
Definition: addb2db.py:364
def to_unix(motr_time)
Definition: addb2db.py:347
def __enter__(self)
Definition: addb2db.py:336
def db_drop_tables()
Definition: addb2db.py:310
def die
Definition: addb2db.py:83
def p_s3_msrm(measurement, labels, table)
Definition: addb2db.py:454
static long long min(long long a, long long b)
Definition: crate.c:191
def fd_consume_record(rec)
Definition: addb2db.py:532
def db_close()
Definition: addb2db.py:324
def parse_app_data(file, pool)
Definition: addb2db.py:629
def p_yaml_translate(translate_dict, measurement, labels, table)
Definition: addb2db.py:377
def fd_consume_data(file, pool)
Definition: addb2db.py:535
def s3req_uid(measurement, labels, table)
Definition: addb2db.py:435
def db_consume_data
Definition: addb2db.py:556
def db_parse_args()
Definition: addb2db.py:687