Motr  M0
proxy.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CM
24 
25 #include "lib/memory.h"
26 #include "lib/errno.h"
27 #include "lib/trace.h"
28 #include "lib/time.h"
29 #include "lib/misc.h"
30 #include "lib/locality.h"
31 
32 #include "rpc/rpc.h"
33 #include "rpc/session.h"
34 #include "motr/magic.h"
35 #include "motr/setup.h" /* CS_MAX_EP_ADDR_LEN */
36 #include "fop/fom.h"
37 
38 #include "cm/cm.h"
39 #include "cm/cp.h"
40 #include "cm/proxy.h"
41 #include "cm/ag.h"
42 
49 enum {
51 };
52 
53 M0_TL_DESCR_DEFINE(proxy, "copy machine proxy", M0_INTERNAL,
54  struct m0_cm_proxy, px_linkage, px_magic,
56 
57 M0_TL_DEFINE(proxy, M0_INTERNAL, struct m0_cm_proxy);
58 
59 M0_TL_DESCR_DEFINE(proxy_fail, "copy machine proxy", M0_INTERNAL,
60  struct m0_cm_proxy, px_fail_linkage, px_magic,
62 
63 M0_TL_DEFINE(proxy_fail, M0_INTERNAL, struct m0_cm_proxy);
64 
65 M0_TL_DESCR_DEFINE(proxy_cp, "pending copy packets", M0_INTERNAL,
66  struct m0_cm_cp, c_cm_proxy_linkage, c_magix,
68 
69 M0_TL_DEFINE(proxy_cp, M0_INTERNAL, struct m0_cm_cp);
70 
71 static const struct m0_bob_type proxy_bob = {
72  .bt_name = "cm proxy",
73  .bt_magix_offset = M0_MAGIX_OFFSET(struct m0_cm_proxy, px_magic),
74  .bt_magix = CM_PROXY_LINK_MAGIC,
75  .bt_check = NULL
76 };
77 
79 
80 static bool cm_proxy_invariant(const struct m0_cm_proxy *pxy)
81 {
85  return _0C(pxy != NULL) && _0C(m0_cm_proxy_bob_check(pxy)) &&
86  _0C(m0_cm_is_locked(pxy->px_cm)) &&
87  _0C(pxy->px_endpoint != NULL);
88 }
89 
90 M0_INTERNAL int m0_cm_proxy_init(struct m0_cm_proxy *proxy, uint64_t px_id,
91  struct m0_cm_ag_id *lo, struct m0_cm_ag_id *hi,
92  const char *endpoint)
93 {
94  M0_PRE(proxy != NULL && lo != NULL && hi != NULL && endpoint != NULL);
95 
96  m0_cm_proxy_bob_init(proxy);
97  proxy_tlink_init(proxy);
98  proxy_fail_tlink_init(proxy);
99  m0_mutex_init(&proxy->px_mutex);
100  proxy_cp_tlist_init(&proxy->px_pending_cps);
101  proxy->px_id = px_id;
102  proxy->px_sw.sw_lo = *lo;
103  proxy->px_sw.sw_hi = *hi;
104  proxy->px_endpoint = endpoint;
105  proxy->px_is_done = false;
106  proxy->px_epoch = 0;
107  return 0;
108 }
109 
110 M0_INTERNAL void m0_cm_proxy_add(struct m0_cm *cm, struct m0_cm_proxy *pxy)
111 {
112  M0_ENTRY("cm: %p proxy: %p", cm, pxy);
114  M0_PRE(!proxy_tlink_is_in(pxy));
115  pxy->px_cm = cm;
116  proxy_tlist_add_tail(&cm->cm_proxies, pxy);
119  M0_ASSERT(proxy_tlink_is_in(pxy));
121  M0_LEAVE();
122 }
123 
124 M0_INTERNAL void m0_cm_proxy_del(struct m0_cm *cm, struct m0_cm_proxy *pxy)
125 {
126  M0_ENTRY("cm: %p proxy: %p", cm, pxy);
128  M0_PRE(proxy_tlink_is_in(pxy));
129  if (proxy_fail_tlink_is_in(pxy))
130  proxy_fail_tlist_del(pxy);
131  proxy_fail_tlink_fini(pxy);
132  proxy_tlink_del_fini(pxy);
134  M0_ASSERT(!proxy_tlink_is_in(pxy));
136  M0_LEAVE();
137 }
138 
139 M0_INTERNAL void m0_cm_proxy_cp_add(struct m0_cm_proxy *pxy,
140  struct m0_cm_cp *cp)
141 {
142  M0_ENTRY("proxy: %p cp: %p ep: %s", pxy, cp, pxy->px_endpoint);
144  M0_PRE(!proxy_cp_tlink_is_in(cp));
145 
146  proxy_cp_tlist_add_tail(&pxy->px_pending_cps, cp);
147  ID_LOG("proxy ag_id", &cp->c_ag->cag_id);
148  M0_POST(proxy_cp_tlink_is_in(cp));
149  M0_LEAVE();
150 }
151 
152 static void cm_proxy_cp_del(struct m0_cm_proxy *pxy,
153  struct m0_cm_cp *cp)
154 {
156  M0_PRE(proxy_cp_tlink_is_in(cp));
157  proxy_cp_tlist_del(cp);
158  M0_POST(!proxy_cp_tlink_is_in(cp));
159 }
160 
161 M0_INTERNAL struct m0_cm_proxy *m0_cm_proxy_locate(struct m0_cm *cm,
162  const char *addr)
163 {
164  struct m0_net_transfer_mc *tm;
165  struct m0_net_end_point *ep;
166  struct m0_cm_proxy *pxy;
167  /*
168  * Proxy address string (pxy->px_endpoint) cannot be directly compared
169  * with supplied address string, because the same end-point might have
170  * different address strings.
171  *
172  * Instantiate the endpoints and compare them directly.
173  */
174  tm = &m0_reqh_rpc_mach_tlist_head
175  (&cm->cm_service.rs_reqh->rh_rpc_machines)->rm_tm;
176  if (tm == NULL || m0_net_end_point_create(&ep, tm, addr) != 0)
177  return NULL;
178 
179  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
180  struct m0_net_end_point *scan;
181  if (m0_net_end_point_create(&scan, tm, pxy->px_endpoint) != 0)
182  continue;
183  m0_net_end_point_put(scan); /* OK to put before comparison. */
184  if (scan == ep)
185  break;
186  } m0_tl_endfor;
188  return pxy;
189 }
190 
191 static void __wake_up_pending_cps(struct m0_cm_proxy *pxy)
192 {
193  struct m0_cm_cp *cp;
194 
195  m0_tl_for(proxy_cp, &pxy->px_pending_cps, cp) {
196  cm_proxy_cp_del(pxy, cp);
197  /* wakeup pending copy packet foms */
198  m0_fom_wakeup(&cp->c_fom);
199  } m0_tl_endfor;
200 
201 }
202 
203 static bool epoch_check(struct m0_cm_proxy *pxy, m0_time_t px_epoch)
204 {
205  if (px_epoch != pxy->px_epoch) {
206  M0_LOG(M0_WARN, "Mismatch Epoch,"
207  "current: %llu" "received: %llu",
208  (unsigned long long)pxy->px_epoch,
209  (unsigned long long)px_epoch);
210 
211  return false;
212  }
213 
214  return true;
215 }
216 
217 static void proxy_done(struct m0_cm_proxy *proxy)
218 {
219  struct m0_cm *cm = proxy->px_cm;
220  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s",
221  proxy, proxy->px_id, proxy->px_endpoint);
222 
223  if (!proxy->px_is_done) {
224  proxy->px_is_done = true;
225  m0_cm_notify(cm);
226  }
228  M0_LEAVE();
229 }
230 
236 static void _sw_update(struct m0_cm_proxy *pxy, struct m0_cm_sw *in_interval,
237  struct m0_cm_sw *out_interval, uint32_t px_status)
238 {
239  ID_LOG("proxy lo", &pxy->px_sw.sw_lo);
240  ID_LOG("proxy hi", &pxy->px_sw.sw_hi);
241 
242  if (m0_cm_sw_cmp(in_interval, &pxy->px_sw) > 0)
243  m0_cm_sw_copy(&pxy->px_sw, in_interval);
244  if (m0_cm_sw_cmp(out_interval, &pxy->px_out_interval) > 0)
245  m0_cm_sw_copy(&pxy->px_out_interval, out_interval);
246  pxy->px_status = px_status;
249 }
250 
251 static int px_ready(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
252  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
253  uint32_t px_status)
254 {
255  struct m0_cm *cm = p->px_cm;
256  struct m0_cm_ag_id hi;
257  int rc = 0;
258  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s", p, p->px_id, p->px_endpoint);
259 
260  if (p->px_epoch == 0 && m0_cm_state_get(cm) == M0_CMS_READY) {
261  p->px_epoch = px_epoch;
262  p->px_status = px_status;
263  hi = in_interval->sw_hi;
264  /*
265  * Here we select the minimum of the sliding window
266  * starting point provided by each remote copy machine,
267  * from which this copy machine will start in-order to
268  * keep all the copy machines in sync.
269  */
272  }
274  rc = 0;
275  } else if (m0_cm_state_get(cm) < M0_CMS_READY)
276  rc = -EINVAL;
277 
278  return M0_RC(rc);
279 }
280 
281 static int px_active(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
282  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
283  uint32_t px_status)
284 {
285  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s", p, p->px_id, p->px_endpoint);
286  _sw_update(p, in_interval, out_interval, px_status);
287  /* TODO This is expensive during M0_CMS_CTIVE phase but needed to
288  * handle cleanup in case of copy machine failures during active
289  * phase. Try to find another alternative.
290  */
291  m0_cm_frozen_ag_cleanup(p->px_cm, p);
292  return M0_RC(0);
293 }
294 
295 static int px_complete(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
296  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
297  uint32_t px_status)
298 {
299  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s", p, p->px_id, p->px_endpoint);
300  _sw_update(p, in_interval, out_interval, px_status);
301  m0_cm_frozen_ag_cleanup(p->px_cm, p);
302  return M0_RC(0);
303 }
304 
305 static int px_stop_fail(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval,
306  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
307  uint32_t px_status)
308 {
309  M0_ENTRY("pxy=%p id=%" PRIu64 ", to %s state=%u",
310  p, p->px_id, p->px_endpoint, px_status);
311  _sw_update(p, in_interval, out_interval, px_status);
312  m0_cm_frozen_ag_cleanup(p->px_cm, p);
313  proxy_done(p);
314  return M0_RC(0);
315 }
316 
317 static int (*px_action[])(struct m0_cm_proxy *px, struct m0_cm_sw *in_interval,
318  struct m0_cm_sw *out_interval, m0_time_t px_epoch,
319  uint32_t px_status) = {
320  [M0_PX_READY] = px_ready,
325 };
326 
327 M0_INTERNAL int m0_cm_proxy_update(struct m0_cm_proxy *pxy,
328  struct m0_cm_sw *in_interval,
329  struct m0_cm_sw *out_interval,
330  uint32_t px_status,
331  m0_time_t px_epoch)
332 {
333  struct m0_cm *cm;
334  int rc;
335 
336  M0_ENTRY("proxy: %p ep: %s", pxy, pxy->px_endpoint);
337  M0_PRE(pxy != NULL && in_interval != NULL && out_interval != NULL);
338 
339  m0_cm_proxy_lock(pxy);
340  cm = pxy->px_cm;
342  M0_LOG(M0_DEBUG, "Recvd from :%s status: %u curr_status: %u "
343  "nr_updates: %u", pxy->px_endpoint, px_status,
344  pxy->px_status, (unsigned)cm->cm_nr_proxy_updated);
345 
346  if (px_status < pxy->px_status) {
347  m0_cm_proxy_unlock(pxy);
348  return M0_RC(-EINVAL);
349  }
350 
351  if (pxy->px_status != M0_PX_INIT && !epoch_check(pxy, px_epoch)) {
352  m0_cm_proxy_unlock(pxy);
353  return M0_RC(-EINVAL);
354  }
355 
356  if (px_status >= M0_PX_COMPLETE &&
357  pxy->px_status < M0_PX_COMPLETE) {
358  /*
359  * Got a fresh "complete(fail,stop)" state - need to
360  * decrease counter
361  */
362  M0_LOG(M0_DEBUG, "Decrease proxy_nr (current nr %"
363  PRIu64") cm %p, pxy %p",
364  cm->cm_proxy_active_nr, cm, pxy);
366  } else if (pxy->px_status >= M0_PX_COMPLETE &&
367  px_status < M0_PX_COMPLETE) {
368  M0_LOG(M0_DEBUG, "Increase proxy_nr (current nr %"
369  PRIu64") cm %p, pxy %p",
370  cm->cm_proxy_active_nr, cm, pxy);
372  }
373 
374  rc = px_action[px_status](pxy, in_interval, out_interval, px_epoch, px_status);
377  /*
378  * All proxies finished processing, i.e. all proxies are in
379  * COMPLETE/STOP/FAIL state.
380  */
381  if (cm->cm_proxy_active_nr == 0) {
382  M0_LOG(M0_DEBUG, "No more active proxies in cm %p", cm);
383  m0_cm_notify(cm);
384  }
385  m0_cm_proxy_unlock(pxy);
386 
387  return M0_RC(rc);
388 }
389 
390 M0_INTERNAL bool m0_cm_proxy_is_updated(struct m0_cm_proxy *proxy,
391  struct m0_cm_sw *in_interval)
392 {
393  return m0_cm_ag_id_cmp(&in_interval->sw_hi,
394  &proxy->px_last_sw_onwire_sent.sw_hi) <= 0;
395 }
396 
398  struct m0_sm_ast *ast)
399 {
400  struct m0_cm_proxy *proxy = container_of(ast, struct m0_cm_proxy,
402  struct m0_cm *cm = proxy->px_cm;
403  struct m0_cm_sw in_interval;
404  struct m0_cm_sw out_interval;
405  M0_ENTRY();
406 
408 
409  m0_cm_ag_in_interval(cm, &in_interval);
411  m0_cm_ag_id_copy(&in_interval.sw_hi,
413  m0_cm_ag_out_interval(cm, &out_interval);
414  M0_LOG(M0_DEBUG, "proxy ep: %s, cm->cm_aggr_grps_in_nr %"PRIu64
415  " pending updates: %u posted: %" PRIu64 " state=%u"
416  " px_update_rc=%d px_send_final_update=%d",
417  proxy->px_endpoint,
419  proxy->px_updates_pending,
420  proxy->px_nr_updates_posted,
421  proxy->px_status,
422  proxy->px_update_rc,
423  !!proxy->px_send_final_update);
424  ID_LOG("proxy last updated hi", &proxy->px_last_sw_onwire_sent.sw_hi);
425 
426  /*
427  * We check if updates posted are greater than 0 and decrement as
428  * there could be a case of update resend while a reply is already
429  * on wire and a proxy may receive multiple replies for an update.
430  */
431  if (proxy->px_nr_updates_posted > 0)
433 
434  if (proxy->px_update_rc != 0 || proxy->px_send_final_update ||
435  (proxy->px_updates_pending > 0 &&
436  (!m0_cm_proxy_is_updated(proxy, &in_interval) || cm->cm_abort ||
437  cm->cm_quiesce))) {
438  if (proxy->px_update_rc == -ECANCELED ||
439  (M0_IN(proxy->px_status, (M0_PX_FAILED, M0_PX_STOP)) &&
440  !proxy->px_send_final_update))
441  proxy->px_updates_pending = 0;
442  else
443  m0_cm_proxy_remote_update(proxy, &in_interval, &out_interval);
444  }
445 
446  if (m0_cm_state_get(cm) == M0_CMS_READY &&
448  proxy->px_update_rc == 0) {
450  m0_bitmap_set(&cm->cm_proxy_update_map, proxy->px_id, true);
451  }
452 
453  /* Initial handshake complete, signal waiters to continue further.*/
456 
457  /*
458  * Handle service/node failure during sns-repair/rebalance.
459  * Cannot send updates to dead proxy, all the aggregation groups,
460  * frozen on that proxy must be destroyed.
461  */
462  if (proxy->px_status == M0_PX_FAILED || m0_cm_state_get(cm) == M0_CMS_FAIL ||
463  cm->cm_quiesce || cm->cm_abort) {
464  m0_cm_proxy_lock(proxy);
465  __wake_up_pending_cps(proxy);
466  m0_cm_proxy_unlock(proxy);
467  /* Here we have already received notification from HA about
468  * the proxy failure and might receive explicit abort command as well.
469  * So no need to transition cm to FAILED state, just aborting the
470  * operation would suffice.
471  */
472  m0_cm_abort(cm, 0);
473  m0_cm_frozen_ag_cleanup(cm, proxy);
474  }
475  if (cm->cm_done || proxy->px_status == M0_PX_FAILED ||
477  /* Wake up anyone waiting to handle further process (cleanup/completion). */
479  }
480  M0_LEAVE();
481 }
482 
483 static void proxy_sw_onwire_item_replied_cb(struct m0_rpc_item *req_item)
484 {
485  struct m0_cm_proxy_sw_onwire *swu_fop;
486  struct m0_cm_sw_onwire_rep *sw_rep;
487  struct m0_rpc_item *rep_item;
488  struct m0_cm_proxy *proxy;
489  struct m0_fop *rep_fop;
490 
491  M0_ENTRY("%p", req_item);
492 
493  swu_fop = M0_AMB(swu_fop, m0_rpc_item_to_fop(req_item), pso_fop);
494  proxy = swu_fop->pso_proxy;
495  M0_ASSERT(m0_cm_proxy_bob_check(proxy));
496 
497  if (req_item->ri_error == 0) {
498  rep_item = req_item->ri_reply;
499  if (m0_rpc_item_is_generic_reply_fop(rep_item))
500  proxy->px_update_rc = m0_rpc_item_generic_reply_rc(rep_item);
501  else {
502  rep_fop = m0_rpc_item_to_fop(rep_item);
503  sw_rep = m0_fop_data(rep_fop);
504  proxy->px_update_rc = sw_rep->swr_rc;
505  }
506  } else
507  proxy->px_update_rc = req_item->ri_error;
508 
511 
512  M0_LEAVE();
513 }
514 
517 };
518 
519 static void cm_proxy_sw_onwire_post(struct m0_cm_proxy *proxy,
520  struct m0_fop *fop,
521  const struct m0_rpc_conn *conn)
522 {
523  struct m0_rpc_item *item;
524 
525  M0_ENTRY("fop: %p conn: %p to pxy %p (%s)",
526  fop, conn, proxy, proxy->px_endpoint);
527  M0_PRE(fop != NULL && conn != NULL);
528 
532  item->ri_session = proxy->px_session;
533  item->ri_deadline = 0;
534 
536  m0_rpc_post(item);
538  M0_LEAVE();
539 }
540 
541 static void proxy_sw_onwire_release(struct m0_ref *ref)
542 {
544  struct m0_fop *fop;
545 
546  fop = container_of(ref, struct m0_fop, f_ref);
547  pso_fop = container_of(fop, struct m0_cm_proxy_sw_onwire, pso_fop);
548  M0_ASSERT(pso_fop != NULL);
549  m0_fop_fini(fop);
550  m0_free(pso_fop);
551 }
552 
553 M0_INTERNAL int m0_cm_proxy_remote_update(struct m0_cm_proxy *proxy,
554  struct m0_cm_sw *in_interval,
555  struct m0_cm_sw *out_interval)
556 {
557  struct m0_cm *cm;
558  struct m0_rpc_machine *rmach;
559  struct m0_rpc_conn *conn;
560  struct m0_cm_proxy_sw_onwire *sw_fop;
561  struct m0_fop *fop;
562  const char *ep;
563  int rc;
564 
565  M0_PRE(proxy != NULL);
566  M0_ENTRY("proxy: %p (%s)", proxy, proxy->px_endpoint);
567  cm = proxy->px_cm;
569 
570  if (proxy->px_nr_updates_posted > 0) {
572  return M0_RC(0);
573  }
574  M0_ALLOC_PTR(sw_fop);
575  if (sw_fop == NULL)
576  return M0_ERR(-ENOMEM);
577  fop = &sw_fop->pso_fop;
578  rmach = proxy->px_conn->c_rpc_machine;
579  ep = rmach->rm_tm.ntm_ep->nep_addr;
580  conn = proxy->px_conn;
583  proxy->px_id, ep, in_interval,
584  out_interval);
585  if (rc != 0) {
587  m0_free(sw_fop);
588  return M0_ERR(rc);
589  }
590 
591  if (proxy->px_send_final_update) {
592  struct m0_cm_sw_onwire *swo;
593  proxy->px_send_final_update = false;
594 
595  /* This is the final update. No more SW update will be sent.
596  * CM status must be set to STOP, so the proxy on remote node
597  * can be finalized.
598  */
599  swo = m0_fop_data(fop);
600  swo->swo_cm_status = M0_PX_STOP;
601  }
602 
603  sw_fop->pso_proxy = proxy;
604  ID_LOG("proxy last updated hi", &proxy->px_last_sw_onwire_sent.sw_hi);
605 
607  m0_cm_sw_copy(&proxy->px_last_sw_onwire_sent, in_interval);
608 
609  M0_LOG(M0_DEBUG, "Sending to %s hi: ["M0_AG_F"]",
610  proxy->px_endpoint, M0_AG_P(&in_interval->sw_hi));
611 
612  return M0_RC(0);
613 }
614 
615 M0_INTERNAL bool m0_cm_proxy_is_done(const struct m0_cm_proxy *pxy)
616 {
617  M0_LOG(M0_DEBUG, "proxy %p (to %s) state: is_done %d "
618  "px_nr_updates_posted %" PRIu64 " onwire_ast.sa_next %p",
619  pxy, pxy->px_endpoint,
620  pxy->px_is_done, pxy->px_nr_updates_posted,
622 
623  return pxy->px_is_done && pxy->px_nr_updates_posted == 0 &&
624  pxy->px_sw_onwire_ast.sa_next == NULL;
625 }
626 
627 M0_INTERNAL void m0_cm_proxy_fini(struct m0_cm_proxy *pxy)
628 {
629  M0_ENTRY("%p", pxy);
630  M0_PRE(pxy != NULL);
631  M0_PRE(proxy_cp_tlist_is_empty(&pxy->px_pending_cps));
632 
633  proxy_cp_tlist_fini(&pxy->px_pending_cps);
634  m0_cm_proxy_bob_fini(pxy);
635  if (m0_clink_is_armed(&pxy->px_ha_link)) {
637  m0_clink_fini(&pxy->px_ha_link);
638  }
639  m0_mutex_fini(&pxy->px_mutex);
640  M0_LEAVE();
641 }
642 
643 M0_INTERNAL uint64_t m0_cm_proxy_nr(struct m0_cm *cm)
644 {
646 
647  return proxy_tlist_length(&cm->cm_proxies);
648 }
649 
650 M0_INTERNAL bool m0_cm_proxy_agid_is_in_sw(struct m0_cm_proxy *pxy,
651  struct m0_cm_ag_id *id)
652 {
653  bool result;
654 
655  m0_cm_proxy_lock(pxy);
656  result = m0_cm_ag_id_cmp(id, &pxy->px_sw.sw_lo) >= 0 &&
657  m0_cm_ag_id_cmp(id, &pxy->px_sw.sw_hi) <= 0;
658  m0_cm_proxy_unlock(pxy);
659 
660  return result;
661 }
662 
663 M0_INTERNAL void m0_cm_proxy_pending_cps_wakeup(struct m0_cm *cm)
664 {
665  struct m0_cm_proxy *pxy;
666 
667  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
669  } m0_tl_endfor;
670 }
671 
672 static void px_fail_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
673 {
674  struct m0_cm_proxy *pxy = container_of(ast, struct m0_cm_proxy,
675  px_fail_ast);
676  M0_ENTRY("pxy %p %s failed", pxy, pxy->px_endpoint);
677 
678  m0_cm_proxy_lock(pxy);
679  pxy->px_status = M0_PX_FAILED;
680  pxy->px_is_done = true;
681  if (!proxy_fail_tlink_is_in(pxy))
682  proxy_fail_tlist_add_tail(&pxy->px_cm->cm_failed_proxies, pxy);
684  m0_cm_proxy_unlock(pxy);
685  m0_cm_abort(pxy->px_cm, 0);
686  m0_cm_frozen_ag_cleanup(pxy->px_cm, pxy);
688  M0_LEAVE();
689 }
690 
691 static void px_online_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
692 {
693  struct m0_cm_proxy *pxy = container_of(ast, struct m0_cm_proxy,
694  px_online_ast);
695 
696  /*
697  * Do nothing for now, ongoing sns operation must cleanup and complete
698  * and sns repair/rebalance must be restarted.
699  */
700  M0_LOG(M0_DEBUG, "proxy %s is online", pxy->px_endpoint);
701 }
702 
703 static bool proxy_clink_cb(struct m0_clink *clink)
704 {
705  struct m0_cm_proxy *pxy = M0_AMB(pxy, clink, px_ha_link);
706  struct m0_conf_obj *svc_obj = container_of(clink->cl_chan,
707  struct m0_conf_obj,
708  co_ha_chan);
709  struct m0_sm_ast *ast = NULL;
710 
712 
713  if (M0_IN(svc_obj->co_ha_state, (M0_NC_FAILED, M0_NC_TRANSIENT))) {
715  ast = &pxy->px_fail_ast;
716  } else if (svc_obj->co_ha_state == M0_NC_ONLINE &&
717  pxy->px_status == M0_PX_FAILED) {
719  ast = &pxy->px_online_ast;
720  }
722 
723  return true;
724 }
725 
726 M0_INTERNAL void m0_cm_proxy_event_handle_register(struct m0_cm_proxy *pxy,
727  struct m0_conf_obj *svc_obj)
728 {
730  m0_clink_add_lock(&svc_obj->co_ha_chan, &pxy->px_ha_link);
731 }
732 
733 M0_INTERNAL bool m0_cm_proxy_is_locked(struct m0_cm_proxy *pxy)
734 {
735  return m0_mutex_is_locked(&pxy->px_mutex);
736 }
737 
738 M0_INTERNAL void m0_cm_proxy_lock(struct m0_cm_proxy *pxy)
739 {
740  m0_mutex_lock(&pxy->px_mutex);
741 }
742 
743 M0_INTERNAL void m0_cm_proxy_unlock(struct m0_cm_proxy *pxy)
744 {
745  m0_mutex_unlock(&pxy->px_mutex);
746 }
747 
748 M0_INTERNAL bool m0_cm_proxies_ready(const struct m0_cm *cm)
749 {
750  uint32_t nr_failed_proxies;
751 
753 
754  nr_failed_proxies = proxy_fail_tlist_length(&cm->cm_failed_proxies);
755  return cm->cm_nr_proxy_updated == (cm->cm_proxy_nr - nr_failed_proxies) * 2;
756 }
757 
758 M0_INTERNAL int m0_cm_proxy_in_count_alloc(struct m0_cm_proxy_in_count *pcount,
759  uint32_t nr_proxies)
760 {
761  M0_PRE(nr_proxies > 0);
762 
763  M0_ALLOC_ARR(pcount->p_count, nr_proxies);
764  if (pcount->p_count == NULL)
765  return -ENOMEM;
766  pcount->p_nr = nr_proxies;
767 
768  return 0;
769 }
770 
771 M0_INTERNAL void m0_cm_proxy_in_count_free(struct m0_cm_proxy_in_count *pcount)
772 {
773  M0_PRE(pcount != NULL);
774 
775  m0_free(pcount->p_count);
776  pcount->p_count = NULL;
777  pcount->p_nr = 0;
778 }
779 
780 M0_INTERNAL void m0_cm_proxies_sent_reset(struct m0_cm *cm)
781 {
782  struct m0_cm_proxy *pxy;
783 
784  m0_tl_for(proxy, &cm->cm_proxies, pxy) {
786  } m0_tl_endfor;
787 }
788 
789 #undef M0_TRACE_SUBSYSTEM
790 
792 /*
793  * Local variables:
794  * c-indentation-style: "K&R"
795  * c-basic-offset: 8
796  * tab-width: 8
797  * fill-column: 80
798  * scroll-step: 1
799  * End:
800  */
M0_INTERNAL void m0_cm_ag_id_copy(struct m0_cm_ag_id *dst, const struct m0_cm_ag_id *src)
Definition: ag.c:83
const struct m0_conf_obj_type * m0_conf_obj_type(const struct m0_conf_obj *obj)
Definition: obj.c:363
uint64_t cm_aggr_grps_in_nr
Definition: cm.h:205
static void proxy_sw_onwire_release(struct m0_ref *ref)
Definition: proxy.c:541
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
static const struct m0_bob_type proxy_bob
Definition: proxy.c:71
static struct m0_addb2_philter p
Definition: consumer.c:40
M0_INTERNAL bool m0_cm_proxy_is_done(const struct m0_cm_proxy *pxy)
Definition: proxy.c:615
M0_INTERNAL void m0_cm_frozen_ag_cleanup(struct m0_cm *cm, struct m0_cm_proxy *proxy)
Definition: cm.c:1148
static void px_fail_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: proxy.c:672
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
struct m0_mutex px_mutex
Definition: proxy.h:112
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
const struct m0_rpc_item_ops proxy_sw_onwire_item_ops
Definition: proxy.c:515
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
Definition: sw.h:45
#define NULL
Definition: misc.h:38
#define M0_AG_P(ag)
Definition: ag.h:55
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
#define ID_LOG(prefix, id)
Definition: ag.h:57
struct m0_bitmap cm_proxy_update_map
Definition: cm.h:252
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
M0_INTERNAL void m0_cm_proxy_lock(struct m0_cm_proxy *pxy)
Definition: proxy.c:738
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
bool m0_rpc_item_is_generic_reply_fop(const struct m0_rpc_item *item)
Definition: fom_generic.c:75
bool cm_quiesce
Definition: cm.h:277
const struct m0_cm_ops * cm_ops
Definition: cm.h:188
static void proxy_sw_onwire_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: proxy.c:397
struct m0_cm_ag_id sw_hi
Definition: sw.h:47
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_TL_DEFINE(proxy, M0_INTERNAL, struct m0_cm_proxy)
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
Definition: cp.h:160
M0_LEAVE()
struct m0_cm_sw px_last_sw_onwire_sent
Definition: proxy.h:71
const struct m0_conf_obj_type M0_CONF_SERVICE_TYPE
Definition: service.c:156
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL void m0_cm_proxy_unlock(struct m0_cm_proxy *pxy)
Definition: proxy.c:743
bool cm_done
Definition: cm.h:265
struct m0_tl px_pending_cps
Definition: proxy.h:110
static bool epoch_check(struct m0_cm_proxy *pxy, m0_time_t px_epoch)
Definition: proxy.c:203
M0_INTERNAL bool m0_cm_proxies_ready(const struct m0_cm *cm)
Definition: proxy.c:748
struct m0_cm_ag_id cag_id
Definition: ag.h:72
int32_t ri_error
Definition: item.h:161
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
M0_INTERNAL void m0_cm_ag_in_interval(const struct m0_cm *cm, struct m0_cm_sw *in_interval)
Definition: ag.c:436
Definition: sm.h:504
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL bool m0_cm_proxy_is_updated(struct m0_cm_proxy *proxy, struct m0_cm_sw *in_interval)
Definition: proxy.c:390
int px_update_rc
Definition: proxy.h:98
uint32_t * p_count
Definition: proxy.h:154
int(* cmo_sw_onwire_fop_setup)(struct m0_cm *cm, struct m0_fop *fop, void(*fop_release)(struct m0_ref *), uint64_t proxy_id, const char *local_ep, const struct m0_cm_sw *sw, const struct m0_cm_sw *out_interval)
Definition: cm.h:342
M0_INTERNAL bool m0_cm_ag_id_is_set(const struct m0_cm_ag_id *id)
Definition: ag.c:95
static struct m0_rpc_item * item
Definition: item.c:56
static int px_ready(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:251
const char * bt_name
Definition: bob.h:73
struct m0_rpc_session * px_session
Definition: proxy.h:116
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
static struct m0_cm * cm
Definition: cm.c:63
Definition: sock.c:754
struct m0_tl cm_proxies
Definition: cm.h:246
uint64_t cm_nr_proxy_updated
Definition: cm.h:253
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
#define M0_AG_F
Definition: ag.h:54
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
M0_INTERNAL void m0_cm_complete_notify(struct m0_cm *cm)
Definition: cm.c:1133
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
M0_INTERNAL bool m0_cm_proxy_agid_is_in_sw(struct m0_cm_proxy *pxy, struct m0_cm_ag_id *id)
Definition: proxy.c:650
#define PRIu64
Definition: types.h:58
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
static void proxy_done(struct m0_cm_proxy *proxy)
Definition: proxy.c:217
M0_INTERNAL void m0_cm_proxy_cp_add(struct m0_cm_proxy *pxy, struct m0_cm_cp *cp)
Definition: proxy.c:139
return M0_ERR(-EOPNOTSUPP)
struct m0_clink px_ha_link
Definition: proxy.h:134
M0_INTERNAL void m0_cm_proxies_sent_reset(struct m0_cm *cm)
Definition: proxy.c:780
M0_INTERNAL void m0_cm_proxy_event_handle_register(struct m0_cm_proxy *pxy, struct m0_conf_obj *svc_obj)
Definition: proxy.c:726
M0_INTERNAL void m0_cm_sw_copy(struct m0_cm_sw *dst, const struct m0_cm_sw *src)
Definition: sw.c:67
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
Definition: refs.h:34
enum m0_proxy_state px_status
Definition: proxy.h:91
struct m0_sm_ast px_online_ast
Definition: proxy.h:89
const char * px_endpoint
Definition: proxy.h:118
static int px_active(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:281
static int px_complete(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:295
M0_INTERNAL int m0_cm_ag_id_cmp(const struct m0_cm_ag_id *id0, const struct m0_cm_ag_id *id1)
Definition: ag.c:73
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
#define M0_ASSERT(cond)
static void px_online_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: proxy.c:691
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_TL_DESCR_DEFINE(proxy, "copy machine proxy", M0_INTERNAL, struct m0_cm_proxy, px_linkage, px_magic, CM_PROXY_LINK_MAGIC, CM_PROXY_HEAD_MAGIC)
uint64_t px_nr_updates_posted
Definition: proxy.h:95
struct m0_rpc_conn * px_conn
Definition: proxy.h:114
enum m0_ha_obj_state co_ha_state
Definition: obj.h:241
struct m0_cm_sw px_sw
Definition: proxy.h:68
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
static bool cm_proxy_invariant(const struct m0_cm_proxy *pxy)
Definition: proxy.c:80
struct m0_cm_ag_id sw_lo
Definition: sw.h:46
struct m0_rpc_item * ri_reply
Definition: item.h:163
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
#define M0_POST(cond)
Definition: xcode.h:73
M0_INTERNAL void m0_cm_proxy_del(struct m0_cm *cm, struct m0_cm_proxy *pxy)
Definition: proxy.c:124
struct m0_chan co_ha_chan
Definition: obj.h:248
M0_INTERNAL void m0_bitmap_set(struct m0_bitmap *map, size_t idx, bool val)
Definition: bitmap.c:139
bool px_is_done
Definition: proxy.h:93
M0_INTERNAL bool m0_cm_proxy_is_locked(struct m0_cm_proxy *pxy)
Definition: proxy.c:733
struct m0_cm_aggr_group * c_ag
Definition: cp.h:172
static bool proxy_clink_cb(struct m0_clink *clink)
Definition: proxy.c:703
struct m0_sm_ast * sa_next
Definition: sm.h:509
M0_INTERNAL int m0_cm_proxy_update(struct m0_cm_proxy *pxy, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, uint32_t px_status, m0_time_t px_epoch)
Definition: proxy.c:327
struct m0_rpc_conn conn
Definition: fsync.c:96
static void proxy_sw_onwire_item_replied_cb(struct m0_rpc_item *req_item)
Definition: proxy.c:483
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
struct m0_sm_ast px_fail_ast
Definition: proxy.h:87
static struct m0_clink clink[RDWR_REQUEST_MAX]
uint64_t px_id
Definition: proxy.h:63
M0_INTERNAL uint64_t m0_cm_proxy_nr(struct m0_cm *cm)
Definition: proxy.c:643
struct m0_chan cm_proxy_init_wait
Definition: cm.h:240
struct m0_cm_proxy * pso_proxy
Definition: proxy.h:167
static int px_stop_fail(struct m0_cm_proxy *p, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:305
static void __wake_up_pending_cps(struct m0_cm_proxy *pxy)
Definition: proxy.c:191
bool px_send_final_update
Definition: proxy.h:142
M0_INTERNAL void m0_cm_proxy_in_count_free(struct m0_cm_proxy_in_count *pcount)
Definition: proxy.c:771
uint32_t p_nr
Definition: proxy.h:152
struct m0_sm_group cm_sm_group
Definition: cm.h:185
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
struct m0_cm_sw px_out_interval
Definition: proxy.h:77
uint64_t cm_proxy_nr
Definition: cm.h:250
#define M0_MAGIX_OFFSET(type, field)
Definition: misc.h:356
char * ep
Definition: sw.h:132
#define M0_CNT_INC(cnt)
Definition: arith.h:226
M0_INTERNAL void m0_cm_notify(struct m0_cm *cm)
Definition: cm.c:1081
struct m0_ref f_ref
Definition: fop.h:80
struct m0_net_end_point * ntm_ep
Definition: net.h:868
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
M0_INTERNAL void m0_cm_ag_out_interval(const struct m0_cm *cm, struct m0_cm_sw *out_interval)
Definition: ag.c:454
M0_INTERNAL void m0_cm_proxy_fini(struct m0_cm_proxy *pxy)
Definition: proxy.c:627
struct m0_reqh_service cm_service
Definition: cm.h:191
static void cm_proxy_sw_onwire_post(struct m0_cm_proxy *proxy, struct m0_fop *fop, const struct m0_rpc_conn *conn)
Definition: proxy.c:519
uint64_t cm_proxy_active_nr
Definition: cm.h:254
M0_INTERNAL int m0_cm_proxy_init(struct m0_cm_proxy *proxy, uint64_t px_id, struct m0_cm_ag_id *lo, struct m0_cm_ag_id *hi, const char *endpoint)
Definition: proxy.c:90
M0_INTERNAL int m0_cm_proxy_in_count_alloc(struct m0_cm_proxy_in_count *pcount, uint32_t nr_proxies)
Definition: proxy.c:758
struct m0_rpc_session * ri_session
Definition: item.h:147
Definition: cm.h:166
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:338
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
Definition: bitmap.c:105
M0_BOB_DEFINE(static, &proxy_bob, m0_cm_proxy)
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL struct m0_cm_proxy * m0_cm_proxy_locate(struct m0_cm *cm, const char *addr)
Definition: proxy.c:161
M0_INTERNAL enum m0_cm_state m0_cm_state_get(const struct m0_cm *cm)
Definition: cm.c:565
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static void cm_proxy_cp_del(struct m0_cm_proxy *pxy, struct m0_cm_cp *cp)
Definition: proxy.c:152
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
struct m0_fop pso_fop
Definition: proxy.h:162
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL void m0_cm_proxy_pending_cps_wakeup(struct m0_cm *cm)
Definition: proxy.c:663
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
struct m0_fom c_fom
Definition: cp.h:161
uint32_t swo_cm_status
Definition: sw.h:75
static void _sw_update(struct m0_cm_proxy *pxy, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, uint32_t px_status)
Definition: proxy.c:236
M0_INTERNAL void m0_cm_abort(struct m0_cm *cm, int rc)
Definition: cm.c:1181
M0_INTERNAL bool m0_cm_is_locked(const struct m0_cm *cm)
Definition: cm.c:560
static int scan(struct scanner *s)
Definition: beck.c:963
struct m0_tl cm_failed_proxies
Definition: cm.h:248
struct m0_reqh * rs_reqh
Definition: reqh_service.h:259
struct m0_sm_ast px_sw_onwire_ast
Definition: proxy.h:85
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
m0_time_t px_epoch
Definition: proxy.h:65
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL bool m0_cm_sw_cmp(const struct m0_cm_sw *sw0, const struct m0_cm_sw *sw1)
Definition: sw.c:51
M0_INTERNAL int m0_cm_proxy_remote_update(struct m0_cm_proxy *proxy, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval)
Definition: proxy.c:553
uint32_t px_updates_pending
Definition: proxy.h:100
M0_INTERNAL void m0_cm_proxy_add(struct m0_cm *cm, struct m0_cm_proxy *pxy)
Definition: proxy.c:110
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_net_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: ep.c:56
bool cm_abort
Definition: cm.h:282
static void hi(void)
Definition: nucleus.c:93
Definition: ag.h:49
Definition: fop.h:79
Definition: trace.h:478
struct m0_cm_ag_id cm_sw_last_updated_hi
Definition: cm.h:220
struct m0_fop * rep_fop
Definition: dir.c:334
struct m0_cm * px_cm
Definition: proxy.h:103
m0_time_t ri_deadline
Definition: item.h:141
static int(* px_action[])(struct m0_cm_proxy *px, struct m0_cm_sw *in_interval, struct m0_cm_sw *out_interval, m0_time_t px_epoch, uint32_t px_status)
Definition: proxy.c:317