Motr  M0
reqh_service.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RPC
24 #include "lib/trace.h"
25 
26 #include "lib/errno.h"
27 #include "lib/finject.h" /* M0_FI_ENABLED */
28 #include "lib/locality.h" /* m0_locality_get */
29 #include "lib/lockers.h"
30 #include "lib/memory.h"
31 #include "lib/misc.h" /* M0_SET0 */
32 #include "lib/rwlock.h"
33 #include "lib/string.h" /* m0_strcaseeq */
34 #include "lib/time.h"
35 #include "fop/fom.h"
36 #include "rpc/conn.h"
37 #include "rpc/rpc_machine_internal.h" /* m0_rpc_chan */
38 #include "rpc/rpclib.h"
39 #include "reqh/reqh.h"
40 #include "reqh/reqh_service.h"
42 #include "rpc/rpc.h" /* m0_rpc__down_timeout */
43 #include "rpc/rpc_machine.h" /* m0_rpc_machine_ep */
44 #include "motr/magic.h"
45 #include "conf/obj_ops.h" /* m0_conf_obj_get */
46 #include "conf/helpers.h" /* m0_conf_obj2reqh */
47 #include "pool/pool.h" /* m0_pools_common_service_ctx_find */
48 #include "fid/fid.h" /* m0_fid_eq */
49 #include "module/instance.h" /* m0_get */
50 #include "conf/ha.h" /* m0_conf_ha_service_event_post */
51 
64 static struct m0_tl rstypes;
65 
66 enum {
74 };
75 
76 M0_TL_DESCR_DECLARE(abandoned_svc_ctxs, M0_EXTERN);
77 M0_TL_DECLARE(abandoned_svc_ctxs, M0_EXTERN, struct m0_reqh_service_ctx);
78 
80 static struct m0_rwlock rstypes_rwlock;
81 
82 M0_TL_DESCR_DEFINE(rstypes, "reqh service types", static,
83  struct m0_reqh_service_type, rst_linkage, rst_magix,
85 
87 
88 static struct m0_bob_type rstypes_bob;
90 
91 static const struct m0_bob_type reqh_svc_ctx = {
92  .bt_name = "m0_reqh_service_ctx",
93  .bt_magix_offset = M0_MAGIX_OFFSET(struct m0_reqh_service_ctx,
94  sc_magic),
95  .bt_magix = M0_REQH_SVC_CTX_MAGIC,
96  .bt_check = NULL
97 };
99 
101  [M0_RST_INITIALISING] = {
103  .sd_name = "Initializing",
104  .sd_allowed = M0_BITS(M0_RST_INITIALISED)
105  },
106  [M0_RST_INITIALISED] = {
107  .sd_name = "Initialized",
108  .sd_allowed = M0_BITS(M0_RST_STARTING, M0_RST_FAILED)
109  },
110  [M0_RST_STARTING] = {
111  .sd_name = "Starting",
112  .sd_allowed = M0_BITS(M0_RST_STARTED, M0_RST_FAILED)
113  },
114  [M0_RST_STARTED] = {
115  .sd_name = "Started",
116  .sd_allowed = M0_BITS(M0_RST_STOPPING)
117  },
118  [M0_RST_STOPPING] = {
119  .sd_name = "Stopping",
120  .sd_allowed = M0_BITS(M0_RST_STOPPED)
121  },
122  [M0_RST_STOPPED] = {
123  .sd_flags = M0_SDF_FINAL,
124  .sd_name = "Stopped",
125  .sd_allowed = M0_BITS(M0_RST_STARTING)
126  },
127  [M0_RST_FAILED] = {
128  .sd_flags = M0_SDF_TERMINAL,
129  .sd_name = "Failed",
130  },
131 };
132 
134  .scf_name = "Service states",
135  .scf_nr_states = ARRAY_SIZE(service_states),
136  .scf_state = service_states
137 };
138 
139 static void
141 
142 
143 M0_INTERNAL bool m0_reqh_service_invariant(const struct m0_reqh_service *svc)
144 {
145  return _0C(m0_reqh_service_bob_check(svc)) &&
146  _0C(M0_IN(svc->rs_sm.sm_state, (M0_RST_INITIALISING, M0_RST_INITIALISED,
149  M0_RST_FAILED))) &&
150  _0C(svc->rs_type != NULL && svc->rs_ops != NULL &&
151  (svc->rs_ops->rso_start_async != NULL ||
152  svc->rs_ops->rso_start != NULL)) &&
153  _0C(ergo(M0_IN(svc->rs_sm.sm_state, (M0_RST_INITIALISED,
156  svc->rs_reqh != NULL)) &&
157  _0C(ergo(M0_IN(svc->rs_sm.sm_state, (M0_RST_STARTED, M0_RST_STOPPING,
159  m0_reqh_svc_tlist_contains(&svc->rs_reqh->rh_services, svc))) &&
160  _0C(ergo(svc->rs_reqh != NULL,
161  M0_IN(m0_reqh_lockers_get(svc->rs_reqh, svc->rs_type->rst_key),
162  (NULL, svc)))) &&
163  _0C(svc->rs_level > M0_RS_LEVEL_UNKNOWN);
164 }
165 M0_EXPORTED(m0_reqh_service_invariant);
166 
167 M0_INTERNAL struct m0_reqh_service_type *
168 m0_reqh_service_type_find(const char *sname)
169 {
170  struct m0_reqh_service_type *t;
171 
172  M0_PRE(sname != NULL);
173 
175 
176  t = m0_tl_find(rstypes, t, &rstypes, m0_streq(t->rst_name, sname));
177  if (t != NULL)
178  M0_ASSERT(m0_reqh_service_type_bob_check(t));
179 
181  return t;
182 }
183 
184 M0_INTERNAL int
186  const struct m0_reqh_service_type *stype,
187  struct m0_reqh_context *rctx)
188 {
189  int rc;
190 
191  M0_ENTRY();
192  M0_PRE(out != NULL && stype != NULL);
193 
194  rc = stype->rst_ops->rsto_service_allocate(out, stype);
195  if (rc == 0) {
196  struct m0_reqh_service *service = *out;
197  service->rs_type = stype;
199  m0_reqh_service_bob_init(service);
201  service->rs_level = stype->rst_level;
203  }
204  return M0_RC(rc);
205 }
206 
208  enum m0_reqh_service_state state)
209 {
210  static const enum m0_conf_ha_service_event state2event[] = {
216  };
217 
218  if (!M0_IN(state, (M0_RST_STARTING, M0_RST_STARTED,
220  return;
221  if (m0_get()->i_ha == NULL || m0_get()->i_ha_link == NULL) {
222  M0_LOG(M0_DEBUG, "can't report service HA event=%d "
223  "service_fid="FID_F, state2event[state],
225  return;
226  }
227  m0_conf_ha_service_event_post(m0_get()->i_ha, m0_get()->i_ha_link,
231  m0_process(),
232  state2event[state],
234 }
235 
237  enum m0_reqh_service_state state)
238 {
240  m0_sm_state_set(&service->rs_sm, state);
243 }
244 
246  struct m0_reqh_service *service,
247  unsigned key)
248 {
250 
251  /*
252  * NOTE: The key is required to be set before 'rso_start'
253  * as some services can call m0_fom_init() directly in
254  * their service start, m0_fom_init() finds the service
255  * given reqh, using this key
256  */
257  M0_ASSERT(m0_reqh_lockers_is_empty(reqh, key));
258  m0_reqh_lockers_set(reqh, key, service);
259  M0_LOG(M0_DEBUG, "key init for reqh=%p, key=%d", reqh, key);
260 }
261 
263  struct m0_reqh_service *service,
264  unsigned key)
265 {
266  if (!m0_reqh_lockers_is_empty(reqh, key))
267  m0_reqh_lockers_clear(reqh, key);
269 }
270 
271 M0_INTERNAL int
273 {
274  int rc;
275  unsigned key;
276  struct m0_reqh *reqh;
277  struct m0_reqh_service *service;
278 
279  M0_PRE(asc != NULL && asc->sac_service != NULL && asc->sac_fom != NULL);
280  service = asc->sac_service;
281  M0_PRE(m0_reqh_service_bob_check(service));
282  reqh = service->rs_reqh;
284 
292 
293  rc = service->rs_ops->rso_start_async(asc);
294 
296  if (rc == 0)
298  else
301 
302  return M0_RC(rc);
303 }
304 
306  struct m0_reqh_service *service)
307 {
309 }
310 
312 {
313  struct m0_reqh *reqh;
314 
315  M0_PRE(m0_reqh_service_bob_check(service));
316  reqh = service->rs_reqh;
317 
324 }
325 
327 {
328  unsigned key;
329  struct m0_reqh *reqh;
330 
331  M0_PRE(m0_reqh_service_bob_check(service));
332  reqh = service->rs_reqh;
334 
341 }
342 
344 {
345  int rc;
346  unsigned key;
347  struct m0_reqh *reqh;
348 
349  M0_PRE(m0_reqh_service_bob_check(service));
350  reqh = service->rs_reqh;
352 
358  M0_POST(m0_reqh_lockers_get(reqh, key) == service);
360 
362 
364  if (rc == 0)
366  else
370 
371  return M0_RC(rc);
372 }
373 
374 M0_INTERNAL void
376 {
377  struct m0_reqh *reqh;
378  bool run_method = false;
379 
380  M0_PRE(m0_reqh_service_bob_check(service));
381  reqh = service->rs_reqh;
382 
383  M0_LOG(M0_DEBUG, "Preparing to stop %s [%d] (%d)",
386 
390  M0_RST_STOPPING)));
394  run_method = true;
395  }
397 
398  if (run_method && service->rs_ops->rso_prepare_to_stop != NULL)
400 }
401 
403 {
404  struct m0_reqh *reqh;
405  unsigned key;
406 
407  M0_PRE(m0_reqh_service_bob_check(service));
409  reqh = service->rs_reqh;
411 
418 
420  /*
421  * Wait again, in case ->rso_stop() launched more foms. E.g., rpcservice
422  * starts reverse connection disconnection at this point.
423  */
425  m0_reqh_lockers_clear(reqh, key);
426 }
427 
429  struct m0_reqh *reqh,
430  const struct m0_fid *fid)
431 {
432  M0_PRE(service != NULL && reqh != NULL &&
434  /* Currently fid may be NULL */
436 
438  &reqh->rh_sm_grp);
439 
440  if (fid != NULL)
442  service->rs_reqh = reqh;
445 
446  /*
447  * We want to track these services externally so add them to the list
448  * just as soon as they enter the M0_RST_INITIALISED state.
449  * They will be left on the list until they get fini'd.
450  */
451  m0_reqh_svc_tlink_init_at(service, &reqh->rh_services);
452  service->rs_fom_key = m0_locality_lockers_allot();
455 }
456 
458 {
459  M0_PRE(service != NULL && m0_reqh_service_bob_check(service));
460 
462  m0_locality_lockers_free(service->rs_fom_key);
463  m0_reqh_svc_tlink_del_fini(service);
464  m0_reqh_service_bob_fini(service);
471 }
472 
474 {
475  M0_PRE(rstype != NULL);
477 
478  if (M0_FI_ENABLED("fake_error"))
479  return M0_ERR(-EINVAL);
480 
481  m0_reqh_service_type_bob_init(rstype);
483  rstype->rst_key = m0_reqh_lockers_allot();
484  rstypes_tlink_init_at_tail(rstype, &rstypes);
486 
487  return 0;
488 }
489 
491 {
492  M0_PRE(rstype != NULL && m0_reqh_service_type_bob_check(rstype));
493 
494  rstypes_tlink_del_fini(rstype);
495  m0_reqh_lockers_free(rstype->rst_key);
496  m0_reqh_service_type_bob_fini(rstype);
497 }
498 
499 M0_INTERNAL int m0_reqh_service_types_length(void)
500 {
501  return rstypes_tlist_length(&rstypes);
502 }
503 
504 M0_INTERNAL void m0_reqh_service_list_print(void)
505 {
506  struct m0_reqh_service_type *stype;
507 
509  M0_ASSERT(m0_reqh_service_type_bob_check(stype));
510  m0_console_printf(" %s\n", stype->rst_name);
511  } m0_tl_endfor;
512 }
513 
514 M0_INTERNAL bool m0_reqh_service_is_registered(const char *sname)
515 {
516  return m0_tl_exists(rstypes, stype, &rstypes,
517  m0_strcaseeq(stype->rst_name, sname));
518 }
519 
520 M0_INTERNAL int m0_reqh_service_types_init(void)
521 {
522  rstypes_tlist_init(&rstypes);
523  m0_bob_type_tlist_init(&rstypes_bob, &rstypes_tl);
525 
526  return 0;
527 }
528 M0_EXPORTED(m0_reqh_service_types_init);
529 
530 M0_INTERNAL void m0_reqh_service_types_fini(void)
531 {
532  rstypes_tlist_fini(&rstypes);
534 }
535 M0_EXPORTED(m0_reqh_service_types_fini);
536 
537 M0_INTERNAL struct m0_reqh_service *
539  const struct m0_reqh *reqh)
540 {
541  struct m0_reqh_service *service;
542 
543  M0_PRE(st != NULL && reqh != NULL);
544  service = m0_reqh_lockers_get(reqh, st->rst_key);
545  M0_POST(ergo(service != NULL, service->rs_type == st));
546  return service;
547 }
548 M0_EXPORTED(m0_reqh_service_find);
549 
550 M0_INTERNAL struct m0_reqh_service *
551 m0_reqh_service_lookup(const struct m0_reqh *reqh, const struct m0_fid *fid)
552 {
553  M0_PRE(reqh != NULL);
554  M0_PRE(fid != NULL);
555 
556  return m0_tl_find(m0_reqh_svc, s, &reqh->rh_services,
557  m0_fid_eq(fid, &s->rs_service_fid));
558 }
559 
560 M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
561 {
562  return s->rs_sm.sm_state;
563 }
564 
565 M0_INTERNAL int m0_reqh_service_setup(struct m0_reqh_service **out,
566  struct m0_reqh_service_type *stype,
567  struct m0_reqh *reqh,
568  struct m0_reqh_context *rctx,
569  const struct m0_fid *fid)
570 {
571  int result;
572 
574  M0_ENTRY();
575 
577  if (result == 0) {
578  struct m0_reqh_service *svc = *out;
579 
581  result = m0_reqh_service_start(svc);
582  if (result != 0)
584  }
585  return M0_RC(result);
586 }
587 
588 M0_INTERNAL void m0_reqh_service_quit(struct m0_reqh_service *svc)
589 {
590  if (svc != NULL && svc->rs_sm.sm_state == M0_RST_STARTED) {
592  svc->rs_reqh) == svc);
594  m0_reqh_idle_wait_for(svc->rs_reqh, svc);
597  }
598 }
599 
600 int
602 {
603  M0_ENTRY();
605 
606  asc->sac_rc = asc->sac_service->rs_ops->rso_start(asc->sac_service);
607  m0_fom_wakeup(asc->sac_fom);
608  return M0_RC(asc->sac_rc);
609 }
611 
613 {
614  return 0 < t && t < M0_CST_NR;
615 }
616 
655  [M0_RSC_OFFLINE] = {
657  .sd_name = "M0_RSC_OFFLINE",
658  .sd_allowed = M0_BITS(M0_RSC_CONNECTING)
659  },
660  [M0_RSC_ONLINE] = {
661  .sd_name = "M0_RSC_ONLINE",
662  .sd_allowed = M0_BITS(M0_RSC_DISCONNECTING,
664  },
665  [M0_RSC_CONNECTING] = {
666  .sd_name = "M0_RSC_CONNECTING",
667  .sd_allowed = M0_BITS(M0_RSC_ONLINE,
669  },
671  .sd_name = "M0_RSC_DISCONNECTING",
672  .sd_allowed = M0_BITS(M0_RSC_OFFLINE)
673  },
674  [M0_RSC_CANCELLED] = {
675  .sd_name = "M0_RSC_CANCELLED",
676  .sd_allowed = M0_BITS(M0_RSC_DISCONNECTING)
677  },
678 };
679 
680 static const struct m0_sm_conf service_ctx_states_conf = {
681  .scf_name = "Service ctx connection states",
682  .scf_nr_states = ARRAY_SIZE(service_ctx_states),
683  .scf_state = service_ctx_states
684 };
685 
687 {
688  return _0C(ctx != NULL) && _0C(m0_reqh_service_ctx_bob_check(ctx)) &&
689  _0C(m0_fid_is_set(&ctx->sc_fid)) &&
690  _0C(service_type_is_valid(ctx->sc_type));
691 }
692 
694 {
695  m0_conf_obj_get(ctx->sc_service);
696  m0_clink_add(&ctx->sc_service->co_ha_chan, &ctx->sc_svc_event);
697  m0_conf_obj_get(ctx->sc_process);
698  m0_clink_add(&ctx->sc_process->co_ha_chan, &ctx->sc_process_event);
699 }
700 
701 M0_INTERNAL void
703 {
704  if (ctx->sc_svc_event.cl_chan != NULL) {
705  m0_clink_cleanup(&ctx->sc_svc_event);
706  ctx->sc_svc_event.cl_chan = NULL;
707  m0_confc_close(ctx->sc_service);
708  ctx->sc_service = NULL;
709  }
710  if (ctx->sc_process_event.cl_chan != NULL) {
711  m0_clink_cleanup(&ctx->sc_process_event);
712  ctx->sc_process_event.cl_chan = NULL;
713  m0_confc_close(ctx->sc_process);
714  ctx->sc_process = NULL;
715  }
716 }
717 
719  m0_time_t deadline)
720 {
722  M0_RSC_CANCELLED)));
723 
724  m0_rpc_link_reset(&ctx->sc_rlink);
726  m0_rpc_link_connect_async(&ctx->sc_rlink, deadline,
727  &ctx->sc_rlink_wait);
728 }
729 
731 {
732  struct m0_conf_obj *obj = ctx->sc_service;
733 
734  M0_ENTRY("ctx=%p '%s' Connect to service '%s' type=%s", ctx,
735  m0_rpc_machine_ep(ctx->sc_rlink.rlk_conn.c_rpc_machine),
736  m0_rpc_link_end_point(&ctx->sc_rlink),
737  m0_conf_service_type2str(ctx->sc_type));
739  M0_PRE(m0_conf_cache_is_locked(obj->co_cache));
741 
745  /*
746  * Set M0_RSC_RLINK_CANCEL according to service conf object. This
747  * simulates behaviour of service_event_handler() for events that are
748  * lost before subscription.
749  */
750  if (obj->co_ha_state == M0_NC_FAILED)
756  M0_LEAVE();
757 }
758 
759 M0_INTERNAL bool
761 {
763 }
764 
765 static void
767 {
769 
770  /*
771  * Not required to wait for reply on session/connection termination
772  * if process is died otherwise wait for some timeout.
773  */
778  &ctx->sc_rlink_wait);
779 }
780 
782 {
783  M0_ENTRY("Disconnecting from service '%s'",
784  m0_rpc_link_end_point(&ctx->sc_rlink));
786 
791  /*
792  * 'ING states will be handled in reqh_service_ctx_ast_cb().
793  * Offline state does not require disconnection either.
794  */
798  M0_LEAVE();
799 }
800 
802  int state)
803 {
804  int rc;
805 
806  M0_PRE(M0_IN(state, (M0_RSC_ONLINE, M0_RSC_OFFLINE)));
807 
809  rc = m0_sm_timedwait(&ctx->sc_sm, M0_BITS(state), M0_TIME_NEVER);
810  M0_ASSERT_INFO(rc == 0, "rc = %d", rc);
811  M0_ASSERT(CTX_STATE(ctx) == state);
812  M0_ASSERT(ergo(state == M0_RSC_ONLINE,
813  m0_rpc_link_is_connected(&ctx->sc_rlink)));
814  M0_ASSERT(ergo(state == M0_RSC_OFFLINE,
815  !m0_rpc_link_is_connected(&ctx->sc_rlink)));
816  rc = ctx->sc_rlink.rlk_rc;
818 
819  return rc;
820 }
821 
823 {
824 
825  if (ctx->sc_service->co_ha_state == M0_NC_ONLINE) {
827  M0_ASSERT(rc == 0);
828  }
829 }
830 
832 {
834 }
835 
837  const char *addr)
838 {
839  M0_PRE(addr != NULL &&
840  strcmp(addr, m0_rpc_link_end_point(&ctx->sc_rlink)) == 0);
841 
842  if (M0_IN(CTX_STATE(ctx), (M0_RSC_DISCONNECTING,
843  M0_RSC_CONNECTING))) {
844  /* 'ING states reach ONLINE eventually */
846  return;
847  }
849 }
850 
851 M0_INTERNAL void
853 {
854  M0_ENTRY("Reconnecting to service '%s'",
855  m0_rpc_link_end_point(&ctx->sc_rlink));
860  if (CTX_STATE(ctx) == M0_RSC_ONLINE) {
861  m0_rpc_session_cancel(&ctx->sc_rlink.rlk_sess);
863  }
865  M0_LEAVE();
866 }
867 
869 {
875 
876  if (CTX_STATE(ctx) == M0_RSC_ONLINE) {
877  m0_rpc_session_cancel(&ctx->sc_rlink.rlk_sess);
879  } else {
881  }
882 }
883 
885 {
887 
890 }
891 
893 {
894  struct m0_reqh_service_ctx *ctx;
895 
896  M0_ENTRY();
897  /*
898  * Step 1: Flag every context for abortion.
899  *
900  * The idea is to make any reconnecting service context to encounter the
901  * flag and go offline before real context destruction occurs.
902  */
903  m0_tl_for(pools_common_svc_ctx, &reqh->rh_pools->pc_svc_ctxs, ctx) {
905  /*
906  * Need to prevent context from being activated by HA
907  * notification, service or process related.
908  */
911  /*
912  * Context is unconditionally flagged to abort. In case of
913  * connection broken due to network reasons, the context will be
914  * put offline and prevented from further connection.
915  *
916  * This way yet still incomplete context is commanded to go
917  * offline. Presently offline context does not require any
918  * special attention.
919  */
922  } m0_tl_endfor;
923 
924  /*
925  * Step 2: Spot a still connecting context and wait for fom activity.
926  *
927  * The idea is to let not a context with dormant fom go for destruction.
928  * The rpc link fom activity has to occur first to make sure abortion
929  * flag goes in effect. Besides the current one, any other reconnecting
930  * context may be offlined in background because of abortion flag
931  * already installed on previous step.
932  */
933  m0_tl_for(pools_common_svc_ctx, &reqh->rh_pools->pc_svc_ctxs, ctx) {
934  bool connecting;
935 
938  connecting = CTX_STATE(ctx) == M0_RSC_CONNECTING;
939  M0_LOG(M0_DEBUG, "[%d] %s (%d)", CTX_STATE(ctx),
940  m0_rpc_link_end_point(&ctx->sc_rlink), ctx->sc_type);
941  if (connecting) {
942  /*
943  * .rlk_rc must not be 0 to meet M0_RSC_OFFLINE
944  * conditions.
945  * */
946  ctx->sc_rlink.rlk_rc = M0_ERR(-EPERM);
948  /*
949  * From this moment we rely on rpc link timeout to wake
950  * up the link fom to be ultimately aborted by the flag
951  * specification.
952  */
953  }
955  /* Do waiting outside the sm lock. */
956  if (connecting) {
957  m0_clink_add_lock(&ctx->sc_rlink.rlk_wait,
958  &ctx->sc_rlink_abort);
959  /*
960  * While we were leaving the sm lock the fom activity
961  * might happen alright, thus timed waiting.
962  */
963  m0_chan_timedwait(&ctx->sc_rlink_abort,
965  m0_clink_del_lock(&ctx->sc_rlink_abort);
966  }
967  } m0_tl_endfor;
968  M0_LEAVE();
969 }
970 
972 {
973  M0_ENTRY();
974 
976 
978  m0_sm_fini(&ctx->sc_sm);
980  m0_sm_group_fini(&ctx->sc_sm_grp);
981  m0_clink_fini(&ctx->sc_rlink_wait);
982  m0_clink_fini(&ctx->sc_rlink_abort);
983  m0_clink_fini(&ctx->sc_process_event);
984  m0_clink_fini(&ctx->sc_svc_event);
985  m0_mutex_fini(&ctx->sc_max_pending_tx_lock);
986  m0_reqh_service_ctx_bob_fini(ctx);
988  m0_rpc_link_fini(&ctx->sc_rlink);
989 
990  M0_LEAVE();
991 }
992 
994  struct m0_sm_ast *ast)
995 {
997  bool connected;
998  bool disconnect;
999 
1001  M0_ENTRY("'%s' ctx of service '%s' state: %d",
1002  m0_rpc_machine_ep(ctx->sc_rlink.rlk_conn.c_rpc_machine),
1003  m0_rpc_link_end_point(&ctx->sc_rlink), CTX_STATE(ctx));
1006 
1007  connected = m0_rpc_link_is_connected(&ctx->sc_rlink);
1009 
1010  switch (CTX_STATE(ctx)) {
1011  case M0_RSC_CONNECTING:
1012  if (connected) {
1013  M0_ASSERT(ctx->sc_rlink.rlk_rc == 0);
1015  M0_LOG(M0_DEBUG, "connecting -> online: %s (%d)",
1016  m0_rpc_link_end_point(&ctx->sc_rlink),
1017  ctx->sc_type);
1018  } else {
1019  M0_ASSERT(ctx->sc_rlink.rlk_rc != 0);
1020  /* Reconnect later. */
1022  M0_LOG(M0_DEBUG, "connecting -> offline: %s (%d)",
1023  m0_rpc_link_end_point(&ctx->sc_rlink),
1024  ctx->sc_type);
1025  }
1026  break;
1027 
1028  case M0_RSC_DISCONNECTING:
1029  M0_ASSERT(!connected);
1032  M0_LOG(M0_DEBUG, "disconnecting -> offline: %s (%d)",
1033  m0_rpc_link_end_point(&ctx->sc_rlink), ctx->sc_type);
1034  break;
1035 
1036  default:
1037  M0_IMPOSSIBLE("Invalid state: %d", CTX_STATE(ctx));
1038  }
1039  M0_LOG(M0_DEBUG, "state: %d", CTX_STATE(ctx));
1040 
1041  /* Cancel only established session. */
1043  connected) {
1044  m0_rpc_session_cancel(&ctx->sc_rlink.rlk_sess);
1047  }
1048  /*
1049  * Reconnect every time `ctx' reaches OFFLINE state until
1050  * m0_reqh_service_disconnect() is called unless explicit abortion is
1051  * requested.
1052  */
1053  if (CTX_STATE(ctx) == M0_RSC_OFFLINE) {
1055  !disconnect)
1058  }
1059  /* m0_reqh_service_disconnect() was called during connecting. */
1060  if (CTX_STATE(ctx) == M0_RSC_ONLINE && disconnect)
1062 
1064 }
1065 
1067 {
1069  struct m0_locality *loc;
1070 
1071  /*
1072  * Go out from the chan lock.
1073  *
1074  * @note m0_reqh_service_connect_wait() and
1075  * m0_reqh_service_disconnect_wait() must not be called from a locality
1076  * thread without m0_fom_block_enter()/leave(). Otherwise, it can lead
1077  * to a deadlock.
1078  */
1079  loc = m0_locality_get(ctx->sc_fid.f_key);
1080  ctx->sc_rlink_ast.sa_cb = &reqh_service_ctx_ast_cb;
1081  m0_sm_ast_post(loc->lo_grp, &ctx->sc_rlink_ast);
1082 
1083  return true;
1084 }
1085 
1090 {
1092  struct m0_conf_obj *obj = M0_AMB(obj, clink->cl_chan,
1093  co_ha_chan);
1094  struct m0_conf_process *process;
1095 
1096  M0_ENTRY();
1098 
1099  process = M0_CONF_CAST(obj, m0_conf_process);
1102  /* Ignore notifications for offline services. */
1103  goto exit_unlock;
1104  }
1105 
1106  switch (obj->co_ha_state) {
1107  case M0_NC_FAILED:
1108  case M0_NC_TRANSIENT:
1109  /*
1110  * m0_rpc_post() needs valid session, so service context is not
1111  * finalised. Here making service context as inactive, which
1112  * will become active again after reconnection when process is
1113  * restarted.
1114  */
1117  break;
1118  case M0_NC_ONLINE:
1120  m0_conf_obj_grandparent(obj)->co_ha_state != M0_NC_ONLINE)
1121  break;
1122  /*
1123  * Process may become online prior to service object.
1124  *
1125  * Make sure respective service object is known online. In case
1126  * it is not, quit and let service_event_handler() do the job.
1127  *
1128  * Note: until service object gets known online, re-connection
1129  * is not possible due to assertions in RPC connection HA
1130  * subscription code.
1131  */
1132  if (ctx->sc_service == NULL)
1133  ctx->sc_service = m0_conf_cache_lookup(obj->co_cache,
1134  &ctx->sc_fid);
1135  M0_ASSERT(ctx->sc_service != NULL);
1136  if (ctx->sc_service->co_ha_state != M0_NC_ONLINE)
1137  break;
1138 
1140  break;
1141  default:
1142  ;
1143  }
1144 exit_unlock:
1146 
1147  M0_LEAVE();
1148  return true;
1149 }
1150 
1155 {
1157  struct m0_conf_obj *obj = M0_AMB(obj, clink->cl_chan,
1158  co_ha_chan);
1159  struct m0_conf_service *service;
1160  struct m0_reqh *reqh = m0_conf_obj2reqh(obj);
1161  bool result = true;
1162 
1163  M0_ENTRY();
1165 
1168  &obj->co_id,
1169  service->cs_type));
1172  /* Ignore notifications for offline services. */
1173  goto exit_unlock;
1174  }
1175 
1176  switch (obj->co_ha_state) {
1177  case M0_NC_TRANSIENT:
1178  /*
1179  * It seems important to do nothing here to let rpc item ha
1180  * timeout do its job. When HA really decides on service death,
1181  * it notifies with M0_NC_FAILED.
1182  */
1183  break;
1184  case M0_NC_FAILED:
1187  break;
1188  case M0_NC_ONLINE:
1189  /*
1190  * Note: Make no assumptions about process HA state, as service
1191  * state update may take a lead in the batch updates.
1192  */
1195  service->cs_endpoints[0]);
1196  }
1197  break;
1198  default:
1199  ;
1200  }
1201 exit_unlock:
1203 
1204  M0_LEAVE();
1205  return result;
1206 }
1207 
1209  struct m0_conf_obj *svc_obj,
1211  struct m0_rpc_machine *rmach,
1212  const char *addr,
1213  uint32_t max_rpc_nr_in_flight)
1214 {
1215  struct m0_conf_obj *proc_obj = m0_conf_obj_grandparent(svc_obj);
1216  int rc;
1217 
1218  M0_ENTRY();
1219  M0_LOG(M0_DEBUG, FID_F "%d", FID_P(&svc_obj->co_id), stype);
1220 
1221  M0_SET0(ctx);
1222  if (rmach != NULL) {
1223  rc = m0_rpc_link_init(&ctx->sc_rlink, rmach, &svc_obj->co_id,
1224  addr, max_rpc_nr_in_flight);
1225  if (rc != 0)
1226  return M0_ERR(rc);
1228  }
1229  ctx->sc_fid = svc_obj->co_id;
1230  ctx->sc_service = svc_obj;
1231  ctx->sc_process = proc_obj;
1232  ctx->sc_type = stype;
1233  ctx->sc_fid_process = proc_obj->co_id;
1234  m0_reqh_service_ctx_bob_init(ctx);
1235  m0_mutex_init(&ctx->sc_max_pending_tx_lock);
1236  m0_clink_init(&ctx->sc_svc_event, service_event_handler);
1237  m0_clink_init(&ctx->sc_process_event, process_event_handler);
1238  m0_clink_init(&ctx->sc_rlink_wait, reqh_service_ctx_rlink_cb);
1239  m0_clink_init(&ctx->sc_rlink_abort, NULL);
1240  m0_sm_group_init(&ctx->sc_sm_grp);
1242  M0_RSC_OFFLINE, &ctx->sc_sm_grp);
1243  ctx->sc_rlink_wait.cl_is_oneshot = true;
1244 
1246  return M0_RC(0);
1247 }
1248 
1249 M0_INTERNAL int m0_reqh_service_ctx_create(struct m0_conf_obj *svc_obj,
1251  struct m0_rpc_machine *rmach,
1252  const char *addr,
1253  uint32_t max_rpc_nr_in_flight,
1254  struct m0_reqh_service_ctx **out)
1255 {
1256  int rc;
1257 
1258  M0_PRE(m0_fid_is_set(&svc_obj->co_id));
1260 
1261  M0_ENTRY(FID_F "stype:%d", FID_P(&svc_obj->co_id), stype);
1262  M0_ALLOC_PTR(*out);
1263  if (*out == NULL)
1264  return M0_ERR(-ENOMEM);
1265  rc = m0_reqh_service_ctx_init(*out, svc_obj, stype, rmach, addr,
1266  max_rpc_nr_in_flight);
1267  if (rc != 0)
1268  m0_free(*out);
1269 
1270  return M0_RC(rc);
1271 }
1272 
1273 M0_INTERNAL void
1275 {
1277  m0_free(ctx);
1278 }
1279 
1280 M0_INTERNAL struct m0_reqh_service_ctx *
1282 {
1283  struct m0_reqh_service_ctx *ret;
1284  struct m0_rpc_link *rlink;
1285 
1286  M0_PRE(session != NULL);
1287 
1288  rlink = M0_AMB(rlink, session, rlk_sess);
1289  ret = M0_AMB(ret, rlink, sc_rlink);
1290 
1292 
1293  return ret;
1294 }
1295 
1297  struct m0_sm_ast *ast)
1298 {
1299  struct m0_reqh_service_ctx *ctx = ast->sa_datum;
1300 
1301  M0_ENTRY("ctx %p "FID_F, ctx, FID_P(&ctx->sc_fid));
1303  M0_LEAVE();
1304 }
1305 
1310 static void
1312 {
1313  struct m0_pools_common *pc = ctx->sc_pc;
1314  struct m0_sm_ast *ast = &ctx->sc_rlink_ast;
1315 
1316  M0_ENTRY("ctx %p", ctx);
1317  M0_PRE(M0_IN(ctx->sc_rlink.rlk_conn.c_sm.sm_state,
1319 
1320  if (pools_common_svc_ctx_tlist_contains(&pc->pc_abandoned_svc_ctxs,
1321  ctx)) {
1322  pools_common_svc_ctx_tlink_del_fini(ctx);
1323  /*
1324  * Escape from being under the context group lock.
1325  *
1326  * Hijacking sc_rlink_ast here must be surely safe for abandoned
1327  * context, as no connection related operation is going to be
1328  * done anymore on the context's rpc link.
1329  */
1330  ast->sa_datum = ctx;
1332  m0_sm_ast_post(m0_locality_get(ctx->sc_fid.f_key)->lo_grp, ast);
1333  }
1334  M0_LEAVE();
1335 }
1336 
1338 #undef M0_TRACE_SUBSYSTEM
1339 
1340 /*
1341  * Local variables:
1342  * c-indentation-style: "K&R"
1343  * c-basic-offset: 8
1344  * tab-width: 8
1345  * fill-column: 80
1346  * scroll-step: 1
1347  * End:
1348  */
const struct m0_conf_obj_type * m0_conf_obj_type(const struct m0_conf_obj *obj)
Definition: obj.c:363
struct m0_fid co_id
Definition: obj.h:208
M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
Definition: reqh_service.c:560
static bool process_event_handler(struct m0_clink *clink)
static void reqh_service_ctx_state_move(struct m0_reqh_service_ctx *ctx, int state)
M0_INTERNAL uint64_t m0_process(void)
Definition: kthread.c:295
M0_INTERNAL int m0_reqh_service_start_async(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.c:272
M0_INTERNAL void m0_reqh_service_cancel_reconnect(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:852
const char * pc_endpoint
Definition: obj.h:590
static void abandoned_ctx_destroy_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
M0_INTERNAL bool m0_buf_is_set(const struct m0_buf *buf)
Definition: buf.c:127
M0_INTERNAL int m0_reqh_service_start(struct m0_reqh_service *service)
Definition: reqh_service.c:343
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_lookup(const struct m0_reqh *reqh, const struct m0_fid *fid)
Definition: reqh_service.c:551
M0_INTERNAL int m0_reqh_service_types_init(void)
Definition: reqh_service.c:520
int(* rso_start)(struct m0_reqh_service *service)
Definition: reqh_service.h:360
struct m0_sm rs_sm
Definition: reqh_service.h:244
M0_TL_DECLARE(abandoned_svc_ctxs, M0_EXTERN, struct m0_reqh_service_ctx)
M0_INTERNAL struct m0_locality * m0_locality_get(uint64_t value)
Definition: locality.c:156
static void reqh_service_ctx_flag_clear(struct m0_reqh_service_ctx *ctx, int flag)
M0_INTERNAL void m0_reqh_service_stop(struct m0_reqh_service *service)
Definition: reqh_service.c:402
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_conf_ha_service_event_post(struct m0_ha *ha, struct m0_ha_link *hl, const struct m0_fid *source_process_fid, const struct m0_fid *source_service_fid, const struct m0_fid *service_fid, uint64_t pid, enum m0_conf_ha_service_event event, enum m0_conf_service_type service_type)
Definition: ha.c:80
m0_conf_service_type
Definition: schema.h:194
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
#define ergo(a, b)
Definition: misc.h:293
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
enum m0_conf_service_type rst_typecode
Definition: reqh_service.h:471
Definition: sm.h:350
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
m0_conf_ha_service_event
Definition: ha.h:119
unsigned rs_level
Definition: reqh_service.h:236
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_INTERNAL struct m0_conf_obj * m0_conf_cache_lookup(const struct m0_conf_cache *cache, const struct m0_fid *id)
Definition: cache.c:106
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL void m0_reqh_service_prepare_to_stop(struct m0_reqh_service *service)
Definition: reqh_service.c:375
const struct m0_conf_obj_type M0_CONF_SERVICE_TYPE
Definition: service.c:156
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
void m0_console_printf(const char *fmt,...)
Definition: trace.c:801
static void reqh_service_state_set(struct m0_reqh_service *service, enum m0_reqh_service_state state)
Definition: reqh_service.c:236
M0_INTERNAL void m0_reqh_service_ctx_subscribe(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:693
void(* rso_fini)(struct m0_reqh_service *service)
Definition: reqh_service.h:399
static bool reqh_service_ctx_is_cancelled(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:884
#define m0_strcaseeq(a, b)
Definition: string.h:35
M0_INTERNAL bool m0_reqh_service_ctx_is_connected(const struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:760
static void reqh_service_disconnect_locked(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:766
M0_INTERNAL void m0_rwlock_write_lock(struct m0_rwlock *lock)
Definition: rwlock.c:42
static void reqh_service_connect_locked(struct m0_reqh_service_ctx *ctx, m0_time_t deadline)
Definition: reqh_service.c:718
M0_INTERNAL void m0_reqh_service_ctx_destroy(struct m0_reqh_service_ctx *ctx)
static bool reqh_service_ctx_sm_is_locked(const struct m0_reqh_service_ctx *ctx)
M0_TL_DESCR_DECLARE(abandoned_svc_ctxs, M0_EXTERN)
static void reqh_service_reconnect_locked(struct m0_reqh_service_ctx *ctx, const char *addr)
Definition: reqh_service.c:836
#define M0_BITS(...)
Definition: misc.h:236
static void reqh_service_ctx_sm_unlock(struct m0_reqh_service_ctx *ctx)
struct m0_tl rh_services
Definition: reqh.h:127
M0_INTERNAL struct m0 * m0_get(void)
Definition: instance.c:41
Definition: sm.h:504
M0_TL_DEFINE(rstypes, static, struct m0_reqh_service_type)
M0_INTERNAL m0_time_t m0_rpc__down_timeout(void)
Definition: rpc.c:322
static struct m0_rpc_session session
Definition: formation2.c:38
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_reqh_service_connect_wait(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:822
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
Definition: rpc_machine.c:603
struct m0_rwlock rh_rwlock
Definition: reqh.h:143
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
Definition: fid.c:106
static bool service_event_handler(struct m0_clink *clink)
M0_INTERNAL int m0_reqh_service_types_length(void)
Definition: reqh_service.c:499
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
Definition: sm.c:65
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
static struct foo * obj
Definition: tlist.c:302
M0_TL_DESCR_DEFINE(rstypes, "reqh service types", static, struct m0_reqh_service_type, rst_linkage, rst_magix, M0_REQH_SVC_TYPE_MAGIC, M0_REQH_SVC_HEAD_MAGIC)
const char * bt_name
Definition: bob.h:73
struct m0_reqh_service * sac_service
Definition: reqh_service.h:293
M0_INTERNAL struct m0_reqh_service_ctx * m0_pools_common_service_ctx_find(const struct m0_pools_common *pc, const struct m0_fid *id, enum m0_conf_service_type type)
Definition: pool.c:1095
static struct m0_pools_common pc
Definition: iter_ut.c:59
M0_INTERNAL struct m0_reqh_service_type * m0_reqh_service_type_find(const char *sname)
Definition: reqh_service.c:168
M0_INTERNAL void m0_rwlock_init(struct m0_rwlock *lock)
Definition: rwlock.c:32
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
Definition: sm.c:387
struct m0_fid fid
Definition: di.c:46
return M0_RC(rc)
static void reqh_service_started_common(struct m0_reqh *reqh, struct m0_reqh_service *service)
Definition: reqh_service.c:305
struct m0_buf rs_ss_param
Definition: reqh_service.h:281
#define M0_ENTRY(...)
Definition: trace.h:170
struct m0_reqh_context rctx
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
M0_INTERNAL bool m0_reqh_service_is_registered(const char *sname)
Definition: reqh_service.c:514
static void reqh_service_ha_event(struct m0_reqh_service *service, enum m0_reqh_service_state state)
Definition: reqh_service.c:207
M0_INTERNAL void m0_conf_obj_get(struct m0_conf_obj *obj)
Definition: obj_ops.c:186
M0_INTERNAL int m0_reqh_service_disconnect_wait(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:831
void(* rso_stop)(struct m0_reqh_service *service)
Definition: reqh_service.h:387
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
static struct m0_sm_state_descr service_states[]
Definition: reqh_service.c:100
static int key
Definition: locality.c:283
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
M0_INTERNAL bool m0_fom_domain_is_idle_for(const struct m0_reqh_service *svc)
Definition: fom.c:1281
static const struct socktype stype[]
Definition: sock.c:1156
static void reqh_service_starting_common(struct m0_reqh *reqh, struct m0_reqh_service *service, unsigned key)
Definition: reqh_service.c:245
M0_INTERNAL int m0_reqh_service_ctx_create(struct m0_conf_obj *svc_obj, enum m0_conf_service_type stype, struct m0_rpc_machine *rmach, const char *addr, uint32_t max_rpc_nr_in_flight, struct m0_reqh_service_ctx **out)
static void reqh_service_ctx_flag_set(struct m0_reqh_service_ctx *ctx, int flag)
#define M0_ASSERT(cond)
M0_INTERNAL void m0_reqh_service_connect(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:730
const char * scf_name
Definition: sm.h:352
M0_INTERNAL void m0_reqh_service_quit(struct m0_reqh_service *svc)
Definition: reqh_service.c:588
M0_INTERNAL const char * m0_conf_service_type2str(enum m0_conf_service_type type)
Definition: service.c:169
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
Definition: sm.c:53
static bool reqh_service_ctx_flag_is_set(const struct m0_reqh_service_ctx *ctx, int flag)
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
const char * rst_name
Definition: reqh_service.h:447
struct m0_conf_obj * m0_conf_obj_grandparent(const struct m0_conf_obj *obj)
Definition: obj.c:384
M0_INTERNAL void m0_reqh_service_fini(struct m0_reqh_service *service)
Definition: reqh_service.c:457
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
Definition: session.c:850
struct m0_fid rs_service_fid
Definition: reqh_service.h:220
static struct m0_thread t[8]
Definition: service_ut.c:1230
Definition: tlist.h:251
#define m0_streq(a, b)
Definition: string.h:34
struct m0_reqh_context * rs_reqh_ctx
Definition: reqh_service.h:271
M0_INTERNAL bool m0_reqh_service_invariant(const struct m0_reqh_service *svc)
Definition: reqh_service.c:143
M0_INTERNAL struct m0_reqh * m0_conf_obj2reqh(const struct m0_conf_obj *obj)
Definition: helpers.c:351
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
M0_INTERNAL bool m0_conf_cache_is_locked(const struct m0_conf_cache *cache)
Definition: cache.c:60
static struct m0_sm_state_descr service_ctx_states[]
Definition: reqh_service.c:654
#define M0_POST(cond)
Definition: xcode.h:73
m0_reqh_service_state
Definition: reqh_service.h:156
struct m0_chan co_ha_chan
Definition: obj.h:248
M0_INTERNAL int m0_reqh_service_allocate(struct m0_reqh_service **out, const struct m0_reqh_service_type *stype, struct m0_reqh_context *rctx)
Definition: reqh_service.c:185
Definition: reqh.h:94
M0_INTERNAL void m0_clink_cleanup(struct m0_clink *link)
Definition: chan.c:310
M0_INTERNAL int m0_reqh_service_ctx_init(struct m0_reqh_service_ctx *ctx, struct m0_conf_obj *svc_obj, enum m0_conf_service_type stype, struct m0_rpc_machine *rmach, const char *addr, uint32_t max_rpc_nr_in_flight)
struct m0_sm_group * lo_grp
Definition: locality.h:67
M0_INTERNAL void m0_reqh_service_ctx_unsubscribe(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:702
static bool reqh_service_context_invariant(const struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:686
static struct m0_tl rstypes
Definition: reqh_service.c:64
static bool reqh_service_ctx_rlink_cb(struct m0_clink *clink)
M0_INTERNAL void m0_rwlock_write_unlock(struct m0_rwlock *lock)
Definition: rwlock.c:47
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
M0_INTERNAL void m0_reqh_service_init(struct m0_reqh_service *service, struct m0_reqh *reqh, const struct m0_fid *fid)
Definition: reqh_service.c:428
#define M0_CONF_CAST(ptr, type)
Definition: obj.h:780
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL int m0_reqh_service_setup(struct m0_reqh_service **out, struct m0_reqh_service_type *stype, struct m0_reqh *reqh, struct m0_reqh_context *rctx, const struct m0_fid *fid)
Definition: reqh_service.c:565
static void reqh_service_failed_common(struct m0_reqh *reqh, struct m0_reqh_service *service, unsigned key)
Definition: reqh_service.c:262
struct m0_tl pc_svc_ctxs
Definition: pool.h:172
static struct fdmi_ctx ctx
Definition: main.c:80
#define FID_P(f)
Definition: fid.h:77
static const struct m0_sm_conf service_ctx_states_conf
Definition: reqh_service.c:680
static const struct m0_bob_type reqh_svc_ctx
Definition: reqh_service.c:91
int m0_reqh_service_async_start_simple(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.c:601
struct m0_mutex rs_mutex
Definition: reqh_service.h:249
static uint32_t timeout
Definition: console.c:52
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
uint32_t sd_flags
Definition: sm.h:378
struct m0_clink sc_rlink_wait
Definition: reqh_service.h:761
struct m0_tl pc_abandoned_svc_ctxs
Definition: pool.h:228
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
struct m0_reqh reqh
Definition: rm_foms.c:48
#define M0_MAGIX_OFFSET(type, field)
Definition: misc.h:356
static void reqh_service_ctx_sm_lock(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL void m0_reqh_service_types_fini(void)
Definition: reqh_service.c:530
struct m0_clink sc_process_event
Definition: reqh_service.h:783
static int reqh_service_ctx_state_wait(struct m0_reqh_service_ctx *ctx, int state)
Definition: reqh_service.c:801
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
Definition: fid.h:38
const struct m0_reqh_service_type * rs_type
Definition: reqh_service.h:227
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
M0_BOB_DEFINE(static, &rstypes_bob, m0_reqh_service_type)
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
M0_INTERNAL bool m0_chan_timedwait(struct m0_clink *link, const m0_time_t abs_timeout)
Definition: chan.c:349
void(* rso_prepare_to_stop)(struct m0_reqh_service *service)
Definition: reqh_service.h:371
struct m0_sm_group rh_sm_grp
Definition: reqh.h:107
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
const struct m0_sm_conf service_states_conf
Definition: reqh_service.c:133
M0_INTERNAL void m0_reqh_service_list_print(void)
Definition: reqh_service.c:504
static struct m0_net_test_service svc
Definition: service.c:34
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
struct m0_pools_common * rh_pools
Definition: reqh.h:118
struct m0_sm_ast sc_rlink_ast
Definition: reqh_service.h:762
M0_INTERNAL void m0_rwlock_read_lock(struct m0_rwlock *lock)
Definition: rwlock.c:52
struct m0_clink sc_svc_event
Definition: reqh_service.h:781
M0_INTERNAL void m0_rwlock_fini(struct m0_rwlock *lock)
Definition: rwlock.c:37
M0_INTERNAL void m0_reqh_idle_wait_for(struct m0_reqh *reqh, struct m0_reqh_service *service)
Definition: reqh.c:591
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
static void reqh_service_ctx_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: reqh_service.c:993
struct m0_fid rh_fid
Definition: reqh.h:177
M0_INTERNAL void m0_confc_close(struct m0_conf_obj *obj)
Definition: confc.c:921
static struct m0_bob_type rstypes_bob
Definition: reqh_service.c:88
#define M0_ASSERT_INFO(cond, fmt,...)
M0_INTERNAL void m0_reqh_service_started(struct m0_reqh_service *service)
Definition: reqh_service.c:311
M0_INTERNAL void m0_rwlock_read_unlock(struct m0_rwlock *lock)
Definition: rwlock.c:57
const struct m0_conf_obj_type M0_CONF_PROCESS_TYPE
Definition: process.c:161
Definition: nucleus.c:42
#define out(...)
Definition: gen.c:41
M0_INTERNAL void m0_reqh_service_disconnect(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:781
M0_INTERNAL bool m0_fid_is_valid(const struct m0_fid *fid)
Definition: fid.c:96
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
M0_INTERNAL void m0_reqh_service_ctx_fini(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:971
struct m0_reqh * rs_reqh
Definition: reqh_service.h:259
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
uint32_t sm_state
Definition: sm.h:307
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
M0_INTERNAL void m0_bob_type_tlist_init(struct m0_bob_type *bt, const struct m0_tl_descr *td)
Definition: bob.c:41
static bool service_type_is_valid(enum m0_conf_service_type t)
Definition: reqh_service.c:612
M0_INTERNAL void m0_reqh_service_ctxs_shutdown_prepare(struct m0_reqh *reqh)
Definition: reqh_service.c:892
int32_t rc
Definition: trigger_fop.h:47
int(* rso_start_async)(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.h:341
static struct m0_rwlock rstypes_rwlock
Definition: reqh_service.c:80
#define ARRAY_SIZE(a)
Definition: misc.h:45
#define m0_tl_exists(name, var, head,...)
Definition: tlist.h:774
const m0_time_t M0_TIME_IMMEDIATELY
Definition: time.c:107
#define FID_F
Definition: fid.h:75
#define CTX_STATE(ctx)
static void reqh_service_ctx_destroy_if_abandoned(struct m0_reqh_service_ctx *ctx)
M0_INTERNAL void m0_reqh_service_failed(struct m0_reqh_service *service)
Definition: reqh_service.c:326
Definition: idx_mock.c:47
const struct m0_reqh_service_ops * rs_ops
Definition: reqh_service.h:254
static void reqh_service_session_cancel(struct m0_reqh_service_ctx *ctx)
Definition: reqh_service.c:868
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL struct m0_reqh_service_ctx * m0_reqh_service_ctx_from_session(struct m0_rpc_session *session)
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331