Motr  M0
cm.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CM
24 
25 #include "lib/trace.h" /* M0_LOG */
26 #include "lib/bob.h" /* M0_BOB_DEFINE */
27 #include "lib/misc.h" /* M0_SET0 */
28 #include "lib/assert.h" /* M0_PRE, M0_POST */
29 #include "lib/errno.h"
30 #include "lib/finject.h"
31 #include "lib/memory.h"
32 
33 #include "module/instance.h" /* m0_get */
34 #include "motr/magic.h"
35 #include "motr/setup.h"
36 #include "net/buffer_pool.h"
37 #include "rpc/rpclib.h"
38 #include "rpc/rpc_machine.h"
39 #include "reqh/reqh.h"
40 #include "fop/fop.h"
41 #include "pool/pool.h" /* pools_common_svc */
42 #include "conf/obj_ops.h" /* m0_conf_obj_find_lock */
43 
44 #include "ha/ha.h" /* m0_ha_send */
45 
46 #include "cm/cm.h"
47 #include "cm/ag.h"
48 #include "cm/cp.h"
49 #include "cm/proxy.h"
50 #include "cm/sw.h"
51 #include "cm/sw_xc.h"
52 #include "cm/cp_onwire_xc.h"
53 #include "cm/ag_xc.h"
54 
432 static struct m0_tl cmtypes;
433 
435 static struct m0_mutex cmtypes_mutex;
436 
437 M0_TL_DESCR_DEFINE(cmtypes, "copy machine types", static,
438  struct m0_cm_type, ct_linkage, ct_magix,
440 
441 M0_TL_DEFINE(cmtypes, static, struct m0_cm_type);
442 
443 static struct m0_bob_type cmtypes_bob;
445 
447  [M0_CMS_INIT] = {
449  .sd_name = "cm_init",
450  .sd_allowed = M0_BITS(M0_CMS_IDLE, M0_CMS_FINI)
451  },
452  [M0_CMS_IDLE] = {
453  .sd_flags = 0,
454  .sd_name = "cm_idle",
455  .sd_allowed = M0_BITS(M0_CMS_FAIL, M0_CMS_PREPARE,
456  M0_CMS_FINI)
457  },
458  [M0_CMS_PREPARE] = {
459  .sd_flags = 0,
460  .sd_name = "cm_prepare",
461  .sd_allowed = M0_BITS(M0_CMS_READY, M0_CMS_FAIL)
462  },
463  [M0_CMS_READY] = {
464  .sd_flags = 0,
465  .sd_name = "cm_ready",
466  .sd_allowed = M0_BITS(M0_CMS_ACTIVE, M0_CMS_FAIL)
467  },
468  [M0_CMS_ACTIVE] = {
469  .sd_flags = 0,
470  .sd_name = "cm_active",
471  .sd_allowed = M0_BITS(M0_CMS_STOP, M0_CMS_FAIL)
472  },
473  [M0_CMS_FAIL] = {
474  .sd_flags = M0_SDF_FAILURE,
475  .sd_name = "cm_fail",
476  .sd_allowed = M0_BITS(M0_CMS_PREPARE, M0_CMS_FINI)
477  },
478  [M0_CMS_STOP] = {
479  .sd_flags = 0,
480  .sd_name = "cm_stop",
481  .sd_allowed = M0_BITS(M0_CMS_IDLE, M0_CMS_FINI,
483  },
484  [M0_CMS_FINI] = {
485  .sd_flags = M0_SDF_TERMINAL,
486  .sd_name = "cm_fini",
487  .sd_allowed = 0
488  },
489 };
490 
491 static const struct m0_sm_conf cm_sm_conf = {
492  .scf_name = "sm:cm conf",
493  .scf_nr_states = M0_CMS_NR,
494  .scf_state = cm_state_descr
495 };
496 
497 M0_INTERNAL struct m0_cm *m0_cmsvc2cm(struct m0_reqh_service *cmsvc)
498 {
499  return container_of(cmsvc, struct m0_cm, cm_service);
500 }
501 
502 M0_INTERNAL int m0_ha_cm_err_send(struct m0_cm *cm, int rc)
503 {
504  struct m0_ha_msg *msg;
505  uint64_t tag;
506 
507  M0_ENTRY("cm: %p rc: %d", cm, rc);
508 
509  M0_PRE(cm != NULL);
510  M0_PRE(rc < 0);
511 
512  M0_LOG(M0_DEBUG, "Notify HA about cm failure, state=%d rc=%d",
513  m0_cm_state_get(cm), rc);
514 
515  M0_ALLOC_PTR(msg);
516  if (msg == NULL)
517  return M0_ERR(-ENOMEM);
518 
519  cm->cm_ops->cmo_ha_msg(cm, msg, rc);
520  m0_ha_send(m0_get()->i_ha, m0_get()->i_ha_link, msg, &tag);
521  m0_free(msg);
522 
523  M0_LEAVE();
524  return M0_RC(0);
525 }
526 
527 M0_INTERNAL void m0_cm_fail(struct m0_cm *cm, int rc)
528 {
529  M0_ENTRY("cm: %p rc: %d", cm, rc);
530 
531  M0_PRE(cm != NULL);
532  M0_PRE(rc < 0);
533 
534  /* If copy machine is already marked failed, do nothing. */
536  return;
537 
538  M0_LOG(M0_ERROR, "copy machine failure, state=%d rc=%d",
539  m0_cm_state_get(cm), rc);
542  M0_LEAVE();
543 }
544 
545 M0_INTERNAL void m0_cm_lock(struct m0_cm *cm)
546 {
548 }
549 
550 M0_INTERNAL void m0_cm_unlock(struct m0_cm *cm)
551 {
553 }
554 
555 M0_INTERNAL int m0_cm_trylock(struct m0_cm *cm)
556 {
558 }
559 
560 M0_INTERNAL bool m0_cm_is_locked(const struct m0_cm *cm)
561 {
563 }
564 
565 M0_INTERNAL enum m0_cm_state m0_cm_state_get(const struct m0_cm *cm)
566 {
567  return (enum m0_cm_state)cm->cm_mach.sm_state;
568 }
569 
570 M0_INTERNAL void m0_cm_state_set(struct m0_cm *cm, enum m0_cm_state state)
571 {
573 
574  m0_sm_state_set(&cm->cm_mach, state);
575  M0_LOG(M0_INFO, "CM:%s%" PRId64 ": %i",
576  (char *)cm->cm_type->ct_stype.rst_name,
578 }
579 
580 static int cm_rc(struct m0_cm *cm)
581 {
582  return cm->cm_mach.sm_rc;
583 }
584 
585 M0_INTERNAL bool m0_cm_invariant(const struct m0_cm *cm)
586 {
587  return cm != NULL && cm->cm_ops != NULL && cm->cm_type != NULL &&
591  M0_CMS_STOP)),
593 }
594 
595 M0_INTERNAL int m0_cm_setup(struct m0_cm *cm)
596 {
597  int rc;
598 
599  M0_ENTRY("cm: %p", cm);
600  M0_PRE(cm != NULL);
601  M0_PRE(cm->cm_type != NULL);
602 
603  if (M0_FI_ENABLED("setup_failure"))
604  return M0_ERR(-EINVAL);
605 
606  m0_cm_lock(cm);
609 
610  rc = cm->cm_ops->cmo_setup(cm);
611  if (M0_FI_ENABLED("setup_failure_2"))
612  rc = -EINVAL;
613  if (rc == 0)
615 
617  m0_cm_unlock(cm);
618 
619  M0_LEAVE("rc: %d", rc);
620  return M0_RC(rc);
621 }
622 
623 /*
624  * Establishes rpc connection with other remote replicas and allocate local
625  * struct m0_cm_proxy for each of them.
626  */
627 static int cm_replicas_connect(struct m0_cm *cm, struct m0_rpc_machine *rmach,
628  struct m0_reqh *reqh)
629 {
630  struct m0_cm_ag_id ag_id0;
631  const char *lep;
632  int rc = -ENOENT;
633  struct m0_pools_common *pc;
634  struct m0_reqh_service_ctx *ctx;
635  struct m0_conf_obj *svc_obj;
636  uint32_t proxy_cnt = 0;
637 
638  M0_PRE(cm != NULL && rmach != NULL && reqh != NULL);
640 
641  pc = reqh->rh_pools;
642  M0_SET0(&ag_id0);
643  lep = m0_rpc_machine_ep(rmach);
644  m0_tl_for(pools_common_svc_ctx, &pc->pc_svc_ctxs, ctx) {
645  struct m0_cm_proxy *pxy;
646  const char *dep = m0_rpc_link_end_point(&ctx->sc_rlink);
647 
648  M0_LOG(M0_DEBUG, "Connect %s dep %s type %d", lep, dep,
649  ctx->sc_type);
650  if ((strcmp(lep, dep) == 0 ||
651  !cm->cm_ops->cmo_is_peer(cm, ctx)))
652  continue;
654  &ctx->sc_fid, &svc_obj);
655  if (rc != 0)
656  return M0_ERR(rc);
657  M0_ALLOC_PTR(pxy);
658  if (pxy == NULL)
659  return M0_ERR(-ENOMEM);
660  rc = m0_cm_proxy_init(pxy, proxy_cnt, &ag_id0, &ag_id0, dep);
661  if (rc == 0) {
662  pxy->px_conn = &ctx->sc_rlink.rlk_conn;
663  pxy->px_session = &ctx->sc_rlink.rlk_sess;
664  m0_cm_proxy_add(cm, pxy);
665  m0_cm_proxy_event_handle_register(pxy, svc_obj);
666  M0_CNT_INC(proxy_cnt);
667  if (M0_IN(svc_obj->co_ha_state, (M0_NC_FAILED,
668  M0_NC_TRANSIENT))) {
669  m0_cm_proxy_lock(pxy);
670  pxy->px_status = M0_PX_FAILED;
671  pxy->px_is_done = true;
672  proxy_fail_tlist_add_tail(
673  &pxy->px_cm->cm_failed_proxies, pxy);
674  m0_cm_proxy_unlock(pxy);
675  }
676  M0_LOG(M0_DEBUG, "Connected to %s", dep);
677  }
678  } m0_tl_endfor;
679 
681 
682  M0_LOG(M0_DEBUG, "Connected to its proxies from local ep %s", lep);
683  return M0_RC(rc);
684 }
685 
686 static void cm_replicas_destroy(struct m0_cm *cm)
687 {
688  struct m0_cm_proxy *pxy;
689 
691 
692  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
693  m0_cm_proxy_del(cm, pxy);
694  m0_cm_proxy_fini(pxy);
695  m0_free(pxy);
696  } m0_tl_endfor;
697 }
698 
699 static int cm_pre_start_cleanup(struct m0_cm *cm)
700 {
701  int rc;
702 
703  rc = m0_cm_complete(cm);
704  if (rc == -EAGAIN)
705  return rc;
706  m0_cm_unlock(cm);
707  m0_cm_stop(cm);
708  m0_cm_lock(cm);
709  return 0;
710 }
711 
712 M0_INTERNAL struct m0_rpc_machine *m0_cm_rpc_machine_find(struct m0_reqh *reqh)
713 {
714  return m0_reqh_rpc_mach_tlist_head(&reqh->rh_rpc_machines);
715 }
716 
717 M0_INTERNAL int m0_cm_prepare(struct m0_cm *cm)
718 {
719  struct m0_rpc_machine *rmach;
720  struct m0_reqh *reqh = cm->cm_service.rs_reqh;
721  int rc;
722 
723  m0_cm_lock(cm);
725  M0_CMS_FAIL)));
726 
728  cm->cm_done = false;
729  cm->cm_nr_proxy_updated = 0;
730  cm->cm_quiesce = false;
731  cm->cm_abort = false;
732  cm->cm_epoch = m0_time_now();
733  rmach = m0_cm_rpc_machine_find(reqh);
734  rc = cm_replicas_connect(cm, rmach, reqh);
735  if (rc == -ENOENT)
736  rc = 0;
737  if (rc == 0) {
738  if (M0_FI_ENABLED("prepare_failure")) {
739  //m0_cm_unlock(cm);
740  rc = -EINVAL;
741  } else
742  rc = cm->cm_ops->cmo_prepare(cm);
743  }
744  if (rc == 0) {
747  }
748  if (rc != 0) {
749  m0_cm_fail(cm, rc);
752  cm->cm_done = true;
753  }
754  m0_cm_unlock(cm);
755 
756  return M0_RC(rc);
757 }
758 
759 M0_INTERNAL int m0_cm_ready(struct m0_cm *cm)
760 {
761  int rc;
762 
763  M0_ENTRY("cm: %p", cm);
764  M0_PRE(cm != NULL);
765  M0_PRE(cm->cm_type != NULL);
766 
767  m0_cm_lock(cm);
770 
771  rc = cm_rc(cm);
772  if (M0_FI_ENABLED("ready_failure"))
773  rc = -EINVAL;
774 
775  if (rc == 0)
777 
778  if (rc == 0) {
781  }
782 
783  if (rc != 0) {
784  m0_cm_abort(cm, rc);
786  }
787  m0_cm_unlock(cm);
788 
789  M0_LEAVE("rc: %d", rc);
790  return M0_RC(rc);
791 }
792 
793 M0_INTERNAL bool m0_cm_is_ready(struct m0_cm *cm)
794 {
796  return m0_cm_state_get(cm) == M0_CMS_READY;
797 }
798 
799 M0_INTERNAL bool m0_cm_is_active(struct m0_cm *cm)
800 {
802  return m0_cm_state_get(cm) == M0_CMS_ACTIVE;
803 }
804 
805 M0_INTERNAL int m0_cm_start(struct m0_cm *cm)
806 {
807  int rc = cm_rc(cm);
808 
809  M0_ENTRY("cm: %p", cm);
810  M0_PRE(cm != NULL);
811  M0_PRE(cm->cm_type != NULL);
812 
813  m0_cm_lock(cm);
816 
818  if (rc == 0)
819  rc = cm->cm_ops->cmo_start(cm);
820  if (M0_FI_ENABLED("start_failure"))
821  rc = -EINVAL;
822  /* Start pump FOM to create copy packets. */
823  if (rc == 0) {
826  }
827 
828  if (rc != 0) {
829  m0_cm_abort(cm, rc);
831  }
832 
834  m0_cm_unlock(cm);
835 
836  M0_LEAVE("rc: %d", rc);
837  return M0_RC(rc);
838 }
839 
840 M0_INTERNAL int m0_cm_proxies_fini(struct m0_cm *cm)
841 {
842  struct m0_cm_proxy *pxy;
843  int rc = 0;
844 
845  M0_ENTRY();
847 
848  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
849  /* Check if proxy has completed. */
850  M0_LOG(M0_DEBUG, "pxy %p (to %s), is_done %d",
851  pxy, pxy->px_endpoint, !!pxy->px_is_done);
852  if (!m0_cm_proxy_is_done(pxy)) {
853  rc = -EAGAIN;
854  M0_LOG(M0_DEBUG, "pxy %p is still active", pxy);
855  continue;
856  }
857  M0_LOG(M0_DEBUG, "Stop proxy. cm %p, pxy %p",cm, pxy);
858  m0_cm_proxy_del(cm, pxy);
859  m0_cm_proxy_fini(pxy);
860  m0_free(pxy);
861  } m0_tl_endfor;
862 
863  return M0_RC(rc);
864 }
865 
866 M0_INTERNAL int m0_cm_stop(struct m0_cm *cm)
867 {
868  int rc = cm->cm_mach.sm_rc;
869 
870  M0_ENTRY("cm: %p", cm);
871  M0_PRE(cm != NULL);
872 
873  m0_cm_lock(cm);
877 
878  cm->cm_ops->cmo_stop(cm);
879  /* In-case of failure (rc != 0) keep copy machine in failed state. */
880  if (rc == 0)
884  m0_cm_unlock(cm);
885 
886  if (cm->cm_asts_run.car_run)
888 
889  cm->cm_reset = false;
890 
891  M0_LEAVE("rc: %d", rc);
892  return M0_RC(rc);
893 }
894 
895 M0_INTERNAL int m0_cm_module_init(void)
896 {
897 
898  M0_ENTRY();
899  cmtypes_tlist_init(&cmtypes);
900  m0_bob_type_tlist_init(&cmtypes_bob, &cmtypes_tl);
902 
903  M0_LEAVE();
904  return 0;
905 }
906 
907 M0_INTERNAL void m0_cm_module_fini(void)
908 {
909  M0_ENTRY();
910  cmtypes_tlist_fini(&cmtypes);
912  M0_LEAVE();
913 }
914 
919 static uint64_t cm_id_generate(void)
920 {
921  static uint64_t id = 0;
922  return ++id;
923 }
924 
925 M0_INTERNAL int m0_cm_init(struct m0_cm *cm, struct m0_cm_type *cm_type,
926  const struct m0_cm_ops *cm_ops)
927 {
928  M0_ENTRY("cm_type: %p cm: %p", cm_type, cm);
929  M0_PRE(cm != NULL && cm_type != NULL && cm_ops != NULL &&
930  cmtypes_tlist_contains(&cmtypes, cm_type));
931 
932  if (M0_FI_ENABLED("init_failure"))
933  return M0_ERR(-EINVAL);
934 
935  cm->cm_type = cm_type;
936  cm->cm_ops = cm_ops;
937  cm->cm_id = cm_id_generate();
940  &cm->cm_sm_group);
941  /*
942  * We lock the copy machine here just to satisfy the
943  * pre-condition of m0_cm_state_get and not to control
944  * concurrency to m0_cm_init.
945  */
946  m0_cm_lock(cm);
948  aggr_grps_in_tlist_init(&cm->cm_aggr_grps_in);
949  aggr_grps_out_tlist_init(&cm->cm_aggr_grps_out);
950  proxy_tlist_init(&cm->cm_proxies);
951  proxy_fail_tlist_init(&cm->cm_failed_proxies);
956 
958  m0_cm_unlock(cm);
959 
960  M0_LEAVE();
961  return 0;
962 }
963 
964 M0_INTERNAL void m0_cm_fini(struct m0_cm *cm)
965 {
966  M0_ENTRY("cm: %p", cm);
967  M0_PRE(cm != NULL);
968 
969  m0_cm_lock(cm);
973 
974  cm->cm_ops->cmo_fini(cm);
975  M0_LOG(M0_INFO, "CM: %s:%" PRId64 ": %i",
976  (char *)cm->cm_type->ct_stype.rst_name,
977  cm->cm_id, cm->cm_mach.sm_state);
979  aggr_grps_in_tlist_fini(&cm->cm_aggr_grps_in);
980  aggr_grps_out_tlist_fini(&cm->cm_aggr_grps_out);
981  proxy_tlist_fini(&cm->cm_proxies);
982  proxy_fail_tlist_fini(&cm->cm_failed_proxies);
983  m0_sm_fini(&cm->cm_mach);
988  m0_cm_unlock(cm);
989 
991 
992  M0_LEAVE();
993 }
994 
995 M0_INTERNAL int m0_cm_type_register(struct m0_cm_type *cmtype)
996 {
997  int rc;
998 
999  M0_ENTRY("cmtype: %p", cmtype);
1000  M0_PRE(cmtype != NULL);
1002 
1004  if (rc == 0) {
1005  m0_cm_type_bob_init(cmtype);
1006  m0_cm_sw_update_init(cmtype);
1007  m0_cm_ag_store_init(cmtype);
1008  m0_cm_cp_pump_init(cmtype);
1010  cmtypes_tlink_init_at_tail(cmtype, &cmtypes);
1012  M0_ASSERT(cmtypes_tlink_is_in(cmtype));
1013  }
1014 
1015  M0_LEAVE("rc: %d", rc);
1016  return M0_RC(rc);
1017 }
1018 
1019 M0_INTERNAL void m0_cm_type_deregister(struct m0_cm_type *cmtype)
1020 {
1021  M0_ENTRY("cmtype: %p", cmtype);
1022  M0_PRE(cmtype != NULL && m0_cm_type_bob_check(cmtype));
1023  M0_PRE(cmtypes_tlist_contains(&cmtypes, cmtype));
1024 
1026  cmtypes_tlink_del_fini(cmtype);
1028  m0_cm_type_bob_fini(cmtype);
1030 
1031  M0_LEAVE();
1032 }
1033 
1034 M0_INTERNAL int m0_cm_data_next(struct m0_cm *cm, struct m0_cm_cp *cp)
1035 {
1036  int rc;
1037 
1038  M0_ENTRY("cm: %p cp: %p", cm, cp);
1041  M0_PRE(cp != NULL);
1042 
1043  rc = cm->cm_ops->cmo_data_next(cm, cp);
1044 
1045  M0_POST(ergo(rc == 0,
1046  cp_data_buf_tlist_length(&cp->c_buffers) == cp->c_buf_nr));
1047 
1048  M0_LEAVE("rc: %d", rc);
1049  return M0_RC(rc);
1050 }
1051 
1052 M0_INTERNAL bool m0_cm_has_more_data(const struct m0_cm *cm)
1053 {
1055 }
1056 
1058  *bp, uint64_t colour)
1059 {
1060  struct m0_net_buffer *buf;
1061  int i;
1062 
1064  buf = m0_net_buffer_pool_get(bp, colour);
1065 
1066  if (buf != NULL) {
1067  for (i = 0; i < bp->nbp_seg_nr; ++i)
1068  memset(buf->nb_buffer.ov_buf[i], 0, bp->nbp_seg_size);
1069  }
1070 
1071  return buf;
1072 }
1073 
1074 M0_INTERNAL void m0_cm_buffer_put(struct m0_net_buffer_pool *bp,
1075  struct m0_net_buffer *buf,
1076  uint64_t colour)
1077 {
1078  m0_net_buffer_pool_put(bp, buf, colour);
1079 }
1080 
1081 M0_INTERNAL void m0_cm_notify(struct m0_cm *cm)
1082 {
1084 }
1085 
1086 M0_INTERNAL void m0_cm_wait(struct m0_cm *cm, struct m0_fom *fom)
1087 {
1089  m0_fom_wait_on(fom, &cm->cm_wait, &fom->fo_cb);
1091 }
1092 
1093 M0_INTERNAL void m0_cm_wait_cancel(struct m0_cm *cm, struct m0_fom *fom)
1094 {
1096  m0_fom_callback_cancel(&fom->fo_cb);
1098 }
1099 
1100 M0_INTERNAL int m0_cm_complete(struct m0_cm *cm)
1101 {
1102  int rc;
1103 
1104  M0_ENTRY("cm %p, done %d", cm, (int)cm->cm_done);
1106 
1107  if (!cm->cm_done) {
1108  if (cm->cm_quiesce || cm->cm_abort)
1110  else
1112 
1113  cm->cm_done = true;
1115  }
1116  /*
1117  * Finalising proxies is a blocking operation because we wait until
1118  * the remote replica, corresponding to the copy machine proxy is
1119  * complete. Thus we check for -EAGAIN if proxy is not yet ready to
1120  * be finalised.
1121  */
1123  if (rc == -EAGAIN)
1124  return M0_RC(rc);
1126  return M0_RC(-EAGAIN);
1127 
1128  m0_cm_notify(cm);
1129 
1130  return M0_RC(rc);
1131 }
1132 
1133 M0_INTERNAL void m0_cm_complete_notify(struct m0_cm *cm)
1134 {
1136  M0_ENTRY("Notifying cm %p id=%"PRIu64, cm, cm->cm_id);
1137 
1139  M0_LEAVE();
1140 }
1141 
1142 M0_INTERNAL void m0_cm_proxies_init_wait(struct m0_cm *cm, struct m0_fom *fom)
1143 {
1146 }
1147 
1148 M0_INTERNAL void m0_cm_frozen_ag_cleanup(struct m0_cm *cm,
1149  struct m0_cm_proxy *proxy)
1150 {
1151  struct m0_cm_aggr_group *ag = NULL;
1152  bool cleanup;
1153  M0_ENTRY();
1154 
1156 
1157  m0_tlist_for(&aggr_grps_in_tl, &cm->cm_aggr_grps_in, ag) {
1158  m0_cm_ag_lock(ag);
1159  cleanup = ag->cag_ops->cago_is_frozen_on(ag, proxy) &&
1160  m0_cm_ag_can_fini(ag);
1161  m0_cm_ag_unlock(ag);
1162  ID_LOG("cm_aggr_grps_in", &ag->cag_id);
1163  if (cleanup) {
1165  M0_LOG(M0_DEBUG, "finalizing frozen aggregation group ["M0_AG_F"]",
1166  M0_AG_P(&ag->cag_id));
1167  ag->cag_ops->cago_fini(ag);
1168  }
1169  } m0_tlist_endfor;
1170 }
1171 
1172 M0_INTERNAL void m0_cm_proxy_failed_cleanup(struct m0_cm *cm)
1173 {
1174  struct m0_cm_proxy *pxy;
1175 
1176  m0_tl_for(proxy_fail, &cm->cm_failed_proxies, pxy) {
1178  } m0_tl_endfor;
1179 }
1180 
1181 M0_INTERNAL void m0_cm_abort(struct m0_cm *cm, int rc)
1182 {
1185  M0_CMS_ACTIVE))) {
1186  cm->cm_abort = true;
1187  if (rc < 0)
1188  m0_cm_fail(cm, rc);
1189  }
1190  m0_cm_notify(cm);
1191 }
1192 
1193 M0_INTERNAL bool m0_cm_is_dirty(struct m0_cm *cm)
1194 {
1195  return cm->cm_abort || cm->cm_quiesce || m0_cm_state_get(cm) == M0_CMS_FAIL;
1196 }
1197 
1198 M0_INTERNAL bool m0_cm_proxies_updated(struct m0_cm *cm)
1199 {
1201 
1202  return cm->cm_nr_proxy_updated == cm->cm_proxy_nr;
1203 }
1204 
1205 static void cm_ast_run_thread(struct m0_cm *cm)
1206 {
1207  while (cm->cm_asts_run.car_run) {
1212  }
1213 }
1214 
1215 M0_INTERNAL int m0_cm_ast_run_thread_init(struct m0_cm *cm)
1216 {
1217  M0_SET0(&cm->cm_asts_run);
1218  cm->cm_asts_run.car_run = true;
1219  return M0_THREAD_INIT(&cm->cm_asts_run.car_th, struct m0_cm *,
1220  NULL, &cm_ast_run_thread, cm, "cm_ast_run_thr");
1221 }
1222 
1223 M0_INTERNAL void m0_cm_ast_run_thread_fini(struct m0_cm *cm)
1224 {
1225  cm->cm_asts_run.car_run = false;
1228 }
1229 
1230 
1231 #undef M0_TRACE_SUBSYSTEM
1232 
1235 /*
1236  * Local variables:
1237  * c-indentation-style: "K&R"
1238  * c-basic-offset: 8
1239  * tab-width: 8
1240  * fill-column: 80
1241  * scroll-step: 1
1242  * End:
1243  */
static struct m0_bob_type cmtypes_bob
Definition: cm.c:443
uint64_t id
Definition: cob.h:2380
M0_INTERNAL int m0_mutex_trylock(struct m0_mutex *mutex)
Definition: mutex.c:84
M0_INTERNAL void m0_cm_ag_unlock(struct m0_cm_aggr_group *ag)
Definition: ag.c:63
M0_INTERNAL int m0_cm_ast_run_thread_init(struct m0_cm *cm)
Definition: cm.c:1215
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
M0_INTERNAL bool m0_cm_proxy_is_done(const struct m0_cm_proxy *pxy)
Definition: proxy.c:615
M0_INTERNAL void m0_cm_frozen_ag_cleanup(struct m0_cm *cm, struct m0_cm_proxy *proxy)
Definition: cm.c:1148
M0_INTERNAL void m0_cm_lock(struct m0_cm *cm)
Definition: cm.c:545
static struct m0_sm_state_descr cm_state_descr[M0_CMS_NR]
Definition: cm.c:446
#define M0_PRE(cond)
const struct m0_cm_type * cm_type
Definition: cm.h:194
static uint64_t cm_id_generate(void)
Definition: cm.c:919
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
Definition: sm.c:468
M0_INTERNAL void m0_cm_fail(struct m0_cm *cm, int rc)
Definition: cm.c:527
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
Definition: bitmap.c:86
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
M0_INTERNAL struct m0_rpc_machine * m0_cm_rpc_machine_find(struct m0_reqh *reqh)
Definition: cm.c:712
M0_INTERNAL bool m0_cm_is_dirty(struct m0_cm *cm)
Definition: cm.c:1193
bool(* cago_is_frozen_on)(struct m0_cm_aggr_group *ag, struct m0_cm_proxy *proxy)
Definition: ag.h:157
#define NULL
Definition: misc.h:38
#define M0_AG_P(ag)
Definition: ag.h:55
M0_INTERNAL bool m0_cm_ag_can_fini(struct m0_cm_aggr_group *ag)
Definition: ag.c:538
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
Definition: bitmap.c:97
#define ID_LOG(prefix, id)
Definition: ag.h:57
struct m0_bitmap cm_proxy_update_map
Definition: cm.h:252
M0_INTERNAL bool m0_cm_is_ready(struct m0_cm *cm)
Definition: cm.c:793
M0_INTERNAL void m0_cm_proxy_lock(struct m0_cm_proxy *pxy)
Definition: proxy.c:738
#define ergo(a, b)
Definition: misc.h:293
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
M0_INTERNAL bool m0_cm_is_active(struct m0_cm *cm)
Definition: cm.c:799
bool cm_quiesce
Definition: cm.h:277
const struct m0_cm_ops * cm_ops
Definition: cm.h:188
Definition: sm.h:350
static void cm_replicas_destroy(struct m0_cm *cm)
Definition: cm.c:686
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
M0_INTERNAL void m0_cm_type_deregister(struct m0_cm_type *cmtype)
Definition: cm.c:1019
#define M0_LOG(level,...)
Definition: trace.h:167
Definition: cp.h:160
M0_LEAVE()
M0_INTERNAL struct m0_net_buffer * m0_net_buffer_pool_get(struct m0_net_buffer_pool *pool, uint32_t colour)
Definition: buffer_pool.c:215
void(* cago_fini)(struct m0_cm_aggr_group *ag)
Definition: ag.h:140
M0_INTERNAL int m0_cm_stop(struct m0_cm *cm)
Definition: cm.c:866
void(* cmo_stop)(struct m0_cm *cm)
Definition: cm.h:304
uint64_t cm_id
Definition: cm.h:174
M0_INTERNAL bool m0_cm_has_more_data(const struct m0_cm *cm)
Definition: cm.c:1052
M0_INTERNAL int m0_cm_trylock(struct m0_cm *cm)
Definition: cm.c:555
M0_INTERNAL void m0_cm_proxy_unlock(struct m0_cm_proxy *pxy)
Definition: proxy.c:743
bool cm_done
Definition: cm.h:265
M0_INTERNAL void m0_cm_wait_cancel(struct m0_cm *cm, struct m0_fom *fom)
Definition: cm.c:1093
M0_INTERNAL int m0_ha_cm_err_send(struct m0_cm *cm, int rc)
Definition: cm.c:502
struct m0_clink s_clink
Definition: sm.h:516
M0_BOB_DEFINE(static, &cmtypes_bob, m0_cm_type)
M0_INTERNAL void m0_ha_send(struct m0_ha *ha, struct m0_ha_link *hl, const struct m0_ha_msg *msg, uint64_t *tag)
Definition: ha.c:862
struct m0_cm_ag_id cag_id
Definition: ag.h:72
M0_INTERNAL void m0_cm_sw_update_complete(struct m0_cm *cm)
static int cm_rc(struct m0_cm *cm)
Definition: cm.c:580
M0_INTERNAL void m0_cm_ast_run_thread_fini(struct m0_cm *cm)
Definition: cm.c:1223
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
#define M0_BITS(...)
Definition: misc.h:236
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
M0_INTERNAL struct m0 * m0_get(void)
Definition: instance.c:41
M0_INTERNAL int m0_cm_start(struct m0_cm *cm)
Definition: cm.c:805
static int void * buf
Definition: dir.c:1019
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL int m0_cm_module_init(void)
Definition: cm.c:895
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_cm_fini(struct m0_cm *cm)
Definition: cm.c:964
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
Definition: rpc_machine.c:603
M0_INTERNAL void m0_cm_state_set(struct m0_cm *cm, enum m0_cm_state state)
Definition: cm.c:570
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
Definition: sm.c:65
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
struct m0_rpc_session * px_session
Definition: proxy.h:116
Definition: sock.c:887
int(* cmo_data_next)(struct m0_cm *cm, struct m0_cm_cp *cp)
Definition: cm.h:318
static struct m0_pools_common pc
Definition: iter_ut.c:59
M0_INTERNAL struct m0_reqh_service_type * m0_reqh_service_type_find(const char *sname)
Definition: reqh_service.c:168
M0_INTERNAL void m0_cm_cp_pump_init(struct m0_cm_type *cmtype)
Definition: pump.c:426
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL int m0_cm_init(struct m0_cm *cm, struct m0_cm_type *cm_type, const struct m0_cm_ops *cm_ops)
Definition: cm.c:925
M0_INTERNAL void m0_cm_ag_store_fini(struct m0_cm_ag_store *store)
Definition: ag_store.c:524
return M0_RC(rc)
static struct m0_cm * cm
Definition: cm.c:63
M0_INTERNAL int m0_cm_data_next(struct m0_cm *cm, struct m0_cm_cp *cp)
Definition: cm.c:1034
M0_INTERNAL void m0_cm_unlock(struct m0_cm *cm)
Definition: cm.c:550
struct m0_tl cm_proxies
Definition: cm.h:246
uint64_t cm_nr_proxy_updated
Definition: cm.h:253
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
#define M0_AG_F
Definition: ag.h:54
M0_INTERNAL void m0_cm_complete_notify(struct m0_cm *cm)
Definition: cm.c:1133
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
M0_INTERNAL bool m0_cm_proxies_updated(struct m0_cm *cm)
Definition: cm.c:1198
bool cm_reset
Definition: cm.h:271
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
m0_cm_state
Definition: cm.h:125
M0_INTERNAL void m0_cm_cp_pump_prepare(struct m0_cm *cm)
Definition: pump.c:433
return M0_ERR(-EOPNOTSUPP)
struct m0_mutex cm_wait_mutex
Definition: cm.h:236
M0_INTERNAL void m0_cm_proxies_sent_reset(struct m0_cm *cm)
Definition: proxy.c:780
M0_INTERNAL void m0_cm_proxy_event_handle_register(struct m0_cm_proxy *pxy, struct m0_conf_obj *svc_obj)
Definition: proxy.c:726
M0_INTERNAL struct m0_net_buffer * m0_cm_buffer_get(struct m0_net_buffer_pool *bp, uint64_t colour)
Definition: cm.c:1057
Definition: trace.h:482
M0_INTERNAL void m0_cm_ag_lock(struct m0_cm_aggr_group *ag)
Definition: ag.c:58
enum m0_proxy_state px_status
Definition: proxy.h:91
const char * px_endpoint
Definition: proxy.h:118
int(* cmo_prepare)(struct m0_cm *cm)
Definition: cm.h:294
M0_INTERNAL bool m0_net_buffer_pool_invariant(const struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:50
M0_TL_DEFINE(cmtypes, static, struct m0_cm_type)
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL void m0_cm_wait(struct m0_cm *cm, struct m0_fom *fom)
Definition: cm.c:1086
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
Definition: sm.c:53
struct m0_rpc_conn * px_conn
Definition: proxy.h:114
m0_time_t m0_time_now(void)
Definition: time.c:134
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
const char * rst_name
Definition: reqh_service.h:447
enum m0_ha_obj_state co_ha_state
Definition: obj.h:241
Definition: tlist.h:251
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
M0_INTERNAL bool m0_reqh_service_invariant(const struct m0_reqh_service *svc)
Definition: reqh_service.c:143
struct m0_conf_cache cc_cache
Definition: confc.h:394
M0_INTERNAL void m0_cm_module_fini(void)
Definition: cm.c:907
M0_INTERNAL void m0_cm_cp_pump_start(struct m0_cm *cm)
Definition: pump.c:479
M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
Definition: chan.c:326
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static void cm_ast_run_thread(struct m0_cm *cm)
Definition: cm.c:1205
Definition: msg.h:115
M0_INTERNAL int m0_conf_obj_find_lock(struct m0_conf_cache *cache, const struct m0_fid *id, struct m0_conf_obj **out)
Definition: obj_ops.c:154
#define M0_POST(cond)
M0_INTERNAL void m0_cm_proxy_del(struct m0_cm *cm, struct m0_cm_proxy *pxy)
Definition: proxy.c:124
struct m0_chan cm_wait
Definition: cm.h:235
Definition: reqh.h:94
bool px_is_done
Definition: proxy.h:93
static int cm_replicas_connect(struct m0_cm *cm, struct m0_rpc_machine *rmach, struct m0_reqh *reqh)
Definition: cm.c:627
int32_t sm_rc
Definition: sm.h:336
Definition: dump.c:103
struct m0_sm_ast * sa_next
Definition: sm.h:509
Definition: cm.h:286
struct m0_confc * pc_confc
Definition: pool.h:164
struct m0_mutex s_lock
Definition: sm.h:514
struct m0_tl pc_svc_ctxs
Definition: pool.h:172
struct m0_sm_ast cag_fini_ast
Definition: ag.h:78
static struct fdmi_ctx ctx
Definition: main.c:80
struct m0_chan cm_complete
Definition: cm.h:238
M0_INTERNAL int m0_cm_setup(struct m0_cm *cm)
Definition: cm.c:595
M0_INTERNAL int m0_cm_ready(struct m0_cm *cm)
Definition: cm.c:759
void(* cmo_ha_msg)(struct m0_cm *cm, struct m0_ha_msg *msg, int rc)
Definition: cm.h:356
struct m0_chan cm_proxy_init_wait
Definition: cm.h:240
#define PRId64
Definition: types.h:57
M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
Definition: chan.c:165
M0_INTERNAL void m0_cm_buffer_put(struct m0_net_buffer_pool *bp, struct m0_net_buffer *buf, uint64_t colour)
Definition: cm.c:1074
static struct m0_mutex cmtypes_mutex
Definition: cm.c:435
M0_INTERNAL struct m0_cm * m0_cmsvc2cm(struct m0_reqh_service *cmsvc)
Definition: cm.c:497
struct m0_sm_group cm_sm_group
Definition: cm.h:185
uint32_t sd_flags
Definition: sm.h:378
bool car_run
Definition: cm.h:162
Definition: fom.h:481
Definition: cm.h:143
void(* cmo_fini)(struct m0_cm *cm)
Definition: cm.h:360
struct m0_tl cm_aggr_grps_out
Definition: cm.h:231
uint64_t cm_proxy_nr
Definition: cm.h:250
M0_INTERNAL int m0_cm_prepare(struct m0_cm *cm)
Definition: cm.c:717
uint32_t c_buf_nr
Definition: cp.h:187
bool m0_cm_cp_pump_is_complete(const struct m0_cm_cp_pump *cp_pump)
Definition: pump.c:420
struct m0_reqh reqh
Definition: rm_foms.c:48
M0_INTERNAL void m0_cm_ag_store_init(struct m0_cm_type *cmtype)
Definition: ag_store.c:512
#define M0_CNT_INC(cnt)
Definition: arith.h:226
M0_INTERNAL void m0_cm_notify(struct m0_cm *cm)
Definition: cm.c:1081
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
Definition: cm.h:134
M0_TL_DESCR_DEFINE(cmtypes, "copy machine types", static, struct m0_cm_type, ct_linkage, ct_magix, CM_TYPE_LINK_MAGIX, CM_TYPE_HEAD_MAGIX)
const struct m0_cm_aggr_group_ops * cag_ops
Definition: ag.h:74
M0_INTERNAL void m0_cm_proxy_fini(struct m0_cm_proxy *pxy)
Definition: proxy.c:627
M0_INTERNAL int m0_cm_sw_remote_update(struct m0_cm *cm)
Definition: sw.c:130
static struct m0_tl cmtypes
Definition: cm.c:432
static int cm_pre_start_cleanup(struct m0_cm *cm)
Definition: cm.c:699
struct m0_reqh_service cm_service
Definition: cm.h:191
M0_INTERNAL int m0_cm_proxy_init(struct m0_cm_proxy *proxy, uint64_t px_id, struct m0_cm_ag_id *lo, struct m0_cm_ag_id *hi, const char *endpoint)
Definition: proxy.c:90
M0_INTERNAL void m0_cm_proxies_init_wait(struct m0_cm *cm, struct m0_fom *fom)
Definition: cm.c:1142
Definition: cm.h:166
M0_INTERNAL bool m0_cm_ag_store_is_complete(struct m0_cm_ag_store *store)
Definition: ag_store.c:529
static const struct m0_sm_conf cm_sm_conf
Definition: cm.c:491
M0_INTERNAL enum m0_cm_state m0_cm_state_get(const struct m0_cm *cm)
Definition: cm.c:565
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
int(* cmo_setup)(struct m0_cm *cm)
Definition: cm.h:292
struct m0_pools_common * rh_pools
Definition: reqh.h:118
#define m0_tlist_endfor
Definition: tlist.h:448
M0_INTERNAL void m0_fom_callback_cancel(struct m0_fom_callback *cb)
Definition: fom.c:1514
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
struct m0_reqh_service_type ct_stype
Definition: cm.h:145
struct m0_thread car_th
Definition: cm.h:161
struct m0_tl c_buffers
Definition: cp.h:184
M0_INTERNAL void m0_cm_sw_update_init(struct m0_cm_type *cmtype)
M0_INTERNAL void m0_chan_fini(struct m0_chan *chan)
Definition: chan.c:104
M0_INTERNAL void m0_net_buffer_pool_put(struct m0_net_buffer_pool *pool, struct m0_net_buffer *buf, uint32_t colour)
Definition: buffer_pool.c:243
struct m0_cm_cp_pump cm_cp_pump
Definition: cm.h:257
bool(* cmo_is_peer)(struct m0_cm *cm, struct m0_reqh_service_ctx *ctx)
Definition: cm.h:353
#define m0_tlist_for(descr, head, obj)
Definition: tlist.h:435
static struct bulkio_params * bp
Definition: bulkio_ut.c:44
Definition: nucleus.c:42
int(* cmo_start)(struct m0_cm *cm)
Definition: cm.h:301
struct m0_sm cm_mach
Definition: cm.h:167
M0_INTERNAL void m0_cm_abort(struct m0_cm *cm, int rc)
Definition: cm.c:1181
M0_INTERNAL void m0_cm_ag_store_complete(struct m0_cm_ag_store *store)
Definition: ag_store.c:519
M0_INTERNAL bool m0_cm_invariant(const struct m0_cm *cm)
Definition: cm.c:585
struct m0_tl cm_aggr_grps_in
Definition: cm.h:203
M0_INTERNAL bool m0_cm_is_locked(const struct m0_cm *cm)
Definition: cm.c:560
M0_INTERNAL void m0_sm_asts_run(struct m0_sm_group *grp)
Definition: sm.c:150
struct m0_tl cm_failed_proxies
Definition: cm.h:248
struct m0_reqh * rs_reqh
Definition: reqh_service.h:259
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
M0_INTERNAL int m0_cm_complete(struct m0_cm *cm)
Definition: cm.c:1100
M0_INTERNAL void m0_cm_ag_store_fom_start(struct m0_cm *cm)
Definition: ag_store.c:534
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
uint32_t sm_state
Definition: sm.h:307
struct m0_cm_ast_run cm_asts_run
Definition: cm.h:263
M0_INTERNAL void m0_cm_proxy_add(struct m0_cm *cm, struct m0_cm_proxy *pxy)
Definition: proxy.c:110
M0_INTERNAL void m0_bob_type_tlist_init(struct m0_bob_type *bt, const struct m0_tl_descr *td)
Definition: bob.c:41
int32_t rc
Definition: trigger_fop.h:47
bool cm_abort
Definition: cm.h:282
M0_INTERNAL int m0_cm_proxies_fini(struct m0_cm *cm)
Definition: cm.c:840
m0_time_t cm_epoch
Definition: cm.h:177
Definition: ag.h:49
M0_INTERNAL void m0_cm_proxy_failed_cleanup(struct m0_cm *cm)
Definition: cm.c:1172
struct m0_cm_ag_store cm_ag_store
Definition: cm.h:261
struct m0_cm * px_cm
Definition: proxy.h:103
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331
M0_INTERNAL int m0_cm_type_register(struct m0_cm_type *cmtype)
Definition: cm.c:995
M0_INTERNAL bool m0_sm_invariant(const struct m0_sm *mach)
Definition: sm.c:267