Motr  M0
cp.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CM
24 #include "lib/trace.h"
25 
26 #include "lib/misc.h" /* m0_forall */
27 #include "lib/memory.h"
28 #include "lib/errno.h"
29 
30 #include "motr/magic.h"
31 #include "reqh/reqh.h"
32 #include "net/buffer_pool.h"
33 #include "rpc/rpc_opcodes.h" /* M0_CM_CP_OPCODE */
34 #include "rpc/bulk.h"
35 
36 #include "cm/cp.h"
37 #include "cm/ag.h"
38 #include "cm/proxy.h"
39 #include "cm/cm.h"
40 
41 #include "fop/fop.h"
42 #include "fop/fom.h"
43 
392 M0_TL_DESCR_DEFINE(cp_data_buf, "copy packet data buffers", M0_INTERNAL,
393  struct m0_net_buffer, nb_extern_linkage, nb_magic,
395 M0_TL_DEFINE(cp_data_buf, M0_INTERNAL, struct m0_net_buffer);
396 
397 static const struct m0_bob_type cp_bob = {
398  .bt_name = "copy packet",
399  .bt_magix_offset = M0_MAGIX_OFFSET(struct m0_cm_cp, c_magix),
400  .bt_magix = CM_CP_MAGIX,
401  .bt_check = NULL
402 };
403 
404 M0_TL_DESCR_DEFINE(proxy_cps, "copy packets in proxy", M0_INTERNAL,
405  struct m0_cm_cp, c_cm_proxy_linkage, c_magix,
407 M0_TL_DEFINE(proxy_cps, M0_INTERNAL, struct m0_cm_cp);
408 
409 
410 M0_BOB_DEFINE(static, &cp_bob, m0_cm_cp);
411 
412 M0_INTERNAL void m0_cm_cp_fom_fini(struct m0_fom *fom)
413 {
414  struct m0_cm_cp *cp = bob_of(fom, struct m0_cm_cp, c_fom,
415  &cp_bob);
416  M0_ENTRY();
417 
418  m0_cm_cp_fini(cp);
419  cp->c_ops->co_free(cp);
420 
421  M0_LEAVE();
422 }
423 
424 static uint64_t cp_fom_locality(const struct m0_fom *fom)
425 {
426  struct m0_cm_cp *cp = bob_of(fom, struct m0_cm_cp, c_fom, &cp_bob);
427 
428  return cp->c_ops->co_home_loc_helper(cp);
429 }
430 
431 static int cp_fom_tick(struct m0_fom *fom)
432 {
433  struct m0_cm_cp *cp = bob_of(fom, struct m0_cm_cp, c_fom, &cp_bob);
434  int phase = m0_fom_phase(fom);
435  int rc;
436 
437  M0_PRE(phase < cp->c_ops->co_action_nr);
438  M0_LOG(M0_DEBUG, "fom phase = %d", phase);
439 
440  rc = cp->c_ops->co_action[phase](cp);
442  if (m0_fom_phase(fom) == M0_CCP_FINI)
443  cp->c_ops->co_action[M0_CCP_FINI](cp);
444  return M0_RC(rc);
445 }
446 
448 static const struct m0_fom_ops cp_fom_ops = {
450  .fo_tick = cp_fom_tick,
451  .fo_home_locality = cp_fom_locality
452 };
453 
454 M0_INTERNAL int m0_cm_cp_fom_create(struct m0_fop *fop, struct m0_fop *r_fop,
455  struct m0_fom **m, struct m0_reqh *reqh)
456 {
457  struct m0_cm_cp *cp;
458  struct m0_cm *cm;
459  struct m0_reqh_service *service;
460 
461  M0_PRE(fop != NULL);
462  M0_PRE(m != NULL);
463 
465  reqh);
466  M0_PRE(service != NULL);
467  cm = container_of(service, struct m0_cm, cm_service);
468  M0_PRE(cm != NULL);
469  cp = cm->cm_ops->cmo_cp_alloc(cm);
470  if (cp == NULL)
471  return M0_ERR(-ENOMEM);
472 
473  m0_cm_cp_fom_init(cm, cp, fop, r_fop);
474  *m = &cp->c_fom;
475  return 0;
476 }
477 
486  [M0_CCP_INIT] = {
488  .sd_name = "Init",
489  .sd_allowed = M0_BITS(M0_CCP_READ, M0_CCP_WRITE,
492  M0_CCP_FINI)
493  },
494  [M0_CCP_READ] = {
495  .sd_flags = 0,
496  .sd_name = "Read",
497  .sd_allowed = M0_BITS(M0_CCP_IO_WAIT, M0_CCP_FAIL,
498  M0_CCP_FINI)
499  },
500  [M0_CCP_WRITE_PRE] = {
501  .sd_flags = 0,
502  .sd_name = "Write-pre",
503  .sd_allowed = M0_BITS(M0_CCP_TX_OPEN, M0_CCP_WRITE,
504  M0_CCP_FAIL)
505  },
506  [M0_CCP_TX_OPEN] = {
507  .sd_flags = 0,
508  .sd_name = "TX Open",
509  .sd_allowed = M0_BITS(M0_CCP_WRITE, M0_CCP_FAIL)
510  },
511  [M0_CCP_WRITE] = {
512  .sd_flags = 0,
513  .sd_name = "Write",
514  .sd_allowed = M0_BITS(M0_CCP_TX_DONE, M0_CCP_IO_WAIT,
516  },
517  [M0_CCP_TX_DONE] = {
518  .sd_flags = 0,
519  .sd_name = "TX Done",
520  .sd_allowed = M0_BITS(M0_CCP_IO_WAIT, M0_CCP_FAIL)
521  },
522  [M0_CCP_IO_WAIT] = {
523  .sd_flags = 0,
524  .sd_name = "IO Wait",
525  .sd_allowed = M0_BITS(M0_CCP_XFORM, M0_CCP_SEND,
527  },
528  [M0_CCP_XFORM] = {
529  .sd_flags = 0,
530  .sd_name = "Xform",
531  .sd_allowed = M0_BITS(M0_CCP_FAIL, M0_CCP_FINI,
534  },
535  [M0_CCP_SW_CHECK] = {
536  .sd_flags = 0,
537  .sd_name = "Sliding window check",
538  .sd_allowed = M0_BITS(M0_CCP_SEND, M0_CCP_FINI, M0_CCP_FAIL)
539  },
540  [M0_CCP_SEND] = {
541  .sd_flags = 0,
542  .sd_name = "Send",
543  .sd_allowed = M0_BITS(M0_CCP_FINI, M0_CCP_RECV_INIT,
545  },
546  [M0_CCP_SEND_WAIT] = {
547  .sd_flags = 0,
548  .sd_name = "Send Wait",
549  .sd_allowed = M0_BITS(M0_CCP_FINI, M0_CCP_FAIL)
550  },
551  [M0_CCP_RECV_INIT] = {
552  .sd_flags = 0,
553  .sd_name = "Recv Init",
554  .sd_allowed = M0_BITS(M0_CCP_RECV_WAIT, M0_CCP_FINI)
555  },
556  [M0_CCP_RECV_WAIT] = {
557  .sd_flags = 0,
558  .sd_name = "Recv Wait",
559  .sd_allowed = M0_BITS(M0_CCP_XFORM, M0_CCP_RECV_INIT,
560  M0_CCP_FAIL)
561  },
562  [M0_CCP_FAIL] = {
563  .sd_flags = M0_SDF_FAILURE,
564  .sd_name = "Failure",
565  .sd_allowed = M0_BITS(M0_CCP_FINI)
566  },
567  [M0_CCP_FINI] = {
568  .sd_flags = M0_SDF_TERMINAL,
569  .sd_name = "Fini",
570  .sd_allowed = 0
571  },
572 };
573 
574 static const struct m0_sm_conf m0_cm_cp_sm_conf = {
575  .scf_name = "sm:cp conf",
576  .scf_nr_states = ARRAY_SIZE(m0_cm_cp_state_descr),
577  .scf_state = m0_cm_cp_state_descr
578 };
579 
580 M0_INTERNAL void m0_cm_cp_init(struct m0_cm_type *cmtype,
581  const struct m0_fom_type_ops *ft_ops)
582 {
583  m0_fom_type_init(&cmtype->ct_fomt, cmtype->ct_fom_id, ft_ops,
584  &cmtype->ct_stype, &m0_cm_cp_sm_conf);
585 }
586 
587 M0_INTERNAL bool m0_cm_cp_invariant(const struct m0_cm_cp *cp)
588 {
589  const struct m0_cm_cp_ops *ops = cp->c_ops;
590 
591  return m0_cm_cp_bob_check(cp) && ops != NULL &&
592  cp->c_ag != NULL &&
593  m0_fom_phase(&cp->c_fom) < ops->co_action_nr &&
594  cp->c_ops->co_invariant(cp);
595 }
596 
597 M0_INTERNAL void m0_cm_cp_only_init(struct m0_cm *cm, struct m0_cm_cp *cp)
598 {
599  m0_cm_cp_bob_init(cp);
600  cp_data_buf_tlist_init(&cp->c_buffers);
601  m0_rpc_bulk_init(&cp->c_bulk);
604  proxy_cp_tlink_init(cp);
605 
606  /* copy packet epoch is derived from its cm */
607  cp->c_epoch = cm->cm_epoch;
608 }
609 
610 M0_INTERNAL void m0_cm_cp_fom_init(struct m0_cm *cm, struct m0_cm_cp *cp,
611  struct m0_fop *fop, struct m0_fop *r_fop)
612 {
613  struct m0_reqh_service *service;
614 
615  M0_PRE(cm != NULL && cp != NULL && cp->c_ops != NULL);
616 
617  m0_cm_cp_only_init(cm, cp);
618  service = &cm->cm_service;
619  m0_fom_init(&cp->c_fom, &cm->cm_type->ct_fomt, &cp_fom_ops, fop, r_fop,
620  service->rs_reqh);
621 }
622 
623 M0_TL_DECLARE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf);
624 M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN);
625 
626 M0_INTERNAL void m0_cm_cp_only_fini(struct m0_cm_cp *cp)
627 {
628  struct m0_rpc_bulk *rbulk;
629  /*
630  * If the copy packet is not an accumulator copy packet, it will get
631  * finalised after transformation. For such copy packets, finalise the
632  * bitmap.
633  */
634  if (cp->c_xform_cp_indices.b_nr > 0)
638 
639  /*
640  * Release the net buffers if rpc bulk object is still dirty.
641  * And wait on channel till all net buffers are deleted from
642  * transfer machine.
643  */
644  rbulk = &cp->c_bulk;
645  m0_mutex_lock(&rbulk->rb_mutex);
646  if (!rpcbulk_tlist_is_empty(&rbulk->rb_buflist)) {
647  struct m0_clink clink;
648  size_t buf_nr;
649  size_t non_queued_buf_nr;
650 
652  m0_clink_add(&rbulk->rb_chan, &clink);
653  buf_nr = rpcbulk_tlist_length(&rbulk->rb_buflist);
654  non_queued_buf_nr = m0_rpc_bulk_store_del_unqueued(rbulk);
655  m0_mutex_unlock(&rbulk->rb_mutex);
656 
657  m0_rpc_bulk_store_del(rbulk);
658  M0_LOG(M0_DEBUG, "bulk %p, buf_nr %llu, non_queued_buf_nr %llu",
659  rbulk,
660  (unsigned long long)buf_nr,
661  (unsigned long long)non_queued_buf_nr);
662  /*
663  * If there were some queued net bufs which had to be deleted,
664  * then it is required to wait for their callbacks.
665  */
666  if (buf_nr > non_queued_buf_nr) {
668  }
671  } else {
672  m0_mutex_unlock(&rbulk->rb_mutex);
673  }
675 
677  m0_rpc_bulk_fini(&cp->c_bulk);
678  proxy_cp_tlink_fini(cp);
679  m0_cm_cp_bob_fini(cp);
680 }
681 
682 M0_INTERNAL void m0_cm_cp_fini(struct m0_cm_cp *cp)
683 {
684  m0_cm_cp_only_fini(cp);
685  m0_fom_fini(&cp->c_fom);
686 }
687 
688 M0_INTERNAL int m0_cm_cp_enqueue(struct m0_cm *cm, struct m0_cm_cp *cp)
689 {
690  struct m0_fom *fom = &cp->c_fom;
691  struct m0_reqh *reqh = cm->cm_service.rs_reqh;
692 
693  M0_PRE(reqh != NULL);
695 
697  return M0_ERR(-ESHUTDOWN);
698 
699  m0_fom_queue(fom);
700  return M0_RC(0);
701 }
702 
703 M0_INTERNAL void m0_cm_cp_buf_add(struct m0_cm_cp *cp, struct m0_net_buffer *nb)
704 {
705  M0_PRE(cp != NULL);
706  M0_PRE(nb != NULL);
707 
708  cp_data_buf_tlink_init(nb);
709  cp_data_buf_tlist_add_tail(&cp->c_buffers, nb);
710  M0_CNT_INC(cp->c_buf_nr);
711 }
712 
713 M0_INTERNAL void m0_cm_cp_buf_release(struct m0_cm_cp *cp)
714 {
715  struct m0_net_buffer_pool *nbp;
716  struct m0_net_buffer *nbuf;
717  struct m0_net_buffer *nbuf_head;
718  uint64_t colour;
719 
720  nbuf_head = cp_data_buf_tlist_head(&cp->c_buffers);
721  if (nbuf_head != NULL) {
722  nbp = nbuf_head->nb_pool;
723  M0_ASSERT(nbp != NULL);
725  m0_tl_for(cp_data_buf, &cp->c_buffers, nbuf) {
726  colour = cp->c_ops->co_home_loc_helper(cp) %
728  cp_data_buf_tlink_del_fini(nbuf);
729  m0_cm_buffer_put(nbp, nbuf, colour);
730  M0_CNT_DEC(cp->c_buf_nr);
731  } m0_tl_endfor;
734  }
735  cp_data_buf_tlist_fini(&cp->c_buffers);
736 }
737 
738 M0_INTERNAL uint64_t m0_cm_cp_nr(struct m0_cm_cp *cp)
739 {
740  struct m0_bitmap *bm = &cp->c_xform_cp_indices;
741  int i;
742  uint64_t cnt = 0;
743 
744  for (i = 0; i < bm->b_nr; ++i) {
745  if (m0_bitmap_get(bm, i))
746  M0_CNT_INC(cnt);
747  }
748 
749  return cnt;
750 }
751 
752 M0_INTERNAL int m0_cm_cp_bufvec_merge(struct m0_cm_cp *cp)
753 {
754  struct m0_net_buffer *nbuf;
755  struct m0_net_buffer *nbuf_head;
756  int rc;
757 
758  nbuf_head = cp_data_buf_tlist_head(&cp->c_buffers);
759  nbuf = nbuf_head;
760  while ((nbuf = cp_data_buf_tlist_next(&cp->c_buffers, nbuf)) != NULL) {
761  rc = m0_bufvec_merge(&nbuf_head->nb_buffer, &nbuf->nb_buffer);
762  if (rc != 0)
763  return M0_RC(rc);
764  }
765  return 0;
766 }
767 
768 M0_INTERNAL int m0_cm_cp_bufvec_split(struct m0_cm_cp *cp)
769 {
770  struct m0_net_buffer *nbuf_head;
771  struct m0_bufvec *bufvec;
772  uint32_t new_v_nr;
773  m0_bcount_t *new_v_count;
774  uint32_t i;
775 
776  nbuf_head = cp_data_buf_tlist_head(&cp->c_buffers);
777  new_v_nr = nbuf_head->nb_pool->nbp_seg_nr;
778  M0_ALLOC_ARR(new_v_count, new_v_nr);
779  if (new_v_count == NULL)
780  return M0_ERR(-ENOMEM);
781 
782  bufvec = &nbuf_head->nb_buffer;
783  for (i = 0; i < new_v_nr; ++i)
784  new_v_count[i] = bufvec->ov_vec.v_count[i];
785 
786  m0_free(bufvec->ov_vec.v_count);
787  bufvec->ov_vec.v_nr = new_v_nr;
788  bufvec->ov_vec.v_count = new_v_count;
789 
790  return 0;
791 }
792 
793 M0_INTERNAL void m0_cm_cp_buf_move(struct m0_cm_cp *src, struct m0_cm_cp *dest)
794 {
795  struct m0_net_buffer *nbuf;
796  M0_PRE(src->c_data_seg_nr == dest->c_data_seg_nr);
797 
798  m0_tl_for(cp_data_buf, &src->c_buffers, nbuf) {
799  cp_data_buf_tlink_del_fini(nbuf);
800  M0_CNT_DEC(src->c_buf_nr);
801  m0_cm_cp_buf_add(dest, nbuf);
802  } m0_tl_endfor;
803 }
804 
805 M0_INTERNAL int m0_cm_cp_dup(struct m0_cm_cp *src, struct m0_cm_cp **dest)
806 {
807  struct m0_cm *cm;
808  struct m0_cm_cp *cp;
809 
810  cm = src->c_ag->cag_cm;
811  cp = cm->cm_ops->cmo_cp_alloc(cm);
812  if (cp == NULL)
813  return -ENOMEM;
814  m0_cm_ag_cp_add_locked(src->c_ag, cp);
815  cp->c_ag_cp_idx = src->c_ag_cp_idx;
816  cp->c_data_seg_nr = src->c_data_seg_nr;
817  m0_cm_cp_fom_init(cm, cp, NULL, NULL);
819  src->c_xform_cp_indices.b_nr);
820  m0_cm_cp_buf_move(src, cp);
821  if (src->c_xform_cp_indices.b_nr > 0)
822  m0_bitmap_copy(&cp->c_xform_cp_indices, &src->c_xform_cp_indices);
823  *dest = cp;
824 
825  return M0_RC(0);
826 }
827 
828 M0_INTERNAL void m0_cm_cp_data_copy(struct m0_cm_cp *src, struct m0_cm_cp *dst)
829 {
830  struct m0_net_buffer *src_nbuf;
831  struct m0_net_buffer *dst_nbuf;
832  struct m0_net_buffer_pool *nbp;
833  m0_bcount_t bytes_copied = 0;
834  uint64_t buf_size = 0;
835  uint64_t total_data_seg_nr;
836 
837  M0_PRE(!cp_data_buf_tlist_is_empty(&src->c_buffers));
838  M0_PRE(!cp_data_buf_tlist_is_empty(&dst->c_buffers));
839  M0_PRE(src->c_buf_nr == dst->c_buf_nr);
840 
841  total_data_seg_nr = src->c_data_seg_nr;
842  for (src_nbuf = cp_data_buf_tlist_head(&src->c_buffers),
843  dst_nbuf = cp_data_buf_tlist_head(&dst->c_buffers);
844  src_nbuf != NULL && dst_nbuf != NULL;
845  src_nbuf = cp_data_buf_tlist_next(&src->c_buffers, src_nbuf),
846  dst_nbuf = cp_data_buf_tlist_next(&dst->c_buffers, dst_nbuf)) {
847  nbp = src_nbuf->nb_pool;
848  if (total_data_seg_nr < nbp->nbp_seg_nr)
849  buf_size = total_data_seg_nr * nbp->nbp_seg_size;
850  else {
851  total_data_seg_nr -= nbp->nbp_seg_nr;
853  }
854  bytes_copied = m0_bufvec_copy(&dst_nbuf->nb_buffer,
855  &src_nbuf->nb_buffer, buf_size);
856  M0_ASSERT(bytes_copied == buf_size);
857  }
858 }
859 
862 /*
863  * Local variables:
864  * c-indentation-style: "K&R"
865  * c-basic-offset: 8
866  * tab-width: 8
867  * fill-column: 80
868  * scroll-step: 1
869  * End:
870  */
M0_INTERNAL m0_bcount_t m0_bufvec_copy(struct m0_bufvec *dst, struct m0_bufvec *src, m0_bcount_t num_bytes)
Definition: vec.c:983
M0_INTERNAL void m0_cm_cp_buf_release(struct m0_cm_cp *cp)
Definition: cp.c:713
bool(* co_invariant)(const struct m0_cm_cp *cp)
Definition: cp.h:235
M0_INTERNAL void m0_cm_cp_only_fini(struct m0_cm_cp *cp)
Definition: cp.c:626
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
const struct m0_cm_type * cm_type
Definition: cm.h:194
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
Definition: bitmap.c:86
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
struct m0_net_buffer_pool * nb_pool
Definition: net.h:1508
M0_INTERNAL int m0_cm_cp_fom_create(struct m0_fop *fop, struct m0_fop *r_fop, struct m0_fom **m, struct m0_reqh *reqh)
Definition: cp.c:454
struct m0_chan c_reply_wait
Definition: cp.h:215
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
Definition: bitmap.c:97
static struct m0_bufvec dst
Definition: xform.c:61
static struct m0_addb2_mach * m
Definition: consumer.c:38
struct m0_bitmap c_xform_cp_indices
Definition: cp.h:181
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
struct m0_bufvec nb_buffer
Definition: net.h:1322
m0_bcount_t nbp_seg_size
Definition: buffer_pool.h:255
const struct m0_cm_ops * cm_ops
Definition: cm.h:188
Definition: sm.h:350
#define M0_LOG(level,...)
Definition: trace.h:167
Definition: cp.h:160
M0_LEAVE()
static int cp_fom_tick(struct m0_fom *fom)
Definition: cp.c:431
M0_INTERNAL void m0_net_buffer_pool_unlock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:203
struct m0_mutex c_reply_wait_mutex
Definition: cp.h:216
M0_INTERNAL int m0_bufvec_merge(struct m0_bufvec *dst_bufvec, struct m0_bufvec *src_bufvec)
Definition: vec.c:280
const struct m0_net_buffer_pool_ops * nbp_ops
Definition: buffer_pool.h:263
struct m0_vec ov_vec
Definition: vec.h:147
struct m0_chan rb_chan
Definition: bulk.h:258
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:263
static const struct m0_sm_conf m0_cm_cp_sm_conf
Definition: cp.c:574
M0_INTERNAL void m0_cm_cp_fom_init(struct m0_cm *cm, struct m0_cm_cp *cp, struct m0_fop *fop, struct m0_fop *r_fop)
Definition: cp.c:610
M0_INTERNAL int m0_cm_cp_bufvec_split(struct m0_cm_cp *cp)
Definition: cp.c:768
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
uint64_t c_ag_cp_idx
Definition: cp.h:175
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL void m0_cm_cp_only_init(struct m0_cm *cm, struct m0_cm_cp *cp)
Definition: cp.c:597
m0_fom_phase
Definition: fom.h:372
const char * bt_name
Definition: bob.h:73
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:279
struct m0_fom_type ft_fom_type
Definition: fop.h:232
M0_INTERNAL void m0_cm_cp_fom_fini(struct m0_fom *fom)
Definition: cp.c:412
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
static struct m0_cm * cm
Definition: cm.c:63
M0_INTERNAL int m0_cm_cp_dup(struct m0_cm_cp *src, struct m0_cm_cp **dest)
Definition: cp.c:805
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_net_buffer_pool nbp
Definition: cp.c:37
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
M0_INTERNAL void m0_cm_cp_data_copy(struct m0_cm_cp *src, struct m0_cm_cp *dst)
Definition: cp.c:828
int i
Definition: dir.c:1033
struct m0_fop_type * f_type
Definition: fop.h:81
struct m0_fom_type ct_fomt
Definition: cm.h:150
return M0_ERR(-EOPNOTSUPP)
Definition: cnt.h:36
struct m0_rpc_bulk c_bulk
Definition: cp.h:193
static uint64_t cp_fom_locality(const struct m0_fom *fom)
Definition: cp.c:424
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL void m0_cm_cp_buf_move(struct m0_cm_cp *src, struct m0_cm_cp *dest)
Definition: cp.c:793
static const struct m0_bob_type cp_bob
Definition: cp.c:397
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:247
uint32_t c_data_seg_nr
Definition: cp.h:190
uint64_t ct_fom_id
Definition: cm.h:148
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
M0_INTERNAL void m0_cm_cp_fini(struct m0_cm_cp *cp)
Definition: cp.c:682
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
Definition: fom.c:1596
const struct m0_cm_cp_ops * c_ops
Definition: cp.h:169
void(* nbpo_not_empty)(struct m0_net_buffer_pool *)
Definition: buffer_pool.h:150
M0_INTERNAL void m0_cm_cp_buf_add(struct m0_cm_cp *cp, struct m0_net_buffer *nb)
Definition: cp.c:703
M0_INTERNAL void m0_net_buffer_pool_lock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:186
int(* co_action[])(struct m0_cm_cp *cp)
Definition: cp.h:259
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
Definition: reqh.h:94
uint32_t v_nr
Definition: vec.h:51
Definition: dump.c:103
struct m0_cm_aggr_group * c_ag
Definition: cp.h:172
uint32_t nbp_colours_nr
Definition: buffer_pool.h:265
uint32_t co_action_nr
Definition: cp.h:253
M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN)
m0_bcount_t * v_count
Definition: vec.h:53
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_cm_cp_bufvec_merge(struct m0_cm_cp *cp)
Definition: cp.c:752
m0_time_t c_epoch
Definition: cp.h:166
M0_INTERNAL void m0_cm_buffer_put(struct m0_net_buffer_pool *bp, struct m0_net_buffer *buf, uint64_t colour)
Definition: cm.c:1074
M0_INTERNAL void m0_bitmap_copy(struct m0_bitmap *dst, const struct m0_bitmap *src)
Definition: bitmap.c:158
M0_INTERNAL int m0_reqh_state_get(struct m0_reqh *reqh)
Definition: reqh.c:398
uint32_t sd_flags
Definition: sm.h:378
struct m0_cm_cp *(* cmo_cp_alloc)(struct m0_cm *cm)
Definition: cm.h:310
Definition: fom.h:481
Definition: cm.h:143
M0_INTERNAL size_t m0_rpc_bulk_store_del_unqueued(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:190
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
M0_INTERNAL int m0_cm_cp_enqueue(struct m0_cm *cm, struct m0_cm_cp *cp)
Definition: cp.c:688
uint32_t c_buf_nr
Definition: cp.h:187
struct m0_reqh reqh
Definition: rm_foms.c:48
#define M0_MAGIX_OFFSET(type, field)
Definition: misc.h:356
#define M0_CNT_INC(cnt)
Definition: arith.h:226
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
M0_BOB_DEFINE(static, &cp_bob, m0_cm_cp)
static const struct m0_fom_ops cp_fom_ops
Definition: cp.c:448
struct m0_reqh_service cm_service
Definition: cm.h:191
M0_TL_DECLARE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf)
Definition: cm.h:166
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
Definition: bitmap.c:105
M0_TL_DESCR_DEFINE(cp_data_buf, "copy packet data buffers", M0_INTERNAL, struct m0_net_buffer, nb_extern_linkage, nb_magic, M0_NET_BUFFER_LINK_MAGIC, CM_CP_DATA_BUF_HEAD_MAGIX)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
M0_INTERNAL bool m0_rpc_bulk_is_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:539
static struct m0_fop * fop
Definition: item.c:57
struct m0_reqh_service_type ct_stype
Definition: cm.h:145
void(* co_free)(struct m0_cm_cp *cp)
Definition: cp.h:250
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
Definition: fom.c:624
uint64_t(* co_home_loc_helper)(const struct m0_cm_cp *cp)
Definition: cp.h:241
struct m0_tl c_buffers
Definition: cp.h:184
M0_INTERNAL void m0_cm_cp_init(struct m0_cm_type *cmtype, const struct m0_fom_type_ops *ft_ops)
Definition: cp.c:580
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
struct m0_tl rb_buflist
Definition: bulk.h:256
static uint32_t buf_size
Definition: ad.c:75
M0_INTERNAL void m0_rpc_bulk_store_del(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:215
size_t b_nr
Definition: bitmap.h:44
struct m0_fom c_fom
Definition: cp.h:161
M0_INTERNAL uint64_t m0_cm_cp_nr(struct m0_cm_cp *cp)
Definition: cp.c:738
struct m0_fom_ops ops
Definition: io_foms.c:623
struct m0_reqh * rs_reqh
Definition: reqh_service.h:259
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
const struct m0_reqh_service_type * ft_rstype
Definition: fom.h:617
M0_INTERNAL void m0_cm_ag_cp_add_locked(struct m0_cm_aggr_group *ag, struct m0_cm_cp *cp)
Definition: ag.c:502
M0_INTERNAL bool m0_cm_cp_invariant(const struct m0_cm_cp *cp)
Definition: cp.c:587
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
struct m0_pdclust_src_addr src
Definition: fd.c:108
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_TL_DEFINE(cp_data_buf, M0_INTERNAL, struct m0_net_buffer)
static struct m0_sm_state_descr m0_cm_cp_state_descr[]
Definition: cp.c:485
m0_time_t cm_epoch
Definition: cm.h:177
Definition: fop.h:79
struct m0_mutex rb_mutex
Definition: bulk.h:251
Definition: vec.h:145