Motr  M0
ctg_store.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CAS
24 
25 #include "lib/trace.h"
26 #include "lib/memory.h"
27 #include "lib/finject.h"
28 #include "lib/assert.h"
29 #include "lib/errno.h" /* ENOMEM, EPROTO */
30 #include "lib/ext.h" /* m0_ext */
31 #include "be/domain.h" /* m0_be_domain_seg_first */
32 #include "be/op.h"
33 #include "module/instance.h"
34 #include "fop/fom_long_lock.h" /* m0_long_lock */
35 
36 #include "cas/ctg_store.h"
37 #include "cas/index_gc.h"
38 #include "dix/fid_convert.h" /* m0_dix_fid_convert_cctg2dix */
39 #include "motr/setup.h"
40 
41 
42 struct m0_ctg_store {
45 
48 
51 
57 
64 
69  struct m0_ref cs_ref;
70 
75 
80 };
81 
82 /* Data schema for CAS catalogues { */
83 
85 struct generic_key {
86  /* Actual length of gk_data array. */
87  uint64_t gk_length;
88  uint8_t gk_data[0];
89 };
91 
93 struct generic_value {
94  /* Actual length of gv_data array. */
95  uint64_t gv_length;
97  uint8_t gv_data[0];
98 };
100 M0_BASSERT(sizeof(struct m0_crv) == 8);
101 
103 struct fid_key {
105  struct m0_fid fk_fid;
106 };
107 M0_BASSERT(sizeof(struct generic_key) + sizeof(struct m0_fid) ==
108  sizeof(struct fid_key));
109 
110 /* The value type used in meta catalogue */
111 struct meta_value {
114 };
115 M0_BASSERT(sizeof(struct generic_value) + sizeof(struct m0_cas_ctg *) ==
116  sizeof(struct meta_value));
117 
118 /* The value type used in ctidx catalogue */
119 struct layout_value {
122 };
123 M0_BASSERT(sizeof(struct generic_value) + sizeof(struct m0_dix_layout) ==
124  sizeof(struct layout_value));
125 
126 /* } end of schema. */
127 
129  CPH_NONE = 0,
133 };
134 
135 static struct m0_be_seg *cas_seg(struct m0_be_domain *dom);
136 
137 static bool ctg_op_is_versioned(const struct m0_ctg_op *op);
138 static int ctg_berc (struct m0_ctg_op *ctg_op);
139 static int ctg_vbuf_unpack (struct m0_buf *buf, struct m0_crv *crv);
140 static int ctg_vbuf_as_ctg (const struct m0_buf *val,
141  struct m0_cas_ctg **ctg);
142 static int ctg_kbuf_unpack (struct m0_buf *buf);
143 static int ctg_kbuf_get (struct m0_buf *dst, const struct m0_buf *src,
144  bool enabled_fi);
145 static void ctg_init (struct m0_cas_ctg *ctg, struct m0_be_seg *seg);
146 static void ctg_fini (struct m0_cas_ctg *ctg);
147 static void ctg_destroy (struct m0_cas_ctg *ctg, struct m0_be_tx *tx);
148 static int ctg_meta_selfadd (struct m0_be_btree *meta,
149  struct m0_be_tx *tx);
150 static void ctg_meta_delete (struct m0_be_btree *meta,
151  const struct m0_fid *fid,
152  struct m0_be_tx *tx);
153 static void ctg_meta_selfrm (struct m0_be_btree *meta, struct m0_be_tx *tx);
154 
155 static void ctg_meta_insert_credit (struct m0_be_btree *bt,
156  m0_bcount_t nr,
157  struct m0_be_tx_credit *accum);
158 static void ctg_meta_delete_credit (struct m0_be_btree *bt,
159  m0_bcount_t nr,
160  struct m0_be_tx_credit *accum);
161 static void ctg_store_init_creds_calc(struct m0_be_seg *seg,
162  struct m0_cas_state *state,
163  struct m0_cas_ctg *ctidx,
164  struct m0_be_tx_credit *cred);
165 
166 static int ctg_op_tick_ret (struct m0_ctg_op *ctg_op, int next_state);
167 static int ctg_op_exec (struct m0_ctg_op *ctg_op, int next_phase);
168 static int ctg_meta_exec (struct m0_ctg_op *ctg_op,
169  const struct m0_fid *fid,
170  int next_phase);
171 static int ctg_exec (struct m0_ctg_op *ctg_op,
172  struct m0_cas_ctg *ctg,
173  const struct m0_buf *key,
174  int next_phase);
175 static void ctg_store_release(struct m0_ref *ref);
176 
177 static m0_bcount_t ctg_ksize (const void *key);
178 static m0_bcount_t ctg_vsize (const void *val);
179 static int ctg_cmp (const void *key0, const void *key1);
180 
181 static int versioned_put_sync (struct m0_ctg_op *ctg_op);
182 static int versioned_get_sync (struct m0_ctg_op *op);
183 static int versioned_cursor_next_sync(struct m0_ctg_op *op, bool alive_only);
184 static int versioned_cursor_get_sync (struct m0_ctg_op *op, bool alive_only);
185 
190 
195 static const struct m0_be_btree_kv_ops cas_btree_ops;
196 static struct m0_ctg_store ctg_store = {};
197 static const char cas_state_key[] = "cas-state-nr";
198 
199 static struct m0_be_seg *cas_seg(struct m0_be_domain *dom)
200 {
202 
203  if (seg == NULL && cas_in_ut())
205  return seg;
206 }
207 
208 static struct m0_be_op *ctg_beop(struct m0_ctg_op *ctg_op)
209 {
210  return ctg_op->co_opcode == CO_CUR ?
211  &ctg_op->co_cur.bc_op : &ctg_op->co_beop;
212 }
213 
214 static int ctg_berc(struct m0_ctg_op *ctg_op)
215 {
216  return ctg_beop(ctg_op)->bo_u.u_btree.t_rc;
217 }
218 
224 {
225  return sizeof(struct generic_value) + value->b_nob;
226 }
227 
235 static void ctg_vbuf_pack(struct m0_buf *dst,
236  const struct m0_buf *src,
237  const struct m0_crv *crv)
238 {
239  struct generic_value *value = dst->b_addr;
240  M0_PRE(dst->b_nob >= ctg_vbuf_packed_size(src));
241 
242  value->gv_length = src->b_nob;
243  value->gv_version = *crv;
244  memcpy(&value->gv_data, src->b_addr, src->b_nob);
245 }
246 
251 static int ctg_kbuf_get(struct m0_buf *dst, const struct m0_buf *src,
252  bool enabled_fi)
253 {
254  struct generic_key *key;
255 
256  M0_ENTRY();
257 
258  if (enabled_fi && M0_FI_ENABLED("cas_alloc_fail"))
259  return M0_ERR(-ENOMEM);
260 
261  key = m0_alloc(src->b_nob + sizeof(*key));
262  if (key != NULL) {
263  key->gk_length = src->b_nob;
264  memcpy(key->gk_data, src->b_addr, src->b_nob);
265  *dst = M0_BUF_INIT(src->b_nob + sizeof(*key), key);
266  return M0_RC(0);
267  } else
268  return M0_ERR(-ENOMEM);
269 }
270 
279 static int ctg_vbuf_unpack(struct m0_buf *buf, struct m0_crv *crv)
280 {
281  struct generic_value *value;
282 
283  M0_ENTRY();
284 
285  value = buf->b_addr;
286 
287  if (buf->b_nob < sizeof(*value))
288  return M0_ERR_INFO(-EPROTO, "%" PRIu64 " < %" PRIu64,
289  buf->b_nob, sizeof(*value));
290  if (value->gv_length != buf->b_nob - sizeof(*value))
291  return M0_ERR(-EPROTO);
292 
293  if (crv != NULL)
294  *crv = value->gv_version;
295 
296  buf->b_nob = value->gv_length;
297  buf->b_addr = &value->gv_data[0];
298 
299  return M0_RC(0);
300 }
301 
306 static int ctg_vbuf_as_ctg(const struct m0_buf *buf, struct m0_cas_ctg **ctg)
307 {
308  struct generic_value *gv = M0_AMB(gv, buf->b_addr, gv_data);
309  struct meta_value *mv = M0_AMB(mv, gv, mv_gval);
310 
311  M0_ENTRY();
312 
313  if (buf->b_nob == sizeof(struct m0_cas_ctg *)) {
314  *ctg = mv->mv_ctg;
315  return M0_RC(0);
316  } else
317  return M0_ERR(-EPROTO);
318 }
319 
323 static int ctg_vbuf_as_layout(const struct m0_buf *buf,
324  struct m0_dix_layout **layout)
325 {
326  struct generic_value *gv = M0_AMB(gv, buf->b_addr, gv_data);
327  struct layout_value *lv = M0_AMB(lv, gv, lv_gval);
328 
329  M0_ENTRY();
330 
331  if (buf->b_nob == sizeof(struct m0_dix_layout)) {
332  *layout = &lv->lv_layout;
333  return M0_RC(0);
334  } else
335  return M0_ERR(-EPROTO);
336 }
337 
342 static int ctg_kbuf_unpack(struct m0_buf *buf)
343 {
344  struct generic_key *key;
345 
346  M0_ENTRY();
347 
348  key = buf->b_addr;
349 
350  if (buf->b_nob < sizeof(*key))
351  return M0_ERR(-EPROTO);
352  if (key->gk_length != buf->b_nob - sizeof(*key))
353  return M0_ERR(-EPROTO);
354 
355  buf->b_nob = key->gk_length;
356  buf->b_addr = &key->gk_data[0];
357 
358  return M0_RC(0);
359 }
360 
361 #define FID_KEY_INIT(__fid) (struct fid_key) { \
362  .fk_gkey = { \
363  .gk_length = sizeof(*(__fid)), \
364  }, \
365  .fk_fid = *(__fid), \
366 }
367 
368 #define GENERIC_VALUE_INIT(__size) (struct generic_value) { \
369  .gv_length = __size, \
370  .gv_version = M0_CRV_INIT_NONE, \
371 }
372 
373 #define META_VALUE_INIT(__ctg_ptr) (struct meta_value) { \
374  .mv_gval = GENERIC_VALUE_INIT(sizeof(__ctg_ptr)), \
375  .mv_ctg = (__ctg_ptr), \
376 }
377 
378 #define LAYOUT_VALUE_INIT(__layout) (struct layout_value) { \
379  .lv_gval = GENERIC_VALUE_INIT(sizeof(*(__layout))), \
380  .lv_layout = *(__layout), \
381 }
382 
383 static m0_bcount_t ctg_ksize(const void *opaque_key)
384 {
385  const struct generic_key *key = opaque_key;
386  return sizeof(*key) + key->gk_length;
387 }
388 
389 static m0_bcount_t ctg_vsize(const void *opaque_val)
390 {
391  const struct generic_value *val = opaque_val;
392  return sizeof(*val) + val->gv_length;
393 }
394 
395 static int ctg_cmp(const void *opaque_key_left, const void *opaque_key_right)
396 {
397  const struct generic_key *left = opaque_key_left;
398  const struct generic_key *right = opaque_key_right;
399 
400  /*
401  * XXX: Origianally, there was an assertion to ensure on-disk data
402  * is correct, and it looked like that:
403  * u64 len = *(u64 *)key;
404  * u64 knob = 8 + len;
405  * assert(knob >= 8);
406  * The problem here is that "len" is always >= 0 (because it is a u64).
407  * Therefore, the assertion knob >= 8 is always true as well.
408  */
409 
410  return memcmp(left->gk_data, right->gk_data,
411  min_check(left->gk_length, right->gk_length)) ?:
412  M0_3WAY(left->gk_length, right->gk_length);
413 }
414 
415 static void ctg_init(struct m0_cas_ctg *ctg, struct m0_be_seg *seg)
416 {
418  .ot_version = M0_CAS_CTG_FORMAT_VERSION,
419  .ot_type = M0_FORMAT_TYPE_CAS_CTG,
420  .ot_footer_offset = offsetof(struct m0_cas_ctg, cc_foot)
421  });
422  M0_ENTRY();
427  ctg->cc_inited = true;
429 }
430 
431 static void ctg_fini(struct m0_cas_ctg *ctg)
432 {
433  M0_ENTRY("ctg=%p", ctg);
434  ctg->cc_inited = false;
435  m0_be_btree_fini(&ctg->cc_tree);
439 }
440 
441 int m0_ctg_create(struct m0_be_seg *seg, struct m0_be_tx *tx,
442  struct m0_cas_ctg **out,
443  const struct m0_fid *cas_fid)
444 {
445  struct m0_cas_ctg *ctg;
446  int rc;
447 
448  if (M0_FI_ENABLED("ctg_create_failure"))
449  return M0_ERR(-EFAULT);
450 
451  M0_BE_ALLOC_PTR_SYNC(ctg, seg, tx);
452  if (ctg == NULL)
453  return M0_ERR(-ENOMEM);
454 
455  ctg_init(ctg, seg);
457  m0_be_btree_create(&ctg->cc_tree, tx, &op,
458  &M0_FID_TINIT('b',
460  cas_fid->f_key)),
461  bo_u.u_btree.t_rc);
462  if (rc != 0) {
463  ctg_fini(ctg);
464  M0_BE_FREE_PTR_SYNC(ctg, seg, tx);
465  }
466  else
467  *out = ctg;
468  return M0_RC(rc);
469 }
470 
471 static void ctg_destroy(struct m0_cas_ctg *ctg, struct m0_be_tx *tx)
472 {
473  M0_ENTRY();
475  ctg_fini(ctg);
477 }
478 
479 M0_INTERNAL void m0_ctg_fini(struct m0_fom *fom,
480  struct m0_cas_ctg *ctg)
481 {
482  if (ctg != NULL && ctg->cc_inited) {
483  struct m0_dtx *dtx = &fom->fo_tx;
484 
485  ctg_fini(ctg);
486  /*
487  * TODO: implement asynchronous free after memory free API
488  * added into ctg_exec in the scope of asynchronous ctidx
489  * operations task.
490  */
491  if (dtx->tx_state != M0_DTX_INVALID) {
492  struct m0_be_tx *tx = &fom->fo_tx.tx_betx;
493  if (m0_be_tx_state(tx) == M0_BTS_ACTIVE)
495  cas_seg(tx->t_engine->eng_domain), tx);
496  }
497  }
498 }
499 
503 M0_INTERNAL int m0_ctg_meta_find_ctg(struct m0_cas_ctg *meta,
504  const struct m0_fid *ctg_fid,
505  struct m0_cas_ctg **ctg)
506 {
507  struct fid_key key_data = FID_KEY_INIT(ctg_fid);
508  struct m0_buf key = M0_BUF_INIT(sizeof(key_data),
509  &key_data);
510  struct m0_be_btree_anchor anchor;
511  int rc;
512 
513  *ctg = NULL;
514 
517  &op,
518  &key,
519  &anchor),
520  bo_u.u_btree.t_rc) ?:
521  ctg_vbuf_unpack(&anchor.ba_value, NULL) ?:
522  ctg_vbuf_as_ctg(&anchor.ba_value, ctg);
523 
524  m0_be_btree_release(NULL, &anchor);
525  return M0_RC(rc);
526 }
527 
528 M0_INTERNAL int m0_ctg__meta_insert(struct m0_be_btree *meta,
529  const struct m0_fid *fid,
530  struct m0_cas_ctg *ctg,
531  struct m0_be_tx *tx)
532 {
533  struct fid_key key_data = FID_KEY_INIT(fid);
534  struct meta_value val_data = META_VALUE_INIT(ctg);
535  struct m0_buf key = M0_BUF_INIT_PTR(&key_data);
536  struct m0_buf value = M0_BUF_INIT_PTR(&val_data);
537  struct m0_be_btree_anchor anchor = {};
538  int rc;
539 
540  /*
541  * NOTE: ctg may be equal to NULL.
542  * Meta-catalogue is a catalogue. Also, it keeps a list of all
543  * catalogues. In order to avoid paradoxes, we allow this function to
544  * accept an empty pointer when the meta-catalogue is trying to
545  * insert itself into the list (see ::ctg_meta_selfadd).
546  * It also helps to generalise listing of catalogues
547  * (see ::m0_ctg_try_init).
548  */
550 
551  anchor.ba_value.b_nob = value.b_nob;
553  m0_be_btree_insert_inplace(meta, tx, &op,
554  &key, &anchor,
556  bo_u.u_btree.t_rc);
557  if (rc == 0)
558  m0_buf_memcpy(&anchor.ba_value, &value);
559 
560  m0_be_btree_release(tx, &anchor);
561  return M0_RC(rc);
562 }
563 
564 static int ctg_meta_selfadd(struct m0_be_btree *meta,
565  struct m0_be_tx *tx)
566 {
567  return m0_ctg__meta_insert(meta, &m0_cas_meta_fid, NULL, tx);
568 }
569 
570 static void ctg_meta_delete(struct m0_be_btree *meta,
571  const struct m0_fid *fid,
572  struct m0_be_tx *tx)
573 {
574  struct fid_key key_data = FID_KEY_INIT(fid);
575  struct m0_buf key = M0_BUF_INIT_PTR(&key_data);
576 
577  M0_ENTRY();
578  M0_BE_OP_SYNC(op, m0_be_btree_delete(meta, tx, &op, &key));
579 }
580 
581 static void ctg_meta_selfrm(struct m0_be_btree *meta, struct m0_be_tx *tx)
582 {
583  return ctg_meta_delete(meta, &m0_cas_meta_fid, tx);
584 }
585 
587  m0_bcount_t nr,
588  struct m0_be_tx_credit *accum)
589 {
590  struct m0_be_seg *seg = bt->bb_seg;
591  struct m0_cas_ctg *ctg;
592 
594  sizeof(struct fid_key),
595  sizeof(struct meta_value),
596  accum);
597  /*
598  * Will allocate space for cas_ctg body then put ptr to it into btree.
599  */
600  M0_BE_ALLOC_CREDIT_PTR(ctg, seg, accum);
601 }
602 
604  m0_bcount_t nr,
605  struct m0_be_tx_credit *accum)
606 {
607  struct m0_be_seg *seg = bt->bb_seg;
608  struct m0_cas_ctg *ctg;
609 
611  sizeof(struct fid_key),
612  sizeof(struct meta_value),
613  accum);
614  M0_BE_FREE_CREDIT_PTR(ctg, seg, accum);
615 }
616 
618  struct m0_cas_state *state,
619  struct m0_cas_ctg *ctidx,
620  struct m0_be_tx_credit *cred)
621 {
622  struct m0_be_btree dummy = {};
623 
624  dummy.bb_seg = seg;
625 
627  M0_BE_ALLOC_CREDIT_PTR(state, seg, cred);
628  M0_BE_ALLOC_CREDIT_PTR(ctidx, seg, cred);
629  /*
630  * Credits for dead_index catalogue descriptor.
631  */
632  M0_BE_ALLOC_CREDIT_PTR(ctidx, seg, cred);
633  /*
634  * Credits for 3 trees: meta, ctidx, dead_index.
635  */
636  m0_be_btree_create_credit(&dummy, 3, cred);
637  ctg_meta_insert_credit(&dummy, 3, cred);
638  /* Error case: tree destruction and freeing. */
639  ctg_meta_delete_credit(&dummy, 3, cred);
641  M0_BE_FREE_CREDIT_PTR(state, seg, cred);
642  M0_BE_FREE_CREDIT_PTR(ctidx, seg, cred);
643  /*
644  * Free for dead_index.
645  */
646  M0_BE_FREE_CREDIT_PTR(ctidx, seg, cred);
647 }
648 
649 static int ctg_state_create(struct m0_be_seg *seg,
650  struct m0_be_tx *tx,
651  struct m0_cas_state **state)
652 {
653  struct m0_cas_state *out;
654  struct m0_be_btree *bt;
655  int rc;
656 
657  M0_ENTRY();
658  *state = NULL;
660  if (out == NULL)
661  return M0_ERR(-ENOSPC);
662  m0_format_header_pack(&out->cs_header, &(struct m0_format_tag){
663  .ot_version = M0_CAS_STATE_FORMAT_VERSION,
664  .ot_type = M0_FORMAT_TYPE_CAS_STATE,
665  .ot_footer_offset = offsetof(struct m0_cas_state, cs_footer)
666  });
667  rc = m0_ctg_create(seg, tx, &out->cs_meta, &m0_cas_meta_fid);
668  if (rc == 0) {
669  bt = &out->cs_meta->cc_tree;
670  rc = ctg_meta_selfadd(bt, tx);
671  if (rc != 0)
672  ctg_destroy(out->cs_meta, tx);
673  }
674  if (rc != 0)
676  else
677  *state = out;
678  return M0_RC(rc);
679 }
680 
681 static void ctg_state_destroy(struct m0_cas_state *state,
682  struct m0_be_tx *tx)
683 {
684  struct m0_cas_ctg *meta = state->cs_meta;
685  struct m0_be_seg *seg = meta->cc_tree.bb_seg;
686 
687  ctg_meta_selfrm(&meta->cc_tree, tx);
688  ctg_destroy(meta, tx);
689  M0_BE_FREE_PTR_SYNC(state, seg, tx);
690 }
691 
695 static int ctg_store__init(struct m0_be_seg *seg, struct m0_cas_state *state)
696 {
697  int rc;
698 
699  M0_ENTRY();
700 
701  ctg_store.cs_state = state;
703  ctg_init(state->cs_meta, seg);
704 
705  /* Searching for catalogue-index catalogue. */
707  &ctg_store.cs_ctidx) ?:
710  if (rc == 0) {
713  } else {
716  }
717  return M0_RC(rc);
718 }
719 
725 static int ctg_store_create(struct m0_be_seg *seg)
726 {
727  /*
728  * Currently catalog store has dictionary consisting of 3 catalogues:
729  * meta itself, ctidx and dead_index.
730  */
731  struct m0_cas_state *state = NULL;
732  struct m0_cas_ctg *ctidx = NULL;
733  struct m0_cas_ctg *dead_index = NULL;
734  struct m0_be_tx tx = {};
735  struct m0_be_tx_credit cred = M0_BE_TX_CREDIT(0, 0);
736  struct m0_sm_group *grp = m0_locality0_get()->lo_grp;
737  int rc;
738 
739  M0_ENTRY();
741  m0_be_tx_init(&tx, 0, seg->bs_domain, grp, NULL, NULL, NULL, NULL);
742 
743  ctg_store_init_creds_calc(seg, state, ctidx, &cred);
744 
745  m0_be_tx_prep(&tx, &cred);
747  if (rc != 0) {
748  m0_be_tx_fini(&tx);
749  return M0_ERR(rc);
750  }
751 
752  rc = ctg_state_create(seg, &tx, &state);
753  if (rc != 0)
754  goto end;
756 
757  /* Create catalog-index catalogue. */
758  rc = m0_ctg_create(seg, &tx, &ctidx, &m0_cas_ctidx_fid);
759  if (rc != 0)
760  goto state_destroy;
761  /*
762  * Insert catalogue-index catalogue into meta-catalogue.
763  */
765  ctidx, &tx);
766  if (rc != 0)
767  goto ctidx_destroy;
768 
769  /*
770  * Create place for records deleted from meta (actually moved there).
771  */
772  rc = m0_ctg_create(seg, &tx, &dead_index, &m0_cas_dead_index_fid);
773  if (rc != 0)
774  goto ctidx_destroy;
775  /*
776  * Insert "dead index" catalogue into meta-catalogue.
777  */
779  &m0_cas_dead_index_fid, dead_index, &tx);
780  if (rc != 0)
781  goto dead_index_destroy;
782 
783  rc = m0_be_seg_dict_insert(seg, &tx, cas_state_key, state);
784  if (rc != 0)
785  goto dead_index_delete;
786  M0_BE_TX_CAPTURE_PTR(seg, &tx, ctidx);
787  M0_BE_TX_CAPTURE_PTR(seg, &tx, dead_index);
789  M0_BE_TX_CAPTURE_PTR(seg, &tx, state);
790  ctg_store.cs_state = state;
791  ctg_store.cs_ctidx = ctidx;
792  ctg_store.cs_dead_index = dead_index;
793  goto end;
794 dead_index_delete:
797  &tx);
798 dead_index_destroy:
799  ctg_destroy(dead_index, &tx);
802  &tx);
803 ctidx_destroy:
804  ctg_destroy(ctidx, &tx);
805 state_destroy:
806  ctg_state_destroy(state, &tx);
807 end:
808  m0_be_tx_close_sync(&tx);
809  m0_be_tx_fini(&tx);
811  return M0_RC(rc);
812 }
813 
814 M0_INTERNAL int m0_ctg_store_init(struct m0_be_domain *dom)
815 {
816  struct m0_be_seg *seg = cas_seg(dom);
817  struct m0_cas_state *state = NULL;
818  int result;
819 
820  M0_ENTRY();
824  result = 0;
825  goto end;
826  }
827 
831  result = m0_be_seg_dict_lookup(seg, cas_state_key, (void **)&state);
832  if (result == 0) {
836  M0_ASSERT(state != NULL);
838  "cas_state from storage: cs_meta %p, cs_rec_nr %"PRIx64,
839  state->cs_meta, state->cs_rec_nr);
840  result = ctg_store__init(seg, state);
841  } else if (result == -ENOENT) {
842  M0_LOG(M0_DEBUG, "Ctg store state wasn't found on a disk.");
843  result = ctg_store_create(seg);
844  }
845 
846  if (result == 0) {
851  ctg_store.cs_initialised = true;
852  }
853 end:
855  return M0_RC(result);
856 }
857 
858 static void ctg_store_release(struct m0_ref *ref)
859 {
860  struct m0_ctg_store *ctg_store = M0_AMB(ctg_store, ref, cs_ref);
861 
862  M0_ENTRY();
867  ctg_store->cs_initialised = false;
868 }
869 
870 M0_INTERNAL void m0_ctg_store_fini(void)
871 {
872  M0_ENTRY();
874 }
875 
876 static void ctg_state_counter_add(uint64_t *counter, uint64_t val)
877 {
878  if (*counter != ~0ULL) {
879  if (*counter + val < *counter) {
880  M0_LOG(M0_WARN,
881  "Ctg store counter overflow: counter %"
882  PRIx64" addendum %"PRIx64, *counter, val);
883  *counter = ~0ULL;
884  } else {
885  *counter += val;
886  }
887  }
888 }
889 
890 static void ctg_state_counter_sub(uint64_t *counter, uint64_t val)
891 {
892  if (*counter != ~0ULL) {
893  M0_ASSERT(*counter - val <= *counter);
894  *counter -= val;
895  }
896 }
897 
898 static uint64_t ctg_state_update(struct m0_be_tx *tx, uint64_t size,
899  bool is_inc)
900 {
901  uint64_t *recs_nr = &ctg_store.cs_state->cs_rec_nr;
902  uint64_t *rec_size = &ctg_store.cs_state->cs_rec_size;
903  struct m0_be_seg *seg = cas_seg(tx->t_engine->eng_domain);
904 
905  if (M0_FI_ENABLED("test_overflow"))
906  size = ~0ULL - 1;
907 
909  /*
910  * recs_nr and rec_size counters update is done having possible overflow
911  * in mind. A counter value is sticked to ~0ULL in the case of overflow
912  * and further counter updates are ignored.
913  */
914  if (is_inc) {
915  /*
916  * Overflow is unlikely. If it happens, then calculation of DIX
917  * repair/re-balance progress may be incorrect.
918  */
919  ctg_state_counter_add(recs_nr, 1);
920  /*
921  * Overflow is possible, because total size is not decremented
922  * on record deletion.
923  */
924  ctg_state_counter_add(rec_size, size);
925  } else {
926  M0_ASSERT(*recs_nr != 0);
927  ctg_state_counter_sub(recs_nr, 1);
928  ctg_state_counter_sub(rec_size, size);
929  }
932 
933  M0_LOG(M0_DEBUG, "ctg_state_update: rec_nr %"PRIx64 " rec_size %"PRIx64,
937  return *recs_nr;
938 }
939 
940 M0_INTERNAL void m0_ctg_state_inc_update(struct m0_be_tx *tx, uint64_t size)
941 {
942  (void)ctg_state_update(tx, size, true);
943 }
944 
945 static void ctg_state_dec_update(struct m0_be_tx *tx, uint64_t size)
946 {
947  (void)ctg_state_update(tx, size, false);
948 }
949 
953 M0_INTERNAL void m0_ctg_try_init(struct m0_cas_ctg *ctg)
954 {
955  M0_ENTRY();
957  /*
958  * ctg is null if this is entry for Meta: it is not filled.
959  */
960  if (ctg != NULL && !ctg->cc_inited) {
961  M0_LOG(M0_DEBUG, "ctg_init %p", ctg);
963  } else
964  M0_LOG(M0_DEBUG, "ctg %p zero or inited", ctg);
966 }
967 
969 static bool ctg_is_ordinary(const struct m0_cas_ctg *ctg)
970 {
971  return !M0_IN(ctg, (m0_ctg_dead_index(), m0_ctg_ctidx(),
972  m0_ctg_meta()));
973 }
974 
975 static bool ctg_op_cb(struct m0_clink *clink)
976 {
977  struct m0_ctg_op *ctg_op = M0_AMB(ctg_op, clink, co_clink);
978  struct m0_be_op *op = ctg_beop(ctg_op);
979  int opc = ctg_op->co_opcode;
980  int ct = ctg_op->co_ct;
981  struct m0_be_tx *tx = &ctg_op->co_fom->fo_tx.tx_betx;
982  struct m0_chan *ctg_chan = &ctg_op->co_ctg->cc_chan.bch_chan;
983  int rc;
984  struct m0_cas_ctg *ctg;
985  struct fid_key *fk;
986  struct meta_value *mv;
987 
988  if (op->bo_sm.sm_state != M0_BOS_DONE)
989  return true;
990 
991  /* Versioned API is implemented with synchronous btree operations. */
992  if (ctg_op->co_is_versioned)
993  return true;
994 
995  rc = ctg_berc(ctg_op);
996  if (rc == 0) {
997  switch (CTG_OP_COMBINE(opc, ct)) {
1000  ctg_op->co_out_val = ctg_op->co_anchor.ba_value;
1001  rc = ctg_vbuf_unpack(&ctg_op->co_out_val, NULL);
1002  if (rc == 0 && ct == CT_META) {
1003  rc = ctg_vbuf_as_ctg(&ctg_op->co_out_val, &ctg);
1004  if (rc == 0)
1005  m0_ctg_try_init(ctg);
1006  }
1007  break;
1009  if (ctg_is_ordinary(ctg_op->co_ctg))
1010  ctg_state_dec_update(tx, 0);
1011  /* Fall through. */
1012  case CTG_OP_COMBINE(CO_DEL, CT_META):
1016  case CTG_OP_COMBINE(CO_GC, CT_META):
1017  m0_chan_broadcast_lock(ctg_chan);
1018  break;
1020  rc = ctg_kbuf_unpack(&ctg_op->co_out_key);
1021  break;
1022  case CTG_OP_COMBINE(CO_CUR, CT_META):
1025  &ctg_op->co_out_key,
1026  &ctg_op->co_out_val);
1027  rc = ctg_kbuf_unpack(&ctg_op->co_out_key) ?:
1028  ctg_vbuf_unpack(&ctg_op->co_out_val, NULL);
1029  if (rc == 0 && ct == CT_META) {
1030  rc = ctg_vbuf_as_ctg(&ctg_op->co_out_val, &ctg);
1031  if (rc == 0)
1032  m0_ctg_try_init(ctg);
1033  }
1034  break;
1036  ctg_vbuf_pack(&ctg_op->co_anchor.ba_value,
1037  &ctg_op->co_val, &M0_CRV_INIT_NONE);
1038  if (ctg_is_ordinary(ctg_op->co_ctg))
1040  ctg_op->co_key.b_nob -
1041  sizeof(struct generic_key) +
1042  ctg_op->co_val.b_nob);
1043  else
1044  m0_chan_broadcast_lock(ctg_chan);
1045  break;
1046  case CTG_OP_COMBINE(CO_PUT, CT_META):
1047  fk = ctg_op->co_key.b_addr;
1048  mv = ctg_op->co_anchor.ba_value.b_addr;
1050  tx, &ctg, &fk->fk_fid);
1051  if (rc == 0) {
1052  *mv = META_VALUE_INIT(ctg);
1053  m0_chan_broadcast_lock(ctg_chan);
1054  }
1055  break;
1057  /*
1058  * Copy user provided buffer to the buffer allocated in
1059  * BE and capture it.
1060  */
1061  m0_buf_memcpy(&ctg_op->co_mem_buf, &ctg_op->co_val);
1063  &ctg_op->co_fom->fo_tx.tx_betx,
1064  &ctg_op->co_mem_buf);
1065  break;
1067  /* Nothing to do. */
1068  break;
1069  }
1070  }
1071 
1072  if (opc == CO_CUR) {
1073  /* Always finalise BE operation of the cursor. */
1074  m0_be_op_fini(&ctg_op->co_cur.bc_op);
1075  M0_SET0(&ctg_op->co_cur.bc_op);
1076  }
1077 
1078  if (opc == CO_PUT &&
1079  ctg_op->co_flags & COF_CREATE &&
1080  rc == -EEXIST)
1081  rc = 0;
1082 
1083  ctg_op->co_rc = M0_RC(rc);
1085  /*
1086  * This callback may be called directly from ctg_op_tick_ret()
1087  * without adding of clink to the channel.
1088  */
1089  if (m0_clink_is_armed(clink))
1091 
1092  return true;
1093 }
1094 
1095 static int ctg_op_tick_ret(struct m0_ctg_op *ctg_op,
1096  int next_state)
1097 {
1098  struct m0_chan *chan = &ctg_op->co_channel;
1099  struct m0_fom *fom = ctg_op->co_fom;
1100  struct m0_clink *clink = &ctg_op->co_clink;
1101  struct m0_be_op *op = ctg_beop(ctg_op);
1102  int ret = M0_FSO_AGAIN;
1103  bool op_is_active;
1104 
1105  m0_be_op_lock(op);
1106  M0_PRE(M0_IN(op->bo_sm.sm_state, (M0_BOS_ACTIVE, M0_BOS_DONE)));
1107 
1108  op_is_active = op->bo_sm.sm_state == M0_BOS_ACTIVE;
1109  if (op_is_active) {
1110  ret = M0_FSO_WAIT;
1111  m0_clink_add(&op->bo_sm.sm_chan, clink);
1112 
1113  m0_chan_lock(chan);
1114  m0_fom_wait_on(fom, chan, &fom->fo_cb);
1116  }
1118 
1119  if (!op_is_active)
1120  clink->cl_cb(clink);
1121  /*
1122  * In some cases we don't want to set the next phase
1123  * if this function is used as helper for something
1124  * else. We use it for lookup on "del" op to populate
1125  * the deleted value into the FDMI record.
1126  */
1127  if (next_state >= 0)
1128  m0_fom_phase_set(fom, next_state);
1129 
1130  return ret;
1131 }
1132 
1133 /*
1134  * Returns a bitmap of zones that could be used within the given operation.
1135  * See ::m0_be_alloc_zone_type.
1136  */
1137 static uint32_t ctg_op_zones(const struct m0_ctg_op *ctg_op)
1138 {
1139  return M0_BITS(M0_BAP_NORMAL) |
1140  ((ctg_op->co_flags & COF_RESERVE) ? M0_BITS(M0_BAP_REPAIR) : 0);
1141 }
1142 
1143 static int ctg_op_exec_normal(struct m0_ctg_op *ctg_op, int next_phase)
1144 {
1145  struct m0_buf *key = &ctg_op->co_key;
1146  struct m0_be_btree *btree = &ctg_op->co_ctg->cc_tree;
1147  struct m0_be_btree_anchor *anchor = &ctg_op->co_anchor;
1148  struct m0_be_btree_cursor *cur = &ctg_op->co_cur;
1149  struct m0_be_tx *tx = &ctg_op->co_fom->fo_tx.tx_betx;
1150  struct m0_be_op *beop = ctg_beop(ctg_op);
1151  int opc = ctg_op->co_opcode;
1152  int ct = ctg_op->co_ct;
1153  uint64_t zones = ctg_op_zones(ctg_op);
1154  M0_ENTRY("opc=%d ct=%d", opc, ct);
1155 
1156  switch (CTG_OP_COMBINE(opc, ct)) {
1158  anchor->ba_value.b_nob = sizeof(struct generic_value) +
1159  ctg_op->co_val.b_nob;
1160  m0_be_btree_save_inplace(btree, tx, beop, key, anchor,
1161  !!(ctg_op->co_flags & COF_OVERWRITE),
1162  zones);
1163  break;
1164  case CTG_OP_COMBINE(CO_PUT, CT_META):
1165  M0_ASSERT(!(ctg_op->co_flags & COF_OVERWRITE));
1166  anchor->ba_value.b_nob = sizeof(struct meta_value);
1167  m0_be_btree_insert_inplace(btree, tx, beop, key, anchor, zones);
1168  break;
1170  /*
1171  * No need a value in dead index, but, seems, must put something
1172  * there. Do not fill anything in the callback after
1173  * m0_be_btree_insert_inplace() have 0 there.
1174  */
1175  anchor->ba_value.b_nob = sizeof(struct generic_value);
1176  m0_be_btree_insert_inplace(btree, tx, beop, key, anchor, zones);
1177  break;
1179  case CTG_OP_COMBINE(CO_GET, CT_META):
1180  m0_be_btree_lookup_inplace(btree, beop, key, anchor);
1181  break;
1183  m0_be_btree_minkey(btree, beop, &ctg_op->co_out_key);
1184  break;
1186  m0_be_btree_truncate(btree, tx, beop, ctg_op->co_cnt);
1187  break;
1189  m0_be_btree_destroy(btree, tx, beop);
1190  break;
1192  m0_be_btree_delete(btree, tx, beop, key);
1193  break;
1194  case CTG_OP_COMBINE(CO_DEL, CT_META):
1195  m0_be_btree_delete(btree, tx, beop, key);
1196  break;
1197  case CTG_OP_COMBINE(CO_GC, CT_META):
1198  m0_cas_gc_wait_async(beop);
1199  break;
1201  case CTG_OP_COMBINE(CO_CUR, CT_META):
1202  M0_ASSERT(M0_IN(ctg_op->co_cur_phase, (CPH_GET, CPH_NEXT)));
1203  if (ctg_op->co_cur_phase == CPH_GET)
1205  !!(ctg_op->co_flags & COF_SLANT));
1206  else
1208  break;
1209  }
1210 
1211  return M0_RC(ctg_op_tick_ret(ctg_op, next_phase));
1212 }
1213 
1214 static int ctg_op_exec_versioned(struct m0_ctg_op *ctg_op, int next_phase)
1215 {
1216  int rc;
1217  int opc = ctg_op->co_opcode;
1218  int ct = ctg_op->co_ct;
1219  struct m0_be_op *beop = ctg_beop(ctg_op);
1220  bool alive_only = !(ctg_op->co_flags & COF_SHOW_DEAD);
1221  M0_ENTRY();
1222 
1223  M0_PRE(ctg_is_ordinary(ctg_op->co_ctg));
1224  M0_PRE(ct == CT_BTREE);
1225  M0_PRE(ergo(opc == CO_CUR,
1226  M0_IN(ctg_op->co_cur_phase, (CPH_GET, CPH_NEXT))));
1227 
1228  switch (CTG_OP_COMBINE(opc, ct)) {
1231  rc = versioned_put_sync(ctg_op);
1232  break;
1234  rc = versioned_get_sync(ctg_op);
1235  break;
1237  if (ctg_op->co_cur_phase == CPH_GET)
1238  rc = versioned_cursor_get_sync(ctg_op, alive_only);
1239  else
1240  rc = versioned_cursor_next_sync(ctg_op, alive_only);
1241  break;
1242  default:
1243  M0_IMPOSSIBLE("The other operations are not allowed here.");
1244  }
1245 
1246  ctg_op->co_rc = rc;
1247 
1248  /* It is either untouched or DONE right away. */
1249  M0_ASSERT(M0_IN(beop->bo_sm.sm_state, (M0_BOS_INIT, M0_BOS_DONE)));
1250  /* Only cursor operation may reach DONE state. */
1251  M0_ASSERT(ergo(beop->bo_sm.sm_state == M0_BOS_DONE, opc == CO_CUR));
1252 
1253  if (opc == CO_CUR) {
1254  /*
1255  * BE cursor has its own BE operation. It gets initialised
1256  * everytime in m0_ctg_cursor_* and m0_ctg_meta_cursor_*.
1257  * Since a ctg op may be re-used (for example, as a part
1258  * of GET+NEXT sequence of calls), we need to make it
1259  * ready to be initialised again.
1260  */
1261  m0_be_op_fini(beop);
1262  M0_SET0(beop);
1263  } else {
1264  /*
1265  * Non-CO_CUR operations are completely ignoring the BE op
1266  * embedded into ctg_op. They are operating in synchrnonous
1267  * manner on local ops. Thus, we need to advance the state
1268  * of the embedded op to let the other parts function
1269  * properly (for example, ::m0_ctg_lookup_result).
1270  */
1271  m0_be_op_active(beop);
1272  m0_be_op_done(beop);
1273  }
1274 
1275  if (next_phase >= 0)
1276  m0_fom_phase_set(ctg_op->co_fom, next_phase);
1277  return M0_RC(M0_FSO_AGAIN);
1278 }
1279 
1280 static int ctg_op_exec(struct m0_ctg_op *ctg_op, int next_phase)
1281 {
1282  M0_ENTRY();
1283  ctg_op->co_is_versioned = ctg_op_is_versioned(ctg_op);
1284 
1285  return ctg_op->co_is_versioned ?
1286  ctg_op_exec_versioned(ctg_op, next_phase) :
1287  ctg_op_exec_normal(ctg_op, next_phase);
1288 }
1289 
1290 static int ctg_mem_op_exec(struct m0_ctg_op *ctg_op, int next_phase)
1291 {
1292  struct m0_be_tx *tx = &ctg_op->co_fom->fo_tx.tx_betx;
1293  struct m0_be_op *beop = ctg_beop(ctg_op);
1294  int opc = ctg_op->co_opcode;
1295  int ct = ctg_op->co_ct;
1296 
1297  switch (CTG_OP_COMBINE(opc, ct)) {
1299  M0_BE_ALLOC_BUF(&ctg_op->co_mem_buf,
1300  cas_seg(tx->t_engine->eng_domain), tx, beop);
1301  break;
1304  cas_seg(tx->t_engine->eng_domain), tx, beop);
1305  break;
1306  }
1307 
1308  return ctg_op_tick_ret(ctg_op, next_phase);
1309 }
1310 
1311 static int ctg_meta_exec(struct m0_ctg_op *ctg_op,
1312  const struct m0_fid *fid,
1313  int next_phase)
1314 {
1315  int ret;
1316  M0_ENTRY();
1317 
1318  ctg_op->co_ctg = ctg_store.cs_state->cs_meta;
1319  ctg_op->co_ct = CT_META;
1320 
1321  if (ctg_op->co_opcode != CO_GC &&
1322  (ctg_op->co_opcode != CO_CUR ||
1323  ctg_op->co_cur_phase != CPH_NEXT))
1324  ctg_op->co_rc = ctg_kbuf_get(&ctg_op->co_key,
1325  &M0_BUF_INIT_PTR_CONST(fid), true);
1326  if (ctg_op->co_rc != 0) {
1327  ret = M0_FSO_AGAIN;
1328  m0_fom_phase_set(ctg_op->co_fom, next_phase);
1329  } else
1330  ret = ctg_op_exec(ctg_op, next_phase);
1331 
1332  return M0_RC(ret);
1333 }
1334 
1335 
1336 M0_INTERNAL int m0_ctg_meta_insert(struct m0_ctg_op *ctg_op,
1337  const struct m0_fid *fid,
1338  int next_phase)
1339 {
1340  M0_PRE(ctg_op != NULL);
1341  M0_PRE(fid != NULL);
1342  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1343 
1344  ctg_op->co_opcode = CO_PUT;
1345 
1346  return ctg_meta_exec(ctg_op, fid, next_phase);
1347 }
1348 
1349 M0_INTERNAL int m0_ctg_gc_wait(struct m0_ctg_op *ctg_op,
1350  int next_phase)
1351 {
1352  M0_ENTRY();
1353 
1354  M0_PRE(ctg_op != NULL);
1355  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1356 
1357  ctg_op->co_opcode = CO_GC;
1358 
1359  return ctg_meta_exec(ctg_op, 0, next_phase);
1360 }
1361 
1362 M0_INTERNAL int m0_ctg_meta_lookup(struct m0_ctg_op *ctg_op,
1363  const struct m0_fid *fid,
1364  int next_phase)
1365 {
1366  M0_PRE(ctg_op != NULL);
1367  M0_PRE(fid != NULL);
1368  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1369  M0_ENTRY();
1370 
1371  ctg_op->co_opcode = CO_GET;
1372 
1373  return ctg_meta_exec(ctg_op, fid, next_phase);
1374 }
1375 
1376 M0_INTERNAL
1378 {
1379  struct m0_cas_ctg *ctg = NULL;
1380 
1381  M0_PRE(ctg_op != NULL);
1382  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_DONE);
1383  M0_PRE(ctg_op->co_opcode == CO_GET);
1384  M0_PRE(ctg_op->co_ct == CT_META);
1385 
1386  ctg_op->co_rc = ctg_op->co_rc ?:
1387  ctg_vbuf_as_ctg(&ctg_op->co_out_val, &ctg);
1388 
1389  return ctg;
1390 }
1391 
1392 M0_INTERNAL int m0_ctg_meta_delete(struct m0_ctg_op *ctg_op,
1393  const struct m0_fid *fid,
1394  int next_phase)
1395 {
1396  M0_PRE(ctg_op != NULL);
1397  M0_PRE(fid != NULL);
1398  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1399 
1400  ctg_op->co_opcode = CO_DEL;
1401 
1402  return ctg_meta_exec(ctg_op, fid, next_phase);
1403 }
1404 
1405 M0_INTERNAL int m0_ctg_dead_index_insert(struct m0_ctg_op *ctg_op,
1406  struct m0_cas_ctg *ctg,
1407  int next_phase)
1408 {
1409  ctg_op->co_ctg = m0_ctg_dead_index();
1410  ctg_op->co_ct = CT_DEAD_INDEX;
1411  ctg_op->co_opcode = CO_PUT;
1412  /* Dead index value is empty */
1413  ctg_op->co_val = M0_BUF_INIT0;
1414  /* Dead index key is a pointer to a catalogue */
1415  return ctg_exec(ctg_op, ctg, &M0_BUF_INIT_PTR(&ctg), next_phase);
1416 }
1417 
1418 static int ctg_exec(struct m0_ctg_op *ctg_op,
1419  struct m0_cas_ctg *ctg,
1420  const struct m0_buf *key,
1421  int next_phase)
1422 {
1423  int ret = M0_FSO_AGAIN;
1424 
1425  ctg_op->co_ctg = ctg;
1426  ctg_op->co_ct = CT_BTREE;
1427 
1428  if (!M0_IN(ctg_op->co_opcode, (CO_MIN, CO_TRUNC, CO_DROP)) &&
1429  (ctg_op->co_opcode != CO_CUR ||
1430  ctg_op->co_cur_phase != CPH_NEXT))
1431  ctg_op->co_rc = ctg_kbuf_get(&ctg_op->co_key, key, true);
1432 
1433  if (ctg_op->co_rc != 0)
1434  m0_fom_phase_set(ctg_op->co_fom, next_phase);
1435  else
1436  ret = ctg_op_exec(ctg_op, next_phase);
1437 
1438  return ret;
1439 }
1440 
1441 static int ctg_mem_exec(struct m0_ctg_op *ctg_op,
1442  int next_phase)
1443 {
1444  ctg_op->co_ct = CT_MEM;
1445  return ctg_mem_op_exec(ctg_op, next_phase);
1446 }
1447 
1448 M0_INTERNAL int m0_ctg_insert(struct m0_ctg_op *ctg_op,
1449  struct m0_cas_ctg *ctg,
1450  const struct m0_buf *key,
1451  const struct m0_buf *val,
1452  int next_phase)
1453 {
1454  M0_PRE(ctg_op != NULL);
1455  M0_PRE(ctg != NULL);
1456  M0_PRE(key != NULL);
1457  M0_PRE(val != NULL);
1458  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1459 
1460  ctg_op->co_opcode = CO_PUT;
1461  ctg_op->co_val = *val;
1462  return ctg_exec(ctg_op, ctg, key, next_phase);
1463 }
1464 
1465 M0_INTERNAL int m0_ctg_lookup_delete(struct m0_ctg_op *ctg_op,
1466  struct m0_cas_ctg *ctg,
1467  const struct m0_buf *key,
1468  struct m0_buf *val,
1469  int flags,
1470  int next_phase)
1471 {
1472  struct m0_fom *fom0;
1473  int ret = M0_FSO_AGAIN;
1474  int rc;
1475  M0_ENTRY();
1476 
1477  M0_PRE(ctg_op != NULL);
1478  M0_PRE(ctg != NULL);
1479  M0_PRE(key != NULL);
1480  M0_PRE(val != NULL);
1481  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1482 
1483  fom0 = ctg_op->co_fom;
1484 
1485  /* Lookup the value first. */
1486  ctg_op->co_ctg = ctg;
1487  ctg_op->co_ct = CT_BTREE;
1488  ctg_op->co_opcode = CO_GET;
1489 
1490  /* Here pass true to allow failures injection. */
1491  ctg_op->co_rc = ctg_kbuf_get(&ctg_op->co_key, key, true);
1492 
1493  /* Pass -1 as the next_phase to indicate we don't set it now. */
1494  if (ctg_op->co_rc == 0)
1495  ctg_op_exec(ctg_op, -1);
1496 
1497  /* It's fine if NOT found (e.g. for DEAD record)*/
1498  if (ctg_op->co_rc != 0 && ctg_op->co_rc != -ENOENT) {
1499  m0_fom_phase_set(fom0, next_phase);
1500  return ret;
1501  }
1502 
1503  /*
1504  * Copy value with allocation because it refers the memory
1505  * chunk that will not be avilable after the delete op.
1506  */
1507  rc = m0_buf_copy(val, &ctg_op->co_out_val);
1508  M0_ASSERT(rc == 0);
1509  m0_ctg_op_fini(ctg_op);
1510 
1511  /* Now delete the value by key. */
1512  m0_ctg_op_init(ctg_op, fom0, flags);
1513 
1514  ctg_op->co_ctg = ctg;
1515  ctg_op->co_ct = CT_BTREE;
1516  ctg_op->co_opcode = CO_DEL;
1517 
1518  /*
1519  * Here pass false to disallow failures injecations. Some
1520  * UT are tailored with idea in mind that only one injection
1521  * of particular type is touched at a run. ctg_buf_get() has
1522  * the falure injection that we need to avoid here to make
1523  * cas UT happy.
1524  */
1525  ctg_op->co_rc = ctg_kbuf_get(&ctg_op->co_key, key, false);
1526 
1527  if (ctg_op->co_rc != 0) {
1528  m0_buf_free(val);
1529  m0_fom_phase_set(fom0, next_phase);
1530  return ret;
1531  }
1532 
1533  ret = ctg_op_exec(ctg_op, next_phase);
1534  if (ctg_op->co_rc != 0)
1535  m0_buf_free(val);
1536 
1537  return ret;
1538 }
1539 
1540 M0_INTERNAL int m0_ctg_delete(struct m0_ctg_op *ctg_op,
1541  struct m0_cas_ctg *ctg,
1542  const struct m0_buf *key,
1543  int next_phase)
1544 {
1545  M0_PRE(ctg_op != NULL);
1546  M0_PRE(ctg != NULL);
1547  M0_PRE(key != NULL);
1548  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1549 
1550  ctg_op->co_opcode = CO_DEL;
1551 
1552  return ctg_exec(ctg_op, ctg, key, next_phase);
1553 }
1554 
1555 M0_INTERNAL int m0_ctg_lookup(struct m0_ctg_op *ctg_op,
1556  struct m0_cas_ctg *ctg,
1557  const struct m0_buf *key,
1558  int next_phase)
1559 {
1560  M0_PRE(ctg_op != NULL);
1561  M0_PRE(ctg != NULL);
1562  M0_PRE(key != NULL);
1563  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
1564 
1565  ctg_op->co_opcode = CO_GET;
1566 
1567  return ctg_exec(ctg_op, ctg, key, next_phase);
1568 }
1569 
1570 M0_INTERNAL void m0_ctg_lookup_result(struct m0_ctg_op *ctg_op,
1571  struct m0_buf *buf)
1572 {
1573  M0_PRE(ctg_op != NULL);
1574  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_DONE);
1575  M0_PRE(ctg_op->co_opcode == CO_GET);
1576  M0_PRE(ctg_op->co_ct == CT_BTREE);
1577  M0_PRE(ctg_op->co_rc == 0);
1578 
1579  *buf = ctg_op->co_out_val;
1580 }
1581 
1582 M0_INTERNAL void m0_ctg_op_get_ver(struct m0_ctg_op *ctg_op,
1583  struct m0_crv *out)
1584 {
1585  M0_PRE(ctg_op != NULL);
1586  M0_PRE(out != NULL);
1587  M0_PRE(ergo(!m0_crv_is_none(&ctg_op->co_out_ver),
1588  ctg_op->co_is_versioned));
1589 
1590  *out = ctg_op->co_out_ver;
1591 }
1592 
1593 M0_INTERNAL int m0_ctg_minkey(struct m0_ctg_op *ctg_op,
1594  struct m0_cas_ctg *ctg,
1595  int next_phase)
1596 {
1597  M0_PRE(ctg_op != NULL);
1598  M0_PRE(ctg != NULL);
1599 
1600  ctg_op->co_opcode = CO_MIN;
1601 
1602  return ctg_exec(ctg_op, ctg, NULL, next_phase);
1603 }
1604 
1605 
1606 M0_INTERNAL int m0_ctg_truncate(struct m0_ctg_op *ctg_op,
1607  struct m0_cas_ctg *ctg,
1608  m0_bcount_t limit,
1609  int next_phase)
1610 {
1611  M0_PRE(ctg_op != NULL);
1612  M0_PRE(ctg != NULL);
1613 
1614  ctg_op->co_opcode = CO_TRUNC;
1615  ctg_op->co_cnt = limit;
1616 
1617  return ctg_exec(ctg_op, ctg, NULL, next_phase);
1618 }
1619 
1620 M0_INTERNAL int m0_ctg_drop(struct m0_ctg_op *ctg_op,
1621  struct m0_cas_ctg *ctg,
1622  int next_phase)
1623 {
1624  M0_PRE(ctg_op != NULL);
1625  M0_PRE(ctg != NULL);
1626 
1627  ctg_op->co_opcode = CO_DROP;
1628 
1629  return ctg_exec(ctg_op, ctg, NULL, next_phase);
1630 }
1631 
1632 
1633 M0_INTERNAL bool m0_ctg_cursor_is_initialised(struct m0_ctg_op *ctg_op)
1634 {
1635  M0_PRE(ctg_op != NULL);
1636 
1637  return ctg_op->co_cur_initialised;
1638 }
1639 
1640 M0_INTERNAL void m0_ctg_cursor_init(struct m0_ctg_op *ctg_op,
1641  struct m0_cas_ctg *ctg)
1642 {
1643  M0_PRE(ctg_op != NULL);
1644  M0_PRE(ctg != NULL);
1645 
1646  if (ctg_op->co_cur_initialised)
1647  return;
1648 
1649  ctg_op->co_ctg = ctg;
1650  ctg_op->co_opcode = CO_CUR;
1651  ctg_op->co_cur_phase = CPH_INIT;
1652  M0_SET0(&ctg_op->co_cur.bc_op);
1653 
1655  &ctg_op->co_ctg->cc_tree);
1656 
1657  ctg_op->co_cur_initialised = true;
1658 }
1659 
1660 M0_INTERNAL int m0_ctg_cursor_get(struct m0_ctg_op *ctg_op,
1661  const struct m0_buf *key,
1662  int next_phase)
1663 {
1664  M0_PRE(ctg_op != NULL);
1665  M0_PRE(key != NULL);
1666 
1667  ctg_op->co_cur_phase = CPH_GET;
1668 
1669  /*
1670  * Key may be set if cursor get is called twice between ctg operation
1671  * init()/fini(), free it to avoid memory leak.
1672  */
1673  m0_buf_free(&ctg_op->co_key);
1674  /* Cursor has its own beop. */
1675  m0_be_op_init(&ctg_op->co_cur.bc_op);
1676 
1677  return ctg_exec(ctg_op, ctg_op->co_ctg, key, next_phase);
1678 }
1679 
1680 M0_INTERNAL int m0_ctg_cursor_next(struct m0_ctg_op *ctg_op,
1681  int next_phase)
1682 {
1683  M0_PRE(ctg_op != NULL);
1684  M0_PRE(ctg_op->co_cur_phase != CPH_INIT);
1685 
1686  ctg_op->co_cur_phase = CPH_NEXT;
1687 
1688  /* BE operation must be initialised each time when next is called. */
1689  m0_be_op_init(&ctg_op->co_cur.bc_op);
1690 
1691  return ctg_exec(ctg_op, ctg_op->co_ctg, NULL, next_phase);
1692 }
1693 
1694 M0_INTERNAL int m0_ctg_meta_cursor_next(struct m0_ctg_op *ctg_op,
1695  int next_phase)
1696 {
1697  M0_PRE(ctg_op != NULL);
1698  M0_PRE(ctg_op->co_cur_phase != CPH_INIT);
1699 
1700  ctg_op->co_cur_phase = CPH_NEXT;
1701 
1702  /* BE operation must be initialised each time when next called. */
1703  m0_be_op_init(&ctg_op->co_cur.bc_op);
1704 
1705  return ctg_op_exec(ctg_op, next_phase);
1706 }
1707 
1708 M0_INTERNAL void m0_ctg_cursor_kv_get(struct m0_ctg_op *ctg_op,
1709  struct m0_buf *key,
1710  struct m0_buf *val)
1711 {
1712  M0_PRE(ctg_op != NULL);
1713  M0_PRE(key != NULL);
1714  M0_PRE(val != NULL);
1715  M0_PRE(ctg_op->co_opcode == CO_CUR);
1716  M0_PRE(ctg_op->co_rc == 0);
1717 
1718  *key = ctg_op->co_out_key;
1719  *val = ctg_op->co_out_val;
1720 }
1721 
1722 M0_INTERNAL void m0_ctg_meta_cursor_init(struct m0_ctg_op *ctg_op)
1723 {
1724  M0_PRE(ctg_op != NULL);
1725 
1727 }
1728 
1729 M0_INTERNAL int m0_ctg_meta_cursor_get(struct m0_ctg_op *ctg_op,
1730  const struct m0_fid *fid,
1731  int next_phase)
1732 {
1733  M0_PRE(ctg_op != NULL);
1734  M0_PRE(fid != NULL);
1735 
1736  ctg_op->co_cur_phase = CPH_GET;
1737 
1738  /* Cursor has its own beop. */
1739  m0_be_op_init(&ctg_op->co_cur.bc_op);
1740 
1741  return ctg_meta_exec(ctg_op, fid, next_phase);
1742 }
1743 
1744 M0_INTERNAL void m0_ctg_cursor_put(struct m0_ctg_op *ctg_op)
1745 {
1746  m0_be_btree_cursor_put(&ctg_op->co_cur);
1747 }
1748 
1749 M0_INTERNAL void m0_ctg_cursor_fini(struct m0_ctg_op *ctg_op)
1750 {
1751  M0_PRE(ctg_op != NULL);
1752 
1753  m0_be_btree_cursor_fini(&ctg_op->co_cur);
1754  M0_SET0(&ctg_op->co_cur);
1755  ctg_op->co_cur_initialised = false;
1756  ctg_op->co_cur_phase = CPH_NONE;
1757 }
1758 
1759 M0_INTERNAL void m0_ctg_op_init(struct m0_ctg_op *ctg_op,
1760  struct m0_fom *fom,
1761  uint32_t flags)
1762 {
1763  M0_ENTRY("ctg_op=%p flags=0x%x", ctg_op, flags);
1764  M0_PRE(ctg_op != NULL);
1765  M0_PRE(fom != NULL);
1766 
1767  M0_SET0(ctg_op);
1768 
1769  m0_be_op_init(&ctg_op->co_beop);
1770  m0_mutex_init(&ctg_op->co_channel_lock);
1771  m0_chan_init(&ctg_op->co_channel, &ctg_op->co_channel_lock);
1772  m0_clink_init(&ctg_op->co_clink, ctg_op_cb);
1773  ctg_op->co_fom = fom;
1774  ctg_op->co_flags = flags;
1775  ctg_op->co_cur_phase = CPH_NONE;
1776  ctg_op->co_is_versioned = false;
1777 }
1778 
1779 M0_INTERNAL int m0_ctg_op_rc(struct m0_ctg_op *ctg_op)
1780 {
1781  M0_PRE(ctg_op != NULL);
1782 
1783  if (M0_FI_ENABLED("be-failure"))
1784  return M0_ERR(-ENOMEM);
1785  return ctg_op->co_rc;
1786 }
1787 
1788 M0_INTERNAL void m0_ctg_op_fini(struct m0_ctg_op *ctg_op)
1789 {
1790  M0_ENTRY("ctg_op=%p", ctg_op);
1791  M0_PRE(ctg_op != NULL);
1792  M0_PRE(ctg_op->co_cur_initialised == false);
1793 
1795  &ctg_op->co_anchor);
1796  m0_buf_free(&ctg_op->co_key);
1797  m0_chan_fini_lock(&ctg_op->co_channel);
1798  m0_mutex_fini(&ctg_op->co_channel_lock);
1799  m0_clink_fini(&ctg_op->co_clink);
1800  m0_be_op_fini(&ctg_op->co_beop);
1801  M0_SET0(ctg_op);
1802  M0_LEAVE();
1803 }
1804 
1805 M0_INTERNAL void m0_ctg_mark_deleted_credit(struct m0_be_tx_credit *accum)
1806 {
1807  m0_bcount_t knob;
1808  m0_bcount_t vnob;
1810  struct m0_be_btree *mbtree = &m0_ctg_dead_index()->cc_tree;
1811  struct m0_cas_ctg *ctg;
1812 
1813  knob = sizeof(struct fid_key);
1814  vnob = sizeof(struct meta_value);
1815  /* Delete from meta. */
1816  m0_be_btree_delete_credit(btree, 1, knob, vnob, accum);
1817  knob = sizeof(struct generic_key) + sizeof(ctg);
1818  vnob = sizeof(struct generic_value);
1819  /* Insert into dead index. */
1820  m0_be_btree_insert_credit2(mbtree, 1, knob, vnob, accum);
1821 }
1822 
1823 M0_INTERNAL void m0_ctg_create_credit(struct m0_be_tx_credit *accum)
1824 {
1825  m0_bcount_t knob;
1826  m0_bcount_t vnob;
1828  struct m0_cas_ctg *ctg;
1829 
1830  m0_be_btree_create_credit(btree, 1, accum);
1831 
1832  knob = sizeof(struct fid_key);
1833  vnob = sizeof(struct meta_value);
1834  m0_be_btree_insert_credit2(btree, 1, knob, vnob, accum);
1835  /*
1836  * That are credits for cas_ctg body.
1837  */
1838  M0_BE_ALLOC_CREDIT_PTR(ctg, cas_seg(btree->bb_seg->bs_domain), accum);
1839 }
1840 
1841 M0_INTERNAL void m0_ctg_drop_credit(struct m0_fom *fom,
1842  struct m0_be_tx_credit *accum,
1843  struct m0_cas_ctg *ctg,
1844  m0_bcount_t *limit)
1845 {
1846  m0_bcount_t records_nr;
1847  m0_bcount_t records_ok;
1848  struct m0_be_tx_credit record_cred;
1849  struct m0_be_tx_credit nodes_cred = {};
1850 
1851  m0_be_btree_clear_credit(&ctg->cc_tree, &nodes_cred, &record_cred,
1852  &records_nr);
1853  records_nr = records_nr ?: 1;
1854  for (records_ok = 0;
1861  !m0_be_should_break_half(m0_fom_tx(fom)->t_engine, accum,
1862  &record_cred) && records_ok < records_nr;
1863  records_ok++)
1864  m0_be_tx_credit_add(accum, &record_cred);
1865 
1876  if (records_ok > 0 && records_nr >= records_ok) {
1877  nodes_cred.tc_reg_nr =
1878  nodes_cred.tc_reg_nr * records_ok / records_nr;
1879  nodes_cred.tc_reg_size =
1880  nodes_cred.tc_reg_size * records_ok / records_nr;
1881  }
1882  m0_be_tx_credit_add(accum, &nodes_cred);
1883 
1884  *limit = records_ok;
1885 }
1886 
1887 M0_INTERNAL void m0_ctg_dead_clean_credit(struct m0_be_tx_credit *accum)
1888 {
1889  struct m0_cas_ctg *ctg;
1890  m0_bcount_t knob;
1891  m0_bcount_t vnob;
1893  /*
1894  * Define credits for delete from dead index.
1895  */
1896  knob = sizeof(struct generic_key) + sizeof(ctg);
1897  vnob = sizeof(struct generic_value);
1898  m0_be_btree_delete_credit(btree, 1, knob, vnob, accum);
1899  /*
1900  * Credits for ctg free.
1901  */
1902  M0_BE_FREE_CREDIT_PTR(ctg, cas_seg(btree->bb_seg->bs_domain), accum);
1903 }
1904 
1905 M0_INTERNAL void m0_ctg_insert_credit(struct m0_cas_ctg *ctg,
1906  m0_bcount_t knob,
1907  m0_bcount_t vnob,
1908  struct m0_be_tx_credit *accum)
1909 {
1910  m0_be_btree_insert_credit2(&ctg->cc_tree, 1, knob, vnob, accum);
1911 }
1912 
1913 M0_INTERNAL void m0_ctg_delete_credit(struct m0_cas_ctg *ctg,
1914  m0_bcount_t knob,
1915  m0_bcount_t vnob,
1916  struct m0_be_tx_credit *accum)
1917 {
1918  m0_be_btree_delete_credit(&ctg->cc_tree, 1, knob, vnob, accum);
1919 
1920  /* XXX: performance.
1921  * At this moment we do not know for sure if the CAS operation should
1922  * have version-aware behavior. So that we are doing a pessimistic
1923  * estimate here: reserving credits for both delete and insert.
1924  */
1925  if (ctg_is_ordinary(ctg)) {
1926  m0_be_btree_insert_credit2(&ctg->cc_tree, 1, knob, vnob, accum);
1927  }
1928 }
1929 
1930 static void ctg_ctidx_op_credits(struct m0_cas_id *cid,
1931  bool insert,
1932  struct m0_be_tx_credit *accum)
1933 {
1934  const struct m0_dix_imask *imask;
1936  struct m0_be_seg *seg = cas_seg(btree->bb_seg->bs_domain);
1937  m0_bcount_t knob;
1938  m0_bcount_t vnob;
1939 
1940  knob = sizeof(struct fid_key);
1941  vnob = sizeof(struct layout_value);
1942  if (insert)
1943  m0_be_btree_insert_credit2(btree, 1, knob, vnob, accum);
1944  else
1945  m0_be_btree_delete_credit(btree, 1, knob, vnob, accum);
1946 
1947  imask = &cid->ci_layout.u.dl_desc.ld_imask;
1948  if (!m0_dix_imask_is_empty(imask)) {
1949  if (insert)
1950  M0_BE_ALLOC_CREDIT_ARR(imask->im_range,
1951  imask->im_nr,
1952  seg,
1953  accum);
1954  else
1955  M0_BE_FREE_CREDIT_ARR(imask->im_range,
1956  imask->im_nr,
1957  seg,
1958  accum);
1959  }
1960 }
1961 
1962 M0_INTERNAL void m0_ctg_ctidx_insert_credits(struct m0_cas_id *cid,
1963  struct m0_be_tx_credit *accum)
1964 {
1965  M0_PRE(cid != NULL);
1966  M0_PRE(accum != NULL);
1967 
1968  ctg_ctidx_op_credits(cid, true, accum);
1969 }
1970 
1971 M0_INTERNAL void m0_ctg_ctidx_delete_credits(struct m0_cas_id *cid,
1972  struct m0_be_tx_credit *accum)
1973 {
1974  M0_PRE(cid != NULL);
1975  M0_PRE(accum != NULL);
1976 
1977  ctg_ctidx_op_credits(cid, false, accum);
1978 }
1979 
1980 M0_INTERNAL int m0_ctg_ctidx_lookup_sync(const struct m0_fid *fid,
1981  struct m0_dix_layout **layout)
1982 {
1983  struct fid_key key_data = FID_KEY_INIT(fid);
1984  struct m0_buf key = M0_BUF_INIT_PTR(&key_data);
1985  struct m0_be_btree_anchor anchor;
1986  struct m0_cas_ctg *ctidx = m0_ctg_ctidx();
1987  int rc;
1988 
1989  M0_PRE(ctidx != NULL);
1990  M0_PRE(fid != NULL);
1991  M0_PRE(layout != NULL);
1992 
1993  *layout = NULL;
1994 
1998  &op,
1999  &key,
2000  &anchor),
2001  bo_u.u_btree.t_rc) ?:
2002  ctg_vbuf_unpack(&anchor.ba_value, NULL) ?:
2003  ctg_vbuf_as_layout(&anchor.ba_value, layout);
2004 
2005  m0_be_btree_release(NULL, &anchor);
2006 
2007  return M0_RC(rc);
2008 }
2009 
2010 M0_INTERNAL int m0_ctg_ctidx_insert_sync(const struct m0_cas_id *cid,
2011  struct m0_be_tx *tx)
2012 {
2013  /* The key is a component catalogue FID. */
2014  struct fid_key key_data = FID_KEY_INIT(&cid->ci_fid);
2015  struct m0_buf key = M0_BUF_INIT_PTR(&key_data);
2016  struct layout_value value_data =
2018  struct m0_buf value = M0_BUF_INIT_PTR(&value_data);
2019  struct m0_cas_ctg *ctidx = m0_ctg_ctidx();
2020  struct m0_be_btree_anchor anchor = {};
2021  struct m0_dix_layout *layout;
2022  struct m0_ext *im_range;
2023  const struct m0_dix_imask *imask = &cid->ci_layout.u.dl_desc.ld_imask;
2024  m0_bcount_t size;
2025  int rc;
2026 
2027  anchor.ba_value.b_nob = value.b_nob;
2030  m0_be_btree_insert_inplace(&ctidx->cc_tree, tx, &op, &key,
2031  &anchor, M0_BITS(M0_BAP_NORMAL)),
2032  bo_u.u_btree.t_rc);
2033  if (rc == 0) {
2034  m0_buf_memcpy(&anchor.ba_value, &value);
2035  layout = &((struct layout_value *)
2036  anchor.ba_value.b_addr)->lv_layout;
2037  if (!m0_dix_imask_is_empty(imask)) {
2038  /*
2039  * Alloc memory in BE segment for imask ranges
2040  * and copy them.
2041  */
2044  cas_seg(tx->t_engine->eng_domain),
2045  tx);
2046  size = imask->im_nr * sizeof(struct m0_ext);
2047  memcpy(im_range, imask->im_range, size);
2049  cas_seg(tx->t_engine->eng_domain),
2050  size, im_range));
2051  /* Assign newly allocated imask ranges. */
2052  layout->u.dl_desc.ld_imask.im_range = im_range;
2053  }
2055  }
2056  m0_be_btree_release(tx, &anchor);
2057  return M0_RC(rc);
2058 }
2059 
2060 M0_INTERNAL int m0_ctg_ctidx_delete_sync(const struct m0_cas_id *cid,
2061  struct m0_be_tx *tx)
2062 {
2063  struct m0_dix_layout *layout;
2064  struct m0_dix_imask *imask;
2065  /* The key is a component catalogue FID. */
2066  struct fid_key key_data = FID_KEY_INIT(&cid->ci_fid);
2067  struct m0_cas_ctg *ctidx = m0_ctg_ctidx();
2068  struct m0_buf key = M0_BUF_INIT_PTR(&key_data);
2069  int rc;
2070 
2071  /* Firstly we should free buffer allocated for imask ranges array. */
2072  rc = m0_ctg_ctidx_lookup_sync(&cid->ci_fid, &layout);
2073  if (rc != 0)
2074  return rc;
2075  imask = &layout->u.dl_desc.ld_imask;
2076  if (!m0_dix_imask_is_empty(imask)) {
2078  M0_BE_FREE_PTR_SYNC(imask->im_range,
2079  cas_seg(tx->t_engine->eng_domain),
2080  tx);
2081  imask->im_range = NULL;
2082  imask->im_nr = 0;
2083  }
2085  M0_BE_OP_SYNC(op, m0_be_btree_delete(&ctidx->cc_tree, tx, &op, &key));
2087  return rc;
2088 }
2089 
2090 M0_INTERNAL int m0_ctg_mem_place(struct m0_ctg_op *ctg_op,
2091  const struct m0_buf *buf,
2092  int next_phase)
2093 {
2094  M0_PRE(ctg_op != NULL);
2095  M0_PRE(buf->b_nob != 0);
2096  M0_PRE(buf->b_addr != NULL);
2097  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
2098 
2099  ctg_op->co_opcode = CO_MEM_PLACE;
2100  ctg_op->co_val = *buf;
2101  ctg_op->co_mem_buf.b_nob = buf->b_nob;
2102  return ctg_mem_exec(ctg_op, next_phase);
2103 }
2104 
2105 M0_INTERNAL void m0_ctg_mem_place_get(struct m0_ctg_op *ctg_op,
2106  struct m0_buf *buf)
2107 {
2108  M0_PRE(ctg_op != NULL);
2109  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_DONE);
2110  M0_PRE(ctg_op->co_opcode == CO_MEM_PLACE);
2111  M0_PRE(ctg_op->co_ct == CT_MEM);
2112  M0_PRE(ctg_op->co_rc == 0);
2113 
2114  *buf = ctg_op->co_mem_buf;
2115 }
2116 
2117 M0_INTERNAL int m0_ctg_mem_free(struct m0_ctg_op *ctg_op,
2118  void *area,
2119  int next_phase)
2120 {
2121  M0_PRE(ctg_op != NULL);
2122  M0_PRE(area != NULL);
2123  M0_PRE(ctg_op->co_beop.bo_sm.sm_state == M0_BOS_INIT);
2124 
2125  ctg_op->co_opcode = CO_MEM_FREE;
2126  ctg_op->co_mem_buf.b_addr = area;
2127  return ctg_mem_exec(ctg_op, next_phase);
2128 }
2129 
2130 M0_INTERNAL struct m0_cas_ctg *m0_ctg_meta(void)
2131 {
2132  return ctg_store.cs_state->cs_meta;
2133 }
2134 
2135 M0_INTERNAL struct m0_cas_ctg *m0_ctg_ctidx(void)
2136 {
2137  return ctg_store.cs_ctidx;
2138 }
2139 
2140 M0_INTERNAL struct m0_cas_ctg *m0_ctg_dead_index(void)
2141 {
2142  return ctg_store.cs_dead_index;
2143 }
2144 
2145 M0_INTERNAL uint64_t m0_ctg_rec_nr(void)
2146 {
2148  return ctg_store.cs_state->cs_rec_nr;
2149 }
2150 
2151 M0_INTERNAL uint64_t m0_ctg_rec_size(void)
2152 {
2154  return ctg_store.cs_state->cs_rec_size;
2155 }
2156 
2157 M0_INTERNAL struct m0_long_lock *m0_ctg_del_lock(void)
2158 {
2159  return &ctg_store.cs_del_lock;
2160 }
2161 
2162 M0_INTERNAL struct m0_long_lock *m0_ctg_lock(struct m0_cas_ctg *ctg)
2163 {
2164  return &ctg->cc_lock.bll_u.llock;
2165 }
2166 
2167 M0_INTERNAL const struct m0_be_btree_kv_ops *m0_ctg_btree_ops(void)
2168 {
2169  return &cas_btree_ops;
2170 }
2171 
2172 static const struct m0_be_btree_kv_ops cas_btree_ops = {
2174  .ko_ksize = &ctg_ksize,
2175  .ko_vsize = &ctg_vsize,
2176  .ko_compare = &ctg_cmp
2177 };
2178 
2179 M0_INTERNAL void
2181  struct m0_buf* val, bool dump_in_hex)
2182 {
2183  int i;
2184  int key_header_len = M0_CAS_CTG_KEY_HDR_SIZE;
2185  int val_header_len = M0_CAS_CTG_VAL_HDR_SIZE;
2186  char *kbuf, *vbuf;
2187 
2188  if (!dump_in_hex) {
2189  m0_console_printf("{key: %.*s}, {val: %.*s}\n",
2190  (int)key->b_nob - key_header_len,
2191  (const char*)(key->b_addr + key_header_len),
2192  (int)val->b_nob - val_header_len,
2193  (const char*)(val->b_addr + val_header_len));
2194  return;
2195  }
2196 
2197  M0_ALLOC_ARR(kbuf, (key->b_nob - key_header_len) * 2 + 1);
2198  for (i = 0; i < key->b_nob - key_header_len; i++)
2199  sprintf(kbuf + 2 * i, "%2x",
2200  *(uint8_t*)(key->b_addr + key_header_len + i));
2201  m0_console_printf("{key: %s}, ", kbuf);
2202  m0_free(kbuf);
2203 
2204  M0_ALLOC_ARR(vbuf, (val->b_nob - val_header_len) * 2 + 1);
2205  for (i = 0; i < val->b_nob - val_header_len; i++)
2206  sprintf(vbuf + 2 * i, "%2x",
2207  *(uint8_t*)(val->b_addr + val_header_len + i));
2208  m0_console_printf("{val: %s} \n", vbuf);
2209  m0_free(vbuf);
2210 }
2211 
2212 M0_INTERNAL int ctg_index_btree_dump(struct m0_motr *motr_ctx,
2213  struct m0_cas_ctg *ctg,
2214  bool dump_in_hex)
2215 {
2216  struct m0_buf key;
2217  struct m0_buf val;
2218  struct m0_be_btree_cursor cursor;
2219  int rc;
2220 
2221  ctg_init(ctg, cas_seg(&motr_ctx->cc_reqh_ctx.rc_be.but_dom));
2222 
2223  m0_be_btree_cursor_init(&cursor, &ctg->cc_tree);
2224  for (rc = m0_be_btree_cursor_first_sync(&cursor); rc == 0;
2225  rc = m0_be_btree_cursor_next_sync(&cursor)) {
2226  m0_be_btree_cursor_kv_get(&cursor, &key, &val);
2227  ctg_index_btree_dump_one_rec(&key, &val, dump_in_hex);
2228  }
2229  m0_be_btree_cursor_fini(&cursor);
2230  ctg_fini(ctg);
2231 
2232  return 0;
2233 }
2234 
2235 int ctgdump(struct m0_motr *motr_ctx, char *fidstr, char *dump_in_hex_str)
2236 {
2237  int rc;
2238  struct m0_fid dfid;
2239  struct m0_fid out_fid = { 0, 0};
2240  struct m0_fid gfid = { 0, 0};
2241  struct m0_buf key;
2242  struct m0_buf val;
2243  struct m0_be_btree_cursor cursor;
2244  struct m0_cas_ctg *ctg = NULL;
2245  bool dumped = false;
2246  bool dump_in_hex = false;
2247  struct fid_key *fkey;
2248 
2249  if (m0_streq("hex", dump_in_hex_str))
2250  dump_in_hex = true;
2251 
2252  rc = m0_fid_sscanf(fidstr, &dfid);
2253  if (rc < 0)
2254  return rc;
2256 
2258  M0_ASSERT(rc == 0);
2259 
2261  for (rc = m0_be_btree_cursor_first_sync(&cursor); rc == 0;
2262  rc = m0_be_btree_cursor_next_sync(&cursor)) {
2263  m0_be_btree_cursor_kv_get(&cursor, &key, &val);
2264  fkey = key.b_addr;
2265  out_fid = fkey->fk_fid;
2266  if (!m0_dix_fid_validate_cctg(&out_fid))
2267  continue;
2268  m0_dix_fid_convert_cctg2dix(&out_fid, &gfid);
2269  M0_LOG(M0_DEBUG, "Found cfid="FID_F" gfid=:"FID_F" dfid="FID_F,
2270  FID_P(&out_fid), FID_P(&gfid), FID_P(&dfid));
2271  if (m0_fid_eq(&gfid, &dfid)) {
2272  rc = ctg_vbuf_unpack(&val, NULL) ?:
2273  ctg_vbuf_as_ctg(&val, &ctg);
2274  if (rc != 0) {
2275  M0_LOG(M0_ERROR, "val unpack error:%d", rc);
2276  break;
2277  }
2278 
2279  M0_LOG(M0_DEBUG, "Dumping dix fid="FID_F" ctg=%p",
2280  FID_P(&dfid), ctg);
2281  ctg_index_btree_dump(motr_ctx, ctg, dump_in_hex);
2282  dumped = true;
2283  break;
2284  }
2285  }
2286  m0_be_btree_cursor_fini(&cursor);
2287 
2288  return rc? : dumped? 0 : -ENOENT;
2289 }
2290 
2291 static bool ctg_op_is_versioned(const struct m0_ctg_op *ctg_op)
2292 {
2293  const struct m0_cas_op *cas_op;
2294  bool has_txd;
2295 
2296  /*
2297  * Since the versioned behavior is optional, it may get turned off
2298  * for a variety of resons. These reasons are listed here
2299  * as M0_RC_INFO messages to aid debugging of the cases
2300  * where the behavior should have not been turned off.
2301  */
2302 
2303  M0_ENTRY("ctg_op=%p", ctg_op);
2304 
2305  if (!ctg_is_ordinary(ctg_op->co_ctg))
2306  return M0_RC_INFO(false, "Non-ordinary catalogue.");
2307 
2308  if (ctg_op->co_fom == NULL)
2309  return M0_RC_INFO(false, "No fom, no versions.");
2310 
2311  if (ctg_op->co_fom->fo_fop == NULL)
2312  return M0_RC_INFO(false, "No fop, no versions.");
2313 
2314  cas_op = m0_fop_data(ctg_op->co_fom->fo_fop);
2315  if (cas_op == NULL)
2316  return M0_RC_INFO(false, "No cas op, no versions.");
2317 
2318  if ((ctg_op->co_flags & COF_VERSIONED) == 0)
2319  return M0_RC_INFO(false, "CAS request is not versioned.");
2320 
2321  has_txd = !m0_dtm0_tx_desc_is_none(&cas_op->cg_txd);
2322 
2323  switch (CTG_OP_COMBINE(ctg_op->co_opcode, ctg_op->co_ct)) {
2325  if ((ctg_op->co_flags & COF_OVERWRITE) == 0)
2326  return M0_RC_INFO(false,
2327  "PUT request without OVERWRITE is "
2328  "not versioned.");
2330  if (has_txd)
2331  return M0_RC(true);
2332  else
2333  return M0_RC_INFO(false,
2334  "%s request is has an empty txd.",
2335  ctg_op->co_opcode == CO_PUT ?
2336  "PUT" : "DEL");
2339  return M0_RC(true);
2340  default:
2341  break;
2342  }
2343 
2344  return M0_RC_INFO(false, "Non-versioned operation.");
2345 }
2346 
2347 static void ctg_op_version_get(const struct m0_ctg_op *ctg_op,
2348  struct m0_crv *out)
2349 {
2350  const struct m0_dtm0_tid *tid;
2351  const struct m0_cas_op *cas_op;
2352  bool tbs;
2353 
2354  M0_ENTRY();
2355  M0_PRE(ctg_op->co_is_versioned);
2356 
2357  if (M0_IN(ctg_op->co_opcode, (CO_PUT, CO_DEL))) {
2358  M0_ASSERT_INFO(ctg_op->co_fom != NULL,
2359  "Versioned op without FOM?");
2360  cas_op = m0_fop_data(ctg_op->co_fom->fo_fop);
2361  M0_ASSERT_INFO(cas_op != NULL, "Versioned op without cas_op?");
2362  tbs = ctg_op->co_opcode == CO_DEL;
2363  tid = &cas_op->cg_txd.dtd_id;
2364  m0_crv_init(out, &tid->dti_ts, tbs);
2365 
2366  M0_LEAVE("ctg_op=%p, cas_op=%p, txid=" DTID0_F ", ver=%" PRIu64
2367  ", ver_enc=%" PRIu64, ctg_op,
2368  cas_op, DTID0_P(tid), tid->dti_ts.dts_phys,
2369  out->crv_encoded);
2370  } else {
2371  *out = M0_CRV_INIT_NONE;
2372  M0_LEAVE("ctg_op=%p, no version", ctg_op);
2373  }
2374 }
2375 
2376 /*
2377  * Synchronously inserts/updates a key-value record considering the version
2378  * and the tombstone that were specified in the operation ("new_version")
2379  * and the version+tombstone that were stored in btree ("old_version").
2380  */
2381 static int versioned_put_sync(struct m0_ctg_op *ctg_op)
2382 {
2383  struct m0_buf *key = &ctg_op->co_key;
2384  struct m0_be_btree *btree = &ctg_op->co_ctg->cc_tree;
2385  struct m0_be_btree_anchor *anchor = &ctg_op->co_anchor;
2386  struct m0_be_tx *tx = &ctg_op->co_fom->fo_tx.tx_betx;
2387  struct m0_crv new_version = M0_CRV_INIT_NONE;
2388  struct m0_crv old_version = M0_CRV_INIT_NONE;
2389  int rc;
2390 
2391  M0_PRE(ctg_op->co_is_versioned);
2392  M0_ENTRY();
2393 
2394  ctg_op_version_get(ctg_op, &new_version);
2395  M0_ASSERT_INFO(!m0_crv_is_none(&new_version),
2396  "Versioned PUT or DEL without a valid version?");
2397 
2398  /*
2399  * TODO: Performance.
2400  * This lookup call can be avoided if the version comparison
2401  * was shifted into btree. However, at this moment the cost of
2402  * such a lookup is much lower than the cost of re-working btree API.
2403  * See the be_btree_search() call inside ::btree_save. It is the
2404  * point where btree_save does its own "lookup". Versions could
2405  * be compared right in there. Alternatively, btree_save could
2406  * use btree_cursor as a "hint".
2407  */
2410  &op,
2411  key,
2412  anchor),
2413  bo_u.u_btree.t_rc) ?:
2414  ctg_vbuf_unpack(&anchor->ba_value, &old_version);
2415 
2416  /* The tree is long-locked anyway. */
2417  m0_be_btree_release(NULL, anchor);
2418 
2419  if (!M0_IN(rc, (0, -ENOENT)))
2420  return M0_ERR(rc);
2421 
2422  /*
2423  * Skip overwrite op if the existing record is "newer" (version-wise)
2424  * than the record to be inserted. Note, <= 0 means that we filter out
2425  * the operations with the exact same version and tombstone.
2426  */
2427  if (!m0_crv_is_none(&old_version) &&
2428  m0_crv_cmp(&new_version, &old_version) <= 0)
2429  return M0_RC(0);
2430 
2431  M0_LOG(M0_DEBUG, "Overwriting " CRV_F " with " CRV_F ".",
2432  CRV_P(&old_version), CRV_P(&new_version));
2433 
2434  anchor->ba_value.b_nob = ctg_vbuf_packed_size(&ctg_op->co_val);
2437  key, anchor, true,
2438  ctg_op_zones(ctg_op)),
2439  bo_u.u_btree.t_rc);
2440 
2441  if (rc == 0) {
2442  ctg_vbuf_pack(&anchor->ba_value,
2443  &ctg_op->co_val,
2444  &new_version);
2445 
2447  key->b_nob -
2448  sizeof(struct generic_key) +
2449  ctg_op->co_val.b_nob);
2450  }
2451 
2452  return M0_RC(rc);
2453 }
2454 
2455 /*
2456  * Gets an alive record (without tombstone set) in btree.
2457  * Returns -ENOENT if tombstone is set.
2458  * Always sets co_out_ver if the record physically exists in the tree.
2459  */
2460 static int versioned_get_sync(struct m0_ctg_op *ctg_op)
2461 {
2462  struct m0_be_btree *btree = &ctg_op->co_ctg->cc_tree;
2463  struct m0_be_btree_anchor *anchor = &ctg_op->co_anchor;
2464  int rc;
2465  M0_ENTRY();
2466 
2467  M0_PRE(ctg_op->co_is_versioned);
2468 
2471  &ctg_op->co_key,
2472  anchor),
2473  bo_u.u_btree.t_rc) ?:
2474  ctg_vbuf_unpack(&anchor->ba_value, &ctg_op->co_out_ver);
2475 
2476  if (rc == 0) {
2477  if (m0_crv_tbs(&ctg_op->co_out_ver))
2478  rc = M0_ERR(-ENOENT);
2479  else
2480  ctg_op->co_out_val = anchor->ba_value;
2481  }
2482 
2483  return M0_RC(rc);
2484 }
2485 
2486 /*
2487  * Synchronously advances the cursor until it reaches an alive
2488  * key-value pair (alive_only=true), next key-value pair (alive_only=false),
2489  * or the end of the tree.
2490  */
2491 static int versioned_cursor_next_sync(struct m0_ctg_op *ctg_op, bool alive_only)
2492 {
2493  struct m0_be_op *beop = ctg_beop(ctg_op);
2494  int rc;
2495 
2496  do {
2497  m0_be_btree_cursor_next(&ctg_op->co_cur);
2498 
2499  rc = ctg_berc(ctg_op);
2500  if (rc != 0)
2501  break;
2502 
2504  &ctg_op->co_out_key,
2505  &ctg_op->co_out_val);
2506  rc = ctg_kbuf_unpack(&ctg_op->co_out_key) ?:
2507  ctg_vbuf_unpack(&ctg_op->co_out_val,
2508  &ctg_op->co_out_ver);
2509  if (rc != 0)
2510  break;
2511 
2512  m0_be_op_reset(beop);
2513 
2514  } while (m0_crv_tbs(&ctg_op->co_out_ver) && alive_only);
2515 
2516  /* It should never return dead values. */
2517  if (m0_crv_tbs(&ctg_op->co_out_ver))
2518  ctg_op->co_out_val = M0_BUF_INIT0;
2519 
2520  return M0_RC(rc);
2521 }
2522 
2523 /*
2524  * Positions the cursor at the alive record that corresponds to the
2525  * specified key or at the alive record next to the specified key
2526  * depending on the slant flag. If alive_only=false then it treats
2527  * all records as alive.
2528  *
2529  * The relations between tombstones and slant flag:
2530  * Non-slant:
2531  * record_at(key) is alive: returns the record.
2532  * record_at(key) has tombstone: returns -ENOENT.
2533  * Slant:
2534  * record_at(key) is alive: returns the record.
2535  * record_at(key) has tombstone: finds next alive record.
2536  */
2537 static int versioned_cursor_get_sync(struct m0_ctg_op *ctg_op, bool alive_only)
2538 {
2539  struct m0_be_op *beop = ctg_beop(ctg_op);
2540  struct m0_buf value = M0_BUF_INIT0;
2541  bool slant = (ctg_op->co_flags & COF_SLANT) != 0;
2542  int rc;
2543 
2544  M0_PRE(ctg_op->co_is_versioned);
2545 
2546  m0_be_btree_cursor_get(&ctg_op->co_cur, &ctg_op->co_key, slant);
2547  rc = ctg_berc(ctg_op);
2548 
2549  if (rc == 0) {
2551  &ctg_op->co_out_key,
2552  &value);
2553  rc = ctg_kbuf_unpack(&ctg_op->co_out_key) ?:
2554  ctg_vbuf_unpack(&value, &ctg_op->co_out_ver);
2555  if (rc == 0) {
2556  if (m0_crv_tbs(&ctg_op->co_out_ver))
2557  rc = alive_only ?
2558  (slant ? -EAGAIN : -ENOENT) : 0;
2559  else
2560  ctg_op->co_out_val = value;
2561  }
2562  } else
2563  M0_ASSERT_INFO(rc != EAGAIN,
2564  "btree cursor op returned EAGAIN?");
2565 
2566  m0_be_op_reset(beop);
2567 
2568  if (rc == -EAGAIN)
2569  rc = versioned_cursor_next_sync(ctg_op, alive_only);
2570 
2571  return M0_RC(rc);
2572 }
2573 
2574 #undef M0_TRACE_SUBSYSTEM
2575 
2576 /*
2577  * Local variables:
2578  * c-indentation-style: "K&R"
2579  * c-basic-offset: 8
2580  * tab-width: 8
2581  * fill-column: 80
2582  * scroll-step: 1
2583  * End:
2584  */
2585 /*
2586  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
2587  */
static int versioned_cursor_next_sync(struct m0_ctg_op *op, bool alive_only)
Definition: ctg_store.c:2491
struct generic_value lv_gval
Definition: ctg_store.c:120
Definition: cas.h:352
M0_INTERNAL void m0_long_lock_fini(struct m0_long_lock *lock)
struct m0_be_long_lock cc_lock
Definition: ctg_store.h:102
M0_INTERNAL void m0_be_btree_insert_inplace(struct m0_be_btree *tree, struct m0_be_tx *tx, struct m0_be_op *op, const struct m0_buf *key, struct m0_be_btree_anchor *anchor, uint64_t zonemask)
Definition: btree.c:2126
#define M0_BE_ALLOC_CREDIT_PTR(ptr, seg, accum)
Definition: alloc.h:355
M0_INTERNAL void m0_be_btree_cursor_next(struct m0_be_btree_cursor *cur)
Definition: btree.c:2358
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_ctg_ctidx_insert_credits(struct m0_cas_id *cid, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1962
M0_INTERNAL void m0_ctg_delete_credit(struct m0_cas_ctg *ctg, m0_bcount_t knob, m0_bcount_t vnob, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1913
struct m0_be_domain * bs_domain
Definition: seg.h:82
struct m0_be_op::@39::m0_be_op__btree u_btree
Definition: cas.h:349
struct m0_dtm0_tx_desc cg_txd
Definition: cas.h:396
struct m0_be_mutex cs_ctg_init_mutex
Definition: ctg_store.h:178
#define M0_PRE(cond)
static struct m0_be_active_record_domain dummy
Definition: active_record.c:35
#define M0_BE_ALLOC_PTR_SYNC(ptr, seg, tx)
Definition: alloc.h:339
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL int m0_ctg_meta_insert(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1336
Definition: dtm.h:554
M0_INTERNAL void m0_be_btree_delete_credit(const struct m0_be_btree *tree, m0_bcount_t nr, m0_bcount_t ksize, m0_bcount_t vsize, struct m0_be_tx_credit *accum)
Definition: btree.c:1740
struct m0_buf co_out_val
Definition: ctg_store.h:216
Definition: cas.h:351
#define M0_BUF_INIT_PTR_CONST(p)
Definition: buf.h:73
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
M0_INTERNAL void m0_dix_fid_convert_cctg2dix(const struct m0_fid *cctg_fid, struct m0_fid *dix_fid)
Definition: fid_convert.c:69
M0_INTERNAL int m0_be_seg_dict_insert(struct m0_be_seg *seg, struct m0_be_tx *tx, const char *name, void *value)
Definition: seg_dict.c:255
struct m0_cas_ctg * cs_dead_index
Definition: ctg_store.c:56
M0_INTERNAL int m0_ctg_dead_index_insert(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, int next_phase)
Definition: ctg_store.c:1405
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
Definition: chan.c:178
uint64_t ko_type
Definition: btree.h:97
M0_INTERNAL void m0_format_header_pack(struct m0_format_header *dest, const struct m0_format_tag *src)
Definition: format.c:40
struct m0_fop * fo_fop
Definition: fom.h:490
int const char const void size_t int flags
Definition: dir.c:328
static struct m0_be_seg * cas_seg(struct m0_be_domain *dom)
Definition: ctg_store.c:199
struct generic_key fk_gkey
Definition: ctg_store.c:104
#define CRV_P(__crv)
Definition: cas.h:509
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
static struct m0_bufvec dst
Definition: xform.c:61
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
M0_INTERNAL int m0_ctg_delete(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1540
M0_INTERNAL int m0_ctg_lookup_delete(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, struct m0_buf *val, int flags, int next_phase)
Definition: ctg_store.c:1465
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
Definition: idx_mock.c:52
#define ergo(a, b)
Definition: misc.h:293
#define M0_3WAY(v0, v1)
Definition: arith.h:199
Definition: cas.h:346
M0_INTERNAL void m0_be_btree_clear_credit(struct m0_be_btree *tree, struct m0_be_tx_credit *fixed_part, struct m0_be_tx_credit *single_record, m0_bcount_t *records_nr)
Definition: btree.c:1862
struct m0_fid fk_fid
Definition: ctg_store.c:105
static struct m0_be_op * ctg_beop(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:208
m0_be_tx_state
Definition: tx.h:214
Definition: cas.h:347
void * b_addr
Definition: buf.h:39
M0_INTERNAL void m0_ctg_lookup_result(struct m0_ctg_op *ctg_op, struct m0_buf *buf)
Definition: ctg_store.c:1570
M0_INTERNAL int m0_ctg_gc_wait(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1349
uint64_t cs_rec_size
Definition: ctg_store.h:173
M0_INTERNAL struct m0_long_lock * m0_ctg_lock(struct m0_cas_ctg *ctg)
Definition: ctg_store.c:2162
static uint32_t ctg_op_zones(const struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1137
M0_INTERNAL void m0_ctg_store_fini(void)
Definition: ctg_store.c:870
M0_INTERNAL void m0_ctg_mem_place_get(struct m0_ctg_op *ctg_op, struct m0_buf *buf)
Definition: ctg_store.c:2105
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_INTERNAL int m0_ctg_meta_cursor_next(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1694
M0_INTERNAL void m0_be_btree_save_inplace(struct m0_be_btree *tree, struct m0_be_tx *tx, struct m0_be_op *op, const struct m0_buf *key, struct m0_be_btree_anchor *anchor, bool overwrite, uint64_t zonemask)
Definition: btree.c:2136
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
#define M0_BE_ALLOC_CREDIT_ARR(arr, nr, seg, accum)
Definition: alloc.h:363
#define min_check(a, b)
Definition: arith.h:88
cursor_phase
Definition: ctg_store.c:128
Definition: cas.h:360
void m0_console_printf(const char *fmt,...)
Definition: trace.c:801
struct m0_be_seg * bb_seg
Definition: btree.h:76
M0_INTERNAL void m0_be_tx_fini(struct m0_be_tx *tx)
Definition: stubs.c:163
M0_INTERNAL void m0_ctg_fini(struct m0_fom *fom, struct m0_cas_ctg *ctg)
Definition: ctg_store.c:479
static void ctg_store_release(struct m0_ref *ref)
Definition: ctg_store.c:858
static void insert(void)
Definition: service_ut.c:970
M0_INTERNAL uint64_t m0_ctg_rec_nr(void)
Definition: ctg_store.c:2145
Definition: cas.h:247
struct m0_cas_ctg * cs_meta
Definition: ctg_store.h:161
M0_INTERNAL void m0_ctg_dead_clean_credit(struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1887
#define M0_BE_OP_SYNC(op_obj, action)
Definition: op.h:190
#define M0_BE_ALLOC_BUF(buf, seg, tx, op)
Definition: alloc.h:348
M0_INTERNAL int m0_ctg_lookup(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1555
static bool ctg_is_ordinary(const struct m0_cas_ctg *ctg)
Definition: ctg_store.c:969
M0_INTERNAL const struct m0_fid m0_cas_meta_fid
Definition: cas.c:146
struct m0_cas_ctg * cs_ctidx
Definition: ctg_store.c:50
struct m0_fom * co_fom
Definition: ctg_store.h:184
struct m0_dtm0_ts dti_ts
Definition: tx_desc.h:94
struct m0_dix_layout ci_layout
Definition: cas.h:120
M0_INTERNAL int m0_ctg_ctidx_lookup_sync(const struct m0_fid *fid, struct m0_dix_layout **layout)
Definition: ctg_store.c:1980
int const char const void * value
Definition: dir.c:325
M0_INTERNAL struct m0_be_seg * m0_be_domain_seg0_get(struct m0_be_domain *dom)
Definition: domain.c:466
M0_INTERNAL bool m0_dix_fid_validate_cctg(const struct m0_fid *cctg_fid)
Definition: fid_convert.c:94
static int ctg_kbuf_get(struct m0_buf *dst, const struct m0_buf *src, bool enabled_fi)
Definition: ctg_store.c:251
M0_INTERNAL void m0_be_op_unlock(struct m0_be_op *op)
Definition: op.c:120
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
struct m0_ref cs_ref
Definition: ctg_store.c:69
M0_INTERNAL void m0_be_tx_prep(struct m0_be_tx *tx, const struct m0_be_tx_credit *credit)
Definition: stubs.c:175
#define M0_BE_TX_CAPTURE_PTR(seg, tx, ptr)
Definition: tx.h:505
static int ctg_mem_op_exec(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1290
M0_INTERNAL int m0_ctg_minkey(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, int next_phase)
Definition: ctg_store.c:1593
static bool ctg_op_cb(struct m0_clink *clink)
Definition: ctg_store.c:975
struct m0_dtx fo_tx
Definition: fom.h:498
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
static m0_bcount_t ctg_vbuf_packed_size(const struct m0_buf *value)
Definition: ctg_store.c:223
static void ctg_meta_delete(struct m0_be_btree *meta, const struct m0_fid *fid, struct m0_be_tx *tx)
Definition: ctg_store.c:570
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
static int left
Definition: locality.c:280
static int versioned_cursor_get_sync(struct m0_ctg_op *op, bool alive_only)
Definition: ctg_store.c:2537
struct generic_value mv_gval
Definition: ctg_store.c:112
static int void * buf
Definition: dir.c:1019
M0_INTERNAL void m0_chan_lock(struct m0_chan *ch)
Definition: chan.c:68
M0_INTERNAL void m0_be_btree_create(struct m0_be_btree *tree, struct m0_be_tx *tx, struct m0_be_op *op, const struct m0_fid *btree_fid)
Definition: btree.c:1512
M0_INTERNAL struct m0_cas_ctg * m0_ctg_dead_index(void)
Definition: ctg_store.c:2140
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL int m0_ctg_cursor_get(struct m0_ctg_op *ctg_op, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1660
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL void m0_cas_gc_wait_async(struct m0_be_op *beop)
Definition: index_gc.c:601
M0_INTERNAL void m0_be_btree_destroy_credit(struct m0_be_btree *tree, struct m0_be_tx_credit *accum)
Definition: btree.c:1831
m0_bcount_t co_cnt
Definition: ctg_store.h:232
struct m0_dix_layout lv_layout
Definition: ctg_store.c:121
#define M0_BE_REG(seg, size, addr)
Definition: seg.h:148
M0_INTERNAL void m0_ctg_insert_credit(struct m0_cas_ctg *ctg, m0_bcount_t knob, m0_bcount_t vnob, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1905
struct m0_be_btree cc_tree
Definition: ctg_store.h:101
static void ctg_meta_insert_credit(struct m0_be_btree *bt, m0_bcount_t nr, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:586
struct m0_cas_ctg * mv_ctg
Definition: ctg_store.c:113
#define PRIx64
Definition: types.h:61
M0_INTERNAL void m0_be_btree_cursor_put(struct m0_be_btree_cursor *cursor)
Definition: btree.c:2480
struct m0_be_ut_backend rc_be
Definition: setup.h:267
static struct m0_be_tx * m0_fom_tx(struct m0_fom *fom)
Definition: fom.h:537
Definition: sock.c:887
static const struct m0_fid * cas_fid(const struct m0_fom *fom)
Definition: service.c:1749
static int ctg_meta_selfadd(struct m0_be_btree *meta, struct m0_be_tx *tx)
Definition: ctg_store.c:564
M0_INTERNAL void m0_be_btree_delete(struct m0_be_btree *tree, struct m0_be_tx *tx, struct m0_be_op *op, const struct m0_buf *key)
Definition: btree.c:1958
M0_INTERNAL void m0_be_btree_cursor_fini(struct m0_be_btree_cursor *cursor)
Definition: btree.c:2290
M0_INTERNAL void m0_buf_memcpy(struct m0_buf *dst, const struct m0_buf *src)
Definition: buf.c:96
static void ctg_init(struct m0_cas_ctg *ctg, struct m0_be_seg *seg)
Definition: ctg_store.c:415
#define FID_KEY_INIT(__fid)
Definition: ctg_store.c:361
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL bool m0_ctg_cursor_is_initialised(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1633
struct m0_long_lock llock
return M0_RC(rc)
op
Definition: libdemo.c:64
M0_INTERNAL int m0_be_btree_cursor_next_sync(struct m0_be_btree_cursor *cur)
Definition: btree.c:2464
static int btree(struct scanner *s, struct rectype *r, char *buf)
Definition: beck.c:1270
#define M0_BE_TX_CREDIT(nr, size)
Definition: tx_credit.h:94
struct m0_dtm0_tid dtd_id
Definition: tx_desc.h:121
M0_INTERNAL int m0_ctg_mem_free(struct m0_ctg_op *ctg_op, void *area, int next_phase)
Definition: ctg_store.c:2117
#define M0_ENTRY(...)
Definition: trace.h:170
Definition: buf.h:37
M0_INTERNAL int m0_ctg_insert(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, const struct m0_buf *val, int next_phase)
Definition: ctg_store.c:1448
struct m0_buf co_val
Definition: ctg_store.h:212
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
m0_time_t dts_phys
Definition: clk_src.h:93
static struct btype bt[]
Definition: beck.c:459
M0_INTERNAL int m0_ctg_cursor_next(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1680
M0_INTERNAL int m0_ctg_meta_lookup(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1362
static int ctg_op_exec_versioned(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1214
M0_INTERNAL void m0_ctg_cursor_fini(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1749
uint64_t gv_length
Definition: ctg_store.c:95
int i
Definition: dir.c:1033
union m0_be_mutex::@196 bm_u
#define M0_RC_INFO(rc, fmt,...)
Definition: trace.h:209
#define PRIu64
Definition: types.h:58
struct m0_crv co_out_ver
Definition: ctg_store.h:218
static struct m0_mutex cs_init_guard
Definition: ctg_store.c:189
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
Definition: refs.c:38
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
static int versioned_put_sync(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:2381
static void ctg_store_init_creds_calc(struct m0_be_seg *seg, struct m0_cas_state *state, struct m0_cas_ctg *ctidx, struct m0_be_tx_credit *cred)
Definition: ctg_store.c:617
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
Definition: refs.c:24
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
#define M0_BE_OP_SYNC_RET(op_obj, action, member)
Definition: op.h:243
M0_INTERNAL void m0_be_btree_insert_credit2(const struct m0_be_btree *tree, m0_bcount_t nr, m0_bcount_t ksize, m0_bcount_t vsize, struct m0_be_tx_credit *accum)
Definition: btree.c:1730
M0_INTERNAL void m0_ctg_try_init(struct m0_cas_ctg *ctg)
Definition: ctg_store.c:953
static int ctg_op_exec_normal(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1143
static int key
Definition: locality.c:283
uint8_t gk_data[0]
Definition: ctg_store.c:88
struct m0_be_mutex cc_chan_guard
Definition: ctg_store.h:106
M0_INTERNAL void m0_be_btree_create_credit(const struct m0_be_btree *tree, m0_bcount_t nr, struct m0_be_tx_credit *accum)
Definition: btree.c:1783
int co_ct
Definition: ctg_store.h:223
Definition: cas.h:344
static void ctg_state_counter_add(uint64_t *counter, uint64_t val)
Definition: ctg_store.c:876
M0_INTERNAL const struct m0_be_btree_kv_ops * m0_ctg_btree_ops(void)
Definition: ctg_store.c:2167
#define M0_FID_TINIT(type, container, key)
Definition: fid.h:90
M0_INTERNAL void m0_ctg_cursor_put(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1744
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
M0_INTERNAL int m0_ctg_mem_place(struct m0_ctg_op *ctg_op, const struct m0_buf *buf, int next_phase)
Definition: ctg_store.c:2090
Definition: refs.h:34
struct m0_be_op co_beop
Definition: ctg_store.h:194
#define CTG_OP_COMBINE(opc, ct)
Definition: ctg_store.h:241
M0_INTERNAL struct m0_cas_ctg * m0_ctg_meta(void)
Definition: ctg_store.c:2130
M0_INTERNAL void m0_be_btree_lookup_inplace(struct m0_be_btree *tree, struct m0_be_op *op, const struct m0_buf *key, struct m0_be_btree_anchor *anchor)
Definition: btree.c:2152
Definition: cas.h:157
M0_INTERNAL const struct m0_fid_type m0_dix_fid_type
Definition: cas.c:168
#define M0_BE_TX_CAPTURE_BUF(seg, tx, buf)
Definition: tx.h:509
m0_bcount_t b_nob
Definition: buf.h:38
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
static void ctg_destroy(struct m0_cas_ctg *ctg, struct m0_be_tx *tx)
Definition: ctg_store.c:471
#define M0_ASSERT(cond)
M0_INTERNAL void m0_be_btree_destroy(struct m0_be_btree *tree, struct m0_be_tx *tx, struct m0_be_op *op)
Definition: btree.c:1538
M0_BASSERT(sizeof(struct generic_key)==M0_CAS_CTG_KEY_HDR_SIZE)
bool cc_inited
Definition: ctg_store.h:112
M0_INTERNAL void m0_be_btree_release(struct m0_be_tx *tx, struct m0_be_btree_anchor *anchor)
Definition: btree.c:2180
struct m0_be_chan cc_chan
Definition: ctg_store.h:104
M0_INTERNAL bool cas_in_ut(void)
Definition: cas.c:220
static int counter
Definition: mutex.c:32
static const char cas_state_key[]
Definition: ctg_store.c:197
struct m0_chan bch_chan
Definition: format.h:225
M0_INTERNAL void m0_be_tx_credit_add(struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
Definition: tx_credit.c:44
M0_INTERNAL int m0_crv_cmp(const struct m0_crv *left, const struct m0_crv *right)
Definition: cas.c:275
#define m0_streq(a, b)
Definition: string.h:34
struct m0_clink co_clink
Definition: ctg_store.h:204
M0_INTERNAL int m0_ctg_store_init(struct m0_be_domain *dom)
Definition: ctg_store.c:814
M0_INTERNAL int ctg_index_btree_dump(struct m0_motr *motr_ctx, struct m0_cas_ctg *ctg, bool dump_in_hex)
Definition: ctg_store.c:2212
static int key0
Definition: locality.c:282
static struct m0_stob_domain * dom
Definition: storage.c:38
union m0_be_long_lock::@190 bll_u
M0_INTERNAL void m0_ctg_mark_deleted_credit(struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1805
#define M0_BUF_INIT0
Definition: buf.h:71
#define META_VALUE_INIT(__ctg_ptr)
Definition: ctg_store.c:373
M0_INTERNAL int m0_ctg__meta_insert(struct m0_be_btree *meta, const struct m0_fid *fid, struct m0_cas_ctg *ctg, struct m0_be_tx *tx)
Definition: ctg_store.c:528
struct m0_be_domain * eng_domain
Definition: engine.h:118
Definition: cas.h:345
int co_rc
Definition: ctg_store.h:227
void * m0_alloc(size_t size)
Definition: memory.c:126
m0_bcount_t tc_reg_nr
Definition: tx_credit.h:84
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static int ctg_store_create(struct m0_be_seg *seg)
Definition: ctg_store.c:725
M0_INTERNAL void m0_be_btree_fini(struct m0_be_btree *tree)
Definition: btree.c:1503
M0_INTERNAL int m0_be_tx_exclusive_open_sync(struct m0_be_tx *tx)
Definition: tx.c:594
struct m0_crv gv_version
Definition: ctg_store.c:96
uint64_t f_container
Definition: fid.h:39
M0_INTERNAL void m0_ctg_create_credit(struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1823
M0_INTERNAL void m0_be_btree_minkey(struct m0_be_btree *tree, struct m0_be_op *op, struct m0_buf *out)
Definition: btree.c:2070
struct m0_be_btree_anchor co_anchor
Definition: ctg_store.h:196
M0_INTERNAL uint64_t m0_ctg_rec_size(void)
Definition: ctg_store.c:2151
static void ctg_state_destroy(struct m0_cas_state *state, struct m0_be_tx *tx)
Definition: ctg_store.c:681
static int ctg_berc(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:214
M0_INTERNAL bool m0_crv_tbs(const struct m0_crv *crv)
Definition: cas.c:225
struct m0_sm_group * lo_grp
Definition: locality.h:67
static int ctg_state_create(struct m0_be_seg *seg, struct m0_be_tx *tx, struct m0_cas_state **state)
Definition: ctg_store.c:649
M0_INTERNAL void m0_be_op_done(struct m0_be_op *op)
Definition: stubs.c:104
M0_INTERNAL const struct m0_fid m0_cas_dead_index_fid
Definition: cas.c:156
M0_INTERNAL void m0_ctg_meta_cursor_init(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1722
Definition: dump.c:103
struct m0_fid ci_fid
Definition: cas.h:113
Definition: chan.h:229
static void ctg_vbuf_pack(struct m0_buf *dst, const struct m0_buf *src, const struct m0_crv *crv)
Definition: ctg_store.c:235
static void ctg_state_counter_sub(uint64_t *counter, uint64_t val)
Definition: ctg_store.c:890
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
M0_INTERNAL int m0_ctg_meta_find_ctg(struct m0_cas_ctg *meta, const struct m0_fid *ctg_fid, struct m0_cas_ctg **ctg)
Definition: ctg_store.c:503
M0_INTERNAL void m0_ctg_cursor_init(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg)
Definition: ctg_store.c:1640
M0_INTERNAL bool m0_be_should_break_half(struct m0_be_engine *eng, const struct m0_be_tx_credit *accum, const struct m0_be_tx_credit *delta)
Definition: tx.c:721
M0_INTERNAL void m0_ctg_op_get_ver(struct m0_ctg_op *ctg_op, struct m0_crv *out)
Definition: ctg_store.c:1582
M0_INTERNAL int m0_be_btree_cursor_first_sync(struct m0_be_btree_cursor *cur)
Definition: btree.c:2348
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_fid_sscanf(const char *s, struct m0_fid *fid)
Definition: fid.c:227
struct m0_be_btree_cursor co_cur
Definition: ctg_store.h:198
static void ctg_fini(struct m0_cas_ctg *ctg)
Definition: ctg_store.c:431
int m0_ctg_create(struct m0_be_seg *seg, struct m0_be_tx *tx, struct m0_cas_ctg **out, const struct m0_fid *cas_fid)
Definition: ctg_store.c:441
static int ctg_exec(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, const struct m0_buf *key, int next_phase)
Definition: ctg_store.c:1418
union m0_dix_layout::@145 u
#define M0_BE_FREE_PTR_SYNC(ptr, seg, tx)
Definition: alloc.h:345
Definition: seg.h:66
M0_INTERNAL int m0_buf_copy(struct m0_buf *dest, const struct m0_buf *src)
Definition: buf.c:104
#define FID_P(f)
Definition: fid.h:77
struct m0_be_domain but_dom
Definition: helper.h:47
#define CRV_F
Definition: cas.h:508
struct m0_long_lock cs_del_lock
Definition: ctg_store.c:63
static int ctg_vbuf_unpack(struct m0_buf *buf, struct m0_crv *crv)
Definition: ctg_store.c:279
M0_INTERNAL void m0_chan_unlock(struct m0_chan *ch)
Definition: chan.c:73
static void ctg_ctidx_op_credits(struct m0_cas_id *cid, bool insert, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1930
void imask(void)
Definition: client_ut.c:141
static int ctg_mem_exec(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1441
Definition: cas.h:372
M0_INTERNAL bool m0_dix_imask_is_empty(const struct m0_dix_imask *mask)
Definition: imask.c:130
M0_INTERNAL void m0_ctg_op_init(struct m0_ctg_op *ctg_op, struct m0_fom *fom, uint32_t flags)
Definition: ctg_store.c:1759
M0_INTERNAL void m0_be_btree_cursor_kv_get(struct m0_be_btree_cursor *cur, struct m0_buf *key, struct m0_buf *val)
Definition: btree.c:2485
static void ctg_meta_selfrm(struct m0_be_btree *meta, struct m0_be_tx *tx)
Definition: ctg_store.c:581
static void ctg_state_dec_update(struct m0_be_tx *tx, uint64_t size)
Definition: ctg_store.c:945
#define M0_BE_FREE_PTR(ptr, seg, tx, op)
Definition: alloc.h:342
M0_INTERNAL void m0_be_tx_init(struct m0_be_tx *tx, uint64_t tid, struct m0_be_domain *dom, struct m0_sm_group *sm_group, m0_be_tx_cb_t persistent, m0_be_tx_cb_t discarded, void(*filler)(struct m0_be_tx *tx, void *payload), void *datum)
Definition: stubs.c:150
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
static m0_bcount_t ctg_ksize(const void *key)
Definition: ctg_store.c:383
#define LAYOUT_VALUE_INIT(__layout)
Definition: ctg_store.c:378
M0_INTERNAL void m0_long_lock_init(struct m0_long_lock *lock)
static uint64_t ctg_state_update(struct m0_be_tx *tx, uint64_t size, bool is_inc)
Definition: ctg_store.c:898
struct m0_cas_ctg * co_ctg
Definition: ctg_store.h:186
Definition: fom.h:481
static int ctg_cmp(const void *key0, const void *key1)
Definition: ctg_store.c:395
M0_INTERNAL void m0_be_op_reset(struct m0_be_op *op)
Definition: op.c:152
M0_INTERNAL int m0_ctg_ctidx_delete_sync(const struct m0_cas_id *cid, struct m0_be_tx *tx)
Definition: ctg_store.c:2060
struct m0_buf co_out_key
Definition: ctg_store.h:214
M0_INTERNAL bool m0_crv_is_none(const struct m0_crv *crv)
Definition: cas.c:282
M0_INTERNAL bool m0_dtm0_tx_desc_is_none(const struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:44
Definition: cas.h:350
Definition: setup.h:354
struct m0_be_domain * cs_be_domain
Definition: ctg_store.c:74
M0_INTERNAL void m0_ctg_state_inc_update(struct m0_be_tx *tx, uint64_t size)
Definition: ctg_store.c:940
M0_INTERNAL void m0_be_op_active(struct m0_be_op *op)
Definition: stubs.c:100
static int ctg_store__init(struct m0_be_seg *seg, struct m0_cas_state *state)
Definition: ctg_store.c:695
static void ctg_meta_delete_credit(struct m0_be_btree *bt, m0_bcount_t nr, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:603
M0_INTERNAL void m0_ctg_cursor_kv_get(struct m0_ctg_op *ctg_op, struct m0_buf *key, struct m0_buf *val)
Definition: ctg_store.c:1708
#define M0_BUF_INIT_PTR(p)
Definition: buf.h:69
static int ctg_kbuf_unpack(struct m0_buf *buf)
Definition: ctg_store.c:342
uint64_t gk_length
Definition: ctg_store.c:87
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
Definition: locality.c:169
static int ctg_op_exec(struct m0_ctg_op *ctg_op, int next_phase)
Definition: ctg_store.c:1280
struct m0_reqh_context cc_reqh_ctx
Definition: setup.h:361
static struct m0_chan chan[RDWR_REQUEST_MAX]
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
Definition: ext.h:37
Definition: fid.h:38
uint64_t f_key
Definition: fid.h:40
M0_INTERNAL void m0_format_footer_update(const void *buffer)
Definition: format.c:95
struct m0_be_tx tx_betx
Definition: dtm.h:559
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
struct m0_buf ba_value
Definition: btree.h:469
M0_INTERNAL void ctg_index_btree_dump_one_rec(struct m0_buf *key, struct m0_buf *val, bool dump_in_hex)
Definition: ctg_store.c:2180
M0_INTERNAL void m0_be_btree_cursor_get(struct m0_be_btree_cursor *cur, const struct m0_buf *key, bool slant)
Definition: btree.c:2295
M0_INTERNAL int m0_ctg_ctidx_insert_sync(const struct m0_cas_id *cid, struct m0_be_tx *tx)
Definition: ctg_store.c:2010
M0_INTERNAL void m0_crv_init(struct m0_crv *crv, const struct m0_dtm0_ts *ts, bool tbs)
Definition: cas.c:251
M0_INTERNAL void m0_ctg_ctidx_delete_credits(struct m0_cas_id *cid, struct m0_be_tx_credit *accum)
Definition: ctg_store.c:1971
m0_bcount_t size
Definition: di.c:39
M0_INTERNAL int m0_ctg_meta_delete(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1392
struct m0_be_op bc_op
Definition: btree.h:589
M0_INTERNAL void m0_be_op_fini(struct m0_be_op *op)
Definition: stubs.c:92
enum m0_dtx_state tx_state
Definition: dtm.h:558
uint8_t gv_data[0]
Definition: ctg_store.c:97
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
#define M0_BE_ALLOC_ARR_SYNC(arr, nr, seg, tx)
Definition: alloc.h:335
bool cs_initialised
Definition: ctg_store.c:79
int ctgdump(struct m0_motr *motr_ctx, char *fidstr, char *dump_in_hex_str)
Definition: ctg_store.c:2235
M0_INTERNAL void m0_be_btree_init(struct m0_be_btree *tree, struct m0_be_seg *seg, const struct m0_be_btree_kv_ops *ops)
Definition: btree.c:1487
#define M0_MUTEX_SINIT(m)
Definition: mutex.h:63
M0_INTERNAL void m0_be_btree_cursor_init(struct m0_be_btree_cursor *cur, struct m0_be_btree *btree)
Definition: btree.c:2281
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
struct m0_buf co_mem_buf
Definition: ctg_store.h:219
M0_INTERNAL struct m0_cas_ctg * m0_ctg_ctidx(void)
Definition: ctg_store.c:2135
struct m0_ext * im_range
Definition: imask.h:191
static struct m0_be_seg * seg
Definition: btree.c:40
static int ctg_vbuf_as_layout(const struct m0_buf *buf, struct m0_dix_layout **layout)
Definition: ctg_store.c:323
#define DTID0_P(__tid)
Definition: tx_desc.h:99
static int ctg_vbuf_as_ctg(const struct m0_buf *val, struct m0_cas_ctg **ctg)
Definition: ctg_store.c:306
#define M0_ASSERT_INFO(cond, fmt,...)
static int versioned_get_sync(struct m0_ctg_op *op)
Definition: ctg_store.c:2460
M0_INTERNAL struct m0_be_seg * m0_be_domain_seg_first(const struct m0_be_domain *dom)
Definition: domain.c:486
M0_INTERNAL void m0_be_tx_capture(struct m0_be_tx *tx, const struct m0_be_reg *reg)
Definition: stubs.c:190
M0_INTERNAL void m0_ctg_op_fini(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1788
union m0_be_op::@39 bo_u
struct m0_mutex cs_state_mutex
Definition: ctg_store.c:47
static int ctg_op_tick_ret(struct m0_ctg_op *ctg_op, int next_state)
Definition: ctg_store.c:1095
struct m0_mutex co_channel_lock
Definition: ctg_store.h:208
static struct m0_cas_op * cas_op(const struct m0_fom *fom)
Definition: service.c:1761
struct m0_cas_state * cs_state
Definition: ctg_store.c:44
bool co_cur_initialised
Definition: ctg_store.h:200
bool co_is_versioned
Definition: ctg_store.h:238
M0_INTERNAL const struct m0_fid m0_cas_ctidx_fid
Definition: cas.c:151
#define M0_CRV_INIT_NONE
Definition: cas.h:511
#define out(...)
Definition: gen.c:41
M0_INTERNAL int m0_ctg_drop(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, int next_phase)
Definition: ctg_store.c:1620
int co_cur_phase
Definition: ctg_store.h:202
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
struct m0_fid gfid
Definition: dir.c:626
struct m0_chan co_channel
Definition: ctg_store.h:206
#define DTID0_F
Definition: tx_desc.h:98
Definition: op.h:59
Definition: op.h:74
M0_INTERNAL void m0_be_seg_dict_insert_credit(struct m0_be_seg *seg, const char *name, struct m0_be_tx_credit *accum)
Definition: seg_dict.c:114
static struct m0_ctg_store ctg_store
Definition: ctg_store.c:196
struct m0_format_header cc_head
Definition: ctg_store.h:95
M0_INTERNAL void m0_be_op_init(struct m0_be_op *op)
Definition: stubs.c:87
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
Definition: op.h:57
Definition: cas.h:107
M0_INTERNAL int m0_be_seg_dict_lookup(struct m0_be_seg *seg, const char *name, void **out)
Definition: seg_dict.c:180
#define M0_BUF_INIT(size, data)
Definition: buf.h:64
uint32_t sm_state
Definition: sm.h:307
struct m0_sm bo_sm
Definition: op.h:75
M0_INTERNAL void m0_be_btree_truncate(struct m0_be_btree *tree, struct m0_be_tx *tx, struct m0_be_op *op, m0_bcount_t limit)
Definition: btree.c:1564
struct m0_pdclust_src_addr src
Definition: fd.c:108
int32_t rc
Definition: trigger_fop.h:47
uint32_t co_flags
Definition: ctg_store.h:225
M0_INTERNAL int m0_ctg_meta_cursor_get(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1729
M0_INTERNAL void m0_be_op_lock(struct m0_be_op *op)
Definition: op.c:115
static int ctg_meta_exec(struct m0_ctg_op *ctg_op, const struct m0_fid *fid, int next_phase)
Definition: ctg_store.c:1311
static void ctg_op_version_get(const struct m0_ctg_op *ctg_op, struct m0_crv *out)
Definition: ctg_store.c:2347
static const struct m0_be_btree_kv_ops cas_btree_ops
Definition: ctg_store.c:195
Definition: cas.h:359
#define M0_BE_FREE_CREDIT_ARR(arr, nr, seg, accum)
Definition: alloc.h:367
Definition: cas.h:362
struct m0_mutex mutex
Definition: format.h:199
#define M0_BE_FREE_CREDIT_PTR(ptr, seg, accum)
Definition: alloc.h:359
M0_INTERNAL int m0_ctg_truncate(struct m0_ctg_op *ctg_op, struct m0_cas_ctg *ctg, m0_bcount_t limit, int next_phase)
Definition: ctg_store.c:1606
M0_INTERNAL int m0_ctg_op_rc(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1779
uint64_t cs_rec_nr
Definition: ctg_store.h:166
m0_bcount_t tc_reg_size
Definition: tx_credit.h:86
M0_INTERNAL struct m0_long_lock * m0_ctg_del_lock(void)
Definition: ctg_store.c:2157
static m0_bcount_t ctg_vsize(const void *val)
Definition: ctg_store.c:389
#define FID_F
Definition: fid.h:75
static bool ctg_op_is_versioned(const struct m0_ctg_op *op)
Definition: ctg_store.c:2291
Definition: trace.h:478
int co_opcode
Definition: ctg_store.h:221
struct m0_be_engine * t_engine
Definition: tx.h:285
m0_bcount_t b_nob
Definition: buf.h:230
M0_INTERNAL void m0_ctg_drop_credit(struct m0_fom *fom, struct m0_be_tx_credit *accum, struct m0_cas_ctg *ctg, m0_bcount_t *limit)
Definition: ctg_store.c:1841
M0_INTERNAL void m0_be_tx_close_sync(struct m0_be_tx *tx)
Definition: stubs.c:205
Definition: tx.h:280
Definition: idx_mock.c:47
M0_INTERNAL struct m0_cas_ctg * m0_ctg_meta_lookup_result(struct m0_ctg_op *ctg_op)
Definition: ctg_store.c:1377
M0_INTERNAL void m0_fid_tassume(struct m0_fid *fid, const struct m0_fid_type *ft)
Definition: fid.c:146
#define M0_IMPOSSIBLE(fmt,...)
struct m0_buf co_key
Definition: ctg_store.h:210