Motr  M0
net.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_SNSCM
24 #include "lib/trace.h"
25 
26 #include "lib/memory.h"
27 
28 #include "cm/proxy.h"
29 #include "sns/cm/cm.h"
30 #include "sns/cm/cp.h"
31 #include "sns/cm/sns_cp_onwire.h"
32 #include "sns/cm/cm_utils.h"
33 #include "sns/cm/file.h"
34 
35 #include "fop/fop.h"
36 #include "fop/fom.h"
37 #include "net/net.h"
38 #include "rpc/item.h"
39 #include "rpc/rpclib.h"
40 #include "rpc/session.h"
41 #include "rpc/conn.h"
42 #include "rpc/rpc_machine.h" /* m0_rpc_machine */
43 #include "ioservice/fid_convert.h" /* m0_fid_convert_stob2cob */
44 
50 static void cp_reply_received(struct m0_rpc_item *item);
51 
52 /*
53  * Over-ridden rpc item ops, required to send notification to the copy packet
54  * send phase that reply has been received and the copy packet can be finalised.
55  */
56 static const struct m0_rpc_item_ops cp_item_ops = {
58 };
59 
60 /* Creates indexvec structure based on number of segments and segment size. */
61 static int indexvec_prepare(struct m0_io_indexvec *iv, m0_bindex_t idx,
62  uint32_t seg_nr, size_t seg_size)
63 {
64  int i;
65 
66  M0_PRE(iv != NULL);
67 
69  if (iv->ci_iosegs == NULL) {
70  m0_free(iv);
71  return M0_ERR(-ENOMEM);
72  }
73 
74  iv->ci_nr = seg_nr;
75  for (i = 0; i < seg_nr; ++i) {
76  iv->ci_iosegs[i].ci_index = idx;
78  idx += seg_size;
79  }
80  return 0;
81 }
82 
83 /* Converts in-memory copy packet structure to onwire copy packet structure. */
84 static int snscp_to_snscpx(const struct m0_sns_cm_cp *sns_cp,
85  struct m0_sns_cpx *sns_cpx)
86 {
87  struct m0_net_buffer *nbuf;
88  const struct m0_cm_cp *cp;
89  uint32_t nbuf_seg_nr;
90  uint32_t tmp_seg_nr;
91  uint32_t nb_idx = 0;
92  uint32_t nb_cnt;
93  uint64_t offset;
94  int rc;
95  int i;
96 
97  M0_PRE(sns_cp != NULL);
98  M0_PRE(sns_cpx != NULL);
99 
100  cp = &sns_cp->sc_base;
101 
102  sns_cpx->scx_stob_id = sns_cp->sc_stob_id;
103  sns_cpx->scx_failed_idx = sns_cp->sc_failed_idx;
104  sns_cpx->scx_cp.cpx_prio = cp->c_prio;
105  sns_cpx->scx_phase = M0_CCP_SEND;
106  m0_cm_ag_id_copy(&sns_cpx->scx_cp.cpx_ag_id, &cp->c_ag->cag_id);
107  sns_cpx->scx_cp.cpx_ag_cp_idx = cp->c_ag_cp_idx;
109  cp->c_ag->cag_cp_global_nr);
111  sns_cpx->scx_cp.cpx_epoch = cp->c_epoch;
112 
113  offset = sns_cp->sc_index;
114  nb_cnt = cp->c_buf_nr;
115  M0_ALLOC_ARR(sns_cpx->scx_ivecs.cis_ivecs, nb_cnt);
116  if (sns_cpx->scx_ivecs.cis_ivecs == NULL) {
117  rc = M0_ERR(-ENOMEM);
118  goto out;
119  }
120 
121  tmp_seg_nr = cp->c_data_seg_nr;
122  m0_tl_for(cp_data_buf, &cp->c_buffers, nbuf) {
123  nbuf_seg_nr = min32(nbuf->nb_pool->nbp_seg_nr, tmp_seg_nr);
124  tmp_seg_nr -= nbuf_seg_nr;
125  rc = indexvec_prepare(&sns_cpx->scx_ivecs.
126  cis_ivecs[nb_idx],
127  offset,
128  nbuf_seg_nr,
129  nbuf->nb_pool->nbp_seg_size);
130  if (rc != 0 )
131  goto cleanup;
132 
133  offset += nbuf_seg_nr * nbuf->nb_pool->nbp_seg_size;
134  M0_CNT_INC(nb_idx);
135  } m0_tl_endfor;
136  sns_cpx->scx_ivecs.cis_nr = nb_idx;
137  sns_cpx->scx_cp.cpx_desc.id_nr = nb_idx;
138 
140  sns_cpx->scx_cp.cpx_desc.id_nr);
141  if (sns_cpx->scx_cp.cpx_desc.id_descs == NULL) {
142  rc = M0_ERR(-ENOMEM);
143  goto cleanup;
144  }
145 
146  goto out;
147 
148 cleanup:
149  for (i = 0; i < nb_idx; ++i)
150  m0_free(&sns_cpx->scx_ivecs.cis_ivecs[nb_idx]);
151  m0_free(sns_cpx->scx_ivecs.cis_ivecs);
153 out:
154  return M0_RC(rc);
155 }
156 
157 static void cp_reply_received(struct m0_rpc_item *req_item)
158 {
159  struct m0_fop *req_fop;
160  struct m0_sns_cm_cp *scp;
161  struct m0_rpc_item *rep_item;
162  struct m0_cm_cp_fop *cp_fop;
163  struct m0_fop *rep_fop;
164  struct m0_sns_cpx_reply *sns_cpx_rep;
165 
166  req_fop = m0_rpc_item_to_fop(req_item);
167  cp_fop = container_of(req_fop, struct m0_cm_cp_fop, cf_fop);
168  scp = cp2snscp(cp_fop->cf_cp);
169  if (req_item->ri_error == 0) {
170  rep_item = req_item->ri_reply;
171  if (m0_rpc_item_is_generic_reply_fop(rep_item))
173  else {
174  rep_fop = m0_rpc_item_to_fop(rep_item);
175  sns_cpx_rep = m0_fop_data(rep_fop);
176  scp->sc_base.c_rc = sns_cpx_rep->scr_cp_rep.cr_rc;
177  }
178  } else
179  scp->sc_base.c_rc = req_item->ri_error;
180 
182 }
183 
184 static void cp_fop_release(struct m0_ref *ref)
185 {
186  struct m0_cm_cp_fop *cp_fop;
187  struct m0_fop *fop = container_of(ref, struct m0_fop, f_ref);
188 
189  cp_fop = container_of(fop, struct m0_cm_cp_fop, cf_fop);
190  M0_ASSERT(cp_fop != NULL);
191  m0_fop_fini(fop);
192  m0_free(cp_fop);
193 }
194 
195 M0_INTERNAL int m0_sns_cm_cp_send(struct m0_cm_cp *cp, struct m0_fop_type *ft)
196 {
197  struct m0_sns_cm_cp *sns_cp;
198  struct m0_sns_cpx *sns_cpx;
199  struct m0_rpc_bulk *rbulk = NULL;
200  struct m0_rpc_bulk_buf *rbuf;
201  struct m0_net_domain *ndom;
202  struct m0_net_buffer *nbuf;
203  struct m0_rpc_session *session;
204  struct m0_cm_cp_fop *cp_fop;
205  struct m0_fop *fop;
206  struct m0_rpc_item *item;
207  uint32_t nbuf_seg_nr;
208  uint32_t tmp_seg_nr;
209  uint64_t offset;
210  int rc;
211  int i;
212 
213  M0_PRE(cp != NULL && m0_fom_phase(&cp->c_fom) == M0_CCP_SEND);
214  M0_PRE(cp->c_cm_proxy != NULL);
215 
216  M0_ALLOC_PTR(cp_fop);
217  if (cp_fop == NULL) {
218  rc = M0_ERR(-ENOMEM);
219  goto out;
220  }
221  sns_cp = cp2snscp(cp);
222  fop = &cp_fop->cf_fop;
225  if (rc != 0) {
226  m0_fop_fini(fop);
227  m0_free(cp_fop);
228  goto out;
229  }
230 
231  sns_cpx = m0_fop_data(fop);
232  M0_PRE(sns_cpx != NULL);
233  cp_fop->cf_cp = cp;
234  rc = snscp_to_snscpx(sns_cp, sns_cpx);
235  if (rc != 0)
236  goto out;
237 
238  rbulk = &cp->c_bulk;
243 
244  offset = sns_cp->sc_index;
245  tmp_seg_nr = cp->c_data_seg_nr;
246  m0_tl_for(cp_data_buf, &cp->c_buffers, nbuf) {
247  nbuf_seg_nr = min32(nbuf->nb_pool->nbp_seg_nr, tmp_seg_nr);
248  tmp_seg_nr -= nbuf_seg_nr;
249  rc = m0_rpc_bulk_buf_add(rbulk, nbuf_seg_nr, 0,
250  ndom, NULL, &rbuf);
251  if (rc != 0 || rbuf == NULL)
252  goto out;
253 
254  for (i = 0; i < nbuf_seg_nr; ++i) {
256  nbuf->nb_buffer.ov_buf[i],
257  nbuf->nb_buffer.ov_vec.v_count[i],
258  offset, ndom);
259  offset += nbuf->nb_buffer.ov_vec.v_count[i];
260  if (rc != 0)
261  goto out;
262  }
263  } m0_tl_endfor;
264 
265  m0_mutex_lock(&rbulk->rb_mutex);
267  m0_mutex_unlock(&rbulk->rb_mutex);
268 
270  sns_cpx->scx_cp.cpx_desc.id_descs,
272  if (rc != 0)
273  goto out;
274 
276  item->ri_ops = &cp_item_ops;
279  item->ri_deadline = 0;
280 
281  m0_rpc_post(item);
283 out:
284  if (rc != 0) {
285  M0_LOG(M0_ERROR, "rc=%d", rc);
287  return M0_FSO_AGAIN;
288  }
289 
291  return M0_FSO_WAIT;
292 }
293 
294 M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN);
295 M0_TL_DECLARE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf);
296 
297 M0_INTERNAL int m0_sns_cm_cp_send_wait(struct m0_cm_cp *cp)
298 {
299  struct m0_rpc_bulk *rbulk = &cp->c_bulk;
300  int c_rc;
301 
302  M0_PRE(cp != NULL);
303 
304  c_rc = cp->c_rc;
305  if (c_rc != 0 && c_rc != -ENOENT) {
306  M0_LOG(M0_ERROR, "rc=%d", c_rc);
307  /* Cleanup rpc bulk in m0_cm_cp_only_fini().*/
309  return M0_FSO_AGAIN;
310  }
311 
312  /*
313  * Wait on channel till all net buffers are deleted from
314  * transfer machine.
315  */
316  if (c_rc == 0) {
317  m0_mutex_lock(&rbulk->rb_mutex);
318  if (!rpcbulk_tlist_is_empty(&rbulk->rb_buflist)) {
319  m0_fom_wait_on(&cp->c_fom, &rbulk->rb_chan, &cp->c_fom.fo_cb);
320  m0_mutex_unlock(&rbulk->rb_mutex);
321  return M0_FSO_WAIT;
322  }
323  m0_mutex_unlock(&rbulk->rb_mutex);
324  }
325 
326  return cp->c_ops->co_phase_next(cp);
327 }
328 
329 static void cp_buf_acquire(struct m0_cm_cp *cp)
330 {
331  struct m0_sns_cm *sns_cm = cm2sns(cp->c_ag->cag_cm);
332  int rc;
333 
334  rc = m0_sns_cm_buf_attach(&sns_cm->sc_ibp.sb_bp, cp);
335  M0_ASSERT(rc == 0);
336 }
337 
338 static void cp_reply_post(struct m0_cm_cp *cp)
339 {
340  struct m0_fop *r_fop = cp->c_fom.fo_rep_fop;
341  struct m0_sns_cpx_reply *sns_cpx_rep;
342 
343  sns_cpx_rep = m0_fop_data(r_fop);
344  sns_cpx_rep->scr_cp_rep.cr_rc = cp->c_rc;
345  m0_rpc_reply_post(&cp->c_fom.fo_fop->f_item, &r_fop->f_item);
346 }
347 
348 static uint32_t seg_nr_get(const struct m0_sns_cpx *sns_cpx, uint32_t ivec_nr)
349 {
350  int i;
351  uint32_t seg_nr = 0;
352 
353  M0_PRE(sns_cpx != NULL);
354 
355  for (i = 0; i < ivec_nr; ++i)
356  seg_nr += sns_cpx->scx_ivecs.cis_ivecs[i].ci_nr;
357 
358  return seg_nr;
359 }
360 
362  struct m0_sns_cm_cp *scp)
363 {
364  struct m0_sns_cm_ag *sag = ag2snsag(ag);
365  struct m0_cm_proxy *proxy = scp->sc_base.c_cm_proxy;
366  struct m0_cm_proxy_in_count *pcount;
367 
369 
370  pcount = &sag->sag_proxy_in_count;
371  if (ag->cag_is_frozen && pcount->p_count[proxy->px_id] == 0)
372  return -ENOENT;
373  M0_CNT_DEC(pcount->p_count[proxy->px_id]);
375 
376  return 0;
377 }
378 
379 /* Converts onwire copy packet structure to in-memory copy packet structure. */
380 static int snscpx_to_snscp(const struct m0_sns_cpx *sns_cpx,
381  struct m0_sns_cm_cp *sns_cp)
382 {
383  struct m0_cm_ag_id ag_id;
384  struct m0_cm *cm;
385  struct m0_cm_aggr_group *ag;
386  int rc;
387 
388  M0_PRE(sns_cp != NULL);
389  M0_PRE(sns_cpx != NULL);
390 
391  sns_cp->sc_stob_id = sns_cpx->scx_stob_id;
392  m0_fid_convert_stob2cob(&sns_cpx->scx_stob_id, &sns_cp->sc_cobfid);
393  sns_cp->sc_failed_idx = sns_cpx->scx_failed_idx;
394 
395  sns_cp->sc_index =
396  sns_cpx->scx_ivecs.cis_ivecs[0].ci_iosegs[0].ci_index;
397 
398  sns_cp->sc_base.c_prio = sns_cpx->scx_cp.cpx_prio;
399  sns_cp->sc_base.c_epoch = sns_cpx->scx_cp.cpx_epoch;
400 
401  m0_cm_ag_id_copy(&ag_id, &sns_cpx->scx_cp.cpx_ag_id);
402 
403  cm = cpfom2cm(&sns_cp->sc_base.c_fom);
404  m0_cm_lock(cm);
405  ag = m0_cm_aggr_group_locate(cm, &ag_id, true);
406  m0_cm_unlock(cm);
407  if (ag == NULL) {
408  M0_LOG(M0_WARN, "ag="M0_AG_F" not found", M0_AG_P(&ag_id));
409  return -ENOENT;
410  }
411 
412  m0_cm_ag_lock(ag);
413  rc = ag_cp_recvd_from_proxy(ag, sns_cp);
414  m0_cm_ag_unlock(ag);
415 
416  if (rc != 0)
417  return M0_ERR(rc);
418 
419  sns_cp->sc_base.c_ag_cp_idx = sns_cpx->scx_cp.cpx_ag_cp_idx;
421  ag->cag_cp_global_nr);
422  m0_bitmap_load(&sns_cpx->scx_cp.cpx_bm,
423  &sns_cp->sc_base.c_xform_cp_indices);
424 
425  sns_cp->sc_base.c_buf_nr = 0;
426  sns_cp->sc_base.c_data_seg_nr = seg_nr_get(sns_cpx,
427  sns_cpx->scx_ivecs.cis_nr);
428 
429  return 0;
430 }
431 
432 M0_INTERNAL int m0_sns_cm_cp_recv_init(struct m0_cm_cp *cp)
433 {
434  struct m0_rpc_bulk_buf *rbuf;
435  struct m0_net_domain *ndom;
436  struct m0_net_buffer *nbuf;
437  struct m0_rpc_bulk *rbulk;
438  struct m0_rpc_session *session;
439  struct m0_cm_proxy *cm_proxy;
440  struct m0_fop *fop = cp->c_fom.fo_fop;
441  struct m0_sns_cpx *sns_cpx;
442  struct m0_sns_cm_cp *sns_cp = cp2snscp(cp);
443  uint32_t nbuf_idx = 0;
444  int rc;
445 
446  M0_PRE(cp != NULL && m0_fom_phase(&cp->c_fom) == M0_CCP_RECV_INIT);
447 
448  cm_proxy = cp->c_cm_proxy;
449  sns_cpx = m0_fop_data(fop);
450  M0_PRE(sns_cpx != NULL);
451  if (sns_cpx->scx_cp.cpx_epoch != cm_proxy->px_epoch) {
452  M0_LOG(M0_WARN, "delayed/stale cp epoch:%llx "
453  "proxy epoch=%llx (%s)",
454  (unsigned long long)cp->c_epoch,
455  (unsigned long long)cm_proxy->px_epoch,
456  cm_proxy->px_endpoint);
457  cp->c_rc = -EPERM;
458  cp_reply_post(cp);
460  return M0_FSO_WAIT;
461  }
462 
463  rc = snscpx_to_snscp(sns_cpx, sns_cp);
464  if (rc != 0) {
465  cp->c_rc = rc;
466  cp_reply_post(cp);
468  return M0_FSO_WAIT;
469  }
470 
471  cp_buf_acquire(cp);
474  rbulk = &cp->c_bulk;
475 
476  m0_tl_for(cp_data_buf, &cp->c_buffers, nbuf) {
477  nbuf->nb_buffer.ov_vec.v_nr =
478  sns_cpx->scx_ivecs.cis_ivecs[nbuf_idx].ci_nr;
479  rc = m0_rpc_bulk_buf_add(rbulk,
480  sns_cpx->scx_ivecs.cis_ivecs[nbuf_idx].ci_nr,
481  0, ndom, nbuf, &rbuf);
482  if (rc != 0 || rbuf == NULL)
483  goto out;
484  M0_CNT_INC(nbuf_idx);
485  } m0_tl_endfor;
486 
487  m0_mutex_lock(&rbulk->rb_mutex);
489  m0_fom_wait_on(&cp->c_fom, &rbulk->rb_chan, &cp->c_fom.fo_cb);
490  m0_mutex_unlock(&rbulk->rb_mutex);
491 
492  rc = m0_rpc_bulk_load(rbulk, session->s_conn,
493  sns_cpx->scx_cp.cpx_desc.id_descs,
495  if (rc != 0) {
496  m0_mutex_lock(&rbulk->rb_mutex);
498  m0_mutex_unlock(&rbulk->rb_mutex);
500  }
501 
502 out:
503  if (rc != 0) {
504  M0_LOG(M0_ERROR, "recv init failure: %d", rc);
505  cp->c_rc = rc;
507  return M0_FSO_AGAIN;
508  }
509  return cp->c_ops->co_phase_next(cp);
510 }
511 
512 M0_INTERNAL int m0_sns_cm_cp_recv_wait(struct m0_cm_cp *cp)
513 {
514  struct m0_rpc_bulk *rbulk;
515  int rc;
516 
517  M0_PRE(cp != NULL && m0_fom_phase(&cp->c_fom) == M0_CCP_RECV_WAIT);
518 
519  rc = cp->c_rc;
520  if (rc == 0) {
521  rbulk = &cp->c_bulk;
522  m0_mutex_lock(&rbulk->rb_mutex);
523  rc = rbulk->rb_rc;
524  m0_mutex_unlock(&rbulk->rb_mutex);
525  if (rc != 0 && rc != -ENODATA) {
526  M0_LOG(M0_ERROR, "Bulk recv failed with rc=%d", rc);
527  cp->c_rc = rc;
528  }
529  }
530 
531  cp_reply_post(cp);
532 
533  if (rc != 0) {
534  M0_LOG(M0_ERROR, "recv wait failure: %d", rc);
536  return M0_FSO_AGAIN;
537  }
538  return cp->c_ops->co_phase_next(cp);
539 }
540 
541 M0_INTERNAL int m0_sns_cm_cp_sw_check(struct m0_cm_cp *cp)
542 {
543  struct m0_sns_cm_cp *scp = cp2snscp(cp);
544  struct m0_fid cob_fid;
545  struct m0_cm *cm = cpfom2cm(&cp->c_fom);
546  struct m0_cm_proxy *cm_proxy;
547  struct m0_conf_obj *svc;
548  enum m0_cm_state cm_state;
549  const char *remote_rep;
550  int rc;
551  struct m0_pool_version *pv;
552 
553  M0_PRE(cp != NULL && m0_fom_phase(&cp->c_fom) == M0_CCP_SW_CHECK);
554 
557  if (cp->c_cm_proxy == NULL) {
558  remote_rep = m0_sns_cm_tgt_ep(cm, pv, &cob_fid, &svc);
559  m0_cm_lock(cm);
560  cm_proxy = m0_cm_proxy_locate(cm, remote_rep);
561  M0_ASSERT(cm_proxy != NULL);
562  cp->c_cm_proxy = cm_proxy;
563  m0_cm_unlock(cm);
565  } else
566  cm_proxy = cp->c_cm_proxy;
567 
568  m0_cm_lock(cm);
569  cm_state = m0_cm_state_get(cm);
570  m0_cm_unlock(cm);
571  /* abort in case of cm failure */
572  if (cm_state == M0_CMS_FAIL) {
574  return M0_FSO_WAIT;
575  }
576  m0_cm_proxy_lock(cm_proxy);
577  if (m0_cm_ag_id_cmp(&cp->c_ag->cag_id, &cm_proxy->px_sw.sw_lo) >= 0 &&
578  m0_cm_ag_id_cmp(&cp->c_ag->cag_id, &cm_proxy->px_sw.sw_hi) <= 0) {
579  rc = cp->c_ops->co_phase_next(cp);
580  } else {
581  /*
582  * If remote replica has already stopped due to some reason,
583  * all the pending copy packets addressed to that copy machine
584  * must be finalised.
585  */
586  if (M0_IN(cm_proxy->px_status, (M0_PX_COMPLETE, M0_PX_STOP, M0_PX_FAILED)) ||
587  m0_cm_ag_id_cmp(&cp->c_ag->cag_id, &cm_proxy->px_sw.sw_lo) < 0) {
588  m0_fom_phase_move(&cp->c_fom, -ENOENT, M0_CCP_FAIL);
589  rc = M0_FSO_AGAIN;
590  } else {
591  m0_cm_proxy_cp_add(cm_proxy, cp);
592  rc = M0_FSO_WAIT;
593  }
594  }
595  m0_cm_proxy_unlock(cm_proxy);
596 
597  return M0_RC(rc);
598 }
599 
601 #undef M0_TRACE_SUBSYSTEM
602 
603 /*
604  * Local variables:
605  * c-indentation-style: "K&R"
606  * c-basic-offset: 8
607  * tab-width: 8
608  * fill-column: 80
609  * scroll-step: 1
610  * End:
611  */
static m0_bcount_t seg_size
Definition: net.c:118
M0_INTERNAL void m0_cm_ag_id_copy(struct m0_cm_ag_id *dst, const struct m0_cm_ag_id *src)
Definition: ag.c:83
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
Definition: cm.h:205
uint64_t ci_count
Definition: vec.h:627
M0_INTERNAL void m0_cm_ag_unlock(struct m0_cm_aggr_group *ag)
Definition: ag.c:63
M0_INTERNAL void m0_cm_lock(struct m0_cm *cm)
Definition: cm.c:545
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL int m0_sns_cm_cp_send(struct m0_cm_cp *cp, struct m0_fop_type *ft)
Definition: net.c:195
struct m0_mutex px_mutex
Definition: proxy.h:112
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
Definition: bitmap.c:86
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
struct m0_net_buffer_pool * nb_pool
Definition: net.h:1508
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
static uint32_t seg_nr
Definition: net.c:119
struct m0_fop * fo_fop
Definition: fom.h:490
uint32_t cpx_prio
Definition: cp_onwire.h:38
#define NULL
Definition: misc.h:38
#define M0_AG_P(ag)
Definition: ag.h:55
M0_INTERNAL int m0_sns_cm_cp_send_wait(struct m0_cm_cp *cp)
Definition: net.c:297
M0_INTERNAL const char * m0_sns_cm_tgt_ep(const struct m0_cm *cm, const struct m0_pool_version *pv, const struct m0_fid *cob_fid, struct m0_conf_obj **hostage)
Definition: cm_utils.c:434
struct m0_stob_id scx_stob_id
Definition: sns_cp_onwire.h:47
struct m0_bitmap c_xform_cp_indices
Definition: cp.h:181
M0_INTERNAL int m0_rpc_bulk_store(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *to_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:520
struct m0_stob_id sc_stob_id
Definition: cp.h:45
M0_INTERNAL void m0_cm_proxy_lock(struct m0_cm_proxy *pxy)
Definition: proxy.c:738
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL struct m0_sns_cm * cm2sns(struct m0_cm *cm)
Definition: cm.c:389
static void cp_fop_release(struct m0_ref *ref)
Definition: net.c:184
bool m0_rpc_item_is_generic_reply_fop(const struct m0_rpc_item *item)
Definition: fom_generic.c:75
struct m0_io_indexvec * cis_ivecs
Definition: vec.h:647
m0_bcount_t nbp_seg_size
Definition: buffer_pool.h:255
struct m0_cm_ag_id sw_hi
Definition: sw.h:47
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
struct m0_pool_version * pv
Definition: dir.c:629
M0_INTERNAL int m0_rpc_bulk_buf_databuf_add(struct m0_rpc_bulk_buf *rbuf, void *buf, m0_bcount_t count, m0_bindex_t index, struct m0_net_domain *netdom)
Definition: bulk.c:331
#define M0_LOG(level,...)
Definition: trace.h:167
Definition: cp.h:160
uint32_t cis_nr
Definition: vec.h:646
M0_INTERNAL int m0_sns_cm_cp_recv_wait(struct m0_cm_cp *cp)
Definition: net.c:512
M0_INTERNAL void m0_cm_proxy_unlock(struct m0_cm_proxy *pxy)
Definition: proxy.c:743
struct m0_vec ov_vec
Definition: vec.h:147
struct m0_chan rb_chan
Definition: bulk.h:258
struct m0_cm_ag_id cag_id
Definition: ag.h:72
struct m0_net_domain * ntm_dom
Definition: net.h:853
int32_t ri_error
Definition: item.h:161
struct m0_net_buf_desc_data * id_descs
Definition: io_fops.h:313
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
uint64_t m0_bindex_t
Definition: types.h:80
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
uint64_t c_ag_cp_idx
Definition: cp.h:175
struct m0_sns_cm_buf_pool sc_ibp
Definition: cm.h:227
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
uint32_t ci_nr
Definition: vec.h:635
M0_INTERNAL int m0_sns_cm_buf_attach(struct m0_net_buffer_pool *bp, struct m0_cm_cp *cp)
Definition: cm.c:810
uint32_t * p_count
Definition: proxy.h:154
static struct m0_rpc_item * item
Definition: item.c:56
void ** ov_buf
Definition: vec.h:149
m0_fom_phase
Definition: fom.h:372
struct m0_rpc_session * px_session
Definition: proxy.h:116
static void cp_reply_received(struct m0_rpc_item *item)
Definition: net.c:157
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:279
M0_TL_DECLARE(rpcbulk, M0_INTERNAL, struct m0_rpc_bulk_buf)
int32_t cr_rc
Definition: cp_onwire.h:64
uint64_t cag_cp_global_nr
Definition: ag.h:86
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
static struct m0_cm * cm
Definition: cm.c:63
M0_INTERNAL void m0_cm_unlock(struct m0_cm *cm)
Definition: cm.c:550
M0_INTERNAL void m0_bitmap_store(const struct m0_bitmap *im_map, struct m0_bitmap_onwire *ow_map)
Definition: bitmap.c:200
uint64_t ci_index
Definition: vec.h:626
bool cag_is_frozen
Definition: ag.h:106
struct m0_fid sc_cobfid
Definition: cp.h:42
m0_time_t cpx_epoch
Definition: cp_onwire.h:59
#define M0_AG_F
Definition: ag.h:54
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
static struct m0_sns_cm_ag * sag
Definition: cm.c:66
M0_INTERNAL int m0_rpc_bulk_load(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *from_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:530
int i
Definition: dir.c:1033
static struct m0_cm_ag_id ag_id
Definition: net.c:121
struct m0_cm_cp sc_base
Definition: cp.h:39
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN)
M0_INTERNAL void m0_cm_proxy_cp_add(struct m0_cm_proxy *pxy, struct m0_cm_cp *cp)
Definition: proxy.c:139
M0_INTERNAL void m0_bitmap_onwire_fini(struct m0_bitmap_onwire *ow_map)
Definition: bitmap.c:192
m0_cm_state
Definition: cm.h:125
return M0_ERR(-EOPNOTSUPP)
struct m0_io_descs cpx_desc
Definition: cp_onwire.h:53
M0_INTERNAL int m0_sns_cm_cp_recv_init(struct m0_cm_cp *cp)
Definition: net.c:432
static int ag_cp_recvd_from_proxy(struct m0_cm_aggr_group *ag, struct m0_sns_cm_cp *scp)
Definition: net.c:361
M0_INTERNAL bool m0_cm_ag_is_locked(struct m0_cm_aggr_group *ag)
Definition: ag.c:68
M0_INTERNAL void m0_cm_ag_lock(struct m0_cm_aggr_group *ag)
Definition: ag.c:58
struct m0_cm_proxy_in_count sag_proxy_in_count
Definition: ag.h:57
struct m0_rpc_bulk c_bulk
Definition: cp.h:193
Definition: refs.h:34
enum m0_proxy_state px_status
Definition: proxy.h:91
static struct m0_rpc_session session
Definition: net.c:113
const char * px_endpoint
Definition: proxy.h:118
M0_INTERNAL int m0_cm_ag_id_cmp(const struct m0_cm_ag_id *id0, const struct m0_cm_ag_id *id1)
Definition: ag.c:73
M0_INTERNAL int m0_bitmap_onwire_init(struct m0_bitmap_onwire *ow_map, size_t nr)
Definition: bitmap.c:182
struct m0_net_transfer_mc rm_tm
Definition: rpc_machine.h:88
int32_t rb_rc
Definition: bulk.h:266
#define M0_ASSERT(cond)
M0_INTERNAL struct m0_pool_version * m0_sns_cm_pool_version_get(struct m0_sns_cm_file_ctx *fctx)
Definition: file.c:686
int c_rc
Definition: cp.h:218
static const struct m0_rpc_item_ops cp_item_ops
Definition: net.c:56
uint32_t c_data_seg_nr
Definition: cp.h:190
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
struct m0_cm * cag_cm
Definition: ag.h:70
m0_bindex_t sc_index
Definition: cp.h:60
struct m0_cm_sw px_sw
Definition: proxy.h:68
const struct m0_cm_cp_ops * c_ops
Definition: cp.h:169
static struct m0_fid cob_fid
Definition: net.c:116
M0_INTERNAL struct m0_cm_aggr_group * m0_cm_aggr_group_locate(struct m0_cm *cm, const struct m0_cm_ag_id *id, bool has_incoming)
Definition: ag.c:262
struct m0_cm_ag_id sw_lo
Definition: sw.h:46
struct m0_rpc_item * ri_reply
Definition: item.h:163
M0_INTERNAL struct m0_cm * cpfom2cm(struct m0_fom *fom)
Definition: cp.c:72
struct m0_cpx scx_cp
Definition: sns_cp_onwire.h:39
uint32_t v_nr
Definition: vec.h:51
struct m0_cm_aggr_group * c_ag
Definition: cp.h:172
static m0_bindex_t offset
Definition: dump.c:173
M0_INTERNAL int m0_fop_data_alloc(struct m0_fop *fop)
Definition: fop.c:71
uint64_t sc_failed_idx
Definition: cp.h:55
void m0_rpc_reply_post(struct m0_rpc_item *request, struct m0_rpc_item *reply)
Definition: rpc.c:135
m0_bcount_t * v_count
Definition: vec.h:53
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
static int snscpx_to_snscp(const struct m0_sns_cpx *sns_cpx, struct m0_sns_cm_cp *sns_cp)
Definition: net.c:380
M0_INTERNAL struct m0_sns_cm_cp * cp2snscp(const struct m0_cm_cp *cp)
Definition: cp.c:57
uint64_t px_id
Definition: proxy.h:63
m0_time_t c_epoch
Definition: cp.h:166
static int indexvec_prepare(struct m0_io_indexvec *iv, m0_bindex_t idx, uint32_t seg_nr, size_t seg_size)
Definition: net.c:61
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
uint64_t scx_failed_idx
Definition: sns_cp_onwire.h:49
struct m0_sns_cm_file_ctx * sag_fctx
Definition: ag.h:48
uint32_t c_buf_nr
Definition: cp.h:187
struct m0_cm_proxy * c_cm_proxy
Definition: cp.h:208
struct m0_net_buffer_pool sb_bp
Definition: cm.h:135
#define M0_CNT_INC(cnt)
Definition: arith.h:226
struct m0_cm_cp * cf_cp
Definition: cp.h:156
struct m0_ref f_ref
Definition: fop.h:80
Definition: fid.h:38
struct m0_bitmap_onwire cpx_bm
Definition: cp_onwire.h:50
static struct m0_sns_cm_cp scp
Definition: cm.c:65
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
uint64_t cpx_ag_cp_idx
Definition: cp_onwire.h:47
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
static void cp_reply_post(struct m0_cm_cp *cp)
Definition: net.c:338
uint32_t id_nr
Definition: io_fops.h:312
struct m0_rpc_session * ri_session
Definition: item.h:147
static struct m0_net_test_service svc
Definition: service.c:34
Definition: cm.h:166
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:338
struct m0_io_indexvec * cis_ivecs
Definition: vec.h:619
M0_INTERNAL struct m0_cm_proxy * m0_cm_proxy_locate(struct m0_cm *cm, const char *addr)
Definition: proxy.c:161
M0_INTERNAL enum m0_cm_state m0_cm_state_get(const struct m0_cm *cm)
Definition: cm.c:565
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
int(* co_phase_next)(struct m0_cm_cp *cp)
Definition: cp.h:232
M0_INTERNAL void m0_fom_callback_cancel(struct m0_fom_callback *cb)
Definition: fom.c:1514
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
static int snscp_to_snscpx(const struct m0_sns_cm_cp *sns_cp, struct m0_sns_cpx *sns_cpx)
Definition: net.c:84
struct m0_tl c_buffers
Definition: cp.h:184
M0_INTERNAL void m0_confc_close(struct m0_conf_obj *obj)
Definition: confc.c:921
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
M0_INTERNAL void m0_rpc_bulk_qtype(struct m0_rpc_bulk *rbulk, enum m0_net_queue_type q)
Definition: bulk.c:372
struct m0_fop * fo_rep_fop
Definition: fom.h:492
struct m0_tl rb_buflist
Definition: bulk.h:256
struct m0_io_indexvec_seq scx_ivecs
Definition: sns_cp_onwire.h:44
static struct m0_fop_type * ft[]
Definition: service_ut.c:856
struct m0_fop cf_fop
Definition: cp.h:155
int32_t c_rc
Definition: io_fops.h:280
enum m0_cm_cp_priority c_prio
Definition: cp.h:163
static uint32_t seg_nr_get(const struct m0_sns_cpx *sns_cpx, uint32_t ivec_nr)
Definition: net.c:348
#define out(...)
Definition: gen.c:41
M0_INTERNAL int m0_sns_cm_cp_sw_check(struct m0_cm_cp *cp)
Definition: net.c:541
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
const struct m0_net_buffer_callbacks m0_rpc__buf_bulk_cb
Definition: bulk.c:238
struct m0_fom c_fom
Definition: cp.h:161
static int32_t min32(int32_t a, int32_t b)
Definition: arith.h:36
M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk, uint32_t segs_nr, m0_bcount_t length, struct m0_net_domain *netdom, struct m0_net_buffer *nb, struct m0_rpc_bulk_buf **out)
Definition: bulk.c:291
M0_INTERNAL void m0_bitmap_load(const struct m0_bitmap_onwire *ow_map, struct m0_bitmap *im_map)
Definition: bitmap.c:213
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
m0_time_t px_epoch
Definition: proxy.h:65
uint32_t scx_phase
Definition: sns_cp_onwire.h:52
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL void m0_fid_convert_stob2cob(const struct m0_stob_id *stob_id, struct m0_fid *cob_fid)
Definition: fid_convert.c:152
struct m0_cpx_reply scr_cp_rep
Definition: sns_cp_onwire.h:59
struct m0_rpc_item f_item
Definition: fop.h:83
M0_INTERNAL void m0_cm_ag_cp_add_locked(struct m0_cm_aggr_group *ag, struct m0_cm_cp *cp)
Definition: ag.c:502
struct m0_ioseg * ci_iosegs
Definition: vec.h:636
M0_INTERNAL struct m0_sns_cm_ag * ag2snsag(const struct m0_cm_aggr_group *ag)
Definition: ag.c:391
int32_t rc
Definition: trigger_fop.h:47
struct m0_fom_callback fo_cb
Definition: fom.h:488
struct m0_rpc_conn * s_conn
Definition: session.h:312
Definition: ag.h:49
Definition: fop.h:79
struct m0_mutex rb_mutex
Definition: bulk.h:251
Definition: trace.h:478
struct m0_fop * rep_fop
Definition: dir.c:334
struct m0_cm_ag_id cpx_ag_id
Definition: cp_onwire.h:44
m0_time_t ri_deadline
Definition: item.h:141
static void cp_buf_acquire(struct m0_cm_cp *cp)
Definition: net.c:329