62 from typing
import List
63 import multiprocessing
64 from itertools
import zip_longest
65 from collections
import defaultdict
67 from plumbum.cmd
import wc
70 from functools
import partial
71 from os.path
import basename, splitext
73 from dateutil.parser
import parse
as dtparse
74 from datetime
import datetime
77 DB = SqliteDatabase(
None)
84 print(what, file=sys.stderr)
95 fom_id = IntegerField()
96 tx_id = IntegerField()
100 fom_id = IntegerField()
101 crow_fom_id = IntegerField()
104 time = IntegerField()
106 service = TextField()
108 req_opcode = TextField()
109 rep_opcode = TextField()
111 rpc_sm_id = IntegerField()
112 fom_sm_id = IntegerField()
113 fom_state_sm_id = IntegerField()
116 time = IntegerField()
118 opcode = IntegerField()
120 session_id = IntegerField()
124 time = IntegerField()
126 opcode = IntegerField()
128 session_id = IntegerField()
132 time = IntegerField()
138 time = IntegerField()
145 tx_id = IntegerField()
146 gr_id = IntegerField()
149 time = IntegerField()
155 time = IntegerField()
161 time = IntegerField()
167 time = IntegerField()
173 time = IntegerField()
180 cas_id = IntegerField()
181 rpc_id = IntegerField()
185 dix_id = IntegerField()
186 cas_id = IntegerField()
190 dix_id = IntegerField()
191 mdix_id = IntegerField()
195 client_id = IntegerField()
196 dix_id = IntegerField()
200 client_id = IntegerField()
201 cob_id = IntegerField()
205 cob_id = IntegerField()
206 rpc_id = IntegerField()
210 client_id = IntegerField()
211 ioo_id = IntegerField()
215 ioo_id = IntegerField()
216 rpc_id = IntegerField()
219 time = IntegerField()
225 time = IntegerField()
232 fom_id = IntegerField()
233 stio_id = IntegerField()
236 time = IntegerField()
242 entity_id = IntegerField()
248 bulk_id = IntegerField()
249 rpc_id = IntegerField()
255 locality = IntegerField()
256 time = IntegerField()
265 s3_request_id = IntegerField()
266 client_id = IntegerField()
274 time = IntegerField()
280 time = IntegerField()
283 val1 = IntegerField(null=
False)
284 val2 = IntegerField(null=
True)
285 val3 = IntegerField(null=
True)
286 val4 = IntegerField(null=
True)
287 val5 = IntegerField(null=
True)
288 val6 = IntegerField(null=
True)
289 val7 = IntegerField(null=
True)
290 val8 = IntegerField(null=
True)
291 val9 = IntegerField(null=
True)
292 val10 = IntegerField(null=
True)
293 val11 = IntegerField(null=
True)
294 val12 = IntegerField(null=
True)
295 val13 = IntegerField(null=
True)
297 db_create_delete_tables = [client_to_dix, dix_to_mdix, dix_to_cas, cas_to_rpc,
298 cas_req, dix_req, client_req, rpc_req, rpc_to_sxid,
299 sxid_to_rpc, fom_desc, fom_to_tx, fom_req, be_tx,
300 client_to_cob, cob_to_rpc, client_to_ioo, ioo_to_rpc,
301 cob_req, ioo_req, fom_req_state, queues, tx_to_gr,
302 fom_to_stio, stio_req, attr, bulk_to_rpc,
303 cas_fom_to_crow_fom, s3_request_to_client,
304 s3_request_uid, s3_request_state, s3_measurement]
308 DB.create_tables(db_create_delete_tables)
312 DB.drop_tables(db_create_delete_tables)
315 DB.init(path, pragmas={
316 'journal_mode':
'off',
317 'cache_size': -1024*1024*256,
318 'synchronous':
'off',
334 delta = time.time() - self.
start 335 logging.info(f
"{self.what}: {delta}s")
344 return yml.translate(str.maketrans(
"><-",
"''_"))
350 np_time = numpy.datetime64(
"".join(mt))
351 return np_time.item()
356 name = measurement[2]
357 time = measurement[1]
358 state = measurement[-1]
359 sm_id = measurement[4]
360 return((table, {
'time': ADDB2PP.to_unix(time),
'state': state,
'id':
int(sm_id) }))
365 ret = yaml.safe_load(
"{"+
" ".join(measurement[3:])+
"}")
383 name = measurement[2]
384 time = measurement[1]
388 clean = (
lambda x: x)
if name
in [
"rpc-item-id-fetch",
389 "rpc-item-id-assign"]
else ADDB2PP.clean_yaml
390 for i,m
in enumerate(measurement[3:]) :
391 measurement[i+3] = m.replace(
"::",
"_")
392 ret = yaml.safe_load(clean(
"{"+
" ".join(measurement[3:])+
"}"))
393 ret[
'time'] = ADDB2PP.to_unix(time)
394 for k,v
in translate_dict.items():
401 name = measurement[2]
402 time = measurement[1]
403 stat = measurement[3:13]
404 ret = dict(zip([s[:-1]
for s
in stat[::2]], stat[1::2]))
405 ret[
'time'] = ADDB2PP.to_unix(time)
407 ret.update({
"locality":
408 labels.get(
"locality")
or 409 labels.get(
"stob-ioq-thread")
or 410 die(f
" {measurement} / {labels} : Label not found!")})
418 name = measurement[2]
419 entity_id = measurement[4][:-1]
420 attr_name = measurement[5][:-1]
421 attr_val = str(measurement[6])
422 ret = {
'entity_id': entity_id,
'name': attr_name,
'val': attr_val }
436 def s3req_bytes_swap(hexstr):
437 t =
int(hexstr.strip(
" ,"), 16)
440 for i
in range(
int(len(t) / 2)):
441 s.append(
int(t[2*i:2*i+2], 16))
443 return "".join(map(
lambda a: f
"{a:02x}", s))
446 ret[
'id'] =
int(measurement[4][:-1])
447 first = s3req_bytes_swap(measurement[6])
448 last = s3req_bytes_swap(measurement[8])
449 ret[
'uuid'] = f
"{first[:8]}-{first[8:12]}-{first[12:16]}-{last[:4]}-{last[4:]}" 455 time = measurement[1]
456 msrm = measurement[3].strip(
',')
457 ret = dict(zip_longest(
458 [f
"val{i}" for i
in range(1,14)],
459 map(
lambda x:
int(x.strip(
',')), measurement[4:])))
460 ret[
"time"] = ADDB2PP.to_unix(time)
466 "runq" : (ADDB2PP.p_queue,
"queues"),
467 "wail" : (ADDB2PP.p_queue,
"queues"),
468 "fom-active" : (ADDB2PP.p_queue,
"queues"),
469 "loc-forq-hist" : (ADDB2PP.p_queue,
"queues"),
470 "loc-wait-hist" : (ADDB2PP.p_queue,
"queues"),
471 "loc-cb-hist" : (ADDB2PP.p_queue,
"queues"),
472 "loc-queue-hist" : (ADDB2PP.p_queue,
"queues"),
473 "stob-ioq-inflight" : (ADDB2PP.p_queue,
"queues"),
474 "stob-ioq-queued" : (ADDB2PP.p_queue,
"queues"),
475 "stob-ioq-got" : (ADDB2PP.p_queue,
"queues"),
476 "rpc-item-id-fetch" : (partial(ADDB2PP.p_yaml_translate, {}),
"sxid_to_rpc"),
477 "fom-descr" : (partial(ADDB2PP.p_yaml_translate, {}),
"fom_desc"),
478 "tx-state" : (ADDB2PP.p_sm_req,
"be_tx"),
479 "fom-phase" : (ADDB2PP.p_sm_req,
"fom_req"),
480 "fom-state" : (ADDB2PP.p_sm_req,
"fom_req_state"),
481 "fom-to-tx" : (ADDB2PP.p_1_to_2,
"fom_to_tx"),
482 "tx-to-gr" : (ADDB2PP.p_1_to_2,
"tx_to_gr"),
483 "cas-to-rpc" : (ADDB2PP.p_1_to_2,
"cas_to_rpc"),
484 "dix-to-cas" : (ADDB2PP.p_1_to_2,
"dix_to_cas"),
485 "dix-to-mdix" : (ADDB2PP.p_1_to_2,
"dix_to_mdix"),
486 "client-to-dix" : (ADDB2PP.p_1_to_2,
"client_to_dix"),
487 "rpc-item-id-assign": (partial(ADDB2PP.p_yaml_translate, {}),
"rpc_to_sxid"),
488 "rpc-out-phase" : (ADDB2PP.p_sm_req,
"rpc_req"),
489 "rpc-in-phase" : (ADDB2PP.p_sm_req,
"rpc_req"),
490 "cas-req-state" : (ADDB2PP.p_sm_req,
"cas_req"),
491 "dix-req-state" : (ADDB2PP.p_sm_req,
"dix_req"),
492 "op-state" : (ADDB2PP.p_sm_req,
"client_req"),
493 "client-to-cob" : (ADDB2PP.p_1_to_2,
"client_to_cob"),
494 "cob-to-rpc" : (ADDB2PP.p_1_to_2,
"cob_to_rpc"),
495 "client-to-ioo" : (ADDB2PP.p_1_to_2,
"client_to_ioo"),
496 "ioo-to-rpc" : (ADDB2PP.p_1_to_2,
"ioo_to_rpc"),
497 "ioo-req-state" : (ADDB2PP.p_sm_req,
"ioo_req"),
498 "cob-req-state" : (partial(ADDB2PP.p_yaml_translate, {
"id":
"cob_id",
"state":
"cob_state"}),
"cob_req"),
499 "stio-req-state" : (partial(ADDB2PP.p_yaml_translate, {
"id":
"stio_id",
"state":
"stio_state"}),
"stio_req"),
500 "fom-to-stio" : (ADDB2PP.p_1_to_2,
"fom_to_stio"),
501 "attr" : (ADDB2PP.p_attr,
"attr"),
502 "bulk-to-rpc" : (ADDB2PP.p_1_to_2,
"bulk_to_rpc"),
503 "cas-fom-to-crow-fom" : (ADDB2PP.p_1_to_2,
"cas_fom_to_crow_fom"),
505 "s3-request-to-client" : (ADDB2PP.p_1_to_2,
"s3_request_to_client"),
506 "s3-request-uid" : (ADDB2PP.s3req_uid,
"s3_request_uid"),
507 "s3-request-state" : (partial(ADDB2PP.p_yaml_translate, {
"id":
"s3_request_id"}),
"s3_request_state"),
508 "s3-measurement" : (ADDB2PP.p_s3_msrm,
"s3_measurement"),
513 ret.update({
"pid": PID})
518 measurement = mnl[0].
split()
519 if measurement == []:
521 measurement_name = measurement[2]
523 labels=dict([kvf
for kvf
in [kv.strip().
split()
for kv
in mnl[1:]]
524 if kvf
and len(kvf)==2])
526 for pname, (parser, table)
in self.
parsers.items():
527 if pname == measurement_name:
528 return _add_pid(*
parser(measurement, labels, table))
533 return APP.consume_record(rec)
if rec
else None 536 def grouper(n, iterable, padvalue=None):
537 return zip_longest(*[iter(iterable)]*n,
540 _wc =
int(wc[
"-l", file]().
split()[0])
541 _wc = ceil(_wc/BLOCK)*BLOCK
543 with tqdm(total=_wc, desc=f
"Read file: {file}")
as t:
544 with open(file)
as fd:
545 for chunk
in grouper(BLOCK, fd):
546 results.extend(pool.map(fd_consume_record, chunk))
552 f_name = splitext((basename(f)))[0]
553 fid = f_name.split(
"_")[-1]
554 return int(fid)
if fid.isnumeric()
else int(fid, base=16)
558 tables = defaultdict(list)
566 with
profiler(f
"Read files: {files}"):
571 pool = multiprocessing.Pool(PROC_NR, pool_init, (
fd_id_get(f),))
576 with tqdm(total=len(rows), desc=
"Insert records")
as t:
578 for k
in tables.keys():
582 raise Exception(
"Cannot insert records with zero dbbatch size")
583 with DB.atomic()
as dbatomic:
584 with tqdm(total=len(tables[k]), desc=f
"into {k}")
as tbl:
585 with
profiler(f
" {k}/{len(tables[k])}"):
587 for batch
in chunked(tables[k], batching):
588 globals()[k].insert_many(batch).execute()
589 tbl.update(len(batch))
591 except OperationalError
as ex:
592 if "too many" in str(ex):
593 logging.warning(f
"insert recs int {k} err {ex}")
595 batching = batching // 2
598 t.update(len(tables[k]))
601 format=
'%(asctime)s %(name)s %(levelname)s %(message)s' 603 level_sh=logging.WARN
604 logging.basicConfig(filename=
'logfile.txt',
609 sh = logging.StreamHandler()
610 sh.setFormatter(logging.Formatter(format))
611 sh.setLevel(level_sh)
612 log = logging.getLogger()
616 auth_srv_format = re.compile(
r"\A([0-9\- :,]{23,29})\s+\w+\s+\[ReqId:([0-9a-fA-F\-]{36})\]\s+(\S+).*")
619 mm = auth_srv_format.match(res)
622 dt = dtparse(mm.group(1))
623 tdelt =
int((dt - datetime.utcfromtimestamp(0)).total_seconds())
624 ret[
"time"] = tdelt * 1000000000 + dt.microsecond * 1000
625 ret[
"id"] = mm.group(2)
626 ret[
"state"] = mm.group(3)
634 with open(file)
as f:
637 print(f
"Error app reading file {file}")
640 blk =
min(BLOCK, len(cont) // PROC_NR)
642 with tqdm(total=len(cont), desc=f
"Process app file: {file}")
as t:
643 for r
in pool.imap_unordered(parse_app_record, cont, blk):
645 results.setdefault(r[
"id"], []).append(r)
653 with
profiler(f
"Read application files: {applogs}"):
658 for req
in s3_request_uid.select():
659 if req.uuid
in states:
660 for st
in states[req.uuid]:
665 "state": st[
"state"]})
671 raise Exception(
"Cannot insert app records with zero dbbatch size")
672 with DB.atomic()
as dbatomic:
674 with tqdm(total=len(req_states), desc=
"Insert app records")
as t:
675 for batch
in chunked(req_states, batching):
676 s3_request_state.insert_many(batch).execute()
679 except OperationalError
as ex:
680 if "too many" in str(ex):
681 logging.warning(f
"app insert recs err {ex}")
683 batching = batching // 2
688 parser = argparse.ArgumentParser(description=
""" 689 addb2db.py: creates sql database containing performance samples 691 parser.add_argument(
'--dumps', nargs=
'+', type=str, required=
False,
694 A bunch of addb2dump.txts can be passed here for processing: 695 python3 addb2db.py --dumps dump1.txt dump2.txt ... 697 parser.add_argument(
'--db', type=str, required=
False,
699 help=
"Output database file")
700 parser.add_argument(
'--procs', type=int, required=
False,
702 help=
"Number of processes to parse dump files")
703 parser.add_argument(
'--block', type=int, required=
False,
705 help=
"Block of data from dump files processed at once")
706 parser.add_argument(
'--batch', type=int, required=
False,
708 help=
"Number of samples commited at once")
709 parser.add_argument(
'--app', nargs=
'+', type=str, required=
False,
711 help=
"Application logs path, e.g. AuthServer")
713 return parser.parse_args()
715 if __name__ ==
'__main__':
727 with multiprocessing.Pool(PROC_NR)
as gpool:
def __exit__(self, exp_type, exp_val, traceback)
static void split(m0_bindex_t offset, int nr, bool commit)
static struct m0_list list
def p_attr(measurement, labels, table)
def p_sm_req(measurement, labels, table)
def parse_app_record(res)
def p_queue(measurement, labels, table)
static M0_UNUSED void print(struct m0_be_list *list)
def consume_record(self, rec)
def p_1_to_2(measurement, labels, table)
def p_s3_msrm(measurement, labels, table)
static long long min(long long a, long long b)
def fd_consume_record(rec)
def parse_app_data(file, pool)
def p_yaml_translate(translate_dict, measurement, labels, table)
def fd_consume_data(file, pool)
def s3req_uid(measurement, labels, table)