Motr  M0
service.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2021 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CAS
24 #include "be/op.h"
25 #include "be/tx_credit.h"
26 #include "be/dtm0_log.h" /* m0_be_dtm0_log API */
27 #include "dtm0/fop.h" /* DTM0 msg and tx_desc */
28 #include "dtm0/service.h" /* m0_dtm0_service API */
29 #include "lib/trace.h"
30 #include "lib/memory.h"
31 #include "lib/finject.h"
32 #include "lib/assert.h"
33 #include "lib/arith.h" /* min_check, M0_3WAY */
34 #include "lib/misc.h" /* M0_IN */
35 #include "lib/errno.h" /* ENOMEM, EPROTO */
36 #include "lib/ext.h"
37 #include "fop/fom_long_lock.h"
38 #include "fop/fom_generic.h"
39 #include "fop/fom_interpose.h"
40 #include "reqh/reqh_service.h"
41 #include "reqh/reqh.h" /* m0_reqh */
42 #include "rpc/rpc_opcodes.h"
43 #include "rpc/rpc_machine.h" /* m0_rpc_machine */
44 #include "conf/schema.h" /* M0_CST_CAS */
45 #include "module/instance.h"
46 #include "addb2/addb2.h"
47 #include "cas/ctg_store.h"
48 #include "pool/pool.h" /* m0_pool_nd_state */
49 
50 #include "dix/cm/cm.h"
51 #include "dix/fid_convert.h" /* m0_dix_fid_cctg_device_id */
52 
53 #include "cas/cas_addb2.h" /* M0_AVI_CAS_KV_SIZES */
54 #include "cas/cas.h"
55 #include "cas/cas_xc.h"
56 #include "cas/index_gc.h"
57 #include "motr/setup.h" /* m0_reqh_context */
58 
292 enum stats_kv {
296 };
297 
302 };
303 
304 enum {
309 };
310 
311 struct cas_service {
314 };
315 
316 struct cas_kv {
317  struct m0_buf ckv_key;
318  struct m0_buf ckv_val;
319 };
320 
321 struct cas_fom {
322  struct m0_fom cf_fom;
323  uint64_t cf_ipos;
324  uint64_t cf_opos;
337  uint64_t cf_curpos;
344  struct cas_kv *cf_ikv;
346  uint64_t cf_ikv_nr;
354  uint64_t cf_in_cids_nr;
361  /* ADDB2 structures to collect long-lock contention metrics. */
367  /* AT helper fields. */
370  /* Statistical counters. */
371  uint64_t cf_kv_stats[STATS_KV_NR]
374 };
375 
390 
397 
416 };
417 
423 
424 #define LAYOUT_IMASK_PTR(l) (&(l)->u.dl_desc.ld_imask)
425 #define CID_IMASK_PTR(cid) LAYOUT_IMASK_PTR(&(cid)->ci_layout)
426 
427 static int cas_service_start (struct m0_reqh_service *service);
428 static void cas_service_stop (struct m0_reqh_service *service);
429 static void cas_service_fini (struct m0_reqh_service *service);
430 static size_t cas_fom_home_locality (const struct m0_fom *fom);
432  const struct m0_reqh_service_type *st);
433 
434 static struct m0_cas_op *cas_op (const struct m0_fom *fom);
435 static const struct m0_fid *cas_fid (const struct m0_fom *fom);
436 static enum m0_cas_type cas_type (const struct m0_fom *fom);
437 static struct m0_cas_rec *cas_at (struct m0_cas_op *op, int idx);
438 static struct m0_cas_rec *cas_out_at (const struct m0_cas_rep *rep, int idx);
439 static bool cas_is_ro (enum m0_cas_opcode opc);
440 static enum m0_cas_opcode m0_cas_opcode (const struct m0_fop *fop);
441 static uint64_t cas_in_nr (const struct m0_fop *fop);
442 static uint64_t cas_out_nr (const struct m0_fop *fop);
443 static void cas_prep (struct cas_fom *fom,
444  enum m0_cas_opcode opc,
445  enum m0_cas_type ct,
446  struct m0_cas_ctg *ctg,
447  uint64_t rec_pos,
448  struct m0_be_tx_credit *accum);
449 static int cas_exec (struct cas_fom *fom,
450  enum m0_cas_opcode opc,
451  enum m0_cas_type ct,
452  struct m0_cas_ctg *ctg,
453  uint64_t rec_pos, int next);
454 
455 static int cas_done(struct cas_fom *fom, struct m0_cas_op *op,
456  struct m0_cas_rep *rep, enum m0_cas_opcode opc);
457 
458 static bool cas_ctidx_op_needed(struct cas_fom *fom, enum m0_cas_opcode opc,
459  enum m0_cas_type ct, uint64_t rec_pos);
460 
461 static bool cas_key_need_to_send(struct cas_fom *fom, enum m0_cas_opcode opc,
462  enum m0_cas_type ct, struct m0_cas_op *op,
463  uint64_t rec_pos);
464 
465 static int cas_prep_send(struct cas_fom *fom, enum m0_cas_opcode opc,
466  enum m0_cas_type ct);
467 
468 static bool cas_is_valid (struct cas_fom *fom, enum m0_cas_opcode opc,
469  enum m0_cas_type ct, const struct m0_cas_rec *rec,
470  uint64_t rec_pos);
471 static int cas_op_recs_check(struct cas_fom *fom, enum m0_cas_opcode opc,
472  enum m0_cas_type ct, struct m0_cas_op *op);
473 
474 static bool cas_fom_invariant(const struct cas_fom *fom);
475 
476 static int cas_buf_cid_decode(struct m0_buf *enc_buf,
477  struct m0_cas_id *cid);
478 static bool cas_fid_is_cctg(const struct m0_fid *fid);
479 static int cas_id_check(const struct m0_cas_id *cid);
480 static int cas_device_check(const struct cas_fom *fom,
481  const struct m0_cas_id *cid);
482 static int cas_op_check(struct m0_cas_op *op,
483  struct cas_fom *fom,
484  bool is_index_drop);
485 static void cas_incoming_kv(const struct cas_fom *fom,
486  uint64_t rec_pos,
487  struct m0_buf *key,
488  struct m0_buf *val);
489 static int cas_incoming_kv_setup(struct cas_fom *fom,
490  const struct m0_cas_op *op);
491 
492 static int cas_kv_load_done(struct cas_fom *fom, enum m0_cas_opcode opc,
493  const struct m0_cas_op *op, int phase);
494 
495 static int cas_ctg_crow_handle(struct cas_fom *fom,
496  const struct m0_cas_id *cid);
497 
498 static int cas_ctidx_mem_place(struct cas_fom *fom,
499  const struct m0_cas_id *in_cid, int next);
500 static int cas_ctidx_mem_free(struct cas_fom *fom, int next);
501 static int cas_ctidx_insert(struct cas_fom *fom, const struct m0_cas_id *in_cid,
502  int next);
503 static int cas_ctidx_lookup(struct cas_fom *fom, const struct m0_cas_id *in_cid,
504  int next);
505 static int cas_ctidx_delete(struct cas_fom *fom, const struct m0_cas_id *in_cid,
506  int next);
507 
510 static const struct m0_fom_ops cas_fom_ops;
511 static const struct m0_fom_type_ops cas_fom_type_ops;
512 static struct m0_sm_conf cas_sm_conf;
514 
515 M0_INTERNAL void m0_cas_svc_init(void)
516 {
526  m0_cas_gc_init();
527 }
528 
529 M0_INTERNAL void m0_cas_svc_fini(void)
530 {
531  m0_cas_gc_fini();
534 }
535 
536 M0_INTERNAL void m0_cas_svc_fop_args(struct m0_sm_conf **sm_conf,
537  const struct m0_fom_type_ops **fom_ops,
538  struct m0_reqh_service_type **svctype)
539 {
540  *sm_conf = &cas_sm_conf;
541  *fom_ops = &cas_fom_type_ops;
542  *svctype = &m0_cas_service_type;
543 }
544 
545 M0_INTERNAL void m0_cas__ut_svc_be_set(struct m0_reqh_service *svc,
546  struct m0_be_domain *dom)
547 {
549  service->c_be_domain = dom;
550 }
551 
552 M0_INTERNAL struct m0_be_domain *
554 {
556  return service->c_be_domain;
557 }
558 
559 
561 {
562  int rc;
564  struct m0_be_domain *ut_dom;
565 
567  ut_dom = m0_cas__ut_svc_be_get(svc);
568  /* XXX It's a workaround. It's needed until we have a better way. */
569  service->c_be_domain = ut_dom != NULL ?
570  ut_dom : svc->rs_reqh_ctx->rc_beseg->bs_domain;
571  rc = m0_ctg_store_init(service->c_be_domain);
572  if (rc == 0) {
573  /*
574  * Start deleted index garbage collector at boot to continue
575  * index drop possibly interrupted by system restart.
576  * If no pending index drop, it finishes soon.
577  */
579  }
580  return rc;
581 }
582 
584 {
585  /* Wait until garbage collector destroys all dead indices. */
587 }
588 
590 {
593 }
594 
596 {
598 
601  m0_free(service);
602 }
603 
605  const struct m0_reqh_service_type *stype)
606 {
607  struct cas_service *service;
608 
610  if (service != NULL) {
611  *svc = &service->c_service;
612  (*svc)->rs_type = stype;
613  (*svc)->rs_ops = &cas_service_ops;
614  return M0_RC(0);
615  } else
616  return M0_ERR(-ENOMEM);
617 }
618 
619 static bool cas_service_started(struct m0_fop *fop,
620  struct m0_reqh *reqh)
621 {
622  struct m0_reqh_service *svc;
623  const struct m0_reqh_service_type *stype;
624 
626  M0_ASSERT(stype != NULL);
627 
629  M0_ASSERT(svc != NULL);
630 
632 }
633 
634 static int cas_fom_create(struct m0_fop *fop,
635  struct m0_fom **out, struct m0_reqh *reqh)
636 {
637  struct cas_fom *fom;
638  struct m0_fom *fom0;
639  struct m0_fop *repfop;
640  struct m0_cas_rep *repdata;
641  struct m0_cas_rec *repv;
642  struct cas_kv *ikv;
643  uint64_t in_nr;
644  uint64_t out_nr;
645 
647  return M0_ERR(-EAGAIN);
648 
649  M0_ALLOC_PTR(fom);
655  in_nr = cas_in_nr(fop);
656  if (in_nr != 0)
657  M0_ALLOC_ARR(ikv, in_nr);
658  else
659  ikv = NULL;
660 
661  out_nr = cas_out_nr(fop);
662  repfop = m0_fop_reply_alloc(fop, &cas_rep_fopt);
670  if (out_nr != 0)
671  M0_ALLOC_ARR(repv, out_nr);
672  else
673  repv = NULL;
674  if (fom != NULL && repfop != NULL &&
675  (in_nr == 0 || ikv != NULL) &&
676  (out_nr == 0 || repv != NULL)) {
677  *out = fom0 = &fom->cf_fom;
678  fom->cf_ikv = ikv;
679  fom->cf_ikv_nr = in_nr;
680  repdata = m0_fop_data(repfop);
681  repdata->cgr_rep.cr_nr = out_nr;
682  repdata->cgr_rep.cr_rec = repv;
684  &cas_fom_ops, fop, repfop, reqh);
685  m0_long_lock_link_init(&fom->cf_lock, fom0,
686  &fom->cf_lock_addb2);
687  m0_long_lock_link_init(&fom->cf_meta, fom0,
688  &fom->cf_meta_addb2);
689  m0_long_lock_link_init(&fom->cf_ctidx, fom0,
690  &fom->cf_ctidx_addb2);
691  m0_long_lock_link_init(&fom->cf_dead_index, fom0,
692  &fom->cf_dead_index_addb2);
693  m0_long_lock_link_init(&fom->cf_del_lock, fom0,
694  &fom->cf_del_lock_addb2);
695  return M0_RC(0);
696  } else {
697  m0_free(ikv);
698  m0_free(repfop);
699  m0_free(repv);
700  m0_free(fom);
701  return M0_ERR(-ENOMEM);
702  }
703 }
704 
705 static int cas_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom,
706  int next_phase)
707 {
708  if (cas_in_ut()) {
709  m0_fom_phase_set(fom, next_phase);
710  return M0_FSO_AGAIN;
711  }
712 
713  return m0_rpc_at_load(ab, fom, next_phase);
714 }
715 
716 static int cas_at_reply(struct m0_rpc_at_buf *in,
717  struct m0_rpc_at_buf *out,
718  struct m0_buf *repbuf,
719  struct m0_fom *fom,
720  int next_phase)
721 {
722  if (cas_in_ut()) {
723  m0_fom_phase_set(fom, next_phase);
724  out->ab_type = M0_RPC_AT_INLINE;
725  out->u.ab_buf = *repbuf;
726  return M0_FSO_AGAIN;
727  }
728 
729  return m0_rpc_at_reply(in, out, repbuf, fom, next_phase);
730 }
731 
732 static void cas_at_fini(struct m0_rpc_at_buf *ab)
733 {
734  if (cas_in_ut()) {
735  ab->ab_type = M0_RPC_AT_EMPTY;
736  return;
737  }
738 
739  m0_rpc_at_fini(ab);
740 }
741 
742 static void cas_incoming_kv(const struct cas_fom *fom,
743  uint64_t rec_pos,
744  struct m0_buf *key,
745  struct m0_buf *val)
746 {
747  *key = fom->cf_ikv[rec_pos].ckv_key;
748  *val = fom->cf_ikv[rec_pos].ckv_val;
749 }
750 
751 static void cas_update_kv_stats(struct cas_fom *fom,
752  const struct m0_rpc_at_buf *ab,
753  m0_bcount_t nob,
754  enum stats_kv kv,
755  enum stats_kv_io kv_io)
756 {
757  if (ab->ab_type == M0_RPC_AT_INLINE)
758  fom->cf_kv_stats[kv][kv_io][STATS_NR_INLINE]++;
759  else if (M0_IN(ab->ab_type, (M0_RPC_AT_BULK_SEND,
762  fom->cf_kv_stats[kv][kv_io][STATS_NR_BULK]++;
763  fom->cf_kv_stats[kv][kv_io][STATS_SIZE] += nob;
764 }
765 
766 static int cas_incoming_kv_setup(struct cas_fom *fom,
767  const struct m0_cas_op *op)
768 {
769  uint64_t i;
770  struct m0_cas_rec *rec;
771  struct m0_buf *key;
772  struct m0_buf *val;
773  int rc = 0;
774 
775  for (i = 0; i < op->cg_rec.cr_nr && rc == 0; i++) {
776  rec = &op->cg_rec.cr_rec[i];
777  key = &fom->cf_ikv[i].ckv_key;
778  val = &fom->cf_ikv[i].ckv_val;
779 
781  rc = m0_rpc_at_get(&rec->cr_key, key);
782  if (rc == 0) {
783  cas_update_kv_stats(fom, &rec->cr_key, key->b_nob,
785  if (m0_rpc_at_is_set(&rec->cr_val)) {
786  rc = m0_rpc_at_get(&rec->cr_val, val);
787  if (rc == 0)
789  &rec->cr_val,
790  val->b_nob,
791  STATS_VAL,
792  STATS_KV_IN);
793  } else
794  *val = M0_BUF_INIT0;
795  }
796  }
797 
798  return M0_RC(rc);
799 }
800 
806  const struct m0_cas_op *op,
807  bool key,
808  size_t opos)
809 {
810  struct m0_rpc_at_buf *ret;
811  struct m0_cas_rec *rec = NULL;
812  uint64_t i = 0;
813 
814  switch (opc) {
815  case CO_PUT:
816  case CO_DEL:
817  case CO_GC:
818  ret = NULL;
819  break;
820  case CO_GET:
821  ret = key ? NULL : &op->cg_rec.cr_rec[opos].cr_val;
822  break;
823  case CO_CUR:
824  while (opos >= op->cg_rec.cr_rec[i].cr_rc) {
825  opos -= op->cg_rec.cr_rec[i].cr_rc;
826  i++;
827  }
828  if (i < op->cg_rec.cr_nr)
829  rec = &op->cg_rec.cr_rec[i];
830  if (rec != NULL && rec->cr_kv_bufs.cv_nr != 0) {
831  struct m0_cas_kv_vec *kv;
832 
833  kv = &rec->cr_kv_bufs;
834  if (opos < kv->cv_nr)
835  ret = key ? &kv->cv_rec[opos].ck_key :
836  &kv->cv_rec[opos].ck_val;
837  else
838  ret = NULL;
839  } else
840  ret = NULL;
841  break;
842  default:
843  M0_IMPOSSIBLE("Invalid opcode.");
844  }
845  return ret;
846 }
847 
858 static int cas_key_send(struct cas_fom *fom,
859  const struct m0_cas_op *op,
860  enum m0_cas_opcode opc,
861  const struct m0_cas_rep *rep,
862  enum cas_fom_phase next_phase)
863 {
864  struct m0_cas_rec *orec = cas_out_at(rep, fom->cf_opos);
865  struct m0_rpc_at_buf *in;
866  int result;
867 
868  M0_ENTRY("fom %p, opc %d, opos %"PRIu64, fom, opc, fom->cf_opos);
869  m0_rpc_at_init(&orec->cr_key);
870  in = cas_out_complementary(opc, op, true, fom->cf_opos);
871  result = cas_at_reply(in,
872  &orec->cr_key,
873  &fom->cf_out_key,
874  &fom->cf_fom,
875  next_phase);
876  cas_update_kv_stats(fom, &orec->cr_key, fom->cf_out_key.b_nob,
878  return result;
879 }
880 
884 static int cas_val_send(struct cas_fom *fom,
885  const struct m0_cas_op *op,
886  enum m0_cas_opcode opc,
887  const struct m0_cas_rep *rep,
888  enum cas_fom_phase next_phase)
889 {
890  struct m0_cas_rec *orec = cas_out_at(rep, fom->cf_opos);
891  struct m0_rpc_at_buf *in;
892  int result;
893 
894  M0_ENTRY("fom %p, opc %d, opos %"PRIu64, fom, opc, fom->cf_opos);
895  m0_rpc_at_init(&orec->cr_val);
896  in = cas_out_complementary(opc, op, false, fom->cf_opos);
897  result = cas_at_reply(in,
898  &orec->cr_val,
899  &fom->cf_out_val,
900  &fom->cf_fom,
901  next_phase);
902  cas_update_kv_stats(fom, &orec->cr_val, fom->cf_out_val.b_nob,
904  return result;
905 }
906 
907 static int cas_op_recs_check(struct cas_fom *fom,
908  enum m0_cas_opcode opc,
909  enum m0_cas_type ct,
910  struct m0_cas_op *op)
911 {
912  /*
913  * Tricky: cas_is_valid() has side effects when working
914  * with meta: it fills ->cf_in_cids.
915  */
916  return op->cg_rec.cr_nr != 0 && op->cg_rec.cr_rec != NULL &&
917  m0_forall(i, op->cg_rec.cr_nr,
918  cas_is_valid(fom, opc, ct, cas_at(op, i), i)) ?
919  M0_RC(0) : M0_ERR(-EPROTO);
920 }
921 
923 {
924  struct m0_fom *fom0 = &fom->cf_fom;
925  bool payload_exceeded = false;
926 
927  /*
928  * For some unit tests it is ok when item session is NULL, we can't call
929  * m0_rpc_item_max_payload_exceeded() in this case.
930  */
931  if (!cas_in_ut() || fom0->fo_fop->f_item.ri_session != NULL)
932  payload_exceeded =
934  &fom0->fo_rep_fop->f_item,
935  fom0->fo_fop->f_item.ri_session);
936  return payload_exceeded;
937 }
938 
939 static bool cas_key_need_to_send(struct cas_fom *fom, enum m0_cas_opcode opc,
940  enum m0_cas_type ct, struct m0_cas_op *op,
941  uint64_t rec_pos)
942 {
943  struct m0_buf in_key;
944  struct m0_buf in_val;
945  struct m0_buf key;
946  struct m0_buf val;
947  struct m0_cas_id *cid;
948  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
949  bool key_send = true;
950 
951  if (opc == CO_CUR && fom->cf_curpos == 0) {
952  if (op->cg_flags & COF_EXCLUDE_START_KEY) {
953  if (op->cg_flags & COF_SLANT) {
954  m0_ctg_cursor_kv_get(ctg_op, &key, &val);
955  if (ct != CT_META) {
956  cas_incoming_kv(fom, rec_pos, &in_key,
957  &in_val);
958  if (m0_buf_eq(&key, &in_key))
959  key_send = false;
960  } else {
961  cid = &fom->cf_in_cids[rec_pos];
962  if (m0_fid_eq(&cid->ci_fid, key.b_addr))
963  key_send = false;
964  }
965  } else
966  key_send = false;
967  }
968 
969  if (!key_send)
970  fom->cf_startkey_excluded = true;
971  }
972 
973  return key_send;
974 }
975 
976 static void cas_fom_cleanup(struct cas_fom *fom, bool ctg_op_fini)
977 {
978  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
979  struct m0_cas_ctg *meta = m0_ctg_meta();
980  struct m0_cas_ctg *ctidx = m0_ctg_ctidx();
981  struct m0_cas_ctg *dead_index = m0_ctg_dead_index();
982 
983  m0_long_unlock(m0_ctg_lock(meta), &fom->cf_meta);
984  m0_long_unlock(m0_ctg_lock(ctidx), &fom->cf_ctidx);
985  m0_long_unlock(m0_ctg_lock(dead_index), &fom->cf_dead_index);
986  m0_long_unlock(m0_ctg_del_lock(), &fom->cf_del_lock);
987  if (fom->cf_ctg != NULL)
988  m0_long_unlock(m0_ctg_lock(fom->cf_ctg), &fom->cf_lock);
989  if (ctg_op_fini) {
990  if (m0_ctg_cursor_is_initialised(ctg_op))
991  m0_ctg_cursor_fini(ctg_op);
992  m0_ctg_op_fini(ctg_op);
993  }
994 }
995 
996 static void cas_fom_failure(struct cas_fom *fom, int rc, bool ctg_op_fini)
997 {
998  struct m0_cas_rep *repdata;
999  struct m0_cas_rec *repv;
1000  uint64_t i;
1001 
1002  M0_PRE(rc < 0);
1003 
1004  /*
1005  * Some generic error happened, no input record can be processed.
1006  * Clean already filled items in reply operation results.
1007  * CAS client shouldn't access cgr_rep array if cgr_rc is non-zero.
1008  */
1009  repdata = m0_fop_data(fom->cf_fom.fo_rep_fop);
1010  for (i = 0; i < repdata->cgr_rep.cr_nr; i++) {
1011  repv = &repdata->cgr_rep.cr_rec[i];
1014  }
1015  m0_free(repdata->cgr_rep.cr_rec);
1016  repdata->cgr_rep.cr_rec = NULL;
1017  repdata->cgr_rep.cr_nr = 0;
1018 
1019  cas_fom_cleanup(fom, ctg_op_fini);
1021 }
1022 
1023 static int cas_dtm0_logrec_credit_add(struct m0_fom *fom0)
1024 {
1025  struct m0_be_tx_credit cred = {};
1026  struct m0_xcode_ctx ctx = {};
1027  int rc;
1028 
1029  M0_ENTRY();
1030 
1032  &M0_XCODE_OBJ(m0_cas_op_xc, cas_op(fom0)));
1033  if (rc < 0)
1034  return M0_ERR(rc);
1035 
1036  M0_ASSERT(rc > 0);
1037 
1039  &cas_op(fom0)->cg_txd,
1040  &((struct m0_buf) { .b_nob = rc }),
1041  m0_fom_reqh(fom0)->rh_beseg,
1042  NULL, &cred);
1043  m0_be_tx_credit_add(&fom0->fo_tx.tx_betx_cred, &cred);
1044 
1045  return M0_RC(0);
1046 }
1047 
1048 static int cas_dtm0_logrec_add(struct m0_fom *fom0,
1049  enum m0_dtm0_tx_pa_state state)
1050 {
1051  /* log the dtm0 logrec before completing the cas op */
1052  struct m0_dtm0_service *dtms =
1054  struct m0_dtm0_tx_desc *msg = &cas_op(fom0)->cg_txd;
1055  struct m0_buf buf = {};
1056  int i;
1057  int rc;
1058 
1059  for (i = 0; i < msg->dtd_ps.dtp_nr; ++i) {
1060  if (m0_fid_eq(&msg->dtd_ps.dtp_pa[i].p_fid,
1061  &dtms->dos_generic.rs_service_fid)) {
1062  msg->dtd_ps.dtp_pa[i].p_state = state;
1063  break;
1064  }
1065  }
1066  rc = m0_xcode_obj_enc_to_buf(&M0_XCODE_OBJ(m0_cas_op_xc, cas_op(fom0)),
1067  &buf.b_addr, &buf.b_nob) ?:
1068  m0_dtm0_logrec_update(dtms->dos_log, &fom0->fo_tx.tx_betx, msg,
1069  &buf);
1070  m0_buf_free(&buf);
1071 
1072  return rc;
1073 }
1074 
1075 static void cas_fom_success(struct cas_fom *fom, enum m0_cas_opcode opc)
1076 {
1077  cas_fom_cleanup(fom, opc == CO_CUR);
1079 }
1080 
1081 static void addb2_add_kv_attrs(const struct cas_fom *fom, enum stats_kv_io kv_io)
1082 {
1083  int i;
1084  int j;
1085  uint64_t sm_id = m0_sm_id_get(&fom->cf_fom.fo_sm_phase);
1086  static int kv_stats_labels[STATS_KV_NR][STATS_KV_IO_NR][STATS_NR] =
1087  {
1088  {
1089  {
1093  },
1094  {
1098  },
1099  },
1100  {
1101  {
1105  },
1106  {
1110  },
1111  }
1112  };
1113 
1114  for (i = 0; i < STATS_KV_NR; i++) {
1115  for (j = 0; j < STATS_NR; j++)
1116  M0_ADDB2_ADD(M0_AVI_ATTR, sm_id,
1117  kv_stats_labels[i][kv_io][j],
1118  fom->cf_kv_stats[i][kv_io][j]);
1119  }
1120 }
1121 
1129 static bool op_is_index_drop(enum m0_cas_opcode opc,
1130  enum m0_cas_type ct)
1131 {
1132  return CTG_OP_COMBINE(CO_DEL, CT_META) == CTG_OP_COMBINE(opc, ct);
1133 }
1134 
1135 /*
1136  * Wait for the BE transaction to be persisted.
1137  */
1138 static int op_sync_wait(struct m0_fom *fom)
1139 {
1140  struct m0_be_tx *tx;
1141 
1142  tx = m0_fom_tx(fom);
1143  if (fom->fo_tx.tx_state != M0_DTX_INVALID) {
1144  /*
1145  * The above checking of 'fom tx' state must go first
1146  * before any other access or checking on this fom tx,
1147  * because if the fom tx is not initialized, any access
1148  * of this fom tx is not safe or useful.
1149  */
1150 
1151  if (m0_be_tx_state(tx) < M0_BTS_LOGGED) {
1152  M0_LOG(M0_DEBUG, "fom wait for tx to be logged");
1153  m0_fom_wait_on(fom, &tx->t_sm.sm_chan, &fom->fo_cb);
1154  return M0_FSO_WAIT;
1155  }
1156  }
1157  return M0_FSO_AGAIN;
1158 }
1159 
1160 static int cas_fom_tick(struct m0_fom *fom0)
1161 {
1162  uint64_t i;
1163  int rc;
1164  int result = M0_FSO_AGAIN;
1165  struct cas_fom *fom = M0_AMB(fom, fom0, cf_fom);
1166  int phase = m0_fom_phase(fom0);
1167  struct m0_cas_op *op = cas_op(fom0);
1168  struct m0_cas_rep *rep = m0_fop_data(fom0->fo_rep_fop);
1169  enum m0_cas_opcode opc = m0_cas_opcode(fom0->fo_fop);
1170  enum m0_cas_type ct = cas_type(fom0);
1171  bool is_meta = ct == CT_META;
1172  struct m0_cas_ctg *ctg = fom->cf_ctg;
1173  size_t ipos = fom->cf_ipos;
1174  struct m0_cas_id *icid = &fom->cf_in_cids[ipos];
1175  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
1176  struct cas_service *service = M0_AMB(service,
1177  fom0->fo_service, c_service);
1178  struct m0_cas_ctg *meta = m0_ctg_meta();
1179  struct m0_cas_ctg *ctidx = m0_ctg_ctidx();
1180  struct m0_cas_rec *rec = NULL;
1181  bool is_dtm0_used = ENABLE_DTM0 &&
1182  !m0_dtm0_tx_desc_is_none(&op->cg_txd);
1183  bool is_index_drop;
1184  bool do_ctidx;
1185  int next_phase;
1186 
1187  M0_ENTRY("fom %p phase %d op_flag=0x%x", fom, phase, op->cg_flags);
1188  M0_PRE(ctidx != NULL);
1190  M0_PRE(ergo(is_dtm0_used, m0_dtm0_tx_desc__invariant(&op->cg_txd)));
1191 
1192  if (!M0_IS0(&op->cg_txd) && phase == M0_FOPH_INIT)
1193  M0_LOG(M0_DEBUG, "Got CAS with txid: " DTID0_F,
1194  DTID0_P(&op->cg_txd.dtd_id));
1195 
1196  is_index_drop = op_is_index_drop(opc, ct);
1197 
1198  switch (phase) {
1199  case M0_FOPH_INIT ... M0_FOPH_NR - 1:
1200 
1201  if (phase == M0_FOPH_INIT && !fom->cf_op_checked) {
1203  break;
1204  }
1205  if (phase == M0_FOPH_TXN_COMMIT) {
1206  /* Piggyback some information about the transaction */
1207  if (M0_IN(opc, (CO_PUT, CO_DEL)))
1208  m0_fom_mod_rep_fill(&rep->cgr_mod_rep, fom0);
1209  }
1210  if (phase == M0_FOPH_FAILURE) {
1211  /*
1212  * Cleanup locks, etc. in case a failure happened in a
1213  * generic phase. When a failure happens in our phase,
1214  * cas_fom_failure() is called explicitly.
1215  */
1216  cas_fom_cleanup(fom, false);
1217  }
1218  result = m0_fom_tick_generic(fom0);
1219  if (m0_fom_phase(fom0) == M0_FOPH_TXN_OPEN) {
1220  M0_ASSERT(phase == M0_FOPH_TXN_INIT);
1221  m0_fom_phase_set(fom0, CAS_START);
1222  }
1223  if (phase == M0_FOPH_TXN_COMMIT) {
1224  if (op->cg_flags & COF_SYNC_WAIT)
1225  result = op_sync_wait(fom0);
1226  }
1227  if (cas_in_ut() && m0_fom_phase(fom0) == M0_FOPH_QUEUE_REPLY) {
1229  }
1230 
1231  /*
1232  * Once the transition TXN_DONE_WAIT->FINISH is completed,
1233  * we need to send out P-msg if DTM0 is in use.
1234  */
1235  if (phase == M0_FOPH_TXN_DONE_WAIT &&
1236  m0_fom_phase(fom0) == M0_FOPH_FINISH && is_dtm0_used) {
1237  rc = m0_dtm0_on_committed(fom0, &cas_op(fom0)->cg_txd.dtd_id);
1238  if (rc != 0)
1239  M0_LOG(M0_WARN, "Could not send PERSISTENT "
1240  "messages out");
1241  }
1242  break;
1243  case CAS_CHECK_PRE:
1244  rc = cas_id_check(&op->cg_id);
1245  if (rc == 0) {
1246  if (cas_fid_is_cctg(&op->cg_id.ci_fid))
1247  result = cas_ctidx_lookup(fom, &op->cg_id,
1248  CAS_CHECK);
1249  else
1250  m0_fom_phase_set(fom0, CAS_CHECK);
1251  } else
1253  break;
1254  case CAS_CHECK:
1255  rc = cas_op_check(op, fom, is_index_drop);
1256  if (rc == 0) {
1257  fom->cf_op_checked = true;
1259  m0_sm_id_get(&fom0->fo_sm_phase),
1261  fom->cf_ikv_nr);
1263  } else
1265  break;
1266  case CAS_START:
1267  if (is_meta) {
1268  /*
1269  * Assign meta to ctg to re-use CAS_LOCK state.
1270  * If this is write op on meta, will need W lock.
1271  */
1272  fom->cf_ctg = meta;
1274  } else
1276  break;
1277  case CAS_META_LOCK:
1278  result = m0_long_read_lock(m0_ctg_lock(meta),
1279  &fom->cf_meta,
1280  CAS_META_LOOKUP);
1281  result = M0_FOM_LONG_LOCK_RETURN(result);
1282  break;
1283  case CAS_META_LOOKUP:
1284  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1285  result = m0_ctg_meta_lookup(ctg_op,
1286  &op->cg_id.ci_fid,
1288  break;
1289  case CAS_META_LOOKUP_DONE:
1290  rc = m0_ctg_op_rc(ctg_op);
1291  if (rc == 0) {
1292  fom->cf_ctg = m0_ctg_meta_lookup_result(ctg_op);
1293  M0_ASSERT(fom->cf_ctg != NULL);
1295  } else if (rc == -ENOENT && opc == CO_PUT &&
1296  (op->cg_flags & COF_CROW)) {
1297  m0_long_unlock(m0_ctg_lock(meta), &fom->cf_meta);
1298  rc = cas_ctg_crow_handle(fom, &op->cg_id);
1299  if (rc == 0) {
1301  result = M0_FSO_WAIT;
1302  }
1303  }
1304  m0_ctg_op_fini(ctg_op);
1305  if (rc != 0)
1306  cas_fom_failure(fom, rc, false);
1307  break;
1308  case CAS_CTG_CROW_DONE:
1309  /*
1310  * EEXIST is treated as success as desired index may be created
1311  * by some other fom after the index lookup resulted ENOENT and
1312  * before the index creation attempt initiated by current fom
1313  * when CROW is used.
1314  *
1315  * Example: simultaneous put of keys key1 and key2 into the same
1316  * index index1, index1 is empty on this node and not yet
1317  * phisically created as CROW is used for distributed indices,
1318  * keys are sent in a separate fops fop1 and fop2:
1319  *
1320  * 1.1. put fop1 recevied -> fom1
1321  * 1.2. put fop2 recevied -> fom2
1322  * 2.1. fom1 -> lookup index1 -> ENOENT
1323  * 2.2. fom2 -> lookup index1 -> ENOENT
1324  * 3.1. fom1 -> run fom1.1 to create index1
1325  * 3.2. fom2 -> run fom2.1 to create index1
1326  * 4.1. fom1.1 -> create index1 -> SUCCESS
1327  * 4.2. fom2.1 -> create index1 -> EEXIST
1328  * 5.1. fom1.1 -> SUCCESS -> fom1
1329  * 5.2. fom2.1 -> EEXIST -> fom2
1330  * 6.1. fom1 -> lookup index1 -> SUCCESS
1331  * 6.2. fom2 -> send EEXIST
1332  * 7. fom1 -> insert key1 into index1 -> send SUCCESS
1333  */
1334  if (fom->cf_thrall_rc == 0 || fom->cf_thrall_rc == -EEXIST)
1335  m0_fom_phase_set(fom0, CAS_START);
1336  else
1337  cas_fom_failure(fom, fom->cf_thrall_rc, false);
1338  break;
1339  case CAS_LOAD_KEY:
1340  result = cas_at_load(&cas_at(op, ipos)->cr_key, fom0,
1341  CAS_LOAD_VAL);
1342  break;
1343  case CAS_LOAD_VAL:
1344  /*
1345  * Don't check result of key loading here, result codes for all
1346  * keys/values are checked in cas_load_check().
1347  */
1348  result = cas_at_load(&cas_at(op, ipos)->cr_val, fom0,
1349  CAS_LOAD_DONE);
1350  break;
1351  case CAS_LOAD_DONE:
1352  /* Record key/value are loaded. */
1353  fom->cf_ipos++;
1354  M0_ASSERT(fom->cf_ipos <= op->cg_rec.cr_nr);
1355  /* Do we need to load other keys and values from op? */
1356  if (fom->cf_ipos == op->cg_rec.cr_nr) {
1357  fom->cf_ipos = 0;
1359  if (rc != 0)
1360  cas_fom_failure(fom, M0_ERR(rc), false);
1361  else {
1363  result = cas_kv_load_done(fom, opc, op,
1364  is_index_drop ?
1366  CAS_LOCK);
1367  }
1368  break;
1369  }
1370  /* Load next key/value. */
1372  break;
1373  case CAS_LOCK:
1374  M0_ASSERT(ctg != NULL);
1375  /*
1376  * In case of index drop use cf_meta lock: we need cf_lock to
1377  * lock index.
1378  */
1379  result = m0_long_lock(m0_ctg_lock(ctg),
1380  !cas_is_ro(opc),
1381  is_index_drop ? &fom->cf_meta :
1382  &fom->cf_lock,
1383  is_meta ? CAS_CTIDX_LOCK : CAS_PREP);
1384  result = M0_FOM_LONG_LOCK_RETURN(result);
1385  fom->cf_ipos = 0;
1386  break;
1387  case CAS_CTIDX_LOCK:
1388  result = m0_long_lock(m0_ctg_lock(m0_ctg_ctidx()), !cas_is_ro(opc),
1389  &fom->cf_ctidx, CAS_PREP);
1390  result = M0_FOM_LONG_LOCK_RETURN(result);
1391  break;
1392  case CAS_DEAD_INDEX_LOCK:
1394  &fom->cf_dead_index, CAS_LOCK);
1395  result = M0_FOM_LONG_LOCK_RETURN(result);
1396  break;
1397  case CAS_PREP:
1398  M0_ASSERT(m0_forall(i, rep->cgr_rep.cr_nr,
1399  rep->cgr_rep.cr_rec[i].cr_rc == 0));
1400 
1401  rc = is_meta ? 0 : cas_op_recs_check(fom, opc, ct, op);
1402  if (rc != 0) {
1403  cas_fom_failure(fom, M0_ERR(rc), false);
1404  break;
1405  }
1406 
1407  for (i = 0; i < op->cg_rec.cr_nr; i++)
1408  cas_prep(fom, opc, ct, ctg, i,
1409  &fom0->fo_tx.tx_betx_cred);
1410  fom->cf_ipos = 0;
1411  fom->cf_opos = 0;
1412  if (opc == CO_CUR) {
1413  /*
1414  * There is only one catalogue operation context for
1415  * cursor operation.
1416  */
1417  m0_ctg_op_init(&fom->cf_ctg_op, fom0,
1418  cas_op(fom0)->cg_flags);
1419  }
1420 
1421  /*
1422  * If dtm0 is used we need to calculate credits for creating
1423  * a dtm0 log record.
1424  */
1425  if (is_dtm0_used) {
1427  if (rc != 0) {
1428  cas_fom_failure(fom, M0_ERR(rc), false);
1429  break;
1430  }
1431  }
1432 
1434  /*
1435  * @todo waiting for transaction open with btree (which can be
1436  * the meta-catalogue) locked, because tree height has to be
1437  * fixed for the correct credit calculation.
1438  */
1439  break;
1440  case CAS_TXN_OPENED:
1441  m0_fom_phase_set(fom0, is_meta ? CAS_LOOP : CAS_META_UNLOCK);
1442  break;
1443  case CAS_META_UNLOCK:
1444  M0_ASSERT(!is_meta);
1445  m0_long_read_unlock(m0_ctg_lock(meta), &fom->cf_meta);
1446  m0_fom_phase_set(fom0, CAS_LOOP);
1447  break;
1448  case CAS_LOOP:
1449  /* Skip empty CUR requests. */
1450  while (opc == CO_CUR && ipos < op->cg_rec.cr_nr &&
1451  cas_at(op, ipos)->cr_rc == 0)
1452  fom->cf_ipos = ++ipos;
1453  /* If all input has been processed... */
1454  if (ipos == op->cg_rec.cr_nr ||
1455  /* ... or all output has been generated. */
1456  fom->cf_opos == rep->cgr_rep.cr_nr) {
1457  /*
1458  * Check reply payload size against max RPC item payload
1459  * size.
1460  */
1462  cas_fom_failure(fom, M0_ERR(-E2BIG),
1463  opc == CO_CUR);
1464  else {
1465  if (is_dtm0_used)
1466  m0_fom_phase_set(fom0, CAS_DTM0);
1467  else
1468  cas_fom_success(fom, opc);
1469  }
1471  } else {
1472  do_ctidx = cas_ctidx_op_needed(fom, opc, ct, ipos);
1473  result = cas_exec(fom, opc, ct, ctg, ipos,
1474  is_index_drop ?
1476  do_ctidx ? CAS_CTIDX :
1478  }
1479  break;
1480  /*
1481  * Here are states specific to catalogue-index.
1482  */
1483  case CAS_CTIDX:
1484  M0_ASSERT(fom->cf_opos < rep->cgr_rep.cr_nr);
1485  rec = cas_out_at(rep, fom->cf_opos);
1486  M0_ASSERT(rec != NULL);
1487  if (rec->cr_rc == 0 && m0_ctg_op_rc(ctg_op) == 0) {
1488  m0_ctg_op_fini(ctg_op);
1489  m0_fom_phase_set(fom0,
1490  opc == CO_PUT ? CAS_CTIDX_MEM_PLACE :
1492  } else
1494  break;
1495  case CAS_CTIDX_MEM_PLACE:
1496  result = cas_ctidx_mem_place(fom, icid, CAS_CTIDX_INSERT);
1497  break;
1498  case CAS_CTIDX_INSERT:
1499  result = cas_ctidx_insert(fom, icid, is_index_drop ?
1500  CAS_IDROP_LOOP :
1502  break;
1503  case CAS_CTIDX_LOOKUP:
1504  result = cas_ctidx_lookup(fom, icid, CAS_CTIDX_MEM_FREE);
1505  break;
1506  case CAS_CTIDX_MEM_FREE:
1508  break;
1509  case CAS_CTIDX_DELETE:
1510  result = cas_ctidx_delete(fom, icid,
1511  is_index_drop ? CAS_IDROP_LOOP : CAS_PREPARE_SEND);
1512  break;
1513 
1514  /*
1515  * Here are states specific to index drop.
1516  */
1517  case CAS_INSERT_TO_DEAD:
1518  /*
1519  * We are here if this is index drop.
1520  * Now insert it into dead_index to process actual tree delete
1521  * by background garbage collector.
1522  */
1523  rc = m0_ctg_op_rc(ctg_op);
1524  if (rc == 0) {
1525  /*
1526  * Just completed meta lookup for this fid.
1527  */
1528  fom->cf_ctg = m0_ctg_meta_lookup_result(ctg_op);
1529  /*
1530  * Meta stores pointers to catalogues.
1531  * We will need catalogue later to lock it.
1532  */
1533  fom->cf_moved_ctgs[ipos] = fom->cf_ctg;
1534  /*
1535  * Insert to dead index, then can remove from meta.
1536  */
1537  m0_ctg_op_fini(ctg_op);
1538  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1539  M0_LOG(M0_DEBUG, "Insert to dead ctg %p", fom->cf_ctg);
1540  m0_ctg_dead_index_insert(ctg_op,
1541  fom->cf_ctg,
1543  /*
1544  * The catalogue (cf_ctg) could be finalized anytime
1545  * after this point by the garbage collector (cgc fom).
1546  * Setting it to NULL below prevents any invalid memory
1547  * access during cas fom cleanup later.
1548  */
1549  m0_long_unlock(m0_ctg_lock(fom->cf_ctg),
1550  &fom->cf_lock);
1551  fom->cf_ctg = NULL;
1552  } else {
1553  M0_LOG(M0_DEBUG, "lookup in meta failed");
1554  fom->cf_moved_ctgs[ipos] = NULL;
1556  }
1557  break;
1558  case CAS_DELETE_FROM_META:
1559  rc = m0_ctg_op_rc(ctg_op);
1560  if (rc == 0) {
1561  m0_ctg_op_fini(ctg_op);
1562  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1563  do_ctidx = cas_ctidx_op_needed(fom, opc, ct, ipos);
1564  m0_ctg_meta_delete(ctg_op,
1565  &fom->cf_in_cids[ipos].ci_fid,
1566  do_ctidx ? CAS_CTIDX :
1567  CAS_IDROP_LOOP);
1568  } else {
1569  M0_LOG(M0_DEBUG, "insert to meta failed");
1571  }
1572  break;
1573  case CAS_IDROP_LOOP:
1574  rc = m0_ctg_op_rc(ctg_op);
1575  fom->cf_ipos = ++ipos;
1576  if (ipos < op->cg_rec.cr_nr)
1577  m0_fom_phase_set(fom0, CAS_LOOP);
1578  else {
1579  m0_ctg_op_fini(ctg_op);
1580  /*
1581  * Moved all records from meta to
1582  * dead_index. Now can unlock meta catalogues and
1583  * wait until dropping indices are not used.
1584  */
1585  m0_long_unlock(m0_ctg_lock(meta), &fom->cf_meta);
1586  m0_long_unlock(m0_ctg_lock(ctidx), &fom->cf_ctidx);
1587  fom->cf_ipos = 0;
1588  m0_fom_phase_set(fom0, rc == 0 ? CAS_IDROP_LOCK_LOOP :
1590  }
1591  break;
1592  case CAS_IDROP_LOCK_LOOP:
1593  if (ipos == op->cg_rec.cr_nr) {
1594  /*
1595  * Unlock dead index, so index garbage collector (if
1596  * running) can safely proceed with newly added indices
1597  * in "dead index" catalogue.
1598  */
1600  &fom->cf_dead_index);
1602  } else if (fom->cf_moved_ctgs[ipos] != NULL) {
1603  /*
1604  * Actually we do not need this ctg_op, but in CAS_DONE
1605  * state we free it. Let's do not change it.
1606  */
1607  m0_ctg_op_init(&fom->cf_ctg_op, fom0, 0);
1608  /*
1609  * Lock dropping index to be sure nobody keep
1610  * using it.
1611  */
1612  result = m0_long_write_lock(
1613  m0_ctg_lock(fom->cf_moved_ctgs[ipos]),
1614  &fom->cf_lock,
1616  result = M0_FOM_LONG_LOCK_RETURN(result);
1617  } else
1618  m0_fom_phase_set(fom0, CAS_LOOP);
1619  break;
1620  case CAS_IDROP_LOCKED:
1621  /*
1622  * Now can unlock: index is not visible any more and nobody use
1623  * it for sure.
1624  */
1625  m0_long_unlock(m0_ctg_lock(fom->cf_moved_ctgs[ipos]),
1626  &fom->cf_lock);
1628  break;
1629  case CAS_IDROP_START_GC:
1630  /* Start garbage collector, if it is not already running. */
1631  m0_cas_gc_start(&service->c_service);
1632  cas_fom_success(fom, opc);
1633  break;
1634  /*
1635  * End of states specific to index drop.
1636  */
1637 
1638  case CAS_PREPARE_SEND:
1639  next_phase = CAS_DONE;
1640  M0_ASSERT(fom->cf_opos < rep->cgr_rep.cr_nr);
1641  rec = cas_out_at(rep, fom->cf_opos);
1642  M0_ASSERT(rec != NULL);
1643  m0_ctg_op_get_ver(ctg_op,
1644  &cas_out_at(rep, fom->cf_opos)->cr_ver);
1645  if (rec->cr_rc == 0) {
1646  rec->cr_rc = m0_ctg_op_rc(ctg_op);
1647  if (rec->cr_rc == 0) {
1648  if (cas_key_need_to_send(fom, opc, ct, op,
1649  ipos)){
1650  rec->cr_rc =
1651  cas_prep_send(fom, opc, ct);
1652  if (rec->cr_rc == 0)
1653  next_phase = CAS_SEND_KEY;
1654  } else {
1655  if (opc == CO_CUR)
1656  fom->cf_curpos++;
1657  next_phase = CAS_LOOP;
1658  }
1659  }
1660  }
1661  m0_fom_phase_set(fom0, next_phase);
1662  break;
1663  case CAS_SEND_KEY:
1664  if (opc == CO_CUR)
1665  result = cas_key_send(fom, op, opc, rep, CAS_KEY_SENT);
1666  else
1668  break;
1669  case CAS_KEY_SENT:
1670  rec = cas_out_at(rep, fom->cf_opos);
1671  rec->cr_rc = m0_rpc_at_reply_rc(&rec->cr_key);
1672  /*
1673  * Try to send value even if a key is not sent successfully.
1674  * It's necessary to return proper reply in case if bulk
1675  * transmission is required, but the user sent empty AT buffer.
1676  */
1678  break;
1679  case CAS_SEND_VAL:
1680  if (ct == CT_BTREE && M0_IN(opc, (CO_GET, CO_CUR)))
1681  result = cas_val_send(fom, op, opc, rep, CAS_VAL_SENT);
1682  else
1683  m0_fom_phase_set(fom0, CAS_DONE);
1684  break;
1685  case CAS_VAL_SENT:
1686  rec = cas_out_at(rep, fom->cf_opos);
1687  rec->cr_rc = m0_rpc_at_reply_rc(&rec->cr_val);
1688  m0_fom_phase_set(fom0, CAS_DONE);
1689  break;
1690  case CAS_DONE:
1691  if (cas_done(fom, op, rep, opc) == 0 && is_index_drop)
1693  else
1694  m0_fom_phase_set(fom0, CAS_LOOP);
1695  break;
1696  case CAS_DTM0:
1697  rc = cas_dtm0_logrec_add(&fom->cf_fom,
1699  if (rc != 0)
1700  cas_fom_failure(fom, M0_ERR(rc), opc == CO_CUR);
1701  else
1702  cas_fom_success(fom, opc);
1703  break;
1704  default:
1705  M0_IMPOSSIBLE("Invalid phase");
1706  }
1707 
1709  return M0_RC(result);
1710 }
1711 
1712 M0_INTERNAL void (*cas__ut_cb_done)(struct m0_fom *fom);
1713 M0_INTERNAL void (*cas__ut_cb_fini)(struct m0_fom *fom);
1714 
1715 static void cas_fom_fini(struct m0_fom *fom0)
1716 {
1717  struct m0_cas_rec *rec;
1718  struct m0_cas_op *op = cas_op(fom0);
1719  struct cas_fom *fom = M0_AMB(fom, fom0, cf_fom);
1720  uint64_t i;
1721 
1722  for (i = 0; i < op->cg_rec.cr_nr; i++) {
1723  rec = cas_at(op, i);
1724 
1725  /* Finalise input AT buffers. */
1726  cas_at_fini(&rec->cr_key);
1727  cas_at_fini(&rec->cr_val);
1728  }
1729 
1730  if (cas_in_ut() && cas__ut_cb_done != NULL)
1731  cas__ut_cb_done(fom0);
1732 
1733  for (i = 0; i < fom->cf_in_cids_nr; i++)
1734  m0_cas_id_fini(&fom->cf_in_cids[i]);
1735  m0_free(fom->cf_in_cids);
1736  m0_free(fom->cf_moved_ctgs);
1737  m0_free(fom->cf_ikv);
1738  m0_long_lock_link_fini(&fom->cf_meta);
1739  m0_long_lock_link_fini(&fom->cf_lock);
1740  m0_long_lock_link_fini(&fom->cf_ctidx);
1741  m0_long_lock_link_fini(&fom->cf_dead_index);
1742  m0_long_lock_link_fini(&fom->cf_del_lock);
1743  m0_fom_fini(fom0);
1744  m0_free(fom);
1745  if (cas_in_ut() && cas__ut_cb_fini != NULL)
1746  cas__ut_cb_fini(fom0);
1747 }
1748 
1749 static const struct m0_fid *cas_fid(const struct m0_fom *fom)
1750 {
1751  return &cas_op(fom)->cg_id.ci_fid;
1752 }
1753 
1754 static size_t cas_fom_home_locality(const struct m0_fom *fom)
1755 {
1756  static uint64_t loc = 0;
1757 
1758  return loc++;
1759 }
1760 
1761 static struct m0_cas_op *cas_op(const struct m0_fom *fom)
1762 {
1763  return m0_fop_data(fom->fo_fop);
1764 }
1765 
1766 static enum m0_cas_opcode m0_cas_opcode(const struct m0_fop *fop)
1767 {
1768  enum m0_cas_opcode opcode;
1769 
1771  M0_ASSERT(0 <= opcode && opcode < CO_NR);
1772  return opcode;
1773 }
1774 
1775 static int cas_sdev_state(struct m0_poolmach *pm,
1776  uint32_t sdev_idx,
1777  enum m0_pool_nd_state *state_out)
1778 {
1779  int i;
1780 
1781  if (M0_FI_ENABLED("sdev_fail")) {
1782  *state_out = M0_PNDS_FAILED;
1783  return 0;
1784  }
1785  for (i = 0; i < pm->pm_state->pst_nr_devices; i++) {
1786  struct m0_pooldev *sdev = &pm->pm_state->pst_devices_array[i];
1787 
1788  if (sdev->pd_sdev_idx == sdev_idx) {
1789  *state_out = sdev->pd_state;
1790  return 0;
1791  }
1792  }
1793  return M0_ERR(-EINVAL);
1794 }
1795 
1803 static int cas_device_check(const struct cas_fom *fom,
1804  const struct m0_cas_id *cid)
1805 {
1806  uint32_t device_id;
1807  enum m0_pool_nd_state state;
1808  struct m0_pool_version *pver;
1809  struct m0_poolmach *pm;
1810  struct cas_service *svc = M0_AMB(svc, fom->cf_fom.fo_service,
1811  c_service);
1812  struct m0_pools_common *pc = svc->c_service.rs_reqh->rh_pools;
1813  int rc = 0;
1814 
1815  /*
1816  * For some unit tests it's ok when pools is NULL, skip checking of
1817  * device in this case.
1818  */
1819  if (cas_fid_is_cctg(&cid->ci_fid) && (!cas_in_ut() || pc != NULL)) {
1820  device_id = m0_dix_fid_cctg_device_id(&cid->ci_fid);
1822  &cid->ci_layout.u.dl_desc.ld_pver);
1823  if (pver != NULL) {
1824  pm = &pver->pv_mach;
1825  rc = cas_sdev_state(pm, device_id, &state);
1826  if (rc == 0 && !M0_IN(state, (M0_PNDS_ONLINE,
1828  rc = M0_ERR(-EBADFD);
1829  } else
1830  rc = M0_ERR(-EINVAL);
1831  }
1832  return M0_RC(rc);
1833 }
1834 
1835 static int cas_id_check(const struct m0_cas_id *cid)
1836 {
1837  const struct m0_dix_layout *layout;
1838  int rc = 0;
1839 
1840  if (!m0_fid_is_valid(&cid->ci_fid) ||
1842  &m0_cctg_fid_type)))
1843  rc = M0_ERR(-EPROTO);
1844 
1845  if (rc == 0 && cas_fid_is_cctg(&cid->ci_fid)) {
1846  layout = &cid->ci_layout;
1847  if (layout->dl_type != DIX_LTYPE_DESCR)
1848  rc = M0_ERR(-EPROTO);
1849  }
1850  return rc;
1851 }
1852 
1853 static int cas_op_check(struct m0_cas_op *op,
1854  struct cas_fom *fom,
1855  bool is_index_drop)
1856 {
1857  struct m0_fom *fom0 = &fom->cf_fom;
1858  enum m0_cas_opcode opc = m0_cas_opcode(fom0->fo_fop);
1859  enum m0_cas_type ct = cas_type(fom0);
1860  bool is_meta = ct == CT_META;
1861  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
1862  const struct m0_cas_id *cid = &op->cg_id;
1863  uint32_t flags = cas_op(fom0)->cg_flags;
1864  struct m0_buf buf;
1865  const struct m0_dix_layout *layout;
1866  struct m0_dix_layout *stored_layout;
1867  int rc = 0;
1868 
1869  if (cas_fid_is_cctg(&cid->ci_fid)) {
1870  rc = m0_ctg_op_rc(ctg_op);
1871  if (rc == 0) {
1872  m0_ctg_lookup_result(ctg_op, &buf);
1873  layout = &cid->ci_layout;
1874  stored_layout = (struct m0_dix_layout *)buf.b_addr;
1875  /* Match stored and received layouts. */
1876  if (!m0_dix_layout_eq(layout, stored_layout))
1877  rc = M0_ERR(-EKEYEXPIRED);
1878  } else if (rc == -ENOENT && (flags & COF_CROW))
1879  /*
1880  * It's OK to not find layout with CROW flag set,
1881  * because the catalogue may be not created yet.
1882  */
1883  rc = 0;
1884  m0_ctg_op_fini(ctg_op);
1885  }
1886 
1887  if (rc == 0) {
1888  rc = cas_device_check(fom, &op->cg_id);
1889  if (rc == 0 && is_meta && fom->cf_ikv_nr != 0) {
1890  M0_ALLOC_ARR(fom->cf_in_cids, fom->cf_ikv_nr);
1891  if (fom->cf_in_cids == NULL)
1892  rc = -ENOMEM;
1893  if (is_index_drop) {
1894  /*
1895  * Store here pointers to records in dead_index
1896  * to be able to lock dropped indices. Need
1897  * to store fom->cf_ikv_nr records (== number of
1898  * keys to delete).
1899  */
1900  M0_ALLOC_ARR(fom->cf_moved_ctgs,
1901  fom->cf_ikv_nr);
1902  if (fom->cf_moved_ctgs == NULL)
1903  rc = -ENOMEM;
1904  }
1905  }
1906  }
1907  if (rc == 0)
1908  /*
1909  * Note: fill cf_in_cids there.
1910  */
1911  rc = cas_op_recs_check(fom, opc, ct, op);
1912 
1913  return M0_RC(rc);
1914 }
1915 
1916 static bool cas_is_valid(struct cas_fom *fom, enum m0_cas_opcode opc,
1917  enum m0_cas_type ct, const struct m0_cas_rec *rec,
1918  uint64_t rec_pos)
1919 {
1920  bool result;
1921  bool gotkey;
1922  bool gotval;
1923  bool meta = ct == CT_META;
1924 
1925  gotkey = m0_rpc_at_is_set(&rec->cr_key);
1926  gotval = m0_rpc_at_is_set(&rec->cr_val);
1927  switch (opc) {
1928  case CO_GET:
1929  case CO_DEL:
1930  result = gotkey && !gotval && rec->cr_rc == 0;
1931  break;
1932  case CO_PUT:
1933  result = gotkey && (gotval == !meta) && rec->cr_rc == 0;
1934  break;
1935  case CO_CUR:
1936  result = gotkey && !gotval;
1937  break;
1938  case CO_REP:
1939  result = !gotval == (((int64_t)rec->cr_rc) < 0 || meta);
1940  break;
1941  case CO_GC:
1942  case CO_MIN:
1943  case CO_TRUNC:
1944  case CO_DROP:
1945  result = true;
1946  break;
1947  default:
1948  M0_IMPOSSIBLE("Wrong opcode.");
1949  }
1950  if (meta && gotkey && result) {
1951  int rc;
1952  struct m0_cas_id cid = {};
1953  struct m0_buf key;
1954  const struct m0_dix_imask *imask;
1955 
1956  /*
1957  * Valid key is sent inline always, so result
1958  * should be 0.
1959  * Key is encoded m0_cas_id in meta case.
1960  */
1961  rc = m0_rpc_at_get(&rec->cr_key, &key) ?:
1962  cas_buf_cid_decode(&key, &cid);
1963  if (rc == 0) {
1964  imask = &cid.ci_layout.u.dl_desc.ld_imask;
1965 
1966  result = m0_fid_is_valid(&cid.ci_fid) &&
1967  M0_IN(m0_fid_type_getfid(&cid.ci_fid),
1969  &m0_cctg_fid_type));
1970  if (result) {
1971  if (cas_fid_is_cctg(&cid.ci_fid))
1972  result = (imask->im_range == NULL) ==
1973  (imask->im_nr == 0) &&
1974  cas_device_check(fom, &cid) == 0;
1975  else
1976  result = (imask->im_range == NULL &&
1977  imask->im_nr == 0);
1978  }
1979  } else
1980  result = false;
1981 
1982  if (result) {
1983  fom->cf_in_cids[rec_pos] = cid;
1984  fom->cf_in_cids_nr++;
1985  }
1986  }
1987  return M0_RC(result);
1988 }
1989 
1990 static bool cas_is_ro(enum m0_cas_opcode opc)
1991 {
1992  return M0_IN(opc, (CO_GET, CO_CUR, CO_REP));
1993 }
1994 
1995 static enum m0_cas_type cas_type(const struct m0_fom *fom)
1996 {
1998  return CT_META;
1999  else
2000  return CT_BTREE;
2001 }
2002 
2003 static uint64_t cas_in_nr(const struct m0_fop *fop)
2004 {
2005  const struct m0_cas_op *op = m0_fop_data(fop);
2006 
2007  return op->cg_rec.cr_nr;
2008 }
2009 
2010 static uint64_t cas_out_nr(const struct m0_fop *fop)
2011 {
2012  const struct m0_cas_op *op = m0_fop_data(fop);
2013  uint64_t nr;
2014 
2015  nr = op->cg_rec.cr_nr;
2016  if (m0_cas_opcode(fop) == CO_CUR)
2017  nr = m0_reduce(i, nr, 0, + op->cg_rec.cr_rec[i].cr_rc);
2018  return nr;
2019 }
2020 
2021 static int cas_buf_cid_decode(struct m0_buf *enc_buf,
2022  struct m0_cas_id *cid)
2023 {
2024  M0_PRE(enc_buf != NULL);
2025  M0_PRE(cid != NULL);
2026 
2027  M0_PRE(M0_IS0(cid));
2028 
2030  &M0_XCODE_OBJ(m0_cas_id_xc, cid),
2031  enc_buf->b_addr, enc_buf->b_nob);
2032 }
2033 
2034 static bool cas_fid_is_cctg(const struct m0_fid *fid)
2035 {
2036  M0_PRE(fid != NULL);
2038 }
2039 
2040 static int cas_place(struct m0_buf *dst, struct m0_buf *src, m0_bcount_t cutoff)
2041 {
2042  int result = 0;
2043 
2044  if (M0_FI_ENABLED("place_fail"))
2045  return M0_ERR(-ENOMEM);
2046 
2047  if (src->b_nob >= cutoff) {
2048  dst->b_addr = m0_alloc_aligned(src->b_nob, m0_pageshift_get());
2049  if (dst->b_addr == NULL)
2050  return M0_ERR(-ENOMEM);
2051  dst->b_nob = src->b_nob;
2052  memcpy(dst->b_addr, src->b_addr, src->b_nob);
2053  } else {
2054  result = m0_buf_copy(dst, src);
2055  }
2056  return M0_RC(result);
2057 }
2058 
2066 static m0_bcount_t cas_kv_nob(const struct m0_buf *inbuf)
2067 {
2068  return inbuf->b_nob + sizeof(uint64_t);
2069 }
2070 
2071 static void cas_prep(struct cas_fom *fom, enum m0_cas_opcode opc,
2072  enum m0_cas_type ct, struct m0_cas_ctg *ctg,
2073  uint64_t rec_pos, struct m0_be_tx_credit *accum)
2074 {
2075  struct m0_fom *fom0 = &fom->cf_fom;
2076  uint32_t flags = cas_op(fom0)->cg_flags;
2077  struct m0_cas_id *cid;
2078  struct m0_buf key;
2079  struct m0_buf val;
2080  m0_bcount_t knob;
2081  m0_bcount_t vnob;
2082 
2083  cas_incoming_kv(fom, rec_pos, &key, &val);
2084  knob = cas_kv_nob(&key);
2085  vnob = cas_kv_nob(&val);
2086  switch (CTG_OP_COMBINE(opc, ct)) {
2087  case CTG_OP_COMBINE(CO_PUT, CT_META):
2088  case CTG_OP_COMBINE(CO_DEL, CT_META):
2089  M0_ASSERT(vnob == sizeof(uint64_t));
2090  cid = &fom->cf_in_cids[rec_pos];
2091  if (cas_fid_is_cctg(&cid->ci_fid)) {
2092  if (opc == CO_PUT)
2093  m0_ctg_ctidx_insert_credits(cid, accum);
2094  else
2095  m0_ctg_ctidx_delete_credits(cid, accum);
2096  }
2097  if (opc == CO_PUT)
2098  m0_ctg_create_credit(accum);
2099  else
2101  break;
2104  if (opc == CO_PUT) {
2105  if (flags & COF_OVERWRITE)
2106  /*
2107  * Consider credits for possible deletion of
2108  * existing key/value to be overwritten, size of
2109  * new value should be enough as current
2110  * key/value are deleted if size of existing
2111  * value is not enough to place new value.
2112  */
2113  m0_ctg_delete_credit(ctg, knob, vnob, accum);
2114  m0_ctg_insert_credit(ctg, knob, vnob, accum);
2115  } else
2116  m0_ctg_delete_credit(ctg, knob, vnob, accum);
2117  break;
2118  }
2119 }
2120 
2121 static struct m0_cas_rec *cas_at(struct m0_cas_op *op, int idx)
2122 {
2123  M0_PRE(0 <= idx && idx < op->cg_rec.cr_nr);
2124  return &op->cg_rec.cr_rec[idx];
2125 }
2126 
2127 static struct m0_cas_rec *cas_out_at(const struct m0_cas_rep *rep, int idx)
2128 {
2129  M0_PRE(0 <= idx && idx < rep->cgr_rep.cr_nr);
2130  return &rep->cgr_rep.cr_rec[idx];
2131 }
2132 
2133 static int cas_kv_load_done(struct cas_fom *fom, enum m0_cas_opcode opc,
2134  const struct m0_cas_op *op, int phase)
2135 
2136 {
2137  if (opc == CO_DEL && (op->cg_flags & COF_DEL_LOCK)) {
2138  /*
2139  * Repair or re-balance may be running, take the lock
2140  * to protect current component record/catalogue that is under
2141  * repair/re-balance from concurrent deletion by the client.
2142  */
2144  m0_ctg_del_lock(),
2145  &fom->cf_del_lock,
2146  phase)));
2147  }
2148 
2149  m0_fom_phase_set(&fom->cf_fom, phase);
2150  return M0_RC(M0_FSO_AGAIN);
2151 }
2152 
2153 static int cas_exec(struct cas_fom *fom, enum m0_cas_opcode opc,
2154  enum m0_cas_type ct, struct m0_cas_ctg *ctg,
2155  uint64_t rec_pos, int next)
2156 {
2157  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2158  struct m0_fom *fom0 = &fom->cf_fom;
2159  uint32_t flags = cas_op(fom0)->cg_flags;
2160  struct m0_buf lbuf = M0_BUF_INIT0;
2161  struct m0_buf kbuf;
2162  struct m0_buf vbuf;
2163  struct m0_cas_id *cid;
2164  struct m0_cas_rec *rec;
2166  M0_ENTRY("opc=%d ct=%d", opc, ct);
2167 
2168  cas_incoming_kv(fom, rec_pos, &kbuf, &vbuf);
2169  if (ct == CT_META)
2170  cid = &fom->cf_in_cids[rec_pos];
2171  else
2172  /*
2173  * Initialise cid to NULL to suppress strange compiler warning.
2174  * Cid is used only for meta operations, but compiler complains
2175  * that it may be uninitialised.
2176  */
2177  cid = NULL;
2178 
2179  if (opc != CO_CUR)
2180  m0_ctg_op_init(&fom->cf_ctg_op, fom0, flags);
2181 
2182  switch (CTG_OP_COMBINE(opc, ct)) {
2184  ret = m0_ctg_lookup(ctg_op, ctg, &kbuf, next);
2185  break;
2187  ret = m0_ctg_insert(ctg_op, ctg, &kbuf, &vbuf, next);
2188  break;
2190  ret = m0_ctg_lookup_delete(ctg_op, ctg, &kbuf, &lbuf, flags, next);
2191  if (ctg_op->co_rc == 0) {
2192  rec = cas_at(cas_op(fom0), rec_pos);
2193 
2194  /*
2195  * Here @lbuf is allocated in m0_ctg_lookup_delete()
2196  * and released in cas_fom_fini().
2197  */
2198  m0_rpc_at_init(&rec->cr_val);
2200  rec->cr_val.u.ab_buf = lbuf;
2201  }
2202  break;
2203  case CTG_OP_COMBINE(CO_DEL, CT_META):
2204  /*
2205  * This is index drop. Move record from meta to dead_index.
2206  * First load record from meta, then insert into dead_index,
2207  * then delete from meta.
2208  * So currnt step is to fetch pointer to ctg from meta.
2209  */
2210  case CTG_OP_COMBINE(CO_GET, CT_META):
2211  ret = m0_ctg_meta_lookup(ctg_op, &cid->ci_fid, next);
2212  break;
2213  case CTG_OP_COMBINE(CO_PUT, CT_META):
2214  ret = m0_ctg_meta_insert(ctg_op, &cid->ci_fid, next);
2215  break;
2216  case CTG_OP_COMBINE(CO_GC, CT_META):
2217  ret = m0_ctg_gc_wait(ctg_op, next);
2218  break;
2220  case CTG_OP_COMBINE(CO_CUR, CT_META):
2221  if (fom->cf_curpos == 0) {
2222  if (!m0_ctg_cursor_is_initialised(ctg_op)) {
2223  if (ct == CT_META)
2224  m0_ctg_meta_cursor_init(ctg_op);
2225  else
2226  m0_ctg_cursor_init(ctg_op, ctg);
2227  }
2228  if (ct == CT_META)
2229  m0_ctg_meta_cursor_get(ctg_op, &cid->ci_fid,
2230  next);
2231  else
2232  m0_ctg_cursor_get(ctg_op, &kbuf, next);
2233  } else if (ct == CT_META)
2234  m0_ctg_meta_cursor_next(ctg_op, next);
2235  else
2236  m0_ctg_cursor_next(ctg_op, next);
2237  break;
2238  }
2239 
2240  return ret;
2241 }
2242 
2243 static bool cas_ctidx_op_needed(struct cas_fom *fom, enum m0_cas_opcode opc,
2244  enum m0_cas_type ct, uint64_t rec_pos)
2245 {
2246  struct m0_cas_id *cid;
2247  bool is_needed = false;
2248 
2249  switch (CTG_OP_COMBINE(opc, ct)) {
2250  case CTG_OP_COMBINE(CO_PUT, CT_META):
2251  case CTG_OP_COMBINE(CO_DEL, CT_META):
2252  cid = &fom->cf_in_cids[rec_pos];
2253  if (cas_fid_is_cctg(&cid->ci_fid))
2254  is_needed = true;
2255  break;
2256  default:
2257  break;
2258  }
2259 
2260  return is_needed;
2261 }
2262 
2263 static int cas_ctidx_mem_place(struct cas_fom *fom,
2264  const struct m0_cas_id *in_cid, int next)
2265 {
2266  const struct m0_dix_imask *imask;
2267  struct m0_fom *fom0 = &fom->cf_fom;
2268  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2270  struct m0_buf buf;
2271 
2272  imask = CID_IMASK_PTR(in_cid);
2273 
2274  if (!m0_dix_imask_is_empty(imask)) {
2275  m0_ctg_op_init(ctg_op, fom0, 0);
2276  buf.b_nob = imask->im_nr * sizeof(imask->im_range[0]);
2277  buf.b_addr = imask->im_range;
2278  ret = m0_ctg_mem_place(ctg_op, &buf, next);
2279  } else
2280  m0_fom_phase_set(fom0, next);
2281 
2282  return ret;
2283 }
2284 
2285 static int cas_ctidx_mem_free(struct cas_fom *fom, int next)
2286 {
2287  struct m0_dix_layout *layout;
2288  struct m0_dix_imask *imask;
2289  struct m0_fom *fom0 = &fom->cf_fom;
2290  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2292  bool free_called = false;
2293  struct m0_buf buf;
2294  int rc;
2295 
2296  rc = m0_ctg_op_rc(ctg_op);
2297  if (rc == 0) {
2298  m0_ctg_lookup_result(ctg_op, &buf);
2299  M0_ASSERT(buf.b_nob == sizeof(struct m0_dix_layout));
2300  layout = (struct m0_dix_layout *)buf.b_addr;
2301  imask = LAYOUT_IMASK_PTR(layout);
2302  if (!m0_dix_imask_is_empty(imask)) {
2303  m0_ctg_op_fini(ctg_op);
2304  m0_ctg_op_init(ctg_op, fom0, 0);
2305  ret = m0_ctg_mem_free(ctg_op, imask->im_range,
2307  free_called = true;
2308  }
2309  }
2310  if (rc != 0 || !free_called)
2311  m0_fom_phase_set(fom0, next);
2312  return ret;
2313 }
2314 
2315 static int cas_ctidx_delete(struct cas_fom *fom, const struct m0_cas_id *in_cid,
2316  int next)
2317 {
2318  struct m0_fom *fom0 = &fom->cf_fom;
2319  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2321  struct m0_buf kbuf;
2322 
2323  if (m0_ctg_op_rc(ctg_op) == 0) {
2324  m0_ctg_op_fini(ctg_op);
2325  m0_ctg_op_init(ctg_op, fom0, 0);
2326  /* The key is a component catalogue FID. */
2327  kbuf = M0_BUF_INIT_PTR_CONST(&in_cid->ci_fid);
2328  ret = m0_ctg_delete(ctg_op, m0_ctg_ctidx(), &kbuf, next);
2329  } else
2330  m0_fom_phase_set(fom0, next);
2331  return ret;
2332 }
2333 
2334 static int cas_ctidx_lookup(struct cas_fom *fom, const struct m0_cas_id *in_cid,
2335  int next)
2336 {
2337  struct m0_fom *fom0 = &fom->cf_fom;
2338  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2339  struct m0_buf kbuf;
2340 
2341  m0_ctg_op_init(ctg_op, fom0, 0);
2342  /* The key is a component catalogue FID. */
2343  kbuf = M0_BUF_INIT_PTR_CONST(&in_cid->ci_fid);
2344  return m0_ctg_lookup(ctg_op, m0_ctg_ctidx(), &kbuf, next);
2345 }
2346 
2347 static int cas_ctidx_insert(struct cas_fom *fom, const struct m0_cas_id *in_cid,
2348  int next)
2349 {
2350  struct m0_fom *fom0 = &fom->cf_fom;
2351  struct m0_cas_id cid_copy = *in_cid;
2352  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2353  bool ctg_op_ok = true;
2354  int ret = M0_FSO_AGAIN;
2355  struct m0_dix_imask *imask;
2356  struct m0_buf kbuf;
2357  struct m0_buf vbuf;
2358  struct m0_buf mbuf;
2359 
2360  imask = CID_IMASK_PTR(&cid_copy);
2361  if (!m0_dix_imask_is_empty(imask)) {
2362  if (m0_ctg_op_rc(ctg_op) == 0) {
2363  m0_ctg_mem_place_get(ctg_op, &mbuf);
2364  m0_ctg_op_fini(ctg_op);
2365  M0_ASSERT(imask->im_nr ==
2366  mbuf.b_nob / sizeof(imask->im_range[0]));
2367  /*
2368  * Substitude imask range array with its BE copy before
2369  * insertion into ctidx tree.
2370  */
2371  imask->im_range = (struct m0_ext *)mbuf.b_addr;
2372  } else
2373  ctg_op_ok = false;
2374  }
2375 
2376  if (ctg_op_ok == true) {
2377  m0_ctg_op_init(ctg_op, fom0, 0);
2378  /* The key is a component catalogue FID. */
2379  kbuf = M0_BUF_INIT_PTR(&cid_copy.ci_fid);
2380  vbuf = M0_BUF_INIT_PTR(&cid_copy.ci_layout);
2381  ret = m0_ctg_insert(ctg_op, m0_ctg_ctidx(), &kbuf, &vbuf, next);
2382  }
2383 
2384  return ret;
2385 }
2386 
2387 static m0_bcount_t cas_rpc_cutoff(const struct cas_fom *fom)
2388 {
2389  return cas_in_ut() ? m0_pagesize_get() :
2390  m0_fop_rpc_machine(fom->cf_fom.fo_fop)->rm_bulk_cutoff;
2391 }
2392 
2393 static int cas_prep_send(struct cas_fom *fom, enum m0_cas_opcode opc,
2394  enum m0_cas_type ct)
2395 {
2396  uint64_t rc = 0;
2397  struct m0_buf key;
2398  struct m0_buf val;
2399  struct m0_ctg_op *ctg_op = &fom->cf_ctg_op;
2400  m0_bcount_t rpc_cutoff = cas_rpc_cutoff(fom);
2401 
2402  switch (CTG_OP_COMBINE(opc, ct)) {
2404  m0_ctg_lookup_result(ctg_op, &val);
2405  rc = cas_place(&fom->cf_out_val, &val, rpc_cutoff);
2406  break;
2407  case CTG_OP_COMBINE(CO_GET, CT_META):
2408  case CTG_OP_COMBINE(CO_DEL, CT_META):
2411  case CTG_OP_COMBINE(CO_PUT, CT_META):
2412  /* Nothing to do: return code is all the user gets. */
2413  break;
2415  case CTG_OP_COMBINE(CO_CUR, CT_META):
2416  m0_ctg_cursor_kv_get(ctg_op, &key, &val);
2417  rc = cas_place(&fom->cf_out_key, &key, rpc_cutoff);
2418  if (ct == CT_BTREE && rc == 0)
2419  rc = cas_place(&fom->cf_out_val, &val,
2420  rpc_cutoff);
2421  break;
2422  }
2423 
2424  return rc;
2425 }
2426 
2427 static int cas_done(struct cas_fom *fom, struct m0_cas_op *op,
2428  struct m0_cas_rep *rep, enum m0_cas_opcode opc)
2429 {
2430  struct m0_cas_rec *rec_out;
2431  struct m0_cas_rec *rec;
2432  int ctg_rc = m0_ctg_op_rc(&fom->cf_ctg_op);
2433  int rc;
2434 
2435  M0_ASSERT(fom->cf_ipos < op->cg_rec.cr_nr);
2436  rec_out = cas_out_at(rep, fom->cf_opos);
2437  rec = cas_at(op, fom->cf_ipos);
2438 
2439  rc = rec_out->cr_rc;
2440  if (opc == CO_CUR) {
2441  fom->cf_curpos++;
2442  if (rc == 0 && ctg_rc == 0)
2443  rc = fom->cf_startkey_excluded ?
2444  fom->cf_curpos - 1 : fom->cf_curpos;
2445  if (ctg_rc == 0 &&
2446  ((fom->cf_curpos < rec->cr_rc) ||
2447  (fom->cf_startkey_excluded &&
2448  (fom->cf_curpos < rec->cr_rc + 1)))) {
2449  /* Continue with the same iteration. */
2450  --fom->cf_ipos;
2451  } else {
2452  /*
2453  * End the iteration on ctg cursor error because it
2454  * doesn't make sense to continue with broken iterator.
2455  */
2456  m0_ctg_cursor_put(&fom->cf_ctg_op);
2457  fom->cf_curpos = 0;
2458  fom->cf_startkey_excluded = false;
2459  }
2460  } else
2461  m0_ctg_op_fini(&fom->cf_ctg_op);
2462 
2463  ++fom->cf_ipos;
2464  ++fom->cf_opos;
2465  /*
2466  * Overwrite return code of put operation if key is already exist and
2467  * COF_CREATE is set or overwrite return code of del operation if key
2468  * is not found and COF_CROW is set.
2469  */
2470  if ((opc == CO_PUT && rc == -EEXIST && (op->cg_flags & COF_CREATE)) ||
2471  (opc == CO_DEL && rc == -ENOENT && (op->cg_flags & COF_CROW)))
2472  rc = 0;
2473 
2474  M0_LOG(M0_DEBUG, "pos: %" PRId64 " rc: %d", fom->cf_opos, rc);
2475  rec_out->cr_rc = rc;
2476 
2477  /*
2478  * Out buffers are passed to RPC AT layer. They will be deallocated
2479  * automatically as part of a reply FOP.
2480  */
2481  fom->cf_out_key = M0_BUF_INIT0;
2482  fom->cf_out_val = M0_BUF_INIT0;
2483 
2484  return rec_out->cr_rc;
2485 }
2486 
2487 static int cas_ctg_crow_fop_buf_prepare(const struct m0_cas_id *cid,
2488  struct m0_rpc_at_buf *at_buf)
2489 {
2490  struct m0_buf buf;
2491  int rc;
2492 
2493  M0_PRE(cid != NULL);
2494  M0_PRE(at_buf != NULL);
2495 
2496  m0_rpc_at_init(at_buf);
2497  rc = m0_xcode_obj_enc_to_buf(&M0_XCODE_OBJ(m0_cas_id_xc,
2498  (struct m0_cas_id *)cid),
2499  &buf.b_addr, &buf.b_nob);
2500  if (rc == 0) {
2501  at_buf->ab_type = M0_RPC_AT_INLINE;
2502  at_buf->u.ab_buf = buf;
2503  }
2504  return rc;
2505 }
2506 
2507 static int cas_ctg_crow_fop_create(const struct m0_cas_id *cid,
2508  struct m0_fop **out)
2509 {
2510  struct m0_cas_op *op;
2511  struct m0_cas_rec *rec;
2512  struct m0_fop *fop;
2513  int rc = 0;
2514 
2515  *out = NULL;
2516 
2517  M0_ALLOC_PTR(op);
2518  M0_ALLOC_PTR(rec);
2519  M0_ALLOC_PTR(fop);
2520  if (op == NULL || rec == NULL || fop == NULL)
2521  rc = -ENOMEM;
2522  if (rc == 0) {
2523  rc = cas_ctg_crow_fop_buf_prepare(cid, &rec->cr_key);
2524  if (rc == 0) {
2525  op->cg_id.ci_fid = m0_cas_meta_fid;
2526  op->cg_rec.cr_nr = 1;
2527  op->cg_rec.cr_rec = rec;
2529  *out = fop;
2530  }
2531  }
2532  if (rc != 0) {
2533  m0_free(op);
2534  m0_free(rec);
2535  m0_free(fop);
2536  }
2537  return rc;
2538 }
2539 
2541  struct m0_fom *serf)
2542 {
2543  struct cas_fom *leader = M0_AMB(leader, thrall, cf_thrall);
2544  struct m0_cas_rep *rep;
2545  int rc;
2546 
2547  rc = m0_fom_rc(serf);
2548  if (rc == 0) {
2549  M0_ASSERT(serf->fo_rep_fop != NULL);
2550  rep = (struct m0_cas_rep *)m0_fop_data(serf->fo_rep_fop);
2551  M0_ASSERT(rep != NULL);
2552  rc = rep->cgr_rc;
2553  if (rc == 0) {
2554  M0_ASSERT(rep->cgr_rep.cr_nr == 1);
2555  rc = rep->cgr_rep.cr_rec[0].cr_rc;
2556  }
2557  }
2558  leader->cf_thrall_rc = rc;
2559 }
2560 
2561 static void cas_addb2_fom_to_crow_fom(const struct m0_fom *fom0,
2562  const struct m0_fom *crow_fom0)
2563 {
2564  uint64_t fom0_sm_id = m0_sm_id_get(&fom0->fo_sm_phase);
2565  uint64_t crow_fom0_sm_id = m0_sm_id_get(&crow_fom0->fo_sm_phase);
2566 
2567  M0_ADDB2_ADD(M0_AVI_CAS_FOM_TO_CROW_FOM, fom0_sm_id, crow_fom0_sm_id);
2568 }
2569 
2570 M0_INTERNAL int m0_cas_fom_spawn(
2571  struct m0_fom *lead,
2572  struct m0_fom_thralldom *thrall,
2573  struct m0_fop *cas_fop,
2574  void (*on_fom_complete)(struct m0_fom_thralldom *,
2575  struct m0_fom *))
2576 {
2577  struct m0_fom *new_fom0;
2578  struct m0_reqh *reqh;
2579  struct m0_rpc_machine *mach;
2580  int rc;
2581 
2582  reqh = lead->fo_service->rs_reqh;
2583  mach = m0_reqh_rpc_mach_tlist_head(&reqh->rh_rpc_machines);
2584  m0_fop_rpc_machine_set(cas_fop, mach);
2585  cas_fop->f_item.ri_session = lead->fo_fop->f_item.ri_session;
2586  rc = cas_fom_create(cas_fop, &new_fom0, reqh);
2587  if (rc == 0) {
2588  new_fom0->fo_local = true;
2589  m0_fom_enthrall(lead,
2590  new_fom0,
2591  thrall,
2592  on_fom_complete);
2593  m0_fom_queue(new_fom0);
2594  cas_addb2_fom_to_crow_fom(lead, new_fom0);
2595  }
2596  /*
2597  * New FOM got reference to FOP, release ref counter as this
2598  * FOP is not needed here.
2599  */
2600  m0_fop_put_lock(cas_fop);
2601 
2602  return M0_RC(rc);
2603 }
2604 
2605 static int cas_ctg_crow_handle(struct cas_fom *fom, const struct m0_cas_id *cid)
2606 {
2607  struct m0_fop *fop;
2608  int rc;
2609 
2610  /*
2611  * Create fop to create component catalogue. For a new CAS FOM this FOP
2612  * will appear as arrived from the network. FOP will be deallocated by a
2613  * new CAS FOM.
2614  */
2615  rc = cas_ctg_crow_fop_create(cid, &fop) ?:
2616  m0_cas_fom_spawn(&fom->cf_fom,
2617  &fom->cf_thrall,
2619  return rc;
2620 }
2621 
2622 static bool cas_fom_invariant(const struct cas_fom *fom)
2623 {
2624  const struct m0_fom *fom0 = &fom->cf_fom;
2625  int phase = m0_fom_phase(fom0);
2626  struct m0_cas_op *op = cas_op(fom0);
2627  struct cas_service *service = M0_AMB(service,
2628  fom0->fo_service, c_service);
2629 
2630  return _0C(ergo(phase > M0_FOPH_INIT && phase != M0_FOPH_FAILURE,
2631  fom->cf_ipos <= op->cg_rec.cr_nr)) &&
2632  _0C(phase <= CAS_NR);
2633 }
2634 
2635 static void cas_fom_addb2_descr(struct m0_fom *fom)
2636 {
2637  struct m0_cas_op *op = cas_op(fom);
2638  struct m0_cas_rec *rec;
2639  int i;
2640 
2641  for (i = 0; i < op->cg_rec.cr_nr; i++) {
2642  rec = cas_at(op, i);
2643  M0_ADDB2_ADD(M0_AVI_CAS_KV_SIZES, FID_P(&op->cg_id.ci_fid),
2644  m0_rpc_at_len(&rec->cr_key),
2645  m0_rpc_at_len(&rec->cr_val));
2646  }
2647 }
2648 
2649 static const struct m0_fom_ops cas_fom_ops = {
2650  .fo_tick = &cas_fom_tick,
2651  .fo_home_locality = &cas_fom_home_locality,
2652  .fo_fini = &cas_fom_fini,
2653  .fo_addb2_descr = &cas_fom_addb2_descr
2654 };
2655 
2656 static const struct m0_fom_type_ops cas_fom_type_ops = {
2658 };
2659 
2660 static struct m0_sm_state_descr cas_fom_phases[] = {
2661  [CAS_CHECK_PRE] = {
2662  .sd_name = "cas-op-check-prepare",
2663  .sd_allowed = M0_BITS(CAS_CHECK, M0_FOPH_FAILURE)
2664  },
2665  [CAS_CHECK] = {
2666  .sd_name = "cas-op-check",
2667  .sd_allowed = M0_BITS(M0_FOPH_INIT, M0_FOPH_FAILURE)
2668  },
2669  [CAS_START] = {
2670  .sd_name = "start",
2671  .sd_allowed = M0_BITS(CAS_META_LOCK, CAS_LOAD_KEY)
2672  },
2673  [CAS_META_LOCK] = {
2674  .sd_name = "meta-lock",
2675  .sd_allowed = M0_BITS(CAS_META_LOOKUP)
2676  },
2677  [CAS_META_LOOKUP] = {
2678  .sd_name = "meta-lookup",
2680  },
2681  [CAS_META_LOOKUP_DONE] = {
2682  .sd_name = "meta-lookup-done",
2683  .sd_allowed = M0_BITS(CAS_CTG_CROW_DONE, CAS_LOAD_KEY,
2685  },
2686  [CAS_CTG_CROW_DONE] = {
2687  .sd_name = "ctg-crow-done",
2688  .sd_allowed = M0_BITS(CAS_START, M0_FOPH_FAILURE)
2689  },
2690  [CAS_LOCK] = {
2691  .sd_name = "lock",
2692  .sd_allowed = M0_BITS(CAS_CTIDX_LOCK, CAS_PREP)
2693  },
2694  [CAS_CTIDX_LOCK] = {
2695  .sd_name = "ctidx_lock",
2696  .sd_allowed = M0_BITS(CAS_PREP)
2697  },
2698  [CAS_LOAD_KEY] = {
2699  .sd_name = "load-key",
2700  .sd_allowed = M0_BITS(CAS_LOAD_VAL)
2701  },
2702  [CAS_LOAD_VAL] = {
2703  .sd_name = "load-value",
2704  .sd_allowed = M0_BITS(CAS_LOAD_DONE)
2705  },
2706  [CAS_LOAD_DONE] = {
2707  .sd_name = "load-done",
2708  .sd_allowed = M0_BITS(CAS_LOAD_KEY, CAS_LOCK,
2710  },
2711  [CAS_PREP] = {
2712  .sd_name = "prep",
2713  .sd_allowed = M0_BITS(M0_FOPH_TXN_OPEN, M0_FOPH_FAILURE)
2714  },
2715  [CAS_TXN_OPENED] = {
2716  .sd_name = "txn-opened",
2717  .sd_allowed = M0_BITS(CAS_META_UNLOCK, CAS_LOOP)
2718  },
2719  [CAS_META_UNLOCK] = {
2720  .sd_name = "meta-unlock",
2721  .sd_allowed = M0_BITS(CAS_LOOP)
2722  },
2723  [CAS_LOOP] = {
2724  .sd_name = "loop",
2725  .sd_allowed = M0_BITS(CAS_CTIDX, CAS_DTM0, CAS_INSERT_TO_DEAD,
2728  },
2729 
2730 
2731  [CAS_CTIDX] = {
2732  .sd_name = "ctidx",
2735  },
2736  [CAS_CTIDX_MEM_PLACE] = {
2737  .sd_name = "ctidx-im-range-alloc",
2738  .sd_allowed = M0_BITS(CAS_CTIDX_INSERT)
2739  },
2740  [CAS_CTIDX_INSERT] = {
2741  .sd_name = "ctidx-insert",
2742  .sd_allowed = M0_BITS(CAS_PREPARE_SEND, CAS_IDROP_LOOP)
2743  },
2744 
2745  [CAS_CTIDX_LOOKUP] = {
2746  .sd_name = "ctidx-lookup",
2747  .sd_allowed = M0_BITS(CAS_CTIDX_MEM_FREE)
2748  },
2749  [CAS_CTIDX_MEM_FREE] = {
2750  .sd_name = "ctidx-im-range-free",
2751  .sd_allowed = M0_BITS(CAS_PREPARE_SEND, CAS_CTIDX_DELETE)
2752  },
2753  [CAS_CTIDX_DELETE] = {
2754  .sd_name = "ctidx-delete",
2755  .sd_allowed = M0_BITS(CAS_PREPARE_SEND, CAS_IDROP_LOOP)
2756  },
2757 
2758 
2759 
2760  [CAS_DONE] = {
2761  .sd_name = "done",
2762  .sd_allowed = M0_BITS(CAS_LOOP, CAS_IDROP_LOCK_LOOP)
2763  },
2764  [CAS_PREPARE_SEND] = {
2765  .sd_name = "prep-send",
2766  .sd_allowed = M0_BITS(CAS_SEND_KEY, CAS_DONE, CAS_LOOP)
2767  },
2768  [CAS_SEND_KEY] = {
2769  .sd_name = "send-key",
2770  .sd_allowed = M0_BITS(CAS_KEY_SENT, CAS_SEND_VAL)
2771  },
2772  [CAS_KEY_SENT] = {
2773  .sd_name = "key-sent",
2774  .sd_allowed = M0_BITS(CAS_SEND_VAL)
2775  },
2776  [CAS_SEND_VAL] = {
2777  .sd_name = "send-val",
2778  .sd_allowed = M0_BITS(CAS_VAL_SENT, CAS_DONE)
2779  },
2780  [CAS_VAL_SENT] = {
2781  .sd_name = "val-sent",
2782  .sd_allowed = M0_BITS(CAS_DONE)
2783  },
2784  [CAS_DEAD_INDEX_LOCK] = {
2785  .sd_name = "dead-index-lock",
2786  .sd_allowed = M0_BITS(CAS_LOCK)
2787  },
2788  [CAS_INSERT_TO_DEAD] = {
2789  .sd_name = "insert-dead-index",
2791  },
2792  [CAS_DELETE_FROM_META] = {
2793  .sd_name = "detele-from-meta",
2794  .sd_allowed = M0_BITS(CAS_CTIDX, CAS_IDROP_LOOP,
2796  },
2797  [CAS_IDROP_LOOP] = {
2798  .sd_name = "index-drop-loop",
2799  .sd_allowed = M0_BITS(CAS_LOOP, CAS_IDROP_LOCK_LOOP,
2801  },
2802  [CAS_IDROP_LOCK_LOOP] = {
2803  .sd_name = "index-drop-lock-loop",
2805  CAS_LOOP)
2806  },
2807  [CAS_IDROP_LOCKED] = {
2808  .sd_name = "index-drop-locked",
2809  .sd_allowed = M0_BITS(CAS_PREPARE_SEND)
2810  },
2811  [CAS_IDROP_START_GC] = {
2812  .sd_name = "index-drop-start-gc",
2813  .sd_allowed = M0_BITS(M0_FOPH_SUCCESS)
2814  },
2815  [CAS_DTM0] = {
2816  .sd_name = "dtm0",
2817  .sd_allowed = M0_BITS(M0_FOPH_SUCCESS, M0_FOPH_FAILURE)
2818  },
2819 };
2820 
2823  { "cas-op-check-prepare", M0_FOPH_INIT, CAS_CHECK_PRE },
2824  { "cas-op-check", CAS_CHECK_PRE, CAS_CHECK },
2825  { "cas-op-check_pre_failed", CAS_CHECK_PRE, M0_FOPH_FAILURE },
2826  { "cas-op-checked", CAS_CHECK, M0_FOPH_INIT },
2827  { "cas-op-check-failed", CAS_CHECK, M0_FOPH_FAILURE },
2828  { "tx-initialised", M0_FOPH_TXN_OPEN, CAS_START },
2829  { "ctg-op?", CAS_START, CAS_META_LOCK },
2830  { "meta-op?", CAS_START, CAS_LOAD_KEY },
2831  { "meta-locked", CAS_META_LOCK, CAS_META_LOOKUP },
2832  { "meta-lookup-launched", CAS_META_LOOKUP, CAS_META_LOOKUP_DONE },
2833  { "key-alloc-failure", CAS_META_LOOKUP, M0_FOPH_FAILURE },
2834  { "meta-lookup-done", CAS_META_LOOKUP_DONE, CAS_LOAD_KEY },
2835  { "meta-lookup-fail", CAS_META_LOOKUP_DONE, M0_FOPH_FAILURE },
2836  { "ctg-crow-done", CAS_META_LOOKUP_DONE, CAS_CTG_CROW_DONE },
2837  { "ctg-crow-success", CAS_CTG_CROW_DONE, CAS_START },
2838  { "ctg-crow-fail", CAS_CTG_CROW_DONE, M0_FOPH_FAILURE },
2839  { "index-locked", CAS_LOCK, CAS_PREP },
2840  { "key-loaded", CAS_LOAD_KEY, CAS_LOAD_VAL },
2841  { "val-loaded", CAS_LOAD_VAL, CAS_LOAD_DONE },
2842  { "more-kv-to-load", CAS_LOAD_DONE, CAS_LOAD_KEY },
2843  { "meta-locked", CAS_LOCK, CAS_CTIDX_LOCK },
2844  { "ctidx-locked", CAS_CTIDX_LOCK, CAS_PREP },
2845  { "load-finished", CAS_LOAD_DONE, CAS_LOCK },
2846  { "load-finished-idrop", CAS_LOAD_DONE, CAS_DEAD_INDEX_LOCK },
2847  { "kv-setup-failure", CAS_LOAD_DONE, M0_FOPH_FAILURE },
2848  { "index-locked", CAS_LOCK, CAS_PREP },
2849  { "meta-locked", CAS_LOCK, CAS_CTIDX_LOCK },
2850  { "tx-credit-calculated", CAS_PREP, M0_FOPH_TXN_OPEN },
2851  { "keys-vals-invalid", CAS_PREP, M0_FOPH_FAILURE },
2852  { "txn-opened-ctg-op?", CAS_TXN_OPENED, CAS_META_UNLOCK },
2853  { "txn-opened-meta-op?", CAS_TXN_OPENED, CAS_LOOP },
2854  { "meta-unlocked", CAS_META_UNLOCK, CAS_LOOP },
2855  { "all-done?", CAS_LOOP, M0_FOPH_SUCCESS },
2856  { "reply-too_large", CAS_LOOP, M0_FOPH_FAILURE },
2857  { "do-ctidx-op", CAS_LOOP, CAS_CTIDX },
2858  { "op-launched", CAS_LOOP, CAS_PREPARE_SEND },
2859  { "do-dtm0-op", CAS_LOOP, CAS_DTM0 },
2860  { "ready-to-send", CAS_PREPARE_SEND, CAS_SEND_KEY },
2861  { "next-key", CAS_PREPARE_SEND, CAS_LOOP },
2862  { "prep-error", CAS_PREPARE_SEND, CAS_DONE },
2863  { "key-sent", CAS_SEND_KEY, CAS_KEY_SENT },
2864  { "skip-key-sending", CAS_SEND_KEY, CAS_SEND_VAL },
2865  { "send-val", CAS_KEY_SENT, CAS_SEND_VAL },
2866  { "val-sent", CAS_SEND_VAL, CAS_VAL_SENT },
2867  { "skip-val-sending", CAS_SEND_VAL, CAS_DONE },
2868  { "processing-done", CAS_VAL_SENT, CAS_DONE },
2869  { "goto-next-rec", CAS_DONE, CAS_LOOP },
2870  { "ctidx-insert", CAS_CTIDX, CAS_CTIDX_MEM_PLACE },
2871  { "ctidx-delete", CAS_CTIDX, CAS_CTIDX_LOOKUP },
2872  { "op-failed", CAS_CTIDX, CAS_PREPARE_SEND },
2873 
2874  { "ctidx-mem-place", CAS_CTIDX_MEM_PLACE, CAS_CTIDX_INSERT },
2875  { "ctidx-do-insert", CAS_CTIDX_INSERT, CAS_PREPARE_SEND },
2876  { "ctidx-ins-idx-drop", CAS_CTIDX_INSERT, CAS_IDROP_LOOP },
2877 
2878  { "ctidx-lookup", CAS_CTIDX_LOOKUP, CAS_CTIDX_MEM_FREE },
2879  { "ctidx-mem-free", CAS_CTIDX_MEM_FREE, CAS_CTIDX_DELETE },
2880  { "ctidx-lookup-failed", CAS_CTIDX_MEM_FREE, CAS_PREPARE_SEND },
2881  { "ctidx-do-delete", CAS_CTIDX_DELETE, CAS_PREPARE_SEND },
2882  { "ctidx-del-idx-drop", CAS_CTIDX_DELETE, CAS_IDROP_LOOP },
2883  { "key-add-reply", CAS_DONE, CAS_LOOP },
2884  { "op-launched", CAS_LOOP, CAS_INSERT_TO_DEAD },
2885  { "idx-drop-reply-sent", CAS_DONE, CAS_IDROP_LOCK_LOOP },
2886  { "dead-index-locked", CAS_DEAD_INDEX_LOCK, CAS_LOCK },
2887  { "dead-index-inserted", CAS_INSERT_TO_DEAD, CAS_DELETE_FROM_META },
2888  { "meta-lookup-fail", CAS_INSERT_TO_DEAD, CAS_PREPARE_SEND },
2889  { "meta-deleted", CAS_DELETE_FROM_META, CAS_IDROP_LOOP },
2890  { "meta-deleted-ctidx", CAS_DELETE_FROM_META, CAS_CTIDX },
2891  { "dead-index-ins-fail", CAS_DELETE_FROM_META, CAS_PREPARE_SEND },
2892  { "idx-drop-start-lock", CAS_IDROP_LOOP, CAS_IDROP_LOCK_LOOP },
2893  { "idx-drop-next", CAS_IDROP_LOOP, CAS_LOOP },
2894  { "idx-lock-failed", CAS_IDROP_LOOP, CAS_PREPARE_SEND },
2895  { "idx-drop-locking", CAS_IDROP_LOCK_LOOP, CAS_IDROP_LOCKED },
2896  { "idx-drop-locked", CAS_IDROP_LOCK_LOOP, CAS_IDROP_START_GC },
2897  { "idx-drop-skip-lock", CAS_IDROP_LOCK_LOOP, CAS_LOOP },
2898  { "idx-dropped-ok", CAS_IDROP_LOCKED, CAS_PREPARE_SEND },
2899  { "idx-drop-all-done", CAS_IDROP_START_GC, M0_FOPH_SUCCESS },
2900 
2901  { "dtm0-op-done", CAS_DTM0, M0_FOPH_SUCCESS },
2902  { "dtm0-op-fail", CAS_DTM0, M0_FOPH_FAILURE },
2903 
2904  { "ut-short-cut", M0_FOPH_QUEUE_REPLY, M0_FOPH_TXN_LOGGED_WAIT }
2905 };
2906 
2907 static struct m0_sm_conf cas_sm_conf = {
2908  .scf_name = "cas-fom",
2909  .scf_nr_states = ARRAY_SIZE(cas_fom_phases),
2910  .scf_state = cas_fom_phases,
2911  .scf_trans_nr = ARRAY_SIZE(cas_fom_trans),
2912  .scf_trans = cas_fom_trans
2913 };
2914 
2915 static const struct m0_reqh_service_type_ops cas_service_type_ops = {
2917 };
2918 
2919 static const struct m0_reqh_service_ops cas_service_ops = {
2921  .rso_start = &cas_service_start,
2922  .rso_prepare_to_stop = &cas_service_prepare_to_stop,
2923  .rso_stop = &cas_service_stop,
2924  .rso_fini = &cas_service_fini
2925 };
2926 
2928  .rst_name = "M0_CST_CAS",
2929  .rst_ops = &cas_service_type_ops,
2930  .rst_level = M0_RS_LEVEL_NORMAL,
2931  .rst_typecode = M0_CST_CAS
2932 };
2933 
2934 #undef M0_TRACE_SUBSYSTEM
2935 
2938 /*
2939  * Local variables:
2940  * c-indentation-style: "K&R"
2941  * c-basic-offset: 8
2942  * tab-width: 8
2943  * fill-column: 80
2944  * scroll-step: 1
2945  * End:
2946  */
2947 /*
2948  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
2949  */
static int cas_key_send(struct cas_fom *fom, const struct m0_cas_op *op, enum m0_cas_opcode opc, const struct m0_cas_rep *rep, enum cas_fom_phase next_phase)
Definition: service.c:858
uint64_t cr_rc
Definition: cas.h:339
Definition: cas.h:352
M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
Definition: reqh_service.c:560
struct m0_poolmach_state * pm_state
Definition: pool_machine.h:169
static uint64_t cas_in_nr(const struct m0_fop *fop)
Definition: service.c:2003
M0_INTERNAL void m0_long_lock_link_init(struct m0_long_lock_link *link, struct m0_fom *fom, struct m0_long_lock_addb2 *addb2)
Definition: fom_long_lock.c:66
cas_fom_phase
Definition: service.c:376
static void cas_ctg_crow_done_cb(struct m0_fom_thralldom *thrall, struct m0_fom *serf)
Definition: service.c:2540
uint32_t rit_opcode
Definition: item.h:474
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_ctg_ctidx_insert_credits(struct m0_cas_id *cid, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1962
M0_INTERNAL void m0_ctg_delete_credit(struct m0_cas_ctg *ctg, m0_bcount_t knob, m0_bcount_t vnob, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1913
struct m0_rpc_at_buf cr_val
Definition: cas.h:300
Definition: cas.h:349
struct m0_dtm0_tx_desc cg_txd
Definition: cas.h:396
#define M0_PRE(cond)
M0_INTERNAL void m0_sm_conf_init(struct m0_sm_conf *conf)
Definition: sm.c:340
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static int cas_op_check(struct m0_cas_op *op, struct cas_fom *fom, bool is_index_drop)
Definition: service.c:1853
M0_INTERNAL int m0_ctg_meta_insert(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1336
Definition: cas.h:351
#define M0_BUF_INIT_PTR_CONST(p)
Definition: buf.h:73
uint64_t cf_curpos
Definition: service.c:337
static int cas_place(struct m0_buf *dst, struct m0_buf *src, m0_bcount_t cutoff)
Definition: service.c:2040
stats_kv_io
Definition: service.c:298
M0_INTERNAL int m0_ctg_dead_index_insert(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, int next_phase)
Definition: ctg_store.c:1405
struct m0_rpc_at_buf ck_key
Definition: cas.h:127
struct m0_fop * fo_fop
Definition: fom.h:490
int const char const void size_t int flags
Definition: dir.c:328
struct m0_dtm0_tx_desc cg_txd
Definition: cas.h:306
static int cas_prep_send(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct)
Definition: service.c:2393
#define M0_FOM_LONG_LOCK_RETURN(rc)
#define NULL
Definition: misc.h:38
uint32_t pst_nr_devices
Definition: pool_machine.h:108
struct m0_long_lock_addb2 cf_dead_index_addb2
Definition: service.c:365
M0_INTERNAL struct m0_dtm0_service * m0_dtm0_service_find(const struct m0_reqh *reqh)
Definition: service.c:406
static struct m0_bufvec dst
Definition: xform.c:61
M0_INTERNAL int m0_ctg_delete(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1540
struct m0_long_lock_addb2 cf_del_lock_addb2
Definition: service.c:366
M0_INTERNAL int m0_ctg_lookup_delete(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, struct m0_buf *val, int flags, int next_phase)
Definition: ctg_store.c:1465
struct m0_reqh_service c_service
Definition: service.c:312
Definition: idx_mock.c:52
#define ergo(a, b)
Definition: misc.h:293
uint64_t cf_kv_stats[STATS_KV_NR][STATS_KV_IO_NR][STATS_NR]
Definition: service.c:373
Definition: cas.h:346
m0_be_tx_state
Definition: tx.h:214
Definition: sm.h:350
Definition: cas.h:347
void * b_addr
Definition: buf.h:39
M0_INTERNAL struct m0_pool_version * m0_pool_version_find(struct m0_pools_common *pc, const struct m0_fid *id)
Definition: pool.c:586
M0_INTERNAL void m0_ctg_lookup_result(struct m0_ctg_op *ctg_op, struct m0_buf *buf)
Definition: ctg_store.c:1570
int(* fo_tick)(struct m0_fom *fom)
Definition: fom.h:663
M0_INTERNAL bool m0_buf_eq(const struct m0_buf *x, const struct m0_buf *y)
Definition: buf.c:90
M0_INTERNAL void m0_cas_svc_init(void)
Definition: service.c:515
M0_INTERNAL int m0_ctg_gc_wait(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1349
M0_INTERNAL struct m0_long_lock * m0_ctg_lock(struct m0_cas_ctg *ctg)
Definition: ctg_store.c:2162
M0_INTERNAL void m0_ctg_store_fini(void)
Definition: ctg_store.c:870
M0_INTERNAL void m0_ctg_mem_place_get(struct m0_ctg_op *ctg_op, struct m0_buf *buf)
Definition: ctg_store.c:2105
uint64_t cv_nr
Definition: cas.h:135
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
M0_INTERNAL int m0_ctg_meta_cursor_next(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1694
#define M0_LOG(level,...)
Definition: trace.h:167
M0_INTERNAL void m0_rpc_at_init(struct m0_rpc_at_buf *ab)
Definition: at.c:433
static void cas_fom_success(struct cas_fom *fom, enum m0_cas_opcode opc)
Definition: service.c:1075
Definition: cas.h:360
M0_INTERNAL bool m0_rpc_at_is_set(const struct m0_rpc_at_buf *ab)
Definition: at.c:492
static void cas_fom_cleanup(struct cas_fom *fom, bool ctg_op_fini)
Definition: service.c:976
static int cas_dtm0_logrec_add(struct m0_fom *fom0, enum m0_dtm0_tx_pa_state state)
Definition: service.c:1048
static void cas_service_fini(struct m0_reqh_service *service)
Definition: service.c:595
struct m0_cas_id * cf_in_cids
Definition: service.c:352
Definition: cas.h:247
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
struct m0_long_lock_link cf_del_lock
Definition: service.c:335
struct cas_kv * cf_ikv
Definition: service.c:344
struct m0_buf ckv_key
Definition: service.c:317
M0_INTERNAL void m0_cas_id_fini(struct m0_cas_id *cid)
Definition: cas.c:198
struct m0_long_lock_link cf_dead_index
Definition: service.c:330
uint64_t sd_allowed
Definition: sm.h:422
M0_INTERNAL int m0_ctg_lookup(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1555
static bool cas_fom_invariant(const struct cas_fom *fom)
Definition: service.c:2622
M0_INTERNAL const struct m0_fid m0_cas_meta_fid
Definition: cas.c:146
M0_INTERNAL void m0_cas__ut_svc_be_set(struct m0_reqh_service *svc, struct m0_be_domain *dom)
Definition: service.c:545
struct m0_dix_layout ci_layout
Definition: cas.h:120
static bool cas_max_reply_payload_exceeded(struct cas_fom *fom)
Definition: service.c:922
M0_INTERNAL const struct m0_fid_type m0_cas_index_fid_type
Definition: cas.c:158
static enum m0_cas_type cas_type(const struct m0_fom *fom)
Definition: service.c:1995
static void cas_update_kv_stats(struct cas_fom *fom, const struct m0_rpc_at_buf *ab, m0_bcount_t nob, enum stats_kv kv, enum stats_kv_io kv_io)
Definition: service.c:751
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
static struct m0_cas_rec * cas_at(struct m0_cas_op *op, int idx)
Definition: service.c:2121
M0_INTERNAL void m0_sm_conf_trans_extend(const struct m0_sm_conf *base, struct m0_sm_conf *sub)
Definition: sm.c:726
uint64_t cf_opos
Definition: service.c:324
M0_INTERNAL void m0_sm_conf_extend(const struct m0_sm_state_descr *base, struct m0_sm_state_descr *sub, uint32_t nr)
Definition: sm.c:763
struct m0_cas_recv cgr_rep
Definition: cas.h:427
struct m0_dtx fo_tx
Definition: fom.h:498
static struct m0_addb2_mach * mach
Definition: storage.c:42
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL int m0_pageshift_get(void)
Definition: memory.c:238
static void cas_addb2_fom_to_crow_fom(const struct m0_fom *fom0, const struct m0_fom *crow_fom0)
Definition: service.c:2561
M0_INTERNAL bool m0_dtm0_tx_desc__invariant(const struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:49
static int void * buf
Definition: dir.c:1019
M0_INTERNAL int m0_cas_fom_spawn(struct m0_fom *lead, struct m0_fom_thralldom *thrall, struct m0_fop *cas_fop, void(*on_fom_complete)(struct m0_fom_thralldom *, struct m0_fom *))
Definition: service.c:2570
M0_INTERNAL struct m0_cas_ctg * m0_ctg_dead_index(void)
Definition: ctg_store.c:2140
M0_INTERNAL int m0_rpc_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom, int next_phase)
Definition: at.c:414
M0_INTERNAL int m0_ctg_cursor_get(struct m0_ctg_op *ctg_op, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1660
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
struct m0_fom_thralldom cf_thrall
Definition: service.c:359
static void cas_at_fini(struct m0_rpc_at_buf *ab)
Definition: service.c:732
static void cas_fom_addb2_descr(struct m0_fom *fom)
Definition: service.c:2635
#define LAYOUT_IMASK_PTR(l)
Definition: service.c:424
const struct m0_sm_conf m0_generic_conf
Definition: fom_generic.c:838
struct m0_fop_getxattr_rep * rep
Definition: dir.c:455
M0_INTERNAL void m0_ctg_insert_credit(struct m0_cas_ctg *ctg, m0_bcount_t knob, m0_bcount_t vnob, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1905
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
m0_fom_phase
Definition: fom.h:372
static struct m0_be_tx * m0_fom_tx(struct m0_fom *fom)
Definition: fom.h:537
Definition: sock.c:887
static bool cas_is_ro(enum m0_cas_opcode opc)
Definition: service.c:1990
static const struct m0_fid * cas_fid(const struct m0_fom *fom)
Definition: service.c:1749
M0_INTERNAL int m0_dtm0_on_committed(struct m0_fom *fom, const struct m0_dtm0_tid *id)
Definition: fop.c:300
static struct m0_pools_common pc
Definition: iter_ut.c:59
M0_INTERNAL int m0_rpc_at_reply(struct m0_rpc_at_buf *in, struct m0_rpc_at_buf *out, struct m0_buf *repbuf, struct m0_fom *fom, int next_phase)
Definition: at.c:528
struct m0_fom_type ft_fom_type
Definition: fop.h:232
struct m0_rpc_at_buf cr_val
Definition: cas.h:182
struct m0_pooldev * pst_devices_array
Definition: pool_machine.h:111
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL bool m0_ctg_cursor_is_initialised(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1633
return M0_RC(rc)
struct m0_be_dtm0_log * dos_log
Definition: service.h:50
op
Definition: libdemo.c:64
M0_INTERNAL void m0_cas_gc_init(void)
Definition: index_gc.c:426
struct m0_dtm0_tid dtd_id
Definition: tx_desc.h:121
M0_INTERNAL int m0_ctg_mem_free(struct m0_ctg_op *ctg_op, void *area, int next_phase)
Definition: ctg_store.c:2117
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL int m0_pagesize_get(void)
Definition: memory.c:233
Definition: buf.h:37
struct m0_cas_rec * cr_rec
Definition: cas.h:236
static int cas_buf_cid_decode(struct m0_buf *enc_buf, struct m0_cas_id *cid)
Definition: service.c:2021
M0_INTERNAL int m0_ctg_insert(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, const struct m0_buf *val, int next_phase)
Definition: ctg_store.c:1448
struct m0_rpc_at_buf ck_val
Definition: cas.h:128
M0_INTERNAL int m0_ctg_cursor_next(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1680
struct m0_rpc_at_buf cr_key
Definition: cas.h:172
M0_INTERNAL int m0_ctg_meta_lookup(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1362
int opcode
Definition: crate.c:301
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
M0_INTERNAL bool m0_long_write_lock(struct m0_long_lock *lk, struct m0_long_lock_link *link, int next_phase)
M0_INTERNAL void m0_ctg_cursor_fini(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1749
int i
Definition: dir.c:1033
void m0_fop_rpc_machine_set(struct m0_fop *fop, struct m0_rpc_machine *mach)
Definition: fop.c:352
Definition: cas.h:355
struct m0_fop_type * f_type
Definition: fop.h:81
#define PRIu64
Definition: types.h:58
uint64_t cr_nr
Definition: cas.h:235
struct m0_fom cf_fom
Definition: service.c:322
M0_INTERNAL bool m0_long_lock(struct m0_long_lock *lock, bool write, struct m0_long_lock_link *link, int next_phase)
static m0_bcount_t cas_kv_nob(const struct m0_buf *inbuf)
Definition: service.c:2066
struct m0_rpc_machine * m0_fop_rpc_machine(const struct m0_fop *fop)
Definition: fop.c:360
static const struct m0_reqh_service_type_ops cas_service_type_ops
Definition: service.c:509
static int cas_service_type_allocate(struct m0_reqh_service **service, const struct m0_reqh_service_type *st)
Definition: service.c:604
return M0_ERR(-EOPNOTSUPP)
m0_bcount_t rm_bulk_cutoff
Definition: rpc_machine.h:157
struct m0_long_lock_link cf_ctidx
Definition: service.c:329
M0_INTERNAL bool m0_dix_layout_eq(const struct m0_dix_layout *layout1, const struct m0_dix_layout *layout2)
Definition: layout.c:322
static int cas_ctidx_mem_free(struct cas_fom *fom, int next)
Definition: service.c:2285
static int key
Definition: locality.c:283
Definition: cas.h:344
M0_INTERNAL void m0_ctg_cursor_put(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1744
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
struct m0_fom_thralldom thrall
Definition: ms_fom_ut.c:110
M0_INTERNAL int m0_ctg_mem_place(struct m0_ctg_op *ctg_op, const struct m0_buf *buf, int next_phase)
Definition: ctg_store.c:2090
static const struct socktype stype[]
Definition: sock.c:1156
if(value==NULL)
Definition: dir.c:350
Definition: cas.h:264
#define CTG_OP_COMBINE(opc, ct)
Definition: ctg_store.h:241
M0_INTERNAL struct m0_cas_ctg * m0_ctg_meta(void)
Definition: ctg_store.c:2130
#define ENABLE_DTM0
Definition: config.h:36
int m0_fom_tick_generic(struct m0_fom *fom)
Definition: fom_generic.c:848
static int cas_ctg_crow_fop_create(const struct m0_cas_id *cid, struct m0_fop **out)
Definition: service.c:2507
M0_INTERNAL void m0_long_unlock(struct m0_long_lock *lock, struct m0_long_lock_link *link)
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
m0_bcount_t b_nob
Definition: buf.h:38
#define M0_ASSERT(cond)
M0_INTERNAL void m0_cas_svc_fini(void)
Definition: service.c:529
struct m0_buf ckv_val
Definition: service.c:318
struct m0_long_lock_addb2 cf_ctidx_addb2
Definition: service.c:364
const char * scf_name
Definition: sm.h:352
struct m0_buf cf_out_val
Definition: service.c:369
uint64_t cf_in_cids_nr
Definition: service.c:354
struct m0_cas_ctg * cf_ctg
Definition: service.c:326
struct m0_fid pver
Definition: idx_dix.c:74
bool fo_local
Definition: fom.h:503
const char * rst_name
Definition: reqh_service.h:447
struct m0_be_domain * c_be_domain
Definition: service.c:313
m0_pool_nd_state
Definition: pool_machine.h:57
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
static int op_sync_wait(struct m0_fom *fom)
Definition: service.c:1138
M0_INTERNAL void m0_long_lock_link_fini(struct m0_long_lock_link *link)
Definition: fom_long_lock.c:76
M0_INTERNAL bool cas_in_ut(void)
Definition: cas.c:220
struct m0_fid rs_service_fid
Definition: reqh_service.h:220
uint32_t scf_nr_states
Definition: sm.h:354
struct m0_crv cr_ver
Definition: cas.h:228
M0_INTERNAL void m0_be_tx_credit_add(struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
Definition: tx_credit.c:44
M0_INTERNAL bool m0_rpc_item_max_payload_exceeded(struct m0_rpc_item *item, struct m0_rpc_session *session)
Definition: item.c:490
static int cas_ctidx_delete(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2315
static struct m0_rpc_at_buf * cas_out_complementary(enum m0_cas_opcode opc, const struct m0_cas_op *op, bool key, size_t opos)
Definition: service.c:805
M0_INTERNAL int m0_ctg_store_init(struct m0_be_domain *dom)
Definition: ctg_store.c:814
uint32_t cg_flags
Definition: cas.h:391
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
M0_BASSERT(M0_CAS_GET_FOP_OPCODE==CO_GET+M0_CAS_GET_FOP_OPCODE)
M0_INTERNAL void m0_fom_enthrall(struct m0_fom *leader, struct m0_fom *serf, struct m0_fom_thralldom *thrall, void(*end)(struct m0_fom_thralldom *thrall, struct m0_fom *serf))
static struct m0_stob_domain * dom
Definition: storage.c:38
M0_INTERNAL void m0_ctg_mark_deleted_credit(struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1805
static int next[]
Definition: cp.c:248
#define M0_BUF_INIT0
Definition: buf.h:71
static int cas_ctidx_insert(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2347
M0_INTERNAL int m0_xcode_obj_enc_to_buf(struct m0_xcode_obj *obj, void **buf, m0_bcount_t *len)
Definition: xcode.c:832
M0_INTERNAL void m0_fom_mod_rep_fill(struct m0_fop_mod_rep *rep, struct m0_fom *fom)
Definition: fom_generic.c:68
M0_INTERNAL int m0_rpc_at_get(const struct m0_rpc_at_buf *ab, struct m0_buf *buf)
Definition: at.c:399
Definition: cas.h:345
uint64_t cf_ikv_nr
Definition: service.c:346
static struct m0_sm_conf cas_sm_conf
Definition: service.c:512
struct m0_be_tx_credit tx_betx_cred
Definition: dtm.h:560
M0_INTERNAL const struct m0_fid_type m0_cctg_fid_type
Definition: cas.c:163
int co_rc
Definition: ctg_store.h:227
struct m0_cas_kv * cv_rec
Definition: cas.h:136
const struct m0_rpc_item_type * ri_type
Definition: item.h:200
M0_INTERNAL const struct m0_fid_type * m0_fid_type_getfid(const struct m0_fid *fid)
Definition: fid.c:76
struct m0_ctg_op cf_ctg_op
Definition: service.c:325
M0_INTERNAL int m0_rpc_at_reply_rc(struct m0_rpc_at_buf *out)
Definition: at.c:583
static bool cas_service_started(struct m0_fop *fop, struct m0_reqh *reqh)
Definition: service.c:619
M0_INTERNAL void m0_cas_gc_fini(void)
Definition: index_gc.c:455
M0_INTERNAL struct m0_be_domain * m0_cas__ut_svc_be_get(struct m0_reqh_service *svc)
Definition: service.c:553
static int cas_at_reply(struct m0_rpc_at_buf *in, struct m0_rpc_at_buf *out, struct m0_buf *repbuf, struct m0_fom *fom, int next_phase)
Definition: service.c:716
uint32_t pd_sdev_idx
Definition: pool.h:437
#define M0_POST(cond)
M0_INTERNAL void m0_ctg_create_credit(struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1823
static const struct m0_fom_type_ops cas_fom_type_ops
Definition: service.c:511
Definition: reqh.h:94
union m0_rpc_at_buf::@447 u
uint32_t dl_type
Definition: layout.h:100
struct m0_cas_recv cg_rec
Definition: cas.h:294
M0_INTERNAL void m0_ctg_meta_cursor_init(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1722
Definition: dump.c:103
struct m0_fid ci_fid
Definition: cas.h:113
M0_INTERNAL void m0_cas_gc_wait_sync(void)
Definition: index_gc.c:591
static struct m0_cas_rec * cas_out_at(const struct m0_cas_rep *rep, int idx)
Definition: service.c:2127
M0_INTERNAL uint32_t m0_dix_fid_cctg_device_id(const struct m0_fid *cctg_fid)
Definition: fid_convert.c:81
static const struct m0_reqh_service_ops cas_service_ops
Definition: service.c:508
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
M0_INTERNAL void m0_ctg_cursor_init(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg)
Definition: ctg_store.c:1640
struct m0_fid p_fid
Definition: tx_desc.h:110
M0_INTERNAL void m0_ctg_op_get_ver(struct m0_ctg_op *ctg_op, struct m0_crv *out)
Definition: ctg_store.c:1582
static int cas_incoming_kv_setup(struct cas_fom *fom, const struct m0_cas_op *op)
Definition: service.c:766
static int cas_id_check(const struct m0_cas_id *cid)
Definition: service.c:1835
struct m0_cas_kv_vec cr_kv_bufs
Definition: cas.h:195
union m0_dix_layout::@145 u
struct m0_sm t_sm
Definition: tx.h:281
M0_INTERNAL int m0_buf_copy(struct m0_buf *dest, const struct m0_buf *src)
Definition: buf.c:104
static int cas_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: service.c:634
#define FID_P(f)
Definition: fid.h:77
#define PRId64
Definition: types.h:57
m0_cas_opcode
Definition: cas.h:343
void imask(void)
Definition: client_ut.c:141
Definition: cas.h:372
M0_INTERNAL bool m0_dix_imask_is_empty(const struct m0_dix_imask *mask)
Definition: imask.c:130
M0_INTERNAL void m0_ctg_op_init(struct m0_ctg_op *ctg_op, struct m0_fom *fom, uint32_t flags)
Definition: ctg_store.c:1759
M0_INTERNAL int m0_xcode_data_size(struct m0_xcode_ctx *ctx, const struct m0_xcode_obj *obj)
Definition: xcode.c:437
M0_INTERNAL void m0_cas_svc_fop_args(struct m0_sm_conf **sm_conf, const struct m0_fom_type_ops **fom_ops, struct m0_reqh_service_type **svctype)
Definition: service.c:536
int m0_reqh_service_async_start_simple(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.c:601
struct m0_fop * m0_fop_reply_alloc(struct m0_fop *req, struct m0_fop_type *rept)
Definition: fop.c:129
static enum m0_cas_opcode m0_cas_opcode(const struct m0_fop *fop)
Definition: service.c:1766
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
#define m0_forall(var, nr,...)
Definition: misc.h:112
struct m0_long_lock_addb2 cf_lock_addb2
Definition: service.c:362
Definition: fom.h:481
M0_INTERNAL struct m0_reqh_service_type m0_cas_service_type
Definition: service.c:2927
m0_cas_type
Definition: cas.h:358
static void cas_fom_fini(struct m0_fom *fom0)
Definition: service.c:1715
struct m0_dtm0_tx_pa * dtp_pa
Definition: tx_desc.h:117
static bool op_is_index_drop(enum m0_cas_opcode opc, enum m0_cas_type ct)
Definition: service.c:1129
static uint64_t cas_out_nr(const struct m0_fop *fop)
Definition: service.c:2010
M0_INTERNAL bool m0_dtm0_tx_desc_is_none(const struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:44
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
Definition: cas.h:350
struct m0_reqh reqh
Definition: rm_foms.c:48
const char * sd_name
Definition: sm.h:383
static int cas_done(struct cas_fom *fom, struct m0_cas_op *op, struct m0_cas_rep *rep, enum m0_cas_opcode opc)
Definition: service.c:2427
#define CID_IMASK_PTR(cid)
Definition: service.c:425
M0_INTERNAL void m0_ctg_cursor_kv_get(struct m0_ctg_op *ctg_op, struct m0_buf *key, struct m0_buf *val)
Definition: ctg_store.c:1708
static int cas_ctidx_lookup(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2334
int(* rsto_service_allocate)(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: reqh_service.h:435
#define M0_BUF_INIT_PTR(p)
Definition: buf.h:69
static int cas_kv_load_done(struct cas_fom *fom, enum m0_cas_opcode opc, const struct m0_cas_op *op, int phase)
Definition: service.c:2133
struct m0_cas_ctg ** cf_moved_ctgs
Definition: service.c:358
static int cas_fom_tick(struct m0_fom *fom0)
Definition: service.c:1160
struct m0_sm_state_descr * scf_state
Definition: sm.h:356
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
Definition: ext.h:37
Definition: fid.h:38
m0_fom_phase_outcome
Definition: fom.h:625
const struct m0_reqh_service_type * rs_type
Definition: reqh_service.h:227
struct m0_sm_trans_descr cas_fom_trans[]
Definition: service.c:2821
M0_INTERNAL void(* cas__ut_cb_done)(struct m0_fom *fom)
Definition: service.c:1712
static int cas_sdev_state(struct m0_poolmach *pm, uint32_t sdev_idx, enum m0_pool_nd_state *state_out)
Definition: service.c:1775
struct m0_be_tx tx_betx
Definition: dtm.h:559
#define M0_IS0(obj)
Definition: misc.h:70
struct m0_reqh_service * fo_service
Definition: fom.h:505
M0_INTERNAL void m0_fop_release(struct m0_ref *ref)
Definition: fop.c:148
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_chan sm_chan
Definition: sm.h:331
static void cas_service_stop(struct m0_reqh_service *service)
Definition: service.c:589
M0_DTPS_PERSISTENT
Definition: tx_desc.h:164
Definition: cas.h:348
m0_dtm0_tx_pa_state
Definition: tx_desc.h:101
M0_INTERNAL void m0_be_dtm0_log_credit(enum m0_be_dtm0_log_credit_op op, struct m0_dtm0_tx_desc *txd, struct m0_buf *payload, struct m0_be_seg *seg, struct m0_dtm0_log_rec *rec, struct m0_be_tx_credit *accum)
Definition: dtm0_log.c:202
M0_INTERNAL int m0_xcode_obj_dec_from_buf(struct m0_xcode_obj *obj, void *buf, m0_bcount_t len)
Definition: xcode.c:850
M0_INTERNAL uint64_t m0_rpc_at_len(const struct m0_rpc_at_buf *ab)
Definition: at.c:709
struct m0_rpc_session * ri_session
Definition: item.h:147
static int cas_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom, int next_phase)
Definition: service.c:705
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL int m0_fom_rc(const struct m0_fom *fom)
Definition: fom.c:1727
M0_INTERNAL void m0_ctg_ctidx_delete_credits(struct m0_cas_id *cid, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1971
struct m0_sm_trans_descr m0_generic_phases_trans[]
Definition: fom_generic.c:765
M0_INTERNAL int m0_ctg_meta_delete(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1392
#define _0C(exp)
Definition: assert.h:311
int cf_thrall_rc
Definition: service.c:360
static void cas_prep(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_ctg *ctg, uint64_t rec_pos, struct m0_be_tx_credit *accum)
Definition: service.c:2071
M0_INTERNAL void m0_rpc_at_fini(struct m0_rpc_at_buf *ab)
Definition: at.c:441
struct m0_rpc_at_buf cr_key
Definition: cas.h:290
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static const struct m0_fom_ops cas_fom_ops
Definition: service.c:510
static struct m0_fop * fop
Definition: item.c:57
struct m0_buf cf_out_key
Definition: service.c:368
struct m0_long_lock_link cf_lock
Definition: service.c:327
static int cas_op_recs_check(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_op *op)
Definition: service.c:907
static bool cas_key_need_to_send(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_op *op, uint64_t rec_pos)
Definition: service.c:939
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
Definition: fom.c:624
struct m0_dtm0_tx_participants dtd_ps
Definition: tx_desc.h:122
static int cas_exec(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, struct m0_cas_ctg *ctg, uint64_t rec_pos, int next)
Definition: service.c:2153
M0_INTERNAL struct m0_cas_ctg * m0_ctg_ctidx(void)
Definition: ctg_store.c:2135
#define DTID0_P(__tid)
Definition: tx_desc.h:99
struct m0_sm fo_sm_phase
Definition: fom.h:522
static bool cas_is_valid(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, const struct m0_cas_rec *rec, uint64_t rec_pos)
Definition: service.c:1916
struct m0_fop * fo_rep_fop
Definition: fom.h:492
stats_kv
Definition: service.c:292
#define M0_XCODE_OBJ(type, ptr)
Definition: xcode.h:962
M0_INTERNAL struct m0_fop_type cas_put_fopt
Definition: cas.c:48
static int cas_device_check(const struct cas_fom *fom, const struct m0_cas_id *cid)
Definition: service.c:1803
M0_INTERNAL void m0_ctg_op_fini(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1788
static int cas_ctg_crow_fop_buf_prepare(const struct m0_cas_id *cid, struct m0_rpc_at_buf *at_buf)
Definition: service.c:2487
static int cas_dtm0_logrec_credit_add(struct m0_fom *fom0)
Definition: service.c:1023
static struct m0_cas_rec repv[N]
Definition: service_ut.c:65
static m0_bcount_t cas_rpc_cutoff(const struct cas_fom *fom)
Definition: service.c:2387
M0_INTERNAL void m0_cas_gc_start(struct m0_reqh_service *service)
Definition: index_gc.c:549
uint64_t cv_nr
Definition: cas.h:283
static struct m0_cas_op * cas_op(const struct m0_fom *fom)
Definition: service.c:1761
M0_INTERNAL void m0_long_read_unlock(struct m0_long_lock *lock, struct m0_long_lock_link *link)
Definition: nucleus.c:42
#define out(...)
Definition: gen.c:41
M0_INTERNAL bool m0_long_read_lock(struct m0_long_lock *lk, struct m0_long_lock_link *link, int next_phase)
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
bool cf_startkey_excluded
Definition: service.c:338
static bool cas_fid_is_cctg(const struct m0_fid *fid)
Definition: service.c:2034
M0_INTERNAL void m0_sm_conf_fini(struct m0_sm_conf *conf)
Definition: sm.c:376
M0_INTERNAL void * m0_alloc_aligned(size_t size, unsigned shift)
Definition: memory.c:168
static bool cas_ctidx_op_needed(struct cas_fom *fom, enum m0_cas_opcode opc, enum m0_cas_type ct, uint64_t rec_pos)
Definition: service.c:2243
M0_INTERNAL int m0_dtm0_logrec_update(struct m0_be_dtm0_log *log, struct m0_be_tx *tx, struct m0_dtm0_tx_desc *txd, struct m0_buf *payload)
Definition: fop.c:284
M0_INTERNAL bool m0_fid_is_valid(const struct m0_fid *fid)
Definition: fid.c:96
#define DTID0_F
Definition: tx_desc.h:98
struct m0_long_lock_addb2 cf_meta_addb2
Definition: service.c:363
static void cas_service_prepare_to_stop(struct m0_reqh_service *svc)
Definition: service.c:583
bool cf_op_checked
Definition: service.c:336
struct m0_cas_recv cgr_rep
Definition: cas.h:303
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
struct m0_reqh * rs_reqh
Definition: reqh_service.h:259
static int cas_service_start(struct m0_reqh_service *service)
Definition: service.c:560
void m0_free(void *data)
Definition: memory.c:146
static void addb2_add_kv_attrs(const struct cas_fom *fom, enum stats_kv_io kv_io)
Definition: service.c:1081
uint64_t cr_rc
Definition: cas.h:221
struct m0_long_lock_link cf_meta
Definition: service.c:328
struct m0_rpc_item f_item
Definition: fop.h:83
const struct m0_reqh_service_type * ft_rstype
Definition: fom.h:617
Definition: cas.h:107
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
static void cas_incoming_kv(const struct cas_fom *fom, uint64_t rec_pos, struct m0_buf *key, struct m0_buf *val)
Definition: service.c:742
struct m0_pdclust_src_addr src
Definition: fd.c:108
uint32_t cg_flags
Definition: cas.h:301
struct m0_reqh_service dos_generic
Definition: service.h:45
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_ctg_meta_cursor_get(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1729
int(* rso_start_async)(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.h:341
#define ARRAY_SIZE(a)
Definition: misc.h:45
static int cas_ctg_crow_handle(struct cas_fom *fom, const struct m0_cas_id *cid)
Definition: service.c:2605
Definition: cas.h:359
M0_INTERNAL void(* cas__ut_cb_fini)(struct m0_fom *fom)
Definition: service.c:1713
Definition: fop.h:79
M0_INTERNAL int m0_ctg_op_rc(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1779
static int cas_ctidx_mem_place(struct cas_fom *fom, const struct m0_cas_id *in_cid, int next)
Definition: service.c:2263
struct m0_cas_id cg_id
Definition: cas.h:374
M0_INTERNAL struct m0_long_lock * m0_ctg_del_lock(void)
Definition: ctg_store.c:2157
M0_INTERNAL struct m0_reqh * m0_fom_reqh(const struct m0_fom *fom)
Definition: fom.c:283
Definition: trace.h:478
uint64_t cf_ipos
Definition: service.c:323
uint32_t ab_type
Definition: at.h:251
M0_INTERNAL struct m0_fop_type cas_rep_fopt
Definition: cas.c:51
Definition: tx.h:280
Definition: idx_mock.c:47
static size_t cas_fom_home_locality(const struct m0_fom *fom)
Definition: service.c:1754
M0_INTERNAL struct m0_cas_ctg * m0_ctg_meta_lookup_result(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1377
static void cas_fom_failure(struct cas_fom *fom, int rc, bool ctg_op_fini)
Definition: service.c:996
#define M0_IMPOSSIBLE(fmt,...)
static int cas_val_send(struct cas_fom *fom, const struct m0_cas_op *op, enum m0_cas_opcode opc, const struct m0_cas_rep *rep, enum cas_fom_phase next_phase)
Definition: service.c:884
static struct m0_sm_state_descr cas_fom_phases[]
Definition: service.c:513