Motr  M0
link.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
67 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_HA
68 #include "lib/trace.h"
69 
70 #include "ha/link.h"
71 
72 #include "lib/memory.h" /* M0_ALLOC_PTR */
73 #include "lib/errno.h" /* ENOMEM */
74 #include "lib/tlist.h" /* M0_TL_DESCR_DEFINE */
75 #include "lib/types.h" /* m0_uint128 */
76 #include "lib/misc.h" /* container_of */
77 #include "lib/time.h" /* m0_time_from_now */
78 
79 #include "sm/sm.h" /* m0_sm_state_descr */
80 #include "rpc/rpc.h" /* m0_rpc_reply_post */
81 #include "rpc/rpc_opcodes.h" /* M0_HA_LINK_OUTGOING_OPCODE */
82 
83 #include "fop/fom_generic.h" /* M0_FOPH_FINISH */
84 
85 #include "ha/link_fops.h" /* m0_ha_link_msg_fopt */
86 #include "ha/link_service.h" /* m0_ha_link_service_register */
87 
88 
89 enum {
116 };
117 
121  .sd_name = "M0_HA_LINK_STATE_INIT",
122  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_FINI,
124  },
126  .sd_flags = M0_SDF_TERMINAL,
127  .sd_name = "M0_HA_LINK_STATE_FINI",
128  .sd_allowed = 0,
129  },
131  .sd_flags = 0,
132  .sd_name = "M0_HA_LINK_STATE_START",
133  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_STOP,
135  },
137  .sd_flags = 0,
138  .sd_name = "M0_HA_LINK_STATE_STOP",
139  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_FINI),
140  },
142  .sd_flags = 0,
143  .sd_name = "M0_HA_LINK_STATE_IDLE",
144  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_STOP,
151  },
153  .sd_flags = 0,
154  .sd_name = "M0_HA_LINK_STATE_RECV",
155  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_IDLE),
156  },
158  .sd_flags = 0,
159  .sd_name = "M0_HA_LINK_STATE_DELIVERY",
160  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_IDLE),
161  },
163  .sd_flags = 0,
164  .sd_name = "M0_HA_LINK_STATE_RPC_FAILED",
165  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_IDLE),
166  },
168  .sd_flags = 0,
169  .sd_name = "M0_HA_LINK_STATE_LINK_FAILED",
170  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_IDLE),
171  },
173  .sd_flags = 0,
174  .sd_name = "M0_HA_LINK_STATE_LINK_REUSED",
175  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_IDLE),
176  },
178  .sd_flags = 0,
179  .sd_name = "M0_HA_LINK_STATE_DISCONNECTING",
180  .sd_allowed = M0_BITS(M0_HA_LINK_STATE_IDLE),
181  },
182 };
183 
185 
186 static struct m0_sm_conf ha_link_sm_conf = {
187  .scf_name = "m0_ha_link::hln_sm",
188  .scf_nr_states = ARRAY_SIZE(ha_link_sm_states),
189  .scf_state = ha_link_sm_states,
190 };
191 
193 extern const struct m0_fom_ops ha_link_outgoing_fom_ops;
194 
195 static void ha_link_outgoing_fom_wakeup(struct m0_ha_link *hl);
196 
197 static bool ha_link_rpc_wait_cb(struct m0_clink *clink)
198 {
199  struct m0_ha_link *hl;
200 
201  M0_ENTRY();
202  hl = container_of(clink, struct m0_ha_link, hln_rpc_wait);
203  m0_mutex_lock(&hl->hln_lock);
204  hl->hln_rpc_event_occurred = true;
207  M0_LEAVE();
208  return true;
209 }
210 
212 {
213  struct m0_ha_link *hl;
214 
215  M0_ENTRY();
217  m0_mutex_lock(&hl->hln_lock);
218  hl->hln_quiesced = true;
221  M0_LEAVE();
222  return true;
223 }
224 
225 M0_INTERNAL int m0_ha_link_init(struct m0_ha_link *hl,
226  struct m0_ha_link_cfg *hl_cfg)
227 {
228  int rc;
229 
230  M0_PRE(M0_IS0(hl));
231 
232  M0_ENTRY("hl=%p hlc_reqh=%p hlc_reqh_service=%p hlc_rpc_machine=%p",
233  hl, hl_cfg->hlc_reqh, hl_cfg->hlc_reqh_service,
234  hl_cfg->hlc_rpc_machine);
235  hl->hln_cfg = *hl_cfg;
236  m0_mutex_init(&hl->hln_lock);
241  hl->hln_cfg.hlc_reqh);
243  M0_ASSERT(rc == 0);
248  &hl->hln_sm_group);
255  hl->hln_rpc_wait.cl_is_oneshot = true;
256  hl->hln_reconnect = false;
257  hl->hln_reconnect_wait = false;
258  hl->hln_reconnect_cfg_is_set = false;
259  hl->hln_waking_up = false;
260  hl->hln_fom_is_stopping = false;
261  hl->hln_fom_enable_wakeup = true;
262  hl->hln_no_new_delivered = false;
263  hl->hln_req_fop_seq = 0;
264  hl->hln_backoff_rc = 0;
265  hl->hln_backoff_nr = 0;
266  return M0_RC(0);
267 }
268 
269 M0_INTERNAL void m0_ha_link_fini(struct m0_ha_link *hl)
270 {
271  M0_ENTRY("hl=%p", hl);
272  if (hl->hln_backoff_rc != 0) {
273  M0_LOG(M0_WARN, "hln_backoff_rc=%d hln_backoff_nr=%"PRIu64,
274  hl->hln_backoff_rc, hl->hln_backoff_nr);
275  }
284  m0_sm_fini(&hl->hln_sm);
292  m0_ha_lq_fini(&hl->hln_q_out);
293  m0_ha_lq_fini(&hl->hln_q_in);
294  m0_mutex_fini(&hl->hln_lock);
295  M0_LEAVE();
296 }
297 
299  const struct m0_ha_link_conn_cfg *src)
300 {
301  char *ep = m0_strdup(src->hlcc_rpc_endpoint);
302 
303  if (ep == NULL)
304  return M0_ERR_INFO(-ENOMEM, "%s", src->hlcc_rpc_endpoint);
305  *dst = *src;
306  dst->hlcc_rpc_endpoint = ep;
307  return M0_RC(0);
308 }
309 
310 static void ha_link_conn_cfg_free(struct m0_ha_link_conn_cfg *hl_conn_cfg)
311 {
312  /* This value is allocated in ha_link_conn_cfg_copy() */
313  m0_free((char *)hl_conn_cfg->hlcc_rpc_endpoint);
314 }
315 
316 static void ha_link_tags_apply(struct m0_ha_link *hl,
317  const struct m0_ha_link_params *lp)
318 {
319  const struct m0_ha_link_tags *tags_in_new = &lp->hlp_tags_remote;
320  const struct m0_ha_link_tags *tags_out_new = &lp->hlp_tags_local;
321  struct m0_ha_link_tags tags_in;
322  struct m0_ha_link_tags tags_out;
323  bool done;
324 
326 
327  m0_ha_lq_tags_get(&hl->hln_q_out, &tags_out);
328  m0_ha_lq_tags_get(&hl->hln_q_in, &tags_in);
329  M0_LOG(M0_DEBUG, "hl=%p out="HLTAGS_F, hl, HLTAGS_P(&tags_out));
330  M0_LOG(M0_DEBUG, "hl=%p new out="HLTAGS_F, hl, HLTAGS_P(tags_out_new));
331  M0_LOG(M0_DEBUG, "hl=%p in="HLTAGS_F, hl, HLTAGS_P(&tags_in));
332  M0_LOG(M0_DEBUG, "hl=%p new in="HLTAGS_F, hl, HLTAGS_P(tags_in_new));
333 
334  /* Move 'next' tag to 'delivered'. Leave all other tags intact */
335  while (m0_ha_lq_tag_next(&hl->hln_q_out) >
338  M0_ASSERT(done);
339  }
342  "m0_ha_lq_tag_next(&hl->hln_q_out)=%" PRIu64 " "
343  "m0_ha_lq_tag_delivered(&hl->hln_q_out)=%"PRIu64,
346 
347  m0_ha_lq_tags_get(&hl->hln_q_out, &tags_out);
349  tags_out_new->hlt_confirmed,
350  "m0_ha_lq_tag_delivered(&hl->hln_q_out)=%" PRIu64 " "
351  "tags_out_new->hlt_confirmed=%"PRIu64,
353  tags_out_new->hlt_confirmed);
355  tags_out_new->hlt_delivered,
356  "m0_ha_lq_tag_delivered(&hl->hln_q_out)=%" PRIu64 " "
357  "tags_out_new->hlt_delivered=%"PRIu64,
359  tags_out_new->hlt_delivered);
361  tags_out_new->hlt_next,
362  "m0_ha_lq_tag_next(&hl->hln_q_out)=%" PRIu64 " "
363  "tags_out_new->hlt_next=%"PRIu64,
365  tags_out_new->hlt_next);
367  tags_out_new->hlt_assign,
368  "m0_ha_lq_tag_assign(&hl->hln_q_out)=%" PRIu64 " "
369  "tags_out_new->hlt_assign=%"PRIu64,
371  tags_out_new->hlt_assign);
372  M0_ASSERT_INFO(m0_ha_link_tags_eq(&tags_in, tags_in_new),
373  "tags_in="HLTAGS_F" tags_in_new="HLTAGS_F,
374  HLTAGS_P(&tags_in), HLTAGS_P(tags_in_new));
375 
376  m0_ha_lq_tags_get(&hl->hln_q_out, &tags_out);
377  m0_ha_lq_tags_get(&hl->hln_q_in, &tags_in);
378  M0_LEAVE("hl=%p out="HLTAGS_F" in="HLTAGS_F,
379  hl, HLTAGS_P(&tags_out), HLTAGS_P(&tags_in));
380 }
381 
382 M0_INTERNAL void m0_ha_link_start(struct m0_ha_link *hl,
383  struct m0_ha_link_conn_cfg *hl_conn_cfg)
384 {
385  struct m0_ha_link_params *lp;
386  int rc;
387 
388  M0_ENTRY("hl=%p hlp_id_local="U128X_F" hlp_id_remote="U128X_F" "
389  "hlp_id_connection="U128X_F, hl,
390  U128_P(&hl_conn_cfg->hlcc_params.hlp_id_local),
391  U128_P(&hl_conn_cfg->hlcc_params.hlp_id_remote),
392  U128_P(&hl_conn_cfg->hlcc_params.hlp_id_connection));
393  M0_LOG(M0_DEBUG, "hlcc_rpc_service_fid="FID_F" "
394  "hlcc_rpc_endpoint=%s hlcc_max_rpcs_in_flight=%"PRIu64,
395  FID_P(&hl_conn_cfg->hlcc_rpc_service_fid),
396  (const char *)hl_conn_cfg->hlcc_rpc_endpoint,
397  hl_conn_cfg->hlcc_max_rpcs_in_flight);
398  rc = ha_link_conn_cfg_copy(&hl->hln_conn_cfg, hl_conn_cfg);
399  M0_ASSERT(rc == 0); /* XXX */
400  m0_mutex_lock(&hl->hln_lock);
401  lp = &hl->hln_conn_cfg.hlcc_params;
404  hl->hln_cb_disconnecting = false;
405  hl->hln_cb_reused = false;
407  m0_fom_queue(&hl->hln_fom);
409  M0_LEAVE();
410 }
411 
412 M0_INTERNAL void m0_ha_link_stop(struct m0_ha_link *hl, struct m0_clink *clink)
413 {
414  M0_ENTRY("hl=%p", hl);
419  M0_LEAVE();
420 }
421 
422 M0_INTERNAL void m0_ha_link_reconnect_begin(struct m0_ha_link *hl,
423  struct m0_ha_link_params *lp)
424 {
425  m0_mutex_lock(&hl->hln_lock);
426  *lp = hl->hln_conn_cfg.hlcc_params;
430  /* TODO refactor it somehow */
431  M0_LOG(M0_DEBUG, "hl=%p id_local="U128X_F" id_remote="U128X_F" "
432  "id_connection="U128X_F, hl, U128_P(&lp->hlp_id_local),
434  M0_LOG(M0_DEBUG, "hl=%p hlp_tags_local="HLTAGS_F,
435  hl, HLTAGS_P(&lp->hlp_tags_local));
436  M0_LOG(M0_DEBUG, "hl=%p hlp_tags_remote="HLTAGS_F,
437  hl, HLTAGS_P(&lp->hlp_tags_remote));
438 }
439 
440 M0_INTERNAL void
442  const struct m0_ha_link_conn_cfg *hl_conn_cfg)
443 {
444  const struct m0_ha_link_params *lp = &hl_conn_cfg->hlcc_params;
445  int rc;
446 
447  /* TODO refactor the M0_LOG of hl_conn_cfg */
448  M0_ENTRY("hl=%p hlcc_rpc_service_fid="FID_F" hlcc_rpc_endpoint=%s",
449  hl, FID_P(&hl_conn_cfg->hlcc_rpc_service_fid),
450  hl_conn_cfg->hlcc_rpc_endpoint);
451  M0_ENTRY("hl=%p hlcc_max_rpcs_in_flight=%" PRIu64 " "
452  "hlcc_connect_timeout=%" PRIu64 " "
453  "hlcc_disconnect_timeout=%" PRIu64 " "
454  "hlcc_resend_interval=%" PRIu64 " "
455  "hlcc_nr_sent_max=%"PRIu64,
456  hl, hl_conn_cfg->hlcc_max_rpcs_in_flight,
457  hl_conn_cfg->hlcc_connect_timeout,
458  hl_conn_cfg->hlcc_disconnect_timeout,
459  hl_conn_cfg->hlcc_resend_interval,
460  hl_conn_cfg->hlcc_nr_sent_max);
461  M0_LOG(M0_DEBUG, "hl=%p id_local="U128X_F" id_remote="U128X_F" "
462  "id_connection="U128X_F, hl, U128_P(&lp->hlp_id_local),
464  M0_LOG(M0_DEBUG, "hl=%p hlp_tags_local="HLTAGS_F,
465  hl, HLTAGS_P(&lp->hlp_tags_local));
466  M0_LOG(M0_DEBUG, "hl=%p hlp_tags_remote="HLTAGS_F,
467  hl, HLTAGS_P(&lp->hlp_tags_remote));
468  m0_mutex_lock(&hl->hln_lock);
469  if (hl->hln_reconnect_cfg_is_set)
471  rc = ha_link_conn_cfg_copy(&hl->hln_conn_reconnect_cfg, hl_conn_cfg);
472  M0_ASSERT(rc == 0); /* XXX */
473  hl->hln_reconnect_cfg_is_set = true;
474  hl->hln_reconnect = true;
476  M0_LEAVE("hl=%p", hl);
477 }
478 
479 M0_INTERNAL void m0_ha_link_reconnect_cancel(struct m0_ha_link *hl)
480 {
481  M0_ENTRY("hl=%p", hl);
482  M0_LEAVE();
483 }
484 
485 M0_INTERNAL void
487  struct m0_ha_link_params *lp_alive_new,
488  struct m0_ha_link_params *lp_dead_new,
489  const struct m0_uint128 *id_alive,
490  const struct m0_uint128 *id_dead,
491  const struct m0_uint128 *id_connection)
492 {
493  const struct m0_ha_link_tags *tags_local = &lp_alive->hlp_tags_local;
494  const struct m0_ha_link_tags *tags_remote = &lp_alive->hlp_tags_remote;
495 
496  *lp_alive_new = (struct m0_ha_link_params){
497  .hlp_id_local = *id_alive,
498  .hlp_id_remote = *id_dead,
499  .hlp_id_connection = *id_connection,
500  .hlp_tags_local = {
501  .hlt_confirmed = tags_local->hlt_confirmed,
502  .hlt_delivered = tags_local->hlt_delivered,
503  .hlt_next = tags_local->hlt_delivered,
504  .hlt_assign = tags_local->hlt_assign,
505  },
506  .hlp_tags_remote = {
507  .hlt_confirmed = tags_remote->hlt_confirmed,
508  .hlt_delivered = tags_remote->hlt_delivered,
509  .hlt_next = tags_remote->hlt_next,
510  .hlt_assign = tags_remote->hlt_assign,
511  },
512  };
513  *lp_dead_new = (struct m0_ha_link_params){
514  .hlp_id_local = *id_dead,
515  .hlp_id_remote = *id_alive,
516  .hlp_id_connection = *id_connection,
517  .hlp_tags_local = {
518  .hlt_confirmed = tags_remote->hlt_delivered,
519  .hlt_delivered = tags_remote->hlt_delivered,
520  .hlt_next = tags_remote->hlt_next,
521  .hlt_assign = tags_remote->hlt_next,
522  },
523  .hlp_tags_remote = {
524  .hlt_confirmed = tags_local->hlt_delivered,
525  .hlt_delivered = tags_local->hlt_delivered,
526  .hlt_next = tags_local->hlt_delivered,
527  .hlt_assign = tags_local->hlt_delivered,
528  },
529  };
530 }
531 
532 M0_INTERNAL struct m0_chan *m0_ha_link_chan(struct m0_ha_link *hl)
533 {
534  return &hl->hln_sm.sm_chan;
535 }
536 
538 {
540  return hl->hln_sm.sm_state;
541 }
542 
543 M0_INTERNAL const char *m0_ha_link_state_name(enum m0_ha_link_state state)
544 {
545  M0_PRE(_0C(state >= M0_HA_LINK_STATE_INIT) &&
546  _0C(state < M0_HA_LINK_STATE_NR));
547  return ha_link_sm_states[state].sd_name;
548 }
549 
554 };
555 
556 M0_INTERNAL void m0_ha_link_send(struct m0_ha_link *hl,
557  const struct m0_ha_msg *msg,
558  uint64_t *tag)
559 {
560  M0_ENTRY("hl=%p msg=%p", hl, msg);
561  m0_mutex_lock(&hl->hln_lock);
562  *tag = m0_ha_lq_enqueue(&hl->hln_q_out, msg);
565  M0_LEAVE("hl=%p msg=%p tag=%"PRIu64, hl, msg, *tag);
566 }
567 
568 M0_INTERNAL struct m0_ha_msg *m0_ha_link_recv(struct m0_ha_link *hl,
569  uint64_t *tag)
570 {
571  struct m0_ha_msg *msg;
572 
573  m0_mutex_lock(&hl->hln_lock);
574  msg = m0_ha_lq_next(&hl->hln_q_in);
575  if (msg != NULL)
576  *tag = m0_ha_msg_tag(msg);
578 
579  M0_LOG(M0_DEBUG, "hl=%p msg=%p tag=%"PRIu64,
580  hl, msg, msg == NULL ? M0_HA_MSG_TAG_UNKNOWN : *tag);
581  return msg;
582 }
583 
584 M0_INTERNAL void m0_ha_link_delivered(struct m0_ha_link *hl,
585  struct m0_ha_msg *msg)
586 {
587  M0_ENTRY("hl=%p msg=%p tag=%"PRIu64, hl, msg, m0_ha_msg_tag(msg));
588  m0_mutex_lock(&hl->hln_lock);
592  M0_LEAVE("hl=%p msg=%p tag=%"PRIu64, hl, msg, m0_ha_msg_tag(msg));
593 }
594 
595 M0_INTERNAL bool m0_ha_link_msg_is_delivered(struct m0_ha_link *hl,
596  uint64_t tag)
597 {
598  bool delivered;
599 
600  m0_mutex_lock(&hl->hln_lock);
601  delivered = m0_ha_lq_is_delivered(&hl->hln_q_out, tag);
603  return delivered;
604 }
605 
606 M0_INTERNAL uint64_t m0_ha_link_delivered_consume(struct m0_ha_link *hl)
607 {
608  uint64_t tag;
609 
610  m0_mutex_lock(&hl->hln_lock);
614  M0_LOG(M0_DEBUG, "hl=%p tag=%"PRIu64, hl, tag);
615  return tag;
616 }
617 
618 M0_INTERNAL uint64_t m0_ha_link_not_delivered_consume(struct m0_ha_link *hl)
619 {
620  uint64_t tag;
621 
622  m0_mutex_lock(&hl->hln_lock);
626  M0_LOG(M0_DEBUG, "hl=%p tag=%"PRIu64, hl, tag);
627  return tag;
628 }
629 
632  uint64_t hwc_tag;
636 };
637 
638 static void ha_link_wait(struct ha_link_wait_ctx *wait_ctx,
639  bool (*check)(struct m0_clink *clink))
640 {
641  int rc;
642 
643  m0_clink_init(&wait_ctx->hwc_clink, check);
644  rc = m0_semaphore_init(&wait_ctx->hwc_sem, 0);
645  M0_ASSERT(rc == 0); /* XXX */
647  &wait_ctx->hwc_clink);
648  check(&wait_ctx->hwc_clink);
649  m0_semaphore_down(&wait_ctx->hwc_sem);
650  m0_clink_del_lock(&wait_ctx->hwc_clink);
651  m0_semaphore_fini(&wait_ctx->hwc_sem);
652  m0_clink_fini(&wait_ctx->hwc_clink);
653 }
654 
656 {
657  struct ha_link_wait_ctx *wait_ctx;
658 
659  /* XXX bob_of */
660  wait_ctx = container_of(clink, struct ha_link_wait_ctx, hwc_clink);
661  if (!wait_ctx->hwc_check_disable &&
662  m0_ha_link_msg_is_delivered(wait_ctx->hwc_hl, wait_ctx->hwc_tag)) {
663  wait_ctx->hwc_check_disable = true;
664  m0_semaphore_up(&wait_ctx->hwc_sem);
665  }
666  return false;
667 }
668 
669 M0_INTERNAL void m0_ha_link_wait_delivery(struct m0_ha_link *hl, uint64_t tag)
670 {
671  bool delivered;
672  struct ha_link_wait_ctx wait_ctx = {
673  .hwc_hl = hl,
674  .hwc_tag = tag,
675  .hwc_check_disable = false,
676  };
677 
678  M0_ENTRY("hl=%p tag=%"PRIu64, hl, tag);
680  delivered = m0_ha_link_msg_is_delivered(hl, tag);
681  M0_ASSERT(delivered);
682  M0_LEAVE("hl=%p tag=%"PRIu64, hl, tag);
683 }
684 
686 {
687  struct ha_link_wait_ctx *wait_ctx;
688  bool arrived;
689  struct m0_ha_link *hl;
690 
691  /* XXX bob_of */
692  wait_ctx = container_of(clink, struct ha_link_wait_ctx, hwc_clink);
693  hl = wait_ctx->hwc_hl;
694  M0_ENTRY("hl=%p", hl);
695  if (!wait_ctx->hwc_check_disable) {
696  m0_mutex_lock(&hl->hln_lock);
697  arrived = m0_ha_lq_has_next(&hl->hln_q_in);
699  if (arrived) {
700  wait_ctx->hwc_check_disable = true;
701  m0_semaphore_up(&wait_ctx->hwc_sem);
702  }
703  M0_LOG(M0_DEBUG, "hl=%p arrived=%d", hl, !!arrived);
704  }
705  M0_LEAVE("hl=%p", hl);
706  return false;
707 }
708 
709 M0_INTERNAL void m0_ha_link_wait_arrival(struct m0_ha_link *hl)
710 {
711  bool arrived;
712  struct ha_link_wait_ctx wait_ctx = {
713  .hwc_hl = hl,
714  .hwc_check_disable = false,
715  };
716 
717  M0_ENTRY("hl=%p", hl);
719  m0_mutex_lock(&hl->hln_lock);
720  arrived = m0_ha_lq_has_next(&hl->hln_q_in);
722  M0_ASSERT(arrived);
723  M0_LEAVE("hl=%p", hl);
724 }
725 
726 
728 {
729  struct ha_link_wait_ctx *wait_ctx;
730  bool delivered_notified;
731  struct m0_ha_link *hl;
732 
733  /* XXX bob_of */
734  wait_ctx = container_of(clink, struct ha_link_wait_ctx, hwc_clink);
735  hl = wait_ctx->hwc_hl;
736  M0_ENTRY("hl=%p", hl);
737  if (!wait_ctx->hwc_check_disable) {
738  m0_mutex_lock(&hl->hln_lock);
739  delivered_notified = wait_ctx->hwc_tag <
742  if (delivered_notified) {
743  wait_ctx->hwc_check_disable = true;
744  m0_semaphore_up(&wait_ctx->hwc_sem);
745  }
746  M0_LOG(M0_DEBUG, "hl=%p delivered_notified=%d",
747  hl, !!delivered_notified);
748  }
749  M0_LEAVE("hl=%p", hl);
750  return false;
751 }
752 
753 M0_INTERNAL void m0_ha_link_wait_confirmation(struct m0_ha_link *hl,
754  uint64_t tag)
755 {
756  struct ha_link_wait_ctx wait_ctx = {
757  .hwc_hl = hl,
758  .hwc_tag = tag,
759  .hwc_check_disable = false,
760  };
761 
762  M0_ENTRY("hl=%p tag=%"PRIu64, hl, tag);
764  M0_LEAVE("hl=%p", hl);
765 }
766 
767 M0_INTERNAL void m0_ha_link_flush(struct m0_ha_link *hl)
768 {
769  uint64_t tag_out_assign;
770  uint64_t tag_in_assign;
771 
772  M0_ENTRY("hl=%p", hl);
773 
774  m0_mutex_lock(&hl->hln_lock);
775  tag_out_assign = m0_ha_lq_tag_assign(&hl->hln_q_out);
776  tag_in_assign = m0_ha_lq_tag_assign(&hl->hln_q_in);
778 
779  if (!M0_IN(tag_out_assign, (1, 2)))
780  m0_ha_link_wait_delivery(hl, tag_out_assign - 2);
781  if (!M0_IN(tag_in_assign, (1, 2)))
782  m0_ha_link_wait_confirmation(hl, tag_in_assign - 2);
783  M0_LEAVE("hl=%p tag_out_assign=%" PRIu64 " tag_in_assign=%"PRIu64,
784  hl, tag_out_assign, tag_in_assign);
785 }
786 
787 M0_INTERNAL void m0_ha_link_cb_disconnecting(struct m0_ha_link *hl)
788 {
789  M0_ENTRY("hl=%p", hl);
790  m0_mutex_lock(&hl->hln_lock);
792  hl->hln_cb_disconnecting = true;
795  M0_LEAVE("hl=%p", hl);
796 }
797 
798 M0_INTERNAL void m0_ha_link_cb_reused(struct m0_ha_link *hl)
799 {
800  M0_ENTRY("hl=%p", hl);
801  m0_mutex_lock(&hl->hln_lock);
802  M0_ASSERT(!hl->hln_cb_reused);
803  hl->hln_cb_reused = true;
806  M0_LEAVE("hl=%p", hl);
807 }
808 
809 static void ha_link_tags_update(struct m0_ha_link *hl,
810  uint64_t out_next,
811  uint64_t in_delivered)
812 {
813  struct m0_ha_link_tags tags_out;
814  struct m0_ha_link_tags tags_in;
815  uint64_t delivered;
816 
817  M0_ENTRY("hl=%p out_next=%" PRIu64 " in_delivered=%"PRIu64,
818  hl, out_next, in_delivered);
820  m0_ha_lq_tags_get(&hl->hln_q_out, &tags_out);
821  m0_ha_lq_tags_get(&hl->hln_q_in, &tags_in);
822  M0_LOG(M0_DEBUG, "hl=%p out="HLTAGS_F, hl, HLTAGS_P(&tags_out));
823  M0_LOG(M0_DEBUG, "hl=%p in="HLTAGS_F, hl, HLTAGS_P(&tags_in));
824 
825  while (m0_ha_lq_tag_next(&hl->hln_q_out) < in_delivered)
826  (void)m0_ha_lq_next(&hl->hln_q_out);
827 
828  delivered = m0_ha_lq_tag_delivered(&hl->hln_q_out);
829  while (delivered < in_delivered) {
830  m0_ha_lq_mark_delivered(&hl->hln_q_out, delivered);
831  delivered += 2;
832  }
833 
834  m0_ha_lq_tags_get(&hl->hln_q_out, &tags_out);
835  m0_ha_lq_tags_get(&hl->hln_q_in, &tags_in);
836  M0_LOG(M0_DEBUG, "hl=%p out="HLTAGS_F, hl, HLTAGS_P(&tags_out));
837  M0_LOG(M0_DEBUG, "hl=%p in="HLTAGS_F, hl, HLTAGS_P(&tags_in));
838 }
839 
840 static void ha_link_tags_in_out(struct m0_ha_link *hl,
841  uint64_t *out_next,
842  uint64_t *in_delivered)
843 {
845 
846  *out_next = m0_ha_lq_tag_next(&hl->hln_q_out);
847  *in_delivered = m0_ha_lq_tag_delivered(&hl->hln_q_in);
848  M0_LOG(M0_DEBUG, "out_next=%" PRIu64 " in_delivered=%"PRIu64,
849  *out_next, *in_delivered);
850 }
851 
852 static void ha_link_msg_received(struct m0_ha_link *hl,
853  const struct m0_ha_msg *msg)
854 {
855  M0_ENTRY("hl=%p tag=%" PRIu64 " type=%d",
856  hl, m0_ha_msg_tag(msg), m0_ha_msg_type_get(msg));
858  if (!M0_IN(m0_ha_msg_tag(msg), (M0_HA_MSG_TAG_UNKNOWN,
859  m0_ha_lq_tag_assign(&hl->hln_q_in)))) {
860  M0_LOG(M0_WARN, "dropping out-of-order message: hl=%p "
861  "tag=%" PRIu64 " hed_type=%d",
862  hl, m0_ha_msg_tag(msg), m0_ha_msg_type_get(msg));
863  } else {
864  m0_ha_lq_enqueue(&hl->hln_q_in, msg);
865  }
866 }
867 
869 {
870  uint64_t in_next;
871  uint64_t out_confirmed;
872  uint64_t tag_recv;
873  uint64_t tag_delivery;
874 
875  m0_mutex_lock(&hl->hln_lock);
876  in_next = m0_ha_lq_tag_next(&hl->hln_q_in);
877  tag_recv = in_next > 2 ? in_next :
878  in_next == m0_ha_lq_tag_assign(&hl->hln_q_in) ? 0 : in_next;
879  out_confirmed = m0_ha_lq_tag_confirmed(&hl->hln_q_out);
880  tag_delivery = out_confirmed > 2 ? out_confirmed :
881  out_confirmed == m0_ha_lq_tag_delivered(&hl->hln_q_out) ?
882  0 : out_confirmed;
884 
886  M0_LOG(M0_DEBUG, "hln_tag_broadcast_recv=%" PRIu64 " tag_recv=%"PRIu64,
887  hl->hln_tag_broadcast_recv, tag_recv);
888  M0_LOG(M0_DEBUG, "hln_tag_broadcast_delivery=%" PRIu64 " "
889  "tag_delivery=%"PRIu64,
890  hl->hln_tag_broadcast_delivery, tag_delivery);
891  if (hl->hln_tag_broadcast_recv <= tag_recv) {
894  hl->hln_tag_broadcast_recv = tag_recv;
895  }
896  if (hl->hln_tag_broadcast_delivery <= tag_delivery) {
899  hl->hln_tag_broadcast_delivery = tag_delivery;
900  }
902 }
903 
906  struct m0_fom hli_fom;
907 };
908 
909 static struct ha_link_incoming_fom *
911 {
912  /* TODO bob_of */
913  return container_of(fom, struct ha_link_incoming_fom, hli_fom);
914 }
915 
917 {
918  struct m0_ha_link_msg_fop *req_fop;
920  struct ha_link_incoming_fom *hli;
921  struct m0_ha_link *hl;
922  struct m0_uint128 id_connection;
923  struct m0_ha_msg *msg;
924  const char *ep;
925 
926  req_fop = m0_fop_data(fom->fo_fop);
927  rep_fop = m0_fop_data(fom->fo_rep_fop);
931  M0_ENTRY("fom=%p req_fop=%p rep_fop=%p ep=%s",
932  fom, req_fop, rep_fop, ep);
933  M0_LOG(M0_DEBUG, "ep=%p lmf_msg_nr=%" PRIu64 " lmf_id_remote="U128X_F" "
934  "lmf_id_local="U128X_F" lmf_id_connection="U128X_F,
935  ep, req_fop->lmf_msg_nr, U128_P(&req_fop->lmf_id_remote),
936  U128_P(&req_fop->lmf_id_local),
937  U128_P(&req_fop->lmf_id_connection));
938 
939  hl = m0_ha_link_service_find_get(fom->fo_service,
940  &req_fop->lmf_id_remote,
941  &id_connection);
942  hli->hli_hl = hl;
943  M0_LOG(M0_DEBUG, "fom=%p hl=%p", fom, hl);
944  if (req_fop->lmf_msg_nr != 0) {
945  msg = &req_fop->lmf_msg;
946  M0_LOG(M0_DEBUG, "ep=%s lmf_id_remote="U128X_F" "
947  "hm_fid="FID_F" hed_type=%d tag=%"PRIu64,
948  ep, U128_P(&req_fop->lmf_id_remote), FID_P(&msg->hm_fid),
949  m0_ha_msg_type_get(msg), m0_ha_msg_tag(msg));
950  }
951  if (hl == NULL) {
952  /*
953  * The first M0_LOG() parameter must be a compile-time
954  * constant.
955  */
956  if (req_fop->lmf_seq <= HA_LINK_SUPPRESS_START_NR) {
957  M0_LOG(M0_DEBUG, "no such link: ep=%s "
958  "lmf_id_remote="U128X_F" lmf_seq=%"PRIu64,
959  ep, U128_P(&req_fop->lmf_id_remote),
960  req_fop->lmf_seq);
961  } else {
962  M0_LOG(M0_WARN, "no such link: ep=%s "
963  "lmf_id_remote="U128X_F" lmf_seq=%"PRIu64,
964  ep, U128_P(&req_fop->lmf_id_remote),
965  req_fop->lmf_seq);
966  }
967  rep_fop->lmr_rc = -ENOLINK;
968  } else if (!m0_uint128_eq(&id_connection,
969  &req_fop->lmf_id_connection)) {
970  /* link exists but connection ID doesn't match */
971  M0_LOG(M0_WARN, "connection ID doesn't match: "
972  "id_connection="U128X_F" lmf_id_connection="U128X_F,
973  U128_P(&id_connection),
974  U128_P(&req_fop->lmf_id_connection));
975  rep_fop->lmr_rc = -EBADSLT;
976  } else {
977  m0_mutex_lock(&hl->hln_lock);
978  if (req_fop->lmf_msg_nr != 0)
979  ha_link_msg_received(hl, &req_fop->lmf_msg);
980  if (!hl->hln_no_new_delivered) {
981  ha_link_tags_update(hl, req_fop->lmf_out_next,
982  req_fop->lmf_in_delivered);
983  }
984  ha_link_tags_in_out(hl, &rep_fop->lmr_out_next,
985  &rep_fop->lmr_in_delivered);
988  rep_fop->lmr_rc = 0;
989  }
990  M0_LOG(M0_DEBUG, "hl=%p lmr_rc=%"PRIu32, hl, rep_fop->lmr_rc);
991 
993  m0_fop_to_rpc_item(fom->fo_rep_fop));
995  return M0_FSO_WAIT;
996 }
997 
999 {
1001 
1002  m0_fom_fini(fom);
1003  if (hli->hli_hl != NULL)
1005  m0_free(hli);
1006 }
1007 
1008 static size_t ha_link_incoming_fom_locality(const struct m0_fom *fom)
1009 {
1010  return 0;
1011 }
1012 
1015  .fo_fini = &ha_link_incoming_fom_fini,
1016  .fo_home_locality = &ha_link_incoming_fom_locality,
1017 };
1018 
1020  struct m0_fom **m,
1021  struct m0_reqh *reqh)
1022 {
1023  struct ha_link_incoming_fom *hli;
1024  struct m0_fom *fom;
1025  struct m0_ha_link_msg_rep_fop *reply;
1026 
1027  M0_PRE(fop != NULL);
1028  M0_PRE(m != NULL);
1029 
1030  M0_ALLOC_PTR(hli);
1031  if (hli == NULL)
1032  return M0_ERR(-ENOMEM);
1033  fom = &hli->hli_fom;
1034 
1036  if (reply == NULL) {
1037  m0_free(hli);
1038  return M0_ERR(-ENOMEM);
1039  }
1040 
1041  fom->fo_rep_fop = m0_fop_alloc(&m0_ha_link_msg_rep_fopt,
1043  if (fom->fo_rep_fop == NULL) {
1044  m0_free(reply);
1045  m0_free(hli);
1046  return M0_ERR(-ENOMEM);
1047  }
1048 
1050  fop, fom->fo_rep_fop, reqh);
1051 
1052  *m = fom;
1053  return M0_RC(0);
1054 }
1055 
1058 };
1059 
1079 };
1080 
1081 static struct m0_sm_state_descr
1083 #define _ST(name, flags, allowed) \
1084  [name] = { \
1085  .sd_flags = flags, \
1086  .sd_name = #name, \
1087  .sd_allowed = allowed \
1088  }
1124 #undef _ST
1125 };
1126 
1127 const static struct m0_sm_conf ha_link_outgoing_fom_conf = {
1128  .scf_name = "ha_link_outgoing_fom",
1129  .scf_nr_states = ARRAY_SIZE(ha_link_outgoing_fom_states),
1130  .scf_state = ha_link_outgoing_fom_states,
1131 };
1132 
1134 {
1135  struct m0_ha_link *hl;
1136 
1137  /* XXX bob_of */
1138  hl = container_of(container_of(item, struct m0_fop, f_item),
1139  struct m0_ha_link, hln_outgoing_fop);
1140  M0_ENTRY("hl=%p item=%p", hl, item);
1141  M0_LEAVE();
1142 }
1143 
1145 {
1146  struct m0_ha_link *hl;
1147  int rc;
1148 
1149  /* XXX bob_of */
1150  hl = container_of(container_of(item, struct m0_fop, f_item),
1151  struct m0_ha_link, hln_outgoing_fop);
1152  M0_ENTRY("hl=%p item=%p ri_error=%"PRIi32, hl, item, item->ri_error);
1153  m0_mutex_lock(&hl->hln_lock);
1154  M0_ASSERT(!hl->hln_replied);
1155  hl->hln_replied = true;
1156  hl->hln_rpc_rc = item->ri_error ?:
1158  rc = hl->hln_rpc_rc;
1159  m0_mutex_unlock(&hl->hln_lock);
1161  M0_LEAVE("hl=%p item=%p rc=%d", hl, item, rc);
1162 }
1163 
1166  .rio_replied = &ha_link_outgoing_item_replied,
1167 };
1168 
1169 static void ha_link_outgoing_fop_release(struct m0_ref *ref)
1170 {
1171  struct m0_ha_link *hl;
1172  struct m0_fop *fop = container_of(ref, struct m0_fop, f_ref);
1173 
1174  /* XXX bob_of */
1175  hl = container_of(fop, struct m0_ha_link, hln_outgoing_fop);
1176  M0_ENTRY("hl=%p fop=%p", hl, fop);
1177  fop->f_data.fd_data = NULL;
1178  m0_fop_fini(fop);
1179  m0_mutex_lock(&hl->hln_lock);
1180  M0_ASSERT(!hl->hln_released);
1181  hl->hln_released = true;
1182  m0_mutex_unlock(&hl->hln_lock);
1184  M0_LEAVE();
1185 }
1186 
1188 {
1189  struct m0_ha_link_msg_fop *req_fop = &hl->hln_req_fop_data;
1190  struct m0_ha_link_params *params;
1191  struct m0_rpc_item *item;
1192 
1193  M0_ENTRY("hl=%p", hl);
1194  M0_SET0(&hl->hln_outgoing_fop);
1195  M0_SET0(req_fop);
1197  req_fop, &ha_link_outgoing_fop_release);
1198 
1199  if (hl->hln_msg_to_send == NULL) {
1200  req_fop->lmf_msg_nr = 0;
1201  } else {
1202  req_fop->lmf_msg_nr = 1;
1203  req_fop->lmf_msg = *hl->hln_msg_to_send;
1204  }
1205  m0_mutex_lock(&hl->hln_lock);
1206  /* TODO use designated initialiser after m0_ha_msg become small */
1208  req_fop->lmf_id_local = params->hlp_id_local;
1209  req_fop->lmf_id_remote = params->hlp_id_remote;
1210  req_fop->lmf_id_connection = params->hlp_id_connection;
1211  req_fop->lmf_seq = ++hl->hln_req_fop_seq;
1212  ha_link_tags_in_out(hl, &req_fop->lmf_out_next,
1213  &req_fop->lmf_in_delivered);
1214  M0_LOG(M0_DEBUG, "lmf_id_remote="U128X_F" lmf_id_local="U128X_F" "
1215  "lmf_id_connection="U128X_F" lmf_msg_nr=%" PRIu64 " tag=%" PRIu64 " "
1216  "lmf_seq=%"PRIu64,
1217  U128_P(&req_fop->lmf_id_remote),
1218  U128_P(&req_fop->lmf_id_local),
1219  U128_P(&req_fop->lmf_id_connection),
1220  req_fop->lmf_msg_nr,
1221  hl->hln_msg_to_send == NULL ?
1223  req_fop->lmf_seq);
1224  m0_mutex_unlock(&hl->hln_lock);
1232  m0_rpc_post(item);
1233  M0_LEAVE("hl=%p", hl);
1234  return 0;
1235 }
1236 
1237 static bool ha_link_backoff_check(struct m0_ha_link *hl,
1238  int rc,
1239  uint64_t *nr,
1240  int *old_rc,
1241  uint64_t *old_nr)
1242 {
1243  *old_nr = hl->hln_backoff_nr;
1244  *old_rc = hl->hln_backoff_rc;
1245  if (hl->hln_backoff_rc == rc || hl->hln_backoff_nr == 0) {
1246  *nr = ++hl->hln_backoff_nr;
1247  return rc != 0 && m0_is_po2(hl->hln_backoff_nr);
1248  }
1249  hl->hln_backoff_nr = 1;
1250  hl->hln_backoff_rc = rc;
1251  *nr = 1;
1252  return true;
1253 }
1254 
1256 {
1258  struct m0_ha_link_tags tags;
1259  struct m0_rpc_item *req_item;
1260  uint64_t old_nr;
1261  uint64_t nr;
1262  int old_rc;
1263  int rc;
1264 
1266 
1267  rc = hl->hln_rpc_rc;
1268  if (rc == 0) {
1269  req_item = m0_fop_to_rpc_item(&hl->hln_outgoing_fop);
1271  rc = rep_fop->lmr_rc;
1272  M0_LOG(M0_DEBUG, "lmr_rc=%"PRIi32, rep_fop->lmr_rc);
1273  if (rc == 0)
1274  ha_link_tags_update(hl, rep_fop->lmr_out_next,
1275  rep_fop->lmr_in_delivered);
1276  }
1277 
1278  if (rc != 0)
1280 
1281  if (ha_link_backoff_check(hl, rc, &nr, &old_rc, &old_nr)) {
1282  m0_ha_lq_tags_get(&hl->hln_q_out, &tags);
1283  M0_LOG(M0_WARN, "rc=%d nr=%" PRIu64 " hl=%p ep=%s "
1284  "lq_tags="HLTAGS_F, rc, nr, hl,
1286  if (rc != old_rc) {
1287  M0_LOG(M0_WARN, "old_rc=%d old_nr=%" PRIu64 " "
1288  "hl=%p ep=%s", old_rc, old_nr, hl,
1290  }
1291  }
1292 
1293  return M0_RC(rc);
1294 }
1295 
1296 static bool ha_link_q_in_confirm_all(struct m0_ha_link *hl)
1297 {
1298  bool confirmed_updated = false;
1299 
1302  confirmed_updated = true;
1303  return confirmed_updated;
1304 }
1305 
1307 {
1308  bool cb_disconnecting;
1309  bool cb_reused;
1310 
1311  M0_ENTRY("hl=%p", hl);
1312  m0_mutex_lock(&hl->hln_lock);
1313  cb_disconnecting = hl->hln_cb_disconnecting;
1314  cb_reused = hl->hln_cb_reused;
1315  hl->hln_cb_disconnecting = false;
1316  hl->hln_cb_reused = false;
1317  m0_mutex_unlock(&hl->hln_lock);
1318  if (cb_disconnecting) {
1323  }
1324  if (cb_reused) {
1329  }
1330  M0_LEAVE("hl=%p cb_disconnecting=%d cb_reused=%d",
1331  hl, !!cb_disconnecting, !!cb_reused);
1332 }
1333 
1335 {
1336  struct m0_ha_link *hl = container_of(timer, struct m0_ha_link,
1338 
1339  m0_mutex_lock(&hl->hln_lock);
1340  hl->hln_reconnect_wait = false;
1341  m0_mutex_unlock(&hl->hln_lock);
1342 
1344 }
1345 
1347 {
1348  enum ha_link_outgoing_fom_state phase;
1349  struct m0_ha_link *hl;
1350  m0_time_t abs_timeout;
1351  bool replied;
1352  bool released;
1353  bool stopping;
1354  bool rpc_event_occurred;
1355  bool reconnect;
1356  bool reconnect_wait;
1357  bool quiesced;
1358  int reply_rc;
1359  int rc;
1360 
1361  hl = container_of(fom, struct m0_ha_link, hln_fom); /* XXX bob_of */
1362  phase = m0_fom_phase(&hl->hln_fom);
1363  M0_ENTRY("hl=%p phase=%s", hl, m0_fom_phase_name(&hl->hln_fom, phase));
1364 
1365  switch (phase) {
1367  hl->hln_msg_to_send = NULL;
1368  hl->hln_confirmed_update = false;
1369  hl->hln_rpc_rc = 0;
1370  hl->hln_reply_rc = 0;
1372  return M0_RC(M0_FSO_AGAIN);
1375  hl->hln_tag_broadcast_recv = 0;
1385  M0_ASSERT(rc == 0); /* XXX handle it */
1387  return M0_RC(M0_FSO_AGAIN);
1393  return M0_RC(M0_FSO_WAIT);
1396  m0_mutex_lock(&hl->hln_lock);
1397  stopping = hl->hln_fom_is_stopping;
1398  m0_mutex_unlock(&hl->hln_lock);
1399  if (!stopping) {
1401  return M0_RC(M0_FSO_AGAIN);
1402  } else {
1403  m0_mutex_lock(&hl->hln_lock);
1404  hl->hln_fom_enable_wakeup = false;
1405  hl->hln_no_new_delivered = true;
1406  while (m0_ha_lq_next(&hl->hln_q_out) != NULL)
1407  ;
1408  while (m0_ha_lq_tag_delivered(&hl->hln_q_out) <
1409  m0_ha_lq_tag_next(&hl->hln_q_out)) {
1412  }
1413  (void)ha_link_q_in_confirm_all(hl);
1414  m0_mutex_unlock(&hl->hln_lock);
1419  m0_mutex_lock(&hl->hln_lock);
1420  if (hl->hln_reconnect_cfg_is_set) {
1422  &hl->hln_conn_reconnect_cfg);
1423  }
1425  m0_mutex_unlock(&hl->hln_lock);
1427  &hl->hln_waking_ast);
1430  return M0_RC(M0_FSO_AGAIN);
1431  }
1433  m0_mutex_lock(&hl->hln_lock);
1434  hl->hln_rpc_event_occurred = false;
1435  m0_mutex_unlock(&hl->hln_lock);
1437  abs_timeout = m0_time_add(m0_time_now(),
1439  m0_rpc_link_connect_async(&hl->hln_rpc_link, abs_timeout,
1440  &hl->hln_rpc_wait);
1442  return M0_RC(M0_FSO_AGAIN);
1444  m0_mutex_lock(&hl->hln_lock);
1445  if (hl->hln_reconnect) {
1446  hl->hln_reconnect = false;
1450  &hl->hln_conn_reconnect_cfg);
1451  M0_ASSERT(rc == 0); /* XXX */
1453  hl->hln_reconnect_cfg_is_set = false;
1455  }
1459  m0_mutex_unlock(&hl->hln_lock);
1461  return M0_RC(M0_FSO_AGAIN);
1463  m0_mutex_lock(&hl->hln_lock);
1464  rpc_event_occurred = hl->hln_rpc_event_occurred;
1465  m0_mutex_unlock(&hl->hln_lock);
1466  if (rpc_event_occurred) {
1468  "rlk_rc=%d", hl->hln_rpc_link.rlk_rc);
1470  return M0_RC(M0_FSO_AGAIN);
1471  }
1472  return M0_RC(M0_FSO_WAIT);
1474  m0_mutex_lock(&hl->hln_lock);
1475  hl->hln_rpc_event_occurred = false;
1476  m0_mutex_unlock(&hl->hln_lock);
1477  abs_timeout = m0_time_add(m0_time_now(),
1479  m0_rpc_link_disconnect_async(&hl->hln_rpc_link, abs_timeout,
1480  &hl->hln_rpc_wait);
1482  return M0_RC(M0_FSO_AGAIN);
1484  m0_mutex_lock(&hl->hln_lock);
1485  hl->hln_quiesced = false;
1486  m0_mutex_unlock(&hl->hln_lock);
1488  &hl->hln_quiesce_chan);
1491  return M0_RC(M0_FSO_AGAIN);
1493  m0_mutex_lock(&hl->hln_lock);
1494  quiesced = hl->hln_quiesced;
1495  m0_mutex_unlock(&hl->hln_lock);
1496  if (quiesced) {
1499  return M0_RC(M0_FSO_AGAIN);
1500  }
1501  return M0_RC(M0_FSO_WAIT);
1505  return M0_RC(M0_FSO_AGAIN);
1507  m0_mutex_lock(&hl->hln_lock);
1508  rpc_event_occurred = hl->hln_rpc_event_occurred;
1509  m0_mutex_unlock(&hl->hln_lock);
1510  if (rpc_event_occurred) {
1511  if (hl->hln_rpc_link.rlk_rc != 0) {
1512  M0_LOG(M0_WARN, "rlk_rc=%d endpoint=%s",
1513  hl->hln_rpc_link.rlk_rc,
1515  }
1518  return M0_RC(M0_FSO_AGAIN);
1519  }
1520  return M0_RC(M0_FSO_WAIT);
1522  M0_ASSERT(hl->hln_msg_to_send == NULL);
1523  hl->hln_replied = false;
1524  hl->hln_released = false;
1526  if (m0_semaphore_trydown(&hl->hln_stop_cond)) {
1527  M0_LOG(M0_DEBUG, "stop case");
1528  m0_mutex_lock(&hl->hln_lock);
1529  hl->hln_fom_is_stopping = true;
1530  m0_mutex_unlock(&hl->hln_lock);
1534  return M0_RC(M0_FSO_AGAIN);
1535  }
1536  reply_rc = hl->hln_reply_rc;
1537  if (reply_rc != 0) {
1538  M0_LOG(M0_DEBUG, "link failed, ha_link reconnect case. "
1539  "reply_rc=%d", reply_rc);
1540  hl->hln_reply_rc = 0;
1542  m0_sm_state_set(&hl->hln_sm,
1546 
1547  m0_mutex_lock(&hl->hln_lock);
1548  hl->hln_reconnect_wait = true;
1549  m0_mutex_unlock(&hl->hln_lock);
1550 
1551  return M0_RC(M0_FSO_AGAIN);
1552  }
1553  m0_mutex_lock(&hl->hln_lock);
1554  reconnect_wait = hl->hln_reconnect_wait;
1555  m0_mutex_unlock(&hl->hln_lock);
1556  if (reconnect_wait) {
1557  time_t rtime = hl->hln_conn_cfg.hlcc_reconnect_interval;
1558  M0_LOG(M0_DEBUG, "link failed, reconnect wait case");
1560  return M0_RC(M0_FSO_WAIT);
1561 
1562  M0_LOG(M0_DEBUG, "link failed, reconnect wait case "
1563  "armed");
1567  hl->hln_fom_locality->lo_grp,
1569  m0_time_add(m0_time_now(), rtime));
1570  M0_ASSERT_INFO(rc == 0, "hl->hln_reconnect_wait_timer "
1571  "failed to start, rc=%d", rc);
1572  return M0_RC(M0_FSO_WAIT);
1573  }
1574  m0_mutex_lock(&hl->hln_lock);
1575  reconnect = hl->hln_reconnect;
1576  m0_mutex_unlock(&hl->hln_lock);
1577  if (reconnect) {
1578  M0_LOG(M0_DEBUG, "link reconnect case");
1581  return M0_RC(M0_FSO_AGAIN);
1582  }
1583  m0_mutex_lock(&hl->hln_lock);
1586  m0_mutex_unlock(&hl->hln_lock);
1587  if (hl->hln_msg_to_send != NULL || hl->hln_confirmed_update) {
1589  return M0_RC(M0_FSO_AGAIN);
1590  }
1591  return M0_RC(M0_FSO_WAIT);
1594  M0_ASSERT(rc == 0); /* XXX handle it */
1596  return M0_RC(M0_FSO_AGAIN);
1598  m0_mutex_lock(&hl->hln_lock);
1599  replied = hl->hln_replied;
1600  m0_mutex_unlock(&hl->hln_lock);
1601  if (replied) {
1602  m0_mutex_lock(&hl->hln_lock);
1604  hl->hln_msg_to_send = NULL;
1605  m0_mutex_unlock(&hl->hln_lock);
1606  if (hl->hln_reply_rc == 0)
1607  hl->hln_confirmed_update = false;
1612  return M0_RC(M0_FSO_AGAIN);
1613  }
1614  return M0_FSO_WAIT;
1616  m0_mutex_lock(&hl->hln_lock);
1617  released = hl->hln_released;
1618  m0_mutex_unlock(&hl->hln_lock);
1619  if (released) {
1621  return M0_RC(M0_FSO_AGAIN);
1622  }
1623  return M0_FSO_WAIT;
1626  M0_IMPOSSIBLE("");
1627  }
1628  return M0_RC(M0_FSO_WAIT);
1629 }
1630 
1632  struct m0_sm_ast *ast)
1633 {
1634  struct m0_ha_link *hl = ast->sa_datum;
1635 
1636  M0_ENTRY("hl=%p", hl);
1637  m0_mutex_lock(&hl->hln_lock);
1638  hl->hln_waking_up = false;
1639  m0_mutex_unlock(&hl->hln_lock);
1640  if (m0_fom_is_waiting(&hl->hln_fom)) {
1641  M0_LOG(M0_DEBUG, "waking up");
1642  m0_fom_ready(&hl->hln_fom);
1643  }
1644  M0_LEAVE();
1645 }
1646 
1648 {
1649  M0_ENTRY("hl=%p", hl);
1650  m0_mutex_lock(&hl->hln_lock);
1651  if (!hl->hln_waking_up && hl->hln_fom_enable_wakeup) {
1652  M0_LOG(M0_DEBUG, "posting ast");
1653  hl->hln_waking_up = true;
1654  hl->hln_waking_ast = (struct m0_sm_ast){
1656  .sa_datum = hl,
1657  };
1659  &hl->hln_waking_ast);
1660  }
1661  m0_mutex_unlock(&hl->hln_lock);
1662  M0_LEAVE();
1663 }
1664 
1666 {
1667  struct m0_ha_link *hl;
1668 
1669  hl = container_of(fom, struct m0_ha_link, hln_fom); /* XXX bob_of */
1670  M0_ENTRY("fom=%p hl=%p", fom, hl);
1671  m0_fom_fini(fom);
1673  M0_LEAVE();
1674 }
1675 
1676 static size_t ha_link_outgoing_fom_locality(const struct m0_fom *fom)
1677 {
1678  return 0;
1679 }
1680 
1683  .fo_fini = &ha_link_outgoing_fom_fini,
1684  .fo_home_locality = &ha_link_outgoing_fom_locality,
1685 };
1686 
1688  struct m0_fom **m,
1689  struct m0_reqh *reqh)
1690 {
1691  struct m0_fom *fom;
1692 
1693  M0_PRE(fop != NULL);
1694  M0_PRE(m != NULL);
1695 
1696  M0_ALLOC_PTR(fom);
1697  if (fom == NULL)
1698  return M0_ERR(-ENOMEM);
1699 
1701  fop, fom->fo_rep_fop, reqh);
1702 
1703  *m = fom;
1704  return M0_RC(0);
1705 }
1706 
1709 };
1710 
1711 M0_INTERNAL struct m0_rpc_session *m0_ha_link_rpc_session(struct m0_ha_link *hl)
1712 {
1713  return &hl->hln_rpc_link.rlk_sess;
1714 }
1715 
1716 M0_INTERNAL void m0_ha_link_rpc_endpoint(struct m0_ha_link *hl,
1717  char *buf,
1718  m0_bcount_t buf_len)
1719 {
1720  m0_mutex_lock(&hl->hln_lock);
1721  strncpy(buf, hl->hln_conn_cfg.hlcc_rpc_endpoint, buf_len - 1);
1722  buf[buf_len - 1] = 0;
1723  m0_mutex_unlock(&hl->hln_lock);
1724 }
1725 
1726 M0_INTERNAL int m0_ha_link_mod_init(void)
1727 {
1728  int rc;
1729 
1731  M0_ASSERT(rc == 0);
1735  return 0;
1736 }
1737 
1738 M0_INTERNAL void m0_ha_link_mod_fini(void)
1739 {
1741 }
1742 
1743 #undef M0_TRACE_SUBSYSTEM
1744 
1747 /*
1748  * Local variables:
1749  * c-indentation-style: "K&R"
1750  * c-basic-offset: 8
1751  * tab-width: 8
1752  * fill-column: 80
1753  * scroll-step: 1
1754  * End:
1755  */
1756 /*
1757  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
1758  */
void * fd_data
Definition: fop.h:75
M0_INTERNAL bool m0_ha_lq_try_unnext(struct m0_ha_lq *lq)
Definition: lq.c:216
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
static size_t nr
Definition: dump.c:1505
M0_INTERNAL bool m0_ha_link_msg_is_delivered(struct m0_ha_link *hl, uint64_t tag)
Definition: link.c:595
m0_time_t ri_resend_interval
Definition: item.h:144
M0_INTERNAL void m0_ha_link_service_put(struct m0_reqh_service *service, struct m0_ha_link *hl)
Definition: link_service.c:176
#define M0_PRE(cond)
M0_INTERNAL uint64_t m0_ha_lq_enqueue(struct m0_ha_lq *lq, const struct m0_ha_msg *msg)
Definition: lq.c:180
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static bool ha_link_rpc_wait_cb(struct m0_clink *clink)
Definition: link.c:197
struct m0_fid hm_fid
Definition: msg.h:117
#define m0_strdup(s)
Definition: string.h:43
M0_INTERNAL void m0_ha_lq_fini(struct m0_ha_lq *lq)
Definition: lq.c:67
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
Definition: chan.c:178
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
M0_BASSERT(ARRAY_SIZE(ha_link_sm_states)==M0_HA_LINK_STATE_NR)
static bool ha_link_q_in_confirm_all(struct m0_ha_link *hl)
Definition: link.c:1296
m0_ha_link_state
Definition: link.h:143
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
static struct m0_bufvec dst
Definition: xform.c:61
static struct m0_addb2_mach * m
Definition: consumer.c:38
const struct m0_fom_type_ops m0_ha_link_outgoing_fom_type_ops
Definition: link.c:1707
M0_INTERNAL struct m0_ha_msg * m0_ha_lq_next(struct m0_ha_lq *lq)
Definition: lq.c:203
M0_INTERNAL void m0_ha_link_stop(struct m0_ha_link *hl, struct m0_clink *clink)
Definition: link.c:412
M0_INTERNAL void m0_ha_lq_tags_set(struct m0_ha_lq *lq, const struct m0_ha_link_tags *tags)
Definition: lq.c:87
static void ha_link_msg_received(struct m0_ha_link *hl, const struct m0_ha_msg *msg)
Definition: link.c:852
M0_INTERNAL enum m0_ha_msg_type m0_ha_msg_type_get(const struct m0_ha_msg *msg)
Definition: msg.c:41
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
M0_INTERNAL bool m0_semaphore_trydown(struct m0_semaphore *semaphore)
Definition: semaphore.c:60
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
static void ha_link_outgoing_fom_wakeup(struct m0_ha_link *hl)
Definition: link.c:1647
const struct m0_fom_type_ops m0_ha_link_incoming_fom_type_ops
Definition: link.c:1056
Definition: sm.h:350
int(* fo_tick)(struct m0_fom *fom)
Definition: fom.h:663
M0_INTERNAL void m0_ha_link_wait_arrival(struct m0_ha_link *hl)
Definition: link.c:709
M0_INTERNAL bool m0_uint128_eq(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:39
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
static bool ha_link_backoff_check(struct m0_ha_link *hl, int rc, uint64_t *nr, int *old_rc, uint64_t *old_nr)
Definition: link.c:1237
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL struct m0_ha_msg * m0_ha_link_recv(struct m0_ha_link *hl, uint64_t *tag)
Definition: link.c:568
static void ha_link_outgoing_fop_release(struct m0_ref *ref)
Definition: link.c:1169
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL uint64_t m0_ha_lq_tag_delivered(const struct m0_ha_lq *lq)
Definition: lq.c:168
static void ha_link_outgoing_fom_fini(struct m0_fom *fom)
Definition: link.c:1665
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
static void ha_link_outgoing_fom_wakeup_ast(struct m0_sm_group *gr, struct m0_sm_ast *ast)
Definition: link.c:1631
M0_INTERNAL const char * m0_ha_link_state_name(enum m0_ha_link_state state)
Definition: link.c:543
static bool m0_is_po2(uint64_t val)
Definition: arith.h:153
M0_INTERNAL struct m0_ha_link * m0_ha_link_service_find_get(struct m0_reqh_service *service, const struct m0_uint128 *link_id, struct m0_uint128 *connection_id)
Definition: link_service.c:157
static bool ha_link_wait_confirmation_check(struct m0_clink *clink)
Definition: link.c:727
int32_t ri_error
Definition: item.h:161
M0_INTERNAL uint64_t m0_ha_lq_dequeue(struct m0_ha_lq *lq)
Definition: lq.c:256
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
Definition: sm.h:504
M0_INTERNAL void m0_ha_link_fops_fini(void)
Definition: link_fops.c:98
#define container_of(ptr, type, member)
Definition: misc.h:33
ha_link_send_type
Definition: link.c:550
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL struct m0_chan * m0_ha_link_chan(struct m0_ha_link *hl)
Definition: link.c:532
M0_INTERNAL void m0_ha_lq_tags_get(const struct m0_ha_lq *lq, struct m0_ha_link_tags *tags)
Definition: lq.c:80
static struct m0_rpc_item * item
Definition: item.c:56
M0_INTERNAL bool m0_ha_lq_has_next(const struct m0_ha_lq *lq)
Definition: lq.c:141
M0_INTERNAL void m0_ha_link_flush(struct m0_ha_link *hl)
Definition: link.c:767
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
Definition: sm.c:65
m0_fom_phase
Definition: fom.h:372
static int ha_link_outgoing_fop_send(struct m0_ha_link *hl)
Definition: link.c:1187
Definition: sock.c:887
M0_INTERNAL void m0_ha_link_cb_reused(struct m0_ha_link *hl)
Definition: link.c:798
M0_INTERNAL void m0_ha_link_start(struct m0_ha_link *hl, struct m0_ha_link_conn_cfg *hl_conn_cfg)
Definition: link.c:382
struct m0_fom_type ft_fom_type
Definition: fop.h:232
static bool ha_link_wait_delivery_check(struct m0_clink *clink)
Definition: link.c:655
M0_INTERNAL enum m0_ha_link_state m0_ha_link_state_get(struct m0_ha_link *hl)
Definition: link.c:537
M0_INTERNAL void m0_ha_link_send(struct m0_ha_link *hl, const struct m0_ha_msg *msg, uint64_t *tag)
Definition: link.c:556
return M0_RC(rc)
Definition: sock.c:754
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
Definition: conn.c:1306
#define M0_ENTRY(...)
Definition: trace.h:170
static bool ha_link_wait_arrival_check(struct m0_clink *clink)
Definition: link.c:685
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
static struct ha_link_incoming_fom * ha_link_incoming_fom_container(struct m0_fom *fom)
Definition: link.c:910
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
M0_INTERNAL bool m0_ha_lq_is_delivered(const struct m0_ha_lq *lq, uint64_t tag)
Definition: lq.c:148
#define HLTAGS_P(_tags)
Definition: link_fops.h:77
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
struct m0_locality fl_locality
Definition: fom.h:302
M0_INTERNAL bool m0_fom_is_waiting(const struct m0_fom *fom)
Definition: fom.c:1732
static size_t ha_link_outgoing_fom_locality(const struct m0_fom *fom)
Definition: link.c:1676
struct m0_fop_type * f_type
Definition: fop.h:81
const struct m0_fom_ops ha_link_incoming_fom_ops
Definition: link.c:1013
#define PRIu64
Definition: types.h:58
static struct nlx_ping_client_params * params
static size_t ha_link_incoming_fom_locality(const struct m0_fom *fom)
Definition: link.c:1008
static void ha_link_msg_recv_or_delivery_broadcast(struct m0_ha_link *hl)
Definition: link.c:868
M0_INTERNAL void m0_ha_lq_mark_delivered(struct m0_ha_lq *lq, uint64_t tag)
Definition: lq.c:228
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
Definition: sm.c:628
struct m0_rpc_machine * m0_fop_rpc_machine(const struct m0_fop *fop)
Definition: fop.c:360
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
static void ha_link_outgoing_item_sent(struct m0_rpc_item *item)
Definition: link.c:1133
void * sa_datum
Definition: sm.h:508
static int ha_link_outgoing_fom_create(struct m0_fop *fop, struct m0_fom **m, struct m0_reqh *reqh)
Definition: link.c:1687
struct m0_fop_type m0_ha_link_msg_rep_fopt
Definition: link_fops.c:42
M0_INTERNAL void m0_fom_ready(struct m0_fom *fom)
Definition: fom.c:429
Definition: refs.h:34
M0_INTERNAL void m0_ha_link_cb_disconnecting(struct m0_ha_link *hl)
Definition: link.c:787
M0_INTERNAL int m0_ha_link_fops_init(void)
Definition: link_fops.c:79
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
M0_INTERNAL void m0_ha_link_reconnect_cancel(struct m0_ha_link *hl)
Definition: link.c:479
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL uint64_t m0_ha_lq_tag_confirmed(const struct m0_ha_lq *lq)
Definition: lq.c:174
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
Definition: sm.c:53
static void ha_link_incoming_fom_fini(struct m0_fom *fom)
Definition: link.c:998
#define U128_P(x)
Definition: types.h:45
m0_time_t m0_time_now(void)
Definition: time.c:134
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
static const struct m0_rpc_item_ops ha_link_outgoing_item_ops
Definition: link.c:1164
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
Definition: fom.c:1596
static void ha_link_cb_disconnecting_reused(struct m0_ha_link *hl)
Definition: link.c:1306
static const struct m0_sm_conf ha_link_outgoing_fom_conf
Definition: link.c:1127
struct m0_rpc_item * ri_reply
Definition: item.h:163
static struct m0_fom_type ha_link_outgoing_fom_type
Definition: link.c:192
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
static void ha_link_wait(struct ha_link_wait_ctx *wait_ctx, bool(*check)(struct m0_clink *clink))
Definition: link.c:638
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
M0_INTERNAL int m0_ha_link_mod_init(void)
Definition: link.c:1726
#define PRIi32
Definition: types.h:67
Definition: msg.h:115
uint64_t ri_nr_sent_max
Definition: item.h:146
M0_INTERNAL void m0_ha_link_reconnect_begin(struct m0_ha_link *hl, struct m0_ha_link_params *lp)
Definition: link.c:422
ha_link_outgoing_fom_state
Definition: link.c:1060
Definition: reqh.h:94
struct m0_sm_group * lo_grp
Definition: locality.h:67
static struct m0_sm_state_descr ha_link_outgoing_fom_states[HA_LINK_OUTGOING_STATE_NR]
Definition: link.c:1082
Definition: dump.c:103
Definition: chan.h:229
M0_INTERNAL uint64_t m0_ha_msg_tag(const struct m0_ha_msg *msg)
Definition: msg.c:36
M0_INTERNAL void m0_ha_link_reconnect_end(struct m0_ha_link *hl, const struct m0_ha_link_conn_cfg *hl_conn_cfg)
Definition: link.c:441
void m0_rpc_reply_post(struct m0_rpc_item *request, struct m0_rpc_item *reply)
Definition: rpc.c:135
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
M0_INTERNAL void m0_ha_lq_init(struct m0_ha_lq *lq, const struct m0_ha_lq_cfg *lq_cfg)
Definition: lq.c:58
static struct m0_clink clink[RDWR_REQUEST_MAX]
void(* rio_sent)(struct m0_rpc_item *item)
Definition: item.h:267
static struct m0_sm_conf ha_link_sm_conf
Definition: link.c:186
M0_INTERNAL uint64_t m0_ha_lq_tag_assign(const struct m0_ha_lq *lq)
Definition: lq.c:156
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
#define FID_P(f)
Definition: fid.h:77
M0_INTERNAL uint64_t m0_ha_link_delivered_consume(struct m0_ha_link *hl)
Definition: link.c:606
M0_INTERNAL void m0_ha_link_delivered(struct m0_ha_link *hl, struct m0_ha_msg *msg)
Definition: link.c:584
M0_INTERNAL void m0_ha_link_wait_confirmation(struct m0_ha_link *hl, uint64_t tag)
Definition: link.c:753
static void ha_link_tags_apply(struct m0_ha_link *hl, const struct m0_ha_link_params *lp)
Definition: link.c:316
#define HLTAGS_F
Definition: link_fops.h:75
static int ha_link_conn_cfg_copy(struct m0_ha_link_conn_cfg *dst, const struct m0_ha_link_conn_cfg *src)
Definition: link.c:298
#define U128X_F
Definition: types.h:42
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
uint32_t sd_flags
Definition: sm.h:378
Definition: fom.h:481
struct m0_fop_data f_data
Definition: fop.h:82
#define PRIu32
Definition: types.h:66
static void ha_link_tags_in_out(struct m0_ha_link *hl, uint64_t *out_next, uint64_t *in_delivered)
Definition: link.c:840
M0_INTERNAL void m0_ha_link_service_register(struct m0_reqh_service *service, struct m0_ha_link *hl, const struct m0_uint128 *link_id, const struct m0_uint128 *connection_id)
Definition: link_service.c:215
struct m0_reqh reqh
Definition: rm_foms.c:48
const char * sd_name
Definition: sm.h:383
M0_INTERNAL void m0_ha_link_mod_fini(void)
Definition: link.c:1738
char * ep
Definition: sw.h:132
static void ha_link_conn_cfg_free(struct m0_ha_link_conn_cfg *hl_conn_cfg)
Definition: link.c:310
struct m0_reqh_service_type m0_ha_link_service_type
Definition: link_service.c:263
static void ha_link_outgoing_reconnect_timeout(struct m0_sm_timer *timer)
Definition: link.c:1334
struct m0_ref f_ref
Definition: fop.h:80
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
Definition: sm.c:610
M0_INTERNAL void m0_ha_link_wait_delivery(struct m0_ha_link *hl, uint64_t tag)
Definition: link.c:669
M0_INTERNAL void m0_ha_link_service_quiesce(struct m0_reqh_service *service, struct m0_ha_link *hl, struct m0_chan *chan)
Definition: link_service.c:197
struct m0_fom_locality * fo_loc
Definition: fom.h:483
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
#define M0_IS0(obj)
Definition: misc.h:70
static void ha_link_outgoing_item_replied(struct m0_rpc_item *item)
Definition: link.c:1144
static int ha_link_incoming_fom_tick(struct m0_fom *fom)
Definition: link.c:916
struct m0_reqh_service * fo_service
Definition: fom.h:505
static int ha_link_outgoing_fom_tick(struct m0_fom *fom)
Definition: link.c:1346
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_chan sm_chan
Definition: sm.h:331
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
static bool ha_link_quiesce_wait_cb(struct m0_clink *clink)
Definition: link.c:211
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL void m0_ha_link_rpc_endpoint(struct m0_ha_link *hl, char *buf, m0_bcount_t buf_len)
Definition: link.c:1716
static struct m0_sm_state_descr ha_link_sm_states[]
Definition: link.c:118
struct m0_rpc_session * ri_session
Definition: item.h:147
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:338
Definition: common.h:34
M0_INTERNAL bool m0_ha_link_tags_eq(const struct m0_ha_link_tags *tags1, const struct m0_ha_link_tags *tags2)
Definition: link_fops.c:57
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
static int ha_link_incoming_fom_create(struct m0_fop *fop, struct m0_fom **m, struct m0_reqh *reqh)
Definition: link.c:1019
static void ha_link_tags_update(struct m0_ha_link *hl, uint64_t out_next, uint64_t in_delivered)
Definition: link.c:809
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
Definition: fom.c:624
void check(struct workload *w)
M0_INTERNAL void m0_chan_fini(struct m0_chan *chan)
Definition: chan.c:104
#define M0_ASSERT_INFO(cond, fmt,...)
M0_INTERNAL void m0_ha_link_reconnect_params(const struct m0_ha_link_params *lp_alive, struct m0_ha_link_params *lp_alive_new, struct m0_ha_link_params *lp_dead_new, const struct m0_uint128 *id_alive, const struct m0_uint128 *id_dead, const struct m0_uint128 *id_connection)
Definition: link.c:486
static unsigned done
Definition: storage.c:91
M0_INTERNAL void m0_sm_ast_cancel(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:183
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
Definition: sm.c:566
M0_INTERNAL int m0_ha_link_init(struct m0_ha_link *hl, struct m0_ha_link_cfg *hl_cfg)
Definition: link.c:225
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
Definition: sm.c:559
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL void m0_ha_link_fini(struct m0_ha_link *hl)
Definition: link.c:269
M0_INTERNAL void m0_ha_link_service_deregister(struct m0_reqh_service *service, struct m0_ha_link *hl)
Definition: link_service.c:235
uint32_t sm_state
Definition: sm.h:307
M0_INTERNAL uint64_t m0_ha_link_not_delivered_consume(struct m0_ha_link *hl)
Definition: link.c:618
M0_INTERNAL uint64_t m0_ha_lq_tag_next(const struct m0_ha_lq *lq)
Definition: lq.c:162
struct m0_pdclust_src_addr src
Definition: fd.c:108
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
const struct m0_fom_ops ha_link_outgoing_fom_ops
Definition: link.c:1681
M0_INTERNAL bool m0_sm_group_is_locked(const struct m0_sm_group *grp)
Definition: sm.c:107
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
Definition: sm.c:577
struct m0_rpc_conn * s_conn
Definition: session.h:312
M0_INTERNAL struct m0_rpc_session * m0_ha_link_rpc_session(struct m0_ha_link *hl)
Definition: link.c:1711
Definition: fop.h:79
static int ha_link_outgoing_fop_replied(struct m0_ha_link *hl)
Definition: link.c:1255
#define FID_F
Definition: fid.h:75
Definition: trace.h:478
struct m0_fop * rep_fop
Definition: dir.c:334
struct m0_fop_type m0_ha_link_msg_fopt
Definition: link_fops.c:41
m0_time_t ri_deadline
Definition: item.h:141
M0_INTERNAL const char * m0_fom_phase_name(const struct m0_fom *fom, int phase)
Definition: fom.c:1722
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331