Motr  M0
client.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CAS
24 
25 #include "lib/trace.h"
26 #include "lib/vec.h"
27 #include "lib/misc.h" /* M0_IN */
28 #include "lib/memory.h"
29 #include "sm/sm.h"
30 #include "fid/fid.h" /* m0_fid */
31 #include "rpc/item.h"
32 #include "rpc/rpc.h" /* m0_rpc_post */
33 #include "rpc/session.h" /* m0_rpc_session */
34 #include "rpc/conn.h" /* m0_rpc_conn */
35 #include "fop/fop.h"
36 #include "fop/fom_generic.h"
37 #include "cas/cas.h"
38 #include "cas/cas_xc.h"
39 #include "cas/client.h"
40 #include "lib/finject.h"
41 #include "cas/cas_addb2.h"
42 #include "dtm0/dtx.h" /* struct m0_dtm0_dtx */
59 struct creq_niter {
63 
67  uint64_t cni_req_i;
68  uint64_t cni_rep_i;
69  uint64_t cni_kpos;
70 };
71 
72 #define CASREQ_FOP_DATA(fop) ((struct m0_cas_op *)m0_fop_data(fop))
73 
74 static void cas_req_replied_cb(struct m0_rpc_item *item);
75 
76 static const struct m0_rpc_item_ops cas_item_ops = {
77  .rio_sent = NULL,
78  .rio_replied = cas_req_replied_cb
79 };
80 
81 static void creq_asmbl_replied_cb(struct m0_rpc_item *item);
82 
83 static const struct m0_rpc_item_ops asmbl_item_ops = {
84  .rio_sent = NULL,
85  .rio_replied = creq_asmbl_replied_cb
86 };
87 
88 static struct m0_sm_state_descr cas_req_states[] = {
89  [CASREQ_INIT] = {
91  .sd_name = "init",
92  .sd_allowed = M0_BITS(CASREQ_SENT, CASREQ_FRAGM_SENT)
93  },
94  [CASREQ_SENT] = {
95  .sd_name = "request-sent",
96  .sd_allowed = M0_BITS(CASREQ_FINAL, CASREQ_FAILURE,
98  },
99  [CASREQ_FRAGM_SENT] = {
100  .sd_name = "request-fragment-sent",
101  .sd_allowed = M0_BITS(CASREQ_FINAL, CASREQ_FAILURE,
103  },
104  [CASREQ_ASSEMBLY] = {
105  .sd_name = "assembly",
106  .sd_allowed = M0_BITS(CASREQ_FRAGM_SENT, CASREQ_FINAL,
108  },
109  [CASREQ_FINAL] = {
110  .sd_name = "final",
111  .sd_flags = M0_SDF_TERMINAL,
112  },
113  [CASREQ_FAILURE] = {
114  .sd_name = "failure",
115  .sd_flags = M0_SDF_TERMINAL | M0_SDF_FAILURE
116  }
117 };
118 
119 static struct m0_sm_trans_descr cas_req_trans[] = {
120  { "send-over-rpc", CASREQ_INIT, CASREQ_SENT },
121  { "send-fragm-over-rpc", CASREQ_INIT, CASREQ_FRAGM_SENT },
122  { "rpc-failure", CASREQ_SENT, CASREQ_FAILURE },
123  { "assembly", CASREQ_SENT, CASREQ_ASSEMBLY },
124  { "req-processed", CASREQ_SENT, CASREQ_FINAL },
125  { "rpc-failure", CASREQ_FRAGM_SENT, CASREQ_FAILURE },
126  { "fragm-assembly", CASREQ_FRAGM_SENT, CASREQ_ASSEMBLY },
127  { "req-processed", CASREQ_FRAGM_SENT, CASREQ_FINAL },
128  { "assembly-fail", CASREQ_ASSEMBLY, CASREQ_FAILURE },
129  { "assembly-done", CASREQ_ASSEMBLY, CASREQ_FINAL },
130  { "assembly-fragm", CASREQ_ASSEMBLY, CASREQ_FRAGM_SENT },
131 };
132 
134  .scf_name = "cas_req",
135  .scf_nr_states = ARRAY_SIZE(cas_req_states),
136  .scf_state = cas_req_states,
137  .scf_trans_nr = ARRAY_SIZE(cas_req_trans),
138  .scf_trans = cas_req_trans
139 };
140 
141 static int cas_req_fragmentation(struct m0_cas_req *req);
142 static int cas_req_fragment_continue(struct m0_cas_req *req,
143  struct m0_cas_op *op);
144 static void creq_recv_fini(struct m0_cas_recv *recv, bool op_is_meta);
145 
146 static void cas_to_rpc_map(const struct m0_cas_req *creq,
147  const struct m0_rpc_item *item)
148 {
149  uint64_t cid = m0_sm_id_get(&creq->ccr_sm);
150  uint64_t iid = m0_sm_id_get(&item->ri_sm);
151  M0_ADDB2_ADD(M0_AVI_CAS_TO_RPC, cid, iid);
152 }
153 
154 
155 static bool fid_is_meta(struct m0_fid *fid)
156 {
157  M0_PRE(fid != NULL);
158  return m0_fid_eq(fid, &m0_cas_meta_fid);
159 }
160 
161 static int creq_op_alloc(uint64_t recs_nr,
162  struct m0_cas_op **out)
163 {
164  struct m0_cas_op *op;
165  struct m0_cas_rec *rec;
166 
167  M0_PRE(recs_nr > 0);
168  if (M0_FI_ENABLED("cas_alloc_fail"))
169  return M0_ERR(-ENOMEM);
170 
171  M0_ALLOC_PTR(op);
172  M0_ALLOC_ARR(rec, recs_nr);
173  if (op == NULL || rec == NULL) {
174  m0_free(op);
175  m0_free(rec);
176  return M0_ERR(-ENOMEM);
177  } else {
178  op->cg_rec.cr_nr = recs_nr;
179  op->cg_rec.cr_rec = rec;
180  m0_dtm0_tx_desc_init_none(&op->cg_txd);
181  *out = op;
182  }
183  return M0_RC(0);
184 }
185 
186 static void creq_op_free(struct m0_cas_op *op)
187 {
188  if (op != NULL) {
189  m0_dtm0_tx_desc_fini(&op->cg_txd);
190  m0_free(op->cg_rec.cr_rec);
191  m0_free(op);
192  }
193 }
194 
195 M0_INTERNAL void m0_cas_req_init(struct m0_cas_req *req,
196  struct m0_rpc_session *sess,
197  struct m0_sm_group *grp)
198 {
199  M0_ENTRY();
200  M0_PRE(sess != NULL);
201  M0_PRE(M0_IS0(req));
202  req->ccr_sess = sess;
204  m0_sm_addb2_counter_init(&req->ccr_sm);
205  M0_LEAVE();
206 }
207 
208 static struct m0_rpc_conn *creq_rpc_conn(const struct m0_cas_req *req)
209 {
210  return req->ccr_sess->s_conn;
211 }
212 
213 static struct m0_rpc_machine *creq_rpc_mach(const struct m0_cas_req *req)
214 {
216 }
217 
218 static struct m0_sm_group *cas_req_smgrp(const struct m0_cas_req *req)
219 {
220  return req->ccr_sm.sm_grp;
221 }
222 
223 M0_INTERNAL void m0_cas_req_lock(struct m0_cas_req *req)
224 {
225  M0_ENTRY();
227 }
228 
229 M0_INTERNAL void m0_cas_req_unlock(struct m0_cas_req *req)
230 {
231  M0_ENTRY();
233 }
234 
235 M0_INTERNAL bool m0_cas_req_is_locked(const struct m0_cas_req *req)
236 {
238 }
239 
240 static void cas_req_state_set(struct m0_cas_req *req,
241  enum m0_cas_req_state state)
242 {
243  M0_LOG(M0_DEBUG, "CAS req: %p, state change:[%s -> %s]\n",
244  req, m0_sm_state_name(&req->ccr_sm, req->ccr_sm.sm_state),
245  m0_sm_state_name(&req->ccr_sm, state));
246  m0_sm_state_set(&req->ccr_sm, state);
247 }
248 
249 static void cas_req_reply_fini(struct m0_cas_req *req)
250 {
251  struct m0_cas_recv *recv = &req->ccr_reply.cgr_rep;
252  uint64_t i;
253 
254  for (i = 0; i < recv->cr_nr; i++) {
255  m0_rpc_at_fini(&recv->cr_rec[i].cr_key);
256  m0_rpc_at_fini(&recv->cr_rec[i].cr_val);
257  }
258  m0_free(req->ccr_reply.cgr_rep.cr_rec);
259 }
260 
261 static void cas_req_fini(struct m0_cas_req *req)
262 {
263  uint32_t cur_state = req->ccr_sm.sm_state;
264 
265  M0_ENTRY();
267  M0_PRE(M0_IN(cur_state, (CASREQ_INIT, CASREQ_FINAL, CASREQ_FAILURE)));
268  if (cur_state == CASREQ_FAILURE) {
269  if (req->ccr_reply_item != NULL)
270  m0_rpc_item_put_lock(req->ccr_reply_item);
271  if (req->ccr_fop != NULL) {
272  req->ccr_fop->f_data.fd_data = NULL;
273  m0_fop_put_lock(req->ccr_fop);
274  }
275  }
276  if (req->ccr_req_op != NULL) {
277  /* Restore records vector for proper freeing. */
278  req->ccr_req_op->cg_rec = req->ccr_rec_orig;
279  creq_recv_fini(&req->ccr_rec_orig, req->ccr_is_meta);
280  creq_op_free(req->ccr_req_op);
281  }
283  m0_free(req->ccr_asmbl_ikeys);
284  m0_sm_fini(&req->ccr_sm);
285  M0_LEAVE();
286 }
287 
288 M0_INTERNAL void m0_cas_req_fini(struct m0_cas_req *req)
289 {
291  cas_req_fini(req);
292  M0_SET0(req);
293 }
294 
295 M0_INTERNAL void m0_cas_req_fini_lock(struct m0_cas_req *req)
296 {
297  M0_ENTRY();
299  cas_req_fini(req);
301  M0_SET0(req);
302  M0_LEAVE();
303 }
304 
310 static void creq_kv_hold_down(struct m0_cas_rec *rec)
311 {
312  struct m0_rpc_at_buf *key = &rec->cr_key;
313  struct m0_rpc_at_buf *val = &rec->cr_val;
314 
315  M0_ENTRY();
317  M0_IN(key->ab_type, (M0_RPC_AT_INLINE, M0_RPC_AT_BULK_SEND)));
318  if (m0_rpc_at_is_set(key))
320  if (m0_rpc_at_is_set(val))
322  M0_LEAVE();
323 }
324 
329 static void creq_recv_fini(struct m0_cas_recv *recv, bool op_is_meta)
330 {
331  struct m0_cas_rec *rec;
332  struct m0_cas_kv *kv;
333  uint64_t i;
334  uint64_t k;
335 
336  for (i = 0; i < recv->cr_nr; i++) {
337  rec = &recv->cr_rec[i];
338  /*
339  * CAS client does not copy keys/values provided by user if
340  * it works with non-meta index, otherwise it encodes keys
341  * and places them in buffers allocated by itself.
342  * Save keys/values in memory in the first case, free in
343  * the second.
344  */
345  if (!op_is_meta)
346  creq_kv_hold_down(rec);
347  m0_rpc_at_fini(&rec->cr_key);
348  m0_rpc_at_fini(&rec->cr_val);
349  for (k = 0; k < rec->cr_kv_bufs.cv_nr; k++) {
350  kv = &rec->cr_kv_bufs.cv_rec[k];
351  m0_rpc_at_fini(&kv->ck_key);
352  m0_rpc_at_fini(&kv->ck_val);
353  }
354  }
355 
356 }
357 
358 static void creq_fop_destroy(struct m0_cas_req *req)
359 {
360  struct m0_fop *fop = req->ccr_fop;
361 
362  fop->f_data.fd_data = NULL;
363  m0_fop_fini(fop);
364  m0_free(fop);
365  req->ccr_fop = NULL;
366 }
367 
368 static void creq_fop_release(struct m0_ref *ref)
369 {
370  struct m0_fop *fop;
371 
372  M0_ENTRY();
373  M0_PRE(ref != NULL);
374  fop = container_of(ref, struct m0_fop, f_ref);
375  m0_fop_fini(fop);
376  m0_free(fop);
377  M0_LEAVE();
378 }
379 
380 static void creq_asmbl_fop_release(struct m0_ref *ref)
381 {
382  struct m0_fop *fop;
383  struct m0_cas_op *op;
384 
385  M0_ENTRY();
386  M0_PRE(ref != NULL);
387  fop = container_of(ref, struct m0_fop, f_ref);
389  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
390  m0_fop_fini(fop);
391  M0_LEAVE();
392 }
393 
394 static int creq_fop_create(struct m0_cas_req *req,
395  struct m0_fop_type *ftype,
396  struct m0_cas_op *op)
397 {
398  struct m0_fop *fop;
399  M0_ENTRY();
400 
401  M0_ALLOC_PTR(fop);
402  if (fop == NULL)
403  return M0_ERR(-ENOMEM);
404 
405  m0_fop_init(fop, ftype, (void *)op, creq_fop_release);
406  fop->f_opaque = req;
407  req->ccr_fop = fop;
408  req->ccr_ftype = ftype;
409 
410  M0_LEAVE("cas_req=%p fop=%p", req, fop);
411  return 0;
412 }
413 
415  struct m0_fop_type *ftype,
416  struct m0_cas_op *op,
417  enum m0_cas_req_state *next_state)
418 {
419  int rc;
420  M0_ENTRY();
421 
422  rc = creq_fop_create(req, ftype, op);
423  if (rc == 0) {
424  *next_state = CASREQ_SENT;
425  /*
426  * Check whether original fop payload does not exceed
427  * max rpc item payload.
428  */
430  &req->ccr_fop->f_item,
431  req->ccr_sess)) {
432  *next_state = CASREQ_FRAGM_SENT;
434  }
435  if (rc != 0)
437  }
438  return M0_RC(rc);
439 }
440 
442 {
443  struct m0_fop *fop = M0_AMB(fop, item, f_item);
444  return (struct m0_cas_req *)fop->f_opaque;
445 }
446 
447 static struct m0_rpc_item *cas_req_to_item(const struct m0_cas_req *req)
448 {
449  return &req->ccr_fop->f_item;
450 }
451 
452 static struct m0_cas_rep *cas_rep(struct m0_rpc_item *reply)
453 {
455 }
456 
457 M0_INTERNAL int m0_cas_req_generic_rc(const struct m0_cas_req *req)
458 {
459  struct m0_fop *req_fop = req->ccr_fop;
460  struct m0_rpc_item *reply;
461  int rc;
462 
463  M0_PRE(M0_IN(req->ccr_sm.sm_state, (CASREQ_FINAL, CASREQ_FAILURE)));
464 
465  reply = req_fop != NULL ? cas_req_to_item(req)->ri_reply : NULL;
466 
467  rc = req->ccr_sm.sm_rc;
468  if (rc == 0 && reply != NULL)
470  if (rc == 0)
471  rc = req->ccr_reply.cgr_rc;
472 
473  return M0_RC(rc);
474 }
475 
477  struct m0_fid *idx_fid)
478 {
479  return m0_rpc_at_is_set(val) == !fid_is_meta(idx_fid);
480 }
481 
482 static int cas_rep__validate(const struct m0_fop_type *ftype,
483  struct m0_cas_op *op,
484  struct m0_cas_rep *rep)
485 {
486  struct m0_cas_rec *rec;
487  uint64_t sum;
488  uint64_t i;
489 
490  if (ftype == &cas_cur_fopt) {
491  sum = 0;
492  for (i = 0; i < op->cg_rec.cr_nr; i++)
493  sum += op->cg_rec.cr_rec[i].cr_rc;
494  if (rep->cgr_rep.cr_nr > sum)
495  return M0_ERR(-EPROTO);
496  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
497  rec = &rep->cgr_rep.cr_rec[i];
498  if ((int32_t)rec->cr_rc > 0 &&
499  (!m0_rpc_at_is_set(&rec->cr_key) ||
501  &op->cg_id.ci_fid)))
502  rec->cr_rc = M0_ERR(-EPROTO);
503  }
504  } else {
505  M0_ASSERT(M0_IN(ftype, (&cas_get_fopt, &cas_put_fopt,
506  &cas_del_fopt)));
507  /*
508  * CAS service guarantees equal number of records in request and
509  * response for GET,PUT, DEL operations. Otherwise, it's not
510  * possible to match requested records with the ones in reply,
511  * because keys in reply are absent.
512  */
513  if (op->cg_rec.cr_nr != rep->cgr_rep.cr_nr)
514  return M0_ERR(-EPROTO);
515  /*
516  * Successful GET reply for ordinary index should always contain
517  * non-empty value.
518  */
519  if (ftype == &cas_get_fopt)
520  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
521  rec = &rep->cgr_rep.cr_rec[i];
522  if (rec->cr_rc == 0 &&
524  &op->cg_id.ci_fid))
525  rec->cr_rc = M0_ERR(-EPROTO);
526  }
527  }
528  return M0_RC(0);
529 }
530 
531 static int cas_rep_validate(const struct m0_cas_req *req)
532 {
533  const struct m0_fop *rfop = req->ccr_fop;
534  struct m0_cas_op *op = m0_fop_data(rfop);
535  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
536 
537  return rep->cgr_rc ?: cas_rep__validate(rfop->f_type, op, rep);
538 }
539 
540 static void cas_req_failure(struct m0_cas_req *req, int32_t rc)
541 {
542  M0_PRE(rc != 0);
543  m0_sm_fail(&req->ccr_sm, CASREQ_FAILURE, rc);
544 }
545 
546 static void cas_req_failure_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
547 {
548  struct m0_cas_req *req = container_of(ast, struct m0_cas_req,
550  int32_t rc = (long)ast->sa_datum;
551 
552  M0_PRE(rc != 0);
554 }
555 
556 static void cas_req_failure_ast_post(struct m0_cas_req *req, int32_t rc)
557 {
558  M0_ENTRY();
559  req->ccr_failure_ast.sa_cb = cas_req_failure_ast;
560  req->ccr_failure_ast.sa_datum = (void *)(long)rc;
561  m0_sm_ast_post(cas_req_smgrp(req), &req->ccr_failure_ast);
562  M0_LEAVE();
563 }
564 
565 static void creq_item_prepare(const struct m0_cas_req *req,
566  struct m0_rpc_item *item,
567  const struct m0_rpc_item_ops *ops)
568 {
570  item->ri_ops = ops;
571  item->ri_session = req->ccr_sess;
574 }
575 
576 static void cas_fop_send(struct m0_cas_req *req)
577 {
578  struct m0_cas_op *op = m0_fop_data(req->ccr_fop);
579  struct m0_rpc_item *item;
580  int rc;
581 
582  M0_ENTRY();
584  req->ccr_sent_recs_nr += op->cg_rec.cr_nr;
587  rc = m0_rpc_post(item);
589  M0_LOG(M0_NOTICE, "RPC post returned %d", rc);
590 }
591 
592 static int creq_kv_buf_add(const struct m0_cas_req *req,
593  const struct m0_bufvec *kv,
594  uint32_t idx,
595  struct m0_rpc_at_buf *buf)
596 {
597  M0_PRE(req != NULL);
598  M0_PRE(kv != NULL);
599  M0_PRE(buf != NULL);
600  M0_PRE(idx < kv->ov_vec.v_nr);
601 
602  return m0_rpc_at_add(buf, &M0_BUF_INIT(kv->ov_vec.v_count[idx],
603  kv->ov_buf[idx]),
604  creq_rpc_conn(req));
605 }
606 
607 static void creq_asmbl_fop_init(struct m0_cas_req *req,
608  struct m0_fop_type *ftype,
609  struct m0_cas_op *op)
610 {
611  m0_fop_init(&req->ccr_asmbl_fop, ftype, (void *)op,
613 }
614 
623 static int greq_asmbl_add(struct m0_cas_req *req,
624  struct m0_cas_rec *rec,
625  uint64_t idx,
626  uint64_t orig_idx,
627  uint64_t vlen)
628 {
629  int rc;
630 
631  m0_rpc_at_init(&rec->cr_key);
632  m0_rpc_at_init(&rec->cr_val);
633  rc = creq_kv_buf_add(req, req->ccr_keys, orig_idx, &rec->cr_key) ?:
634  m0_rpc_at_recv(&rec->cr_val, creq_rpc_conn(req), vlen, true);
635  if (rc == 0) {
636  req->ccr_asmbl_ikeys[idx] = orig_idx;
637  } else {
638  m0_rpc_at_detach(&rec->cr_key);
639  m0_rpc_at_fini(&rec->cr_key);
640  m0_rpc_at_fini(&rec->cr_val);
641  }
642  return M0_RC(rc);
643 }
644 
648 static uint64_t greq_asmbl_count(const struct m0_cas_req *req)
649 {
650  const struct m0_fop *rfop = req->ccr_fop;
651  struct m0_cas_op *req_op = m0_fop_data(rfop);
652  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
653  struct m0_cas_rec *rcvd;
654  struct m0_cas_rec *sent;
655  struct m0_buf buf;
656  uint64_t len;
657  uint64_t ret = 0;
658  uint64_t i;
659  int rc;
660 
661  M0_PRE(rfop->f_type == &cas_get_fopt);
662 
663  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
664  rcvd = &rep->cgr_rep.cr_rec[i];
665  sent = &req_op->cg_rec.cr_rec[i];
666  rc = m0_rpc_at_rep_get(&sent->cr_val, &rcvd->cr_val, &buf);
667  if (rc != 0 && m0_rpc_at_rep_is_bulk(&rcvd->cr_val, &len))
668  ret++;
669  }
670  return ret;
671 }
672 
673 static int greq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
674 {
675  const struct m0_fop *rfop = req->ccr_fop;
676  struct m0_cas_op *req_op = m0_fop_data(rfop);
677  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
678  struct m0_cas_rec *rcvd;
679  struct m0_cas_rec *sent;
680  struct m0_buf buf;
681  struct m0_cas_recv *recv;
682  uint64_t len;
683  uint64_t i;
684  uint64_t k = 0;
685  int rc = 0;
686 
687  M0_PRE(op != NULL);
688  M0_PRE(rfop->f_type == &cas_get_fopt);
689 
690  recv = &op->cg_rec;
691  op->cg_id = req_op->cg_id;
692 
693  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
694  rcvd = &rep->cgr_rep.cr_rec[i];
695  sent = &req_op->cg_rec.cr_rec[i];
696  rc = m0_rpc_at_rep_get(&sent->cr_val, &rcvd->cr_val, &buf);
697  if (rc != 0 && m0_rpc_at_rep_is_bulk(&rcvd->cr_val, &len)) {
698  M0_PRE(k < recv->cr_nr);
699  rc = greq_asmbl_add(req, &recv->cr_rec[k], k, i, len);
700  if (rc != 0)
701  goto err;
702  k++;
703  }
704  }
705 
706 err:
707  if (rc != 0) {
708  /* Finalise all already initialised records. */
709  recv->cr_nr = k;
710  creq_recv_fini(recv, fid_is_meta(&op->cg_id.ci_fid));
711  }
712  return M0_RC(rc);
713 }
714 
715 static bool greq_asmbl_post(struct m0_cas_req *req)
716 {
717  struct m0_cas_op *op = NULL;
718  struct m0_rpc_item *item = &req->ccr_asmbl_fop.f_item;
719  uint64_t asmbl_count;
720  bool ret = false;
721  int rc = 0;
722 
723  asmbl_count = greq_asmbl_count(req);
724  if (asmbl_count > 0) {
725  M0_ALLOC_ARR(req->ccr_asmbl_ikeys, asmbl_count);
726  if (req->ccr_asmbl_ikeys == NULL) {
727  rc = M0_ERR(-ENOMEM);
728  goto err;
729  }
730  rc = creq_op_alloc(asmbl_count, &op) ?:
732  if (rc == 0) {
735  rc = m0_rpc_post(item);
737  if (rc != 0)
738  m0_fop_put_lock(&req->ccr_asmbl_fop);
739  else
740  ret = true;
741  }
742  }
743 err:
744  if (rc != 0) {
745  m0_free(req->ccr_asmbl_ikeys);
746  creq_op_free(op);
747  }
748  return ret;
749 }
750 
751 static bool creq_niter_invariant(struct creq_niter *it)
752 {
753  return it->cni_req_i <= it->cni_reqv->cr_nr &&
754  it->cni_rep_i <= it->cni_repv->cr_nr;
755 }
756 
757 static void creq_niter_init(struct creq_niter *it,
758  struct m0_cas_op *op,
759  struct m0_cas_rep *rep)
760 {
761  it->cni_reqv = &op->cg_rec;
762  it->cni_repv = &rep->cgr_rep;
763  it->cni_req_i = 0;
764  it->cni_rep_i = 0;
765  it->cni_kpos = -1;
766  it->cni_req = NULL;
767  it->cni_rep = NULL;
768 }
769 
770 static int creq_niter_next(struct creq_niter *it)
771 {
772  int rc = 0;
773 
775  if (it->cni_rep_i == it->cni_repv->cr_nr)
776  rc = -ENOENT;
777 
778  if (rc == 0) {
779  it->cni_rep = &it->cni_repv->cr_rec[it->cni_rep_i++];
780  it->cni_kpos++;
781  if (it->cni_rep->cr_rc == 1 ||
782  ((int32_t)it->cni_rep->cr_rc <= 0 && it->cni_req_i == 0) ||
783  it->cni_kpos == it->cni_req->cr_rc) {
785  M0_PRE(it->cni_req_i < it->cni_reqv->cr_nr);
786  it->cni_req = &it->cni_reqv->cr_rec[it->cni_req_i];
787  it->cni_req_i++;
788  it->cni_kpos = 0;
789  }
790  }
791 
793  return M0_RC(rc);
794 }
795 
796 static void creq_niter_fini(struct creq_niter *it)
797 {
798  M0_SET0(it);
799 }
800 
805 static int nreq_asmbl_prep(struct m0_cas_req *req, struct m0_cas_op *op)
806 {
807  struct m0_cas_op *orig = m0_fop_data(req->ccr_fop);
808  struct m0_cas_rec *rec;
809  uint64_t i;
810  int rc = 0;
811 
812  M0_PRE(op->cg_rec.cr_nr == orig->cg_rec.cr_nr);
813  op->cg_id = orig->cg_id;
814  for (i = 0; i < orig->cg_rec.cr_nr; i++) {
815  rec = &op->cg_rec.cr_rec[i];
816  M0_ASSERT(M0_IS0(rec));
817  rec->cr_rc = orig->cg_rec.cr_rec[i].cr_rc;
818  m0_rpc_at_init(&rec->cr_key);
819  rc = creq_kv_buf_add(req, req->ccr_keys, i, &rec->cr_key);
820  if (rc != 0)
821  goto err;
822  M0_ALLOC_ARR(rec->cr_kv_bufs.cv_rec, rec->cr_rc);
823  if (op->cg_rec.cr_rec[i].cr_kv_bufs.cv_rec == NULL) {
824  rc = M0_ERR(-ENOMEM);
825  goto err;
826  }
827  }
828 err:
829  if (rc != 0) {
830  /* Finalise all already initialised records. */
831  op->cg_rec.cr_nr = i + 1;
832  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
833  }
834 
835  return M0_RC(rc);
836 }
837 
838 static int nreq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
839 {
840  const struct m0_fop *rfop = req->ccr_fop;
841  struct m0_cas_rep *reply = cas_rep(cas_req_to_item(req)->ri_reply);
842  struct m0_cas_rec *rep;
843  struct m0_cas_rec *rec;
844  struct m0_rpc_at_buf *key;
845  struct m0_rpc_at_buf *val;
846  struct creq_niter iter;
847  uint64_t klen;
848  uint64_t vlen;
849  bool bulk_key;
850  bool bulk_val;
851  int rc = 0;
852  uint64_t i;
853 
854  M0_PRE(rfop->f_type == &cas_cur_fopt);
855 
856  rc = nreq_asmbl_prep(req, op);
857  if (rc != 0)
858  return M0_ERR(rc);
859  /*
860  * 'op' is a copy of original request relative to starting keys,
861  * so iterator will iterate over 'op' the same way as with original
862  * request.
863  */
864  creq_niter_init(&iter, op, reply);
865  while (creq_niter_next(&iter) != -ENOENT) {
866  rep = iter.cni_rep;
867  rec = iter.cni_req;
868  i = rec->cr_kv_bufs.cv_nr;
869  bulk_key = m0_rpc_at_rep_is_bulk(&rep->cr_key, &klen);
870  bulk_val = m0_rpc_at_rep_is_bulk(&rep->cr_val, &vlen);
871  key = &rec->cr_kv_bufs.cv_rec[i].ck_key;
872  val = &rec->cr_kv_bufs.cv_rec[i].ck_val;
875  rc = m0_rpc_at_recv(val, creq_rpc_conn(req), vlen, bulk_val) ?:
876  m0_rpc_at_recv(key, creq_rpc_conn(req), klen, bulk_key);
877  if (rc == 0) {
878  rec->cr_kv_bufs.cv_nr++;
879  } else {
882  break;
883  }
884  }
885  creq_niter_fini(&iter);
886 
887  if (rc != 0)
888  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
889  return M0_RC(rc);
890 }
891 
892 static bool nreq_asmbl_post(struct m0_cas_req *req)
893 {
894  const struct m0_fop *rfop = req->ccr_fop;
895  struct m0_cas_op *req_op = m0_fop_data(rfop);
896  struct m0_cas_rep *rep = cas_rep(cas_req_to_item(req)->ri_reply);
897  struct m0_rpc_item *item = &req->ccr_asmbl_fop.f_item;
898  struct m0_cas_rec *rcvd;
899  struct m0_cas_rec *sent;
900  bool bulk = false;
901  struct m0_buf buf;
902  struct m0_cas_op *op;
903  int rc;
904  uint64_t len;
905  struct creq_niter iter;
906 
907  creq_niter_init(&iter, req_op, rep);
908  while ((rc = creq_niter_next(&iter)) != -ENOENT) {
909  rcvd = iter.cni_rep;
910  sent = iter.cni_req;
911  M0_ASSERT(sent->cr_kv_bufs.cv_nr == 0);
912  rc = m0_rpc_at_rep_get(NULL, &rcvd->cr_key, &buf) ?:
913  m0_rpc_at_rep_get(NULL, &rcvd->cr_val, &buf);
914  if (rc != 0 && !bulk)
915  bulk = m0_rpc_at_rep_is_bulk(&rcvd->cr_key, &len) ||
916  m0_rpc_at_rep_is_bulk(&rcvd->cr_val, &len);
917  }
918  creq_niter_fini(&iter);
919 
920  /*
921  * If at least one key/value requires bulk transmission, then
922  * resend the whole request, requesting bulk transmission as necessary.
923  */
924  if (bulk) {
925  rc = creq_op_alloc(req_op->cg_rec.cr_nr, &op);
926  if (rc == 0) {
927  rc = nreq_asmbl_fill(req, op);
928  if (rc == 0) {
931  rc = m0_rpc_post(item);
933  if (rc != 0)
934  m0_fop_put_lock(&req->ccr_asmbl_fop);
935  } else {
936  creq_op_free(op);
937  bulk = false;
938  }
939  }
940  }
941  return bulk;
942 }
943 
944 static void creq_rep_override(struct m0_cas_rec *orig,
945  struct m0_cas_rec *new)
946 {
947  M0_ENTRY();
948  m0_rpc_at_fini(&orig->cr_key);
949  m0_rpc_at_fini(&orig->cr_val);
950  *orig = *new;
951  /*
952  * Key/value data buffers are now attached to both records.
953  * Detach buffers from 'new' record to avoid double free.
954  */
955  m0_rpc_at_detach(&new->cr_key);
956  m0_rpc_at_detach(&new->cr_val);
957  M0_LEAVE();
958 }
959 
960 static void nreq_asmbl_accept(struct m0_cas_req *req)
961 {
962  struct m0_fop *fop = &req->ccr_asmbl_fop;
963  struct m0_rpc_item *item = &fop->f_item;
964  struct m0_cas_rep *crep = cas_rep(cas_req_to_item(req)->ri_reply);
966  item->ri_reply));
967  struct m0_cas_rec *rcvd;
968  struct m0_cas_rec *sent;
969  struct m0_cas_op *op = m0_fop_data(fop);
970  struct m0_cas_kv *kv;
971  struct creq_niter iter;
972  uint64_t i;
973  int rc;
974 
975  i = 0;
976  creq_niter_init(&iter, op, rep);
977  while (creq_niter_next(&iter) != -ENOENT) {
978  rcvd = iter.cni_rep;
979  sent = iter.cni_req;
980  if ((int32_t)rcvd->cr_rc > 0) {
982  M0_PRE(rcvd->cr_rc <= sent->cr_kv_bufs.cv_nr);
983  kv = &sent->cr_kv_bufs.cv_rec[rcvd->cr_rc - 1];
984  rc = m0_rpc_at_rep2inline(&kv->ck_key, &rcvd->cr_key) ?:
985  m0_rpc_at_rep2inline(&kv->ck_val, &rcvd->cr_val);
986  if (rc == 0)
988  rcvd);
989  }
990  i++;
991  }
992  creq_niter_fini(&iter);
993 }
994 
995 static void greq_asmbl_accept(struct m0_cas_req *req)
996 {
997  struct m0_fop *fop = &req->ccr_asmbl_fop;
998  struct m0_rpc_item *item = &fop->f_item;
999  struct m0_cas_rep *crep = cas_rep(cas_req_to_item(req)->ri_reply);
1001  item->ri_reply));
1002  struct m0_cas_rec *rec;
1003  struct m0_cas_op *op = m0_fop_data(fop);
1004  uint64_t i;
1005  uint64_t orig_i;
1006  int rc;
1007 
1008  for (i = 0; i < rep->cgr_rep.cr_nr; i++) {
1009  rec = &rep->cgr_rep.cr_rec[i];
1010  if (rec->cr_rc == 0) {
1011  rc = m0_rpc_at_rep2inline(&op->cg_rec.cr_rec[i].cr_val,
1012  &rec->cr_val);
1013  if (rc == 0) {
1014  orig_i = req->ccr_asmbl_ikeys[i];
1015  creq_rep_override(&crep->cgr_rep.cr_rec[orig_i],
1016  rec);
1017  }
1018  }
1019  }
1020 }
1021 
1027 {
1028  struct m0_cas_rep *rep;
1029 
1030  M0_PRE(req != NULL);
1031  M0_PRE(req->ccr_fop != NULL);
1032 
1033  rep = cas_rep(cas_req_to_item(req)->ri_reply);
1034  M0_ASSERT(rep != NULL);
1035 
1036  req->ccr_remid = rep->cgr_mod_rep.fmr_remid;
1037 }
1038 
1040  bool *fragm_continue)
1041 {
1042  struct m0_cas_rep *reply = &req->ccr_reply;
1043  struct m0_cas_rep *rcvd_reply = cas_rep(req->ccr_reply_item);
1044  struct m0_fop *req_fop = req->ccr_fop;
1045  struct m0_cas_op *op = m0_fop_data(req_fop);
1046  uint64_t i;
1047  uint64_t reply_seed;
1048  int rc = 0;
1049 
1050  M0_ASSERT(req_fop->f_type == req->ccr_ftype);
1051  M0_ASSERT(req->ccr_sent_recs_nr <= req->ccr_rec_orig.cr_nr);
1052  M0_ASSERT(reply->cgr_rep.cr_nr + rcvd_reply->cgr_rep.cr_nr <=
1053  req->ccr_max_replies_nr);
1054  *fragm_continue = false;
1055 
1056  /* Copy tx remid before fop and rpc item in `req` are set to NULL*/
1058  /*
1059  * Place reply buffers locally (without copying of actual data), zero
1060  * them in reply fop to avoid their freeing during reply fop destroying.
1061  */
1062  reply_seed = reply->cgr_rep.cr_nr;
1063  for (i = 0; i < rcvd_reply->cgr_rep.cr_nr; i++) {
1064  reply->cgr_rep.cr_rec[reply_seed + i] =
1065  rcvd_reply->cgr_rep.cr_rec[i];
1066  /* Detach buffers to avoid double-freeing of received data. */
1067  creq_kv_hold_down(&rcvd_reply->cgr_rep.cr_rec[i]);
1068  }
1069  reply->cgr_rep.cr_nr += rcvd_reply->cgr_rep.cr_nr;
1070  /* Roger, reply item is not needed anymore. */
1071  m0_rpc_item_put_lock(req->ccr_reply_item);
1072  req->ccr_reply_item = NULL;
1073  /*
1074  * Null fop data pointer to avoid cas_op freeing during fop finalisation
1075  * as cas_op is going to be reused for the rest fragments sending.
1076  */
1077  req_fop->f_data.fd_data = NULL;
1078  m0_fop_put_lock(req_fop);
1079  req->ccr_fop = NULL;
1080  if (req->ccr_sent_recs_nr < req->ccr_rec_orig.cr_nr) {
1081  /* Continue fragmentation. */
1083  if (rc == 0)
1084  *fragm_continue = true;
1085  }
1086  return rc;
1087 }
1088 
1090  struct m0_sm_ast *ast)
1091 {
1092  struct m0_cas_req *req = container_of(ast, struct m0_cas_req,
1093  ccr_replied_ast);
1094  struct m0_fop *fop = &req->ccr_asmbl_fop;
1095  struct m0_rpc_item *item = &fop->f_item;
1096  struct m0_rpc_item *reply = item->ri_reply;
1097  bool fragm_continue;
1098  int rc;
1099 
1100  rc = m0_rpc_item_error(item) ?:
1101  cas_rep(reply)->cgr_rc ?:
1103  if (rc == 0) {
1104  M0_ASSERT(M0_IN(fop->f_type, (&cas_get_fopt, &cas_cur_fopt)));
1105  if (fop->f_type == &cas_get_fopt)
1107  else
1109  }
1110  /*
1111  * On assembly request error, just continue request processing.
1112  * All records that were requested via assembly request already
1113  * have error status code.
1114  */
1115  rc = cas_req_reply_handle(req, &fragm_continue);
1116  if (rc == 0)
1117  cas_req_state_set(req, !fragm_continue ?
1119  else
1121 
1123 }
1124 
1126 {
1128  struct m0_cas_req,
1129  ccr_asmbl_fop);
1130 
1131  M0_ENTRY();
1132  req->ccr_replied_ast.sa_cb = creq_asmbl_replied_ast;
1133  m0_sm_ast_post(cas_req_smgrp(req), &req->ccr_replied_ast);
1134  M0_LEAVE();
1135 }
1136 
1137 static void cas_req_replied_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
1138 {
1139  struct m0_cas_req *req = container_of(ast, struct m0_cas_req,
1140  ccr_replied_ast);
1141  struct m0_fop_type *req_fop_type = req->ccr_fop->f_type;
1142  bool assembly_wait = false;
1143  bool suppress_err_msg;
1144  bool fragm_continue;
1145  int rc;
1146 
1147  req->ccr_reply.cgr_rc = cas_rep(req->ccr_reply_item)->cgr_rc;
1148  rc = cas_rep_validate(req);
1149  if (rc == 0) {
1150  if (M0_IN(req_fop_type, (&cas_cur_fopt, &cas_get_fopt)) &&
1151  !req->ccr_is_meta) {
1152  assembly_wait = (req_fop_type == &cas_get_fopt) ?
1154  if (assembly_wait)
1156  }
1157  if (!assembly_wait) {
1158  rc = cas_req_reply_handle(req, &fragm_continue);
1159  if (rc == 0 && !fragm_continue)
1161  }
1162  }
1163  if (rc != 0) {
1164  /*
1165  * For now CROW flag is commonly used for PUT operations. In
1166  * this case indices are physically created only on the nodes
1167  * where keys are presented. But NEXT queries are sent to all
1168  * the nodes, so some of them may return -ENOENT (index does
1169  * not exist). It is not critical, suppress these errors not
1170  * to irritate the user.
1171  */
1172  suppress_err_msg = !req->ccr_is_meta &&
1173  req_fop_type == &cas_cur_fopt && rc == -ENOENT;
1174  cas_req_failure(req, suppress_err_msg ? rc : M0_ERR(rc));
1175  }
1176 }
1177 
1179 {
1180  struct m0_cas_req *req = item_to_cas_req(item);
1181 
1182  M0_ENTRY();
1183  if (M0_FI_ENABLED("send-failure"))
1184  item->ri_error = -ENOTCONN;
1185  if (item->ri_error == 0) {
1186  M0_ASSERT(item->ri_reply != NULL);
1187  req->ccr_reply_item = item->ri_reply;
1188  /*
1189  * Get additional reference to reply item to copy reply buffers
1190  * in replied ast call.
1191  */
1193  req->ccr_replied_ast.sa_cb = cas_req_replied_ast;
1194  m0_sm_ast_post(cas_req_smgrp(req), &req->ccr_replied_ast);
1195  } else
1197  M0_LEAVE();
1198 }
1199 
1200 static int cas_index_op_prepare(const struct m0_cas_req *req,
1201  const struct m0_cas_id *cids,
1202  uint64_t cids_nr,
1203  bool recv_val,
1204  uint32_t flags,
1205  struct m0_cas_op **out)
1206 {
1207  struct m0_cas_op *op;
1208  struct m0_cas_rec *rec;
1209  int rc;
1210  uint64_t i;
1211 
1212  M0_ENTRY();
1213 
1214  rc = creq_op_alloc(cids_nr, &op);
1215  if (rc != 0)
1216  return M0_ERR(rc);
1217  op->cg_id.ci_fid = m0_cas_meta_fid;
1218  op->cg_flags = flags;
1219  rec = op->cg_rec.cr_rec;
1220  for (i = 0; i < cids_nr; i++) {
1221  struct m0_buf buf;
1222 
1223  m0_rpc_at_init(&rec[i].cr_key);
1224  /* Xcode the key to get continuous buffer for sending. */
1226  &M0_XCODE_OBJ(m0_cas_id_xc,
1227  /*
1228  * Cast to avoid 'discard const' compile
1229  * error, in fact cas id element is not
1230  * changed during encoding.
1231  */
1232  (struct m0_cas_id *)&cids[i]),
1233  &buf.b_addr, &buf.b_nob);
1234  if (rc == 0) {
1235  rc = m0_rpc_at_add(&rec[i].cr_key,
1236  &buf,
1237  creq_rpc_conn(req));
1238  if (rc != 0)
1239  m0_buf_free(&buf);
1240  else if (recv_val) {
1241  m0_rpc_at_init(&rec[i].cr_val);
1242  rc = m0_rpc_at_recv(&rec[i].cr_val,
1243  creq_rpc_conn(req),
1244  sizeof(struct m0_fid),
1245  false);
1246  }
1247  }
1248  if (rc != 0)
1249  break;
1250  }
1251 
1252  if (rc != 0) {
1253  op->cg_rec.cr_nr = i + 1;
1254  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
1255  creq_op_free(op);
1256  return M0_ERR(rc);
1257  }
1258  *out = op;
1259  return M0_RC(rc);
1260 }
1261 
1262 static void addb2_add_cas_req_attrs(const struct m0_cas_req *req)
1263 {
1264  uint64_t sm_id = m0_sm_id_get(&req->ccr_sm);
1265 
1266  M0_ADDB2_ADD(M0_AVI_ATTR, sm_id,
1267  M0_AVI_CAS_REQ_ATTR_IS_META, !!req->ccr_is_meta);
1268  if (req->ccr_req_op != NULL)
1270  req->ccr_req_op->cg_rec.cr_nr);
1271 }
1272 
1274  const struct m0_cas_id *cids,
1275  uint64_t cids_nr,
1276  uint64_t max_replies_nr,
1277  bool recv_val,
1278  uint32_t flags,
1279  struct m0_cas_op **op)
1280 {
1281  struct m0_cas_recv *reply_recv;
1282  int rc;
1283 
1284  reply_recv = &req->ccr_reply.cgr_rep;
1285  M0_ALLOC_ARR(reply_recv->cr_rec, max_replies_nr);
1286  if (reply_recv->cr_rec == NULL)
1287  return M0_ERR(-ENOMEM);
1288  /* Set to 0 initially, will be increased when reply is received. */
1289  reply_recv->cr_nr = 0;
1290  req->ccr_max_replies_nr = max_replies_nr;
1291  req->ccr_is_meta = true;
1292  rc = cas_index_op_prepare(req, cids, cids_nr, recv_val, flags, op);
1293  if (rc == 0) {
1294  req->ccr_rec_orig = (*op)->cg_rec;
1295  req->ccr_req_op = *op;
1297  }
1298  if (rc != 0) {
1299  m0_free(reply_recv->cr_rec);
1300  reply_recv->cr_nr = 0;
1301  reply_recv->cr_rec = NULL;
1302  }
1303 
1304  return M0_RC(rc);
1305 
1306 }
1307 
1308 M0_INTERNAL uint64_t m0_cas_req_nr(const struct m0_cas_req *req)
1309 {
1310  return req->ccr_reply.cgr_rep.cr_nr;
1311 }
1312 
1313 M0_INTERNAL int m0_cas_req_wait(struct m0_cas_req *req, uint64_t states,
1314  m0_time_t to)
1315 {
1316  M0_ENTRY();
1318  return M0_RC(m0_sm_timedwait(&req->ccr_sm, states, to));
1319 }
1320 
1321 M0_INTERNAL int m0_cas_index_create(struct m0_cas_req *req,
1322  const struct m0_cas_id *cids,
1323  uint64_t cids_nr,
1324  struct m0_dtx *dtx)
1325 {
1326  struct m0_cas_op *op;
1327  enum m0_cas_req_state next_state;
1328  int rc;
1329 
1330  M0_ENTRY();
1331  M0_PRE(req->ccr_sess != NULL);
1333  M0_PRE(m0_forall(i, cids_nr, m0_cas_id_invariant(&cids[i])));
1334  (void)dtx;
1335  rc = cas_index_req_prepare(req, cids, cids_nr, cids_nr, false, 0, &op);
1336  if (rc != 0)
1337  return M0_ERR(rc);
1338  rc = creq_fop_create_and_prepare(req, &cas_put_fopt, op, &next_state);
1339  if (rc == 0) {
1340  cas_fop_send(req);
1341  cas_req_state_set(req, next_state);
1342  }
1343  return M0_RC(rc);
1344 }
1345 
1346 static void cas_rep_copy(const struct m0_cas_req *req,
1347  uint64_t idx,
1348  struct m0_cas_rec_reply *rep)
1349 {
1350  const struct m0_cas_recv *recv = &req->ccr_reply.cgr_rep;
1351 
1352  M0_ASSERT(idx < m0_cas_req_nr(req));
1353  rep->crr_rc = recv->cr_rec[idx].cr_rc;
1354  rep->crr_hint = recv->cr_rec[idx].cr_hint;
1355 }
1356 
1357 M0_INTERNAL void m0_cas_index_create_rep(const struct m0_cas_req *req,
1358  uint64_t idx,
1359  struct m0_cas_rec_reply *rep)
1360 {
1361  M0_ENTRY();
1362  cas_rep_copy(req, idx, rep);
1363  M0_LEAVE();
1364 }
1365 
1366 M0_INTERNAL int m0_cas_index_delete(struct m0_cas_req *req,
1367  const struct m0_cas_id *cids,
1368  uint64_t cids_nr,
1369  struct m0_dtx *dtx,
1370  uint32_t flags)
1371 {
1372  struct m0_cas_op *op;
1373  enum m0_cas_req_state next_state;
1374  int rc;
1375 
1376  M0_ENTRY();
1377  M0_PRE(req->ccr_sess != NULL);
1379  M0_PRE(m0_forall(i, cids_nr, m0_cas_id_invariant(&cids[i])));
1381  (void)dtx;
1382  rc = cas_index_req_prepare(req, cids, cids_nr, cids_nr, false, flags,
1383  &op);
1384  if (rc != 0)
1385  return M0_ERR(rc);
1386  rc = creq_fop_create_and_prepare(req, &cas_del_fopt, op, &next_state);
1387  if (rc == 0) {
1388  cas_fop_send(req);
1390  }
1391  return M0_RC(rc);
1392 }
1393 
1394 M0_INTERNAL void m0_cas_index_delete_rep(const struct m0_cas_req *req,
1395  uint64_t idx,
1396  struct m0_cas_rec_reply *rep)
1397 {
1398  M0_ENTRY();
1399  cas_rep_copy(req, idx, rep);
1400  M0_LEAVE();
1401 }
1402 
1403 M0_INTERNAL int m0_cas_index_lookup(struct m0_cas_req *req,
1404  const struct m0_cas_id *cids,
1405  uint64_t cids_nr)
1406 {
1407  struct m0_cas_op *op;
1408  enum m0_cas_req_state next_state;
1409  int rc;
1410 
1411  M0_ENTRY();
1412  M0_PRE(req->ccr_sess != NULL);
1414  M0_PRE(m0_forall(i, cids_nr, m0_cas_id_invariant(&cids[i])));
1415  rc = cas_index_req_prepare(req, cids, cids_nr, cids_nr, true, 0, &op);
1416  if (rc != 0)
1417  return M0_ERR(rc);
1418  rc = creq_fop_create_and_prepare(req, &cas_get_fopt, op, &next_state);
1419  if (rc == 0) {
1420  cas_fop_send(req);
1421  cas_req_state_set(req, next_state);
1422  }
1423  return M0_RC(rc);
1424 }
1425 
1426 M0_INTERNAL void m0_cas_index_lookup_rep(const struct m0_cas_req *req,
1427  uint64_t idx,
1428  struct m0_cas_rec_reply *rep)
1429 {
1430  M0_ENTRY();
1431  cas_rep_copy(req, idx, rep);
1432  M0_LEAVE();
1433 }
1434 
1435 M0_INTERNAL int m0_cas_index_list(struct m0_cas_req *req,
1436  const struct m0_fid *start_fid,
1437  uint32_t indices_nr,
1438  uint32_t flags)
1439 {
1440  struct m0_cas_op *op;
1441  struct m0_cas_id cid = { .ci_fid = *start_fid };
1442  enum m0_cas_req_state next_state;
1443  int rc;
1444 
1445  M0_ENTRY();
1446  M0_PRE(start_fid != NULL);
1447  M0_PRE(req->ccr_sess != NULL);
1450 
1451  rc = cas_index_req_prepare(req, &cid, 1, indices_nr, false, flags, &op);
1452  if (rc != 0)
1453  return M0_ERR(rc);
1454  op->cg_rec.cr_rec[0].cr_rc = indices_nr;
1455  rc = creq_fop_create_and_prepare(req, &cas_cur_fopt, op, &next_state);
1456  if (rc == 0) {
1457  cas_fop_send(req);
1458  cas_req_state_set(req, next_state);
1459  }
1460  return M0_RC(rc);
1461 }
1462 
1463 static int cas_next_rc(int64_t service_rc)
1464 {
1465  int rc;
1466 
1467  /*
1468  * Zero return code means some error on service side.
1469  * Service place sequence number of record starting from 1 in cr_rc on
1470  * success.
1471  */
1472  if (service_rc == 0)
1473  /*
1474  * Don't use M0_ERR() here to not pollute trace log.
1475  * Service places zero return code in all records following the
1476  * record having negative return code. It can happen in a
1477  * totally valid case when client requests more records than
1478  * available in a catalogue.
1479  */
1480  rc = -EPROTO;
1481  else if (service_rc < 0)
1482  rc = service_rc;
1483  else
1484  rc = 0;
1485  return M0_RC(rc);
1486 }
1487 
1488 M0_INTERNAL void m0_cas_index_list_rep(struct m0_cas_req *req,
1489  uint32_t idx,
1490  struct m0_cas_ilist_reply *rep)
1491 {
1492  struct m0_cas_recv *recv = &req->ccr_reply.cgr_rep;
1493  struct m0_cas_rec *rec;
1494  struct m0_buf fid;
1495 
1496  M0_ENTRY();
1497  M0_PRE(idx < m0_cas_req_nr(req));
1498 
1499  rec = &recv->cr_rec[idx];
1500  rep->clr_rc = cas_next_rc(rec->cr_rc) ?:
1501  m0_rpc_at_rep_get(NULL, &rec->cr_key, &fid);
1502  if (rep->clr_rc == 0) {
1503  rep->clr_fid = *(struct m0_fid *)fid.b_addr;
1504  rep->clr_hint = recv->cr_rec[idx].cr_hint;
1505  }
1506  M0_LEAVE();
1507 }
1508 
1510 {
1511  struct m0_cas_op *op = m0_fop_data(req->ccr_fop);
1512  int rc = 0;
1513 
1514  if (M0_FI_ENABLED("fragm_error"))
1515  return -E2BIG;
1516 
1517  M0_PRE(req->ccr_rec_orig.cr_nr != 0 &&
1518  req->ccr_rec_orig.cr_rec != NULL);
1519  M0_PRE(req->ccr_sent_recs_nr < req->ccr_rec_orig.cr_nr);
1520  /*
1521  * Flush records counter and reset records pointer to the first record
1522  * to be sent.
1523  */
1524  op->cg_rec.cr_nr = 0;
1525  op->cg_rec.cr_rec = &req->ccr_rec_orig.cr_rec[req->ccr_sent_recs_nr];
1526  do {
1527  op->cg_rec.cr_nr++;
1528  if (m0_rpc_item_max_payload_exceeded(&req->ccr_fop->f_item,
1529  req->ccr_sess)) {
1530  /*
1531  * Found the number of records when item payload exceeds
1532  * max rpc item payload, chose previous number of
1533  * records (current - 1) for sending.
1534  */
1535  op->cg_rec.cr_nr--;
1536  break;
1537  }
1538  } while (req->ccr_sent_recs_nr + op->cg_rec.cr_nr <
1539  req->ccr_rec_orig.cr_nr);
1540  if (op->cg_rec.cr_nr == 0)
1541  rc = -E2BIG; /* Almost impossible case. */
1542  if (rc != 0)
1543  /*
1544  * Restore original records vector in case of error for proper
1545  * finalisation of data structures.
1546  */
1547  op->cg_rec = req->ccr_rec_orig;
1548 
1549  return M0_RC(rc);
1550 }
1551 
1553  struct m0_cas_op *op)
1554 {
1555  int rc;
1556 
1557  rc = creq_fop_create(req, req->ccr_ftype, op);
1558  if (rc != 0)
1559  return M0_ERR(rc);
1561  if (rc == 0)
1562  cas_fop_send(req);
1563  else
1565 
1566  return M0_RC(rc);
1567 }
1568 
1569 static int cas_records_op_prepare(const struct m0_cas_req *req,
1570  const struct m0_cas_id *index,
1571  const struct m0_bufvec *keys,
1572  const struct m0_bufvec *values,
1573  uint32_t flags,
1574  struct m0_cas_op **out)
1575 {
1576  struct m0_cas_op *op;
1577  struct m0_cas_rec *rec;
1578  uint32_t keys_nr = keys->ov_vec.v_nr;
1579  uint64_t i;
1580  int rc;
1581 
1582  M0_ENTRY();
1583  rc = creq_op_alloc(keys_nr, &op);
1584  if (rc != 0)
1585  return M0_ERR(rc);
1586  op->cg_id = *index;
1587  rec = op->cg_rec.cr_rec;
1588  for (i = 0; i < keys_nr; i++) {
1589  m0_rpc_at_init(&rec[i].cr_key);
1590  rc = creq_kv_buf_add(req, keys, i, &rec[i].cr_key);
1591  if (rc == 0 && values != NULL) {
1592  m0_rpc_at_init(&rec[i].cr_val);
1593  rc = creq_kv_buf_add(req, values, i, &rec[i].cr_val);
1594  }
1595  if (rc != 0)
1596  break;
1597  }
1598  if (rc != 0) {
1599  op->cg_rec.cr_nr = i + 1;
1600  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
1601  creq_op_free(op);
1602  return M0_ERR(rc);
1603  }
1604  op->cg_flags = flags;
1605  *out = op;
1606  return M0_RC(rc);
1607 }
1608 
1609 static int cas_req_prep(struct m0_cas_req *req,
1610  const struct m0_cas_id *index,
1611  const struct m0_bufvec *keys,
1612  const struct m0_bufvec *values,
1613  uint64_t max_replies_nr,
1614  uint32_t flags,
1615  struct m0_cas_op **op)
1616 {
1617  struct m0_cas_recv *reply_recv;
1618  int rc;
1619  M0_ENTRY();
1620 
1621  reply_recv = &req->ccr_reply.cgr_rep;
1622  M0_ALLOC_ARR(reply_recv->cr_rec, max_replies_nr);
1623  if (reply_recv->cr_rec == NULL)
1624  return M0_ERR(-ENOMEM);
1625  /* Set to 0 initially, will be increased when reply is received. */
1626  reply_recv->cr_nr = 0;
1627  req->ccr_max_replies_nr = max_replies_nr;
1628  req->ccr_is_meta = false;
1629  rc = cas_records_op_prepare(req, index, keys, values, flags, op);
1630  if (rc == 0) {
1631  req->ccr_rec_orig = (*op)->cg_rec;
1632  req->ccr_req_op = *op;
1634  }
1635  if (rc != 0) {
1636  m0_free(reply_recv->cr_rec);
1637  reply_recv->cr_nr = 0;
1638  reply_recv->cr_rec = NULL;
1639  }
1640  return M0_RC(rc);
1641 }
1642 
1643 M0_INTERNAL int m0_cas_put(struct m0_cas_req *req,
1644  struct m0_cas_id *index,
1645  const struct m0_bufvec *keys,
1646  const struct m0_bufvec *values,
1647  struct m0_dtx *dtx,
1648  uint32_t flags)
1649 {
1650  struct m0_cas_op *op;
1651  enum m0_cas_req_state next_state;
1652  int rc;
1653 
1654  M0_ENTRY();
1655  M0_PRE(keys != NULL);
1656  M0_PRE(values != NULL);
1657  M0_PRE(keys->ov_vec.v_nr == values->ov_vec.v_nr);
1659  /* Create and overwrite flags can't be specified together. */
1660  M0_PRE(!(flags & COF_CREATE) || !(flags & COF_OVERWRITE));
1661  /*
1662  * Only create, overwrite, crow, sync_wait, and skip_layout flags
1663  * are allowed.
1664  */
1665  M0_PRE((flags &
1667  COF_SKIP_LAYOUT | COF_VERSIONED)) == 0);
1669 
1670  rc = cas_req_prep(req, index, keys, values, keys->ov_vec.v_nr, flags,
1671  &op);
1672  if (rc != 0)
1673  return M0_ERR(rc);
1674  rc = m0_dtx0_txd_copy(dtx, &op->cg_txd);
1675  if (rc != 0)
1676  return M0_ERR(rc);
1677  rc = creq_fop_create_and_prepare(req, &cas_put_fopt, op, &next_state);
1678  if (rc == 0) {
1679  cas_fop_send(req);
1680  cas_req_state_set(req, next_state);
1681  }
1682  return M0_RC(rc);
1683 }
1684 
1685 M0_INTERNAL void m0_cas_put_rep(struct m0_cas_req *req,
1686  uint64_t idx,
1687  struct m0_cas_rec_reply *rep)
1688 {
1689  M0_ENTRY();
1690  M0_PRE(req->ccr_ftype == &cas_put_fopt);
1691  cas_rep_copy(req, idx, rep);
1692  M0_LEAVE();
1693 }
1694 
1695 static int m0_cas__get(struct m0_cas_req *req,
1696  struct m0_cas_id *index,
1697  const struct m0_bufvec *keys,
1698  int flags)
1699 {
1700  struct m0_cas_op *op;
1701  int rc;
1702  struct m0_rpc_at_buf *ab;
1703  enum m0_cas_req_state next_state;
1704  uint32_t i;
1705 
1706  M0_ENTRY();
1707  M0_PRE(keys != NULL);
1710 
1711  rc = cas_req_prep(req, index, keys, NULL, keys->ov_vec.v_nr, flags,
1712  &op);
1713  if (rc != 0)
1714  return M0_ERR(rc);
1715  for (i = 0; i < keys->ov_vec.v_nr; i++) {
1716  ab = &op->cg_rec.cr_rec[i].cr_val;
1717  m0_rpc_at_init(ab);
1719  M0_RPC_AT_UNKNOWN_LEN, false);
1720  if (rc != 0) {
1721  m0_rpc_at_fini(ab);
1722  break;
1723  }
1724  }
1725  if (rc != 0) {
1726  op->cg_rec.cr_nr = i;
1727  creq_recv_fini(&op->cg_rec, fid_is_meta(&op->cg_id.ci_fid));
1728  creq_op_free(op);
1729  } else {
1730  req->ccr_keys = keys;
1732  &next_state);
1733  if (rc == 0) {
1734  cas_fop_send(req);
1735  cas_req_state_set(req, next_state);
1736  }
1737  }
1738  return M0_RC(rc);
1739 }
1740 
1741 M0_INTERNAL int m0_cas_get(struct m0_cas_req *req,
1742  struct m0_cas_id *index,
1743  const struct m0_bufvec *keys)
1744 {
1745  return m0_cas__get(req, index, keys, 0);
1746 }
1747 
1748 M0_INTERNAL int m0_cas_versioned_get(struct m0_cas_req *req,
1749  struct m0_cas_id *index,
1750  const struct m0_bufvec *keys)
1751 {
1752  return m0_cas__get(req, index, keys, COF_VERSIONED);
1753 }
1754 
1755 M0_INTERNAL void m0_cas_get_rep(const struct m0_cas_req *req,
1756  uint64_t idx,
1757  struct m0_cas_get_reply *rep)
1758 {
1759  const struct m0_cas_rep *cas_rep = &req->ccr_reply;
1760  struct m0_cas_rec *sent;
1761  struct m0_cas_rec *rcvd;
1762 
1763  M0_ENTRY();
1764  M0_PRE(idx < m0_cas_req_nr(req));
1765  M0_PRE(req->ccr_ftype == &cas_get_fopt);
1766  rcvd = &cas_rep->cgr_rep.cr_rec[idx];
1767  sent = &req->ccr_rec_orig.cr_rec[idx];
1768  rep->cge_rc = rcvd->cr_rc;
1769  rep->cge_ver = rcvd->cr_ver;
1770  if (rep->cge_rc == 0)
1771  m0_rpc_at_rep_get(&sent->cr_val, &rcvd->cr_val, &rep->cge_val);
1772  M0_LEAVE();
1773 }
1774 
1775 M0_INTERNAL int m0_cas_next(struct m0_cas_req *req,
1776  struct m0_cas_id *index,
1777  struct m0_bufvec *start_keys,
1778  uint32_t *recs_nr,
1779  uint32_t flags)
1780 {
1781  struct m0_cas_op *op;
1782  enum m0_cas_req_state next_state;
1783  uint64_t max_replies_nr = 0;
1784  uint32_t i;
1785  int rc;
1786 
1787  M0_ENTRY();
1788  M0_PRE(start_keys != NULL);
1791  /* Only slant, exclude start key, and versioned flags are allowed. */
1793  COF_VERSIONED | COF_SHOW_DEAD)) == 0);
1794  /* COF_SHOW_DEAD cannot be used without COF_VERSIONED */
1795  M0_PRE(ergo((flags & COF_SHOW_DEAD) != 0,
1796  (flags & COF_VERSIONED) != 0));
1797 
1798  for (i = 0; i < start_keys->ov_vec.v_nr; i++)
1799  max_replies_nr += recs_nr[i];
1800  rc = cas_req_prep(req, index, start_keys, NULL, max_replies_nr, flags,
1801  &op);
1802  if (rc != 0)
1803  return M0_ERR(rc);
1804  for (i = 0; i < start_keys->ov_vec.v_nr; i++)
1805  op->cg_rec.cr_rec[i].cr_rc = recs_nr[i];
1806  req->ccr_keys = start_keys;
1808  &next_state);
1809  if (rc == 0) {
1810  cas_fop_send(req);
1811  cas_req_state_set(req, next_state);
1812  }
1813  return M0_RC(rc);
1814 }
1815 
1816 M0_INTERNAL void m0_cas_rep_mlock(const struct m0_cas_req *req,
1817  uint64_t idx)
1818 {
1819  const struct m0_cas_rep *cas_rep = &req->ccr_reply;
1820 
1821  M0_PRE(M0_IN(req->ccr_ftype, (&cas_get_fopt, &cas_cur_fopt)));
1823 }
1824 
1825 M0_INTERNAL void m0_cas_next_rep(const struct m0_cas_req *req,
1826  uint32_t idx,
1827  struct m0_cas_next_reply *rep)
1828 {
1829  const struct m0_cas_rep *cas_rep = &req->ccr_reply;
1830  struct m0_cas_rec *rcvd;
1831 
1832  M0_ENTRY();
1833  M0_PRE(idx < m0_cas_req_nr(req));
1834  M0_PRE(req->ccr_ftype == &cas_cur_fopt);
1835  rcvd = &cas_rep->cgr_rep.cr_rec[idx];
1836  rep->cnp_ver = rcvd->cr_ver;
1837  rep->cnp_rc = cas_next_rc(rcvd->cr_rc) ?:
1838  m0_rpc_at_rep_get(NULL, &rcvd->cr_key, &rep->cnp_key) ?:
1839  m0_rpc_at_rep_get(NULL, &rcvd->cr_val, &rep->cnp_val);
1840 }
1841 
1842 M0_INTERNAL int m0_cas_del(struct m0_cas_req *req,
1843  struct m0_cas_id *index,
1844  struct m0_bufvec *keys,
1845  struct m0_dtx *dtx,
1846  uint32_t flags)
1847 {
1848  struct m0_cas_op *op;
1849  enum m0_cas_req_state next_state;
1850  int rc;
1851 
1852  M0_ENTRY();
1853  M0_PRE(keys != NULL);
1857 
1858  rc = cas_req_prep(req, index, keys, NULL, keys->ov_vec.v_nr, flags,
1859  &op);
1860  if (rc != 0)
1861  return M0_ERR(rc);
1862  rc = m0_dtx0_txd_copy(dtx, &op->cg_txd);
1863  if (rc != 0)
1864  return M0_ERR(rc);
1865  rc = creq_fop_create_and_prepare(req, &cas_del_fopt, op, &next_state);
1866  if (rc == 0) {
1867  cas_fop_send(req);
1868  cas_req_state_set(req, next_state);
1869  }
1870  return M0_RC(rc);
1871 }
1872 
1873 M0_INTERNAL void m0_cas_del_rep(struct m0_cas_req *req,
1874  uint64_t idx,
1875  struct m0_cas_rec_reply *rep)
1876 {
1877  M0_ENTRY();
1878  M0_PRE(req->ccr_ftype == &cas_del_fopt);
1879  cas_rep_copy(req, idx, rep);
1880  M0_LEAVE();
1881 }
1882 
1883 M0_INTERNAL int m0_cas_sm_conf_init(void)
1884 {
1889 }
1890 
1891 M0_INTERNAL void m0_cas_sm_conf_fini(void)
1892 {
1895 }
1896 
1897 #undef M0_TRACE_SUBSYSTEM
1898 
1901 /*
1902  * Local variables:
1903  * c-indentation-style: "K&R"
1904  * c-basic-offset: 8
1905  * tab-width: 8
1906  * fill-column: 80
1907  * scroll-step: 1
1908  * End:
1909  */
1910 /*
1911  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
1912  */
M0_INTERNAL void m0_cas_req_fini(struct m0_cas_req *req)
Definition: client.c:288
void * fd_data
Definition: fop.h:75
static void creq_kv_hold_down(struct m0_cas_rec *rec)
Definition: client.c:310
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
struct m0_rpc_at_buf cr_val
Definition: cas.h:300
#define M0_PRE(cond)
static int cas_req_prep(struct m0_cas_req *req, const struct m0_cas_id *index, const struct m0_bufvec *keys, const struct m0_bufvec *values, uint64_t max_replies_nr, uint32_t flags, struct m0_cas_op **op)
Definition: client.c:1609
M0_INTERNAL void m0_sm_conf_init(struct m0_sm_conf *conf)
Definition: sm.c:340
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
Definition: dtm.h:554
struct m0_cas_rec * cni_rep
Definition: client.c:62
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
Definition: sm.c:468
static int greq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:673
static struct m0_cas_req * item_to_cas_req(struct m0_rpc_item *item)
Definition: client.c:441
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
static void cas_to_rpc_map(const struct m0_cas_req *creq, const struct m0_rpc_item *item)
Definition: client.c:146
struct m0_rpc_at_buf ck_key
Definition: cas.h:127
static int cas_req_fragmentation(struct m0_cas_req *req)
Definition: client.c:1509
int const char const void size_t int flags
Definition: dir.c:328
static void creq_fop_destroy(struct m0_cas_req *req)
Definition: client.c:358
M0_INTERNAL int m0_sm_addb2_init(struct m0_sm_conf *conf, uint64_t id, uint64_t counter)
Definition: sm.c:846
static int creq_fop_create(struct m0_cas_req *req, struct m0_fop_type *ftype, struct m0_cas_op *op)
Definition: client.c:394
#define NULL
Definition: misc.h:38
struct m0_cas_recv * cni_repv
Definition: client.c:66
static const struct m0_rpc_item_ops cas_item_ops
Definition: client.c:76
Definition: idx_mock.c:52
M0_INTERNAL int m0_cas_index_list(struct m0_cas_req *req, const struct m0_fid *start_fid, uint32_t indices_nr, uint32_t flags)
Definition: client.c:1435
#define ergo(a, b)
Definition: misc.h:293
Definition: sm.h:350
static int cas_records_op_prepare(const struct m0_cas_req *req, const struct m0_cas_id *index, const struct m0_bufvec *keys, const struct m0_bufvec *values, uint32_t flags, struct m0_cas_op **out)
Definition: client.c:1569
static struct io_request req
Definition: file.c:100
uint64_t cv_nr
Definition: cas.h:135
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
static struct m0_sm_trans_descr cas_req_trans[]
Definition: client.c:119
M0_LEAVE()
M0_INTERNAL void m0_rpc_at_init(struct m0_rpc_at_buf *ab)
Definition: at.c:433
static void creq_niter_init(struct creq_niter *it, struct m0_cas_op *op, struct m0_cas_rep *rep)
Definition: client.c:757
static void cas_req_state_set(struct m0_cas_req *req, enum m0_cas_req_state state)
Definition: client.c:240
M0_INTERNAL bool m0_cas_req_is_locked(const struct m0_cas_req *req)
Definition: client.c:235
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL bool m0_rpc_at_is_set(const struct m0_rpc_at_buf *ab)
Definition: at.c:492
static void creq_asmbl_fop_release(struct m0_ref *ref)
Definition: client.c:380
Definition: cas.h:247
struct m0_vec ov_vec
Definition: vec.h:147
void m0_rpc_item_get(struct m0_rpc_item *item)
Definition: item.c:434
M0_INTERNAL int m0_dtx0_txd_copy(const struct m0_dtx *dtx, struct m0_dtm0_tx_desc *dst)
Definition: dtx.c:510
M0_INTERNAL void m0_cas_get_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_get_reply *rep)
Definition: client.c:1755
struct m0_sm ri_sm
Definition: item.h:181
M0_INTERNAL const struct m0_fid m0_cas_meta_fid
Definition: cas.c:146
static bool nreq_asmbl_post(struct m0_cas_req *req)
Definition: client.c:892
static int sum
Definition: rwlock.c:53
int32_t ri_error
Definition: item.h:161
static void cas_rep_copy(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1346
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
static struct m0_be_emap_cursor it
Definition: extmap.c:46
M0_INTERNAL const char * m0_sm_state_name(const struct m0_sm *mach, int state)
Definition: sm.c:781
struct m0_cas_recv cgr_rep
Definition: cas.h:427
M0_INTERNAL void m0_cas_index_create_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1357
M0_INTERNAL int m0_rpc_at_add(struct m0_rpc_at_buf *ab, const struct m0_buf *buf, const struct m0_rpc_conn *conn)
Definition: at.c:462
#define M0_BITS(...)
Definition: misc.h:236
Definition: sm.h:504
static void cas_fop_send(struct m0_cas_req *req)
Definition: client.c:576
struct m0_fop ccr_asmbl_fop
Definition: client.h:137
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
#define CASREQ_FOP_DATA(fop)
Definition: client.c:72
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
M0_INTERNAL bool m0_sm_addb2_counter_init(struct m0_sm *sm)
Definition: sm.c:891
static struct m0_rpc_item * item
Definition: item.c:56
struct m0_fop_getxattr_rep * rep
Definition: dir.c:455
void ** ov_buf
Definition: vec.h:149
M0_INTERNAL int m0_cas_get(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys)
Definition: client.c:1741
Definition: sock.c:887
struct m0_rpc_at_buf cr_val
Definition: cas.h:182
M0_INTERNAL void m0_dtm0_tx_desc_init_none(struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:39
static void cas_req_failure_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: client.c:546
static void creq_rep_override(struct m0_cas_rec *orig, struct m0_cas_rec *new)
Definition: client.c:944
M0_INTERNAL int m0_rpc_at_rep_get(struct m0_rpc_at_buf *sent, struct m0_rpc_at_buf *rcvd, struct m0_buf *out)
Definition: at.c:606
static void creq_asmbl_fop_init(struct m0_cas_req *req, struct m0_fop_type *ftype, struct m0_cas_op *op)
Definition: client.c:607
M0_INTERNAL void m0_cas_req_unlock(struct m0_cas_req *req)
Definition: client.c:229
M0_INTERNAL bool m0_cas_id_invariant(const struct m0_cas_id *cid)
Definition: cas.c:207
uint64_t cni_req_i
Definition: client.c:67
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
Definition: sm.c:387
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL void m0_cas_req_lock(struct m0_cas_req *req)
Definition: client.c:223
static struct m0_sm_group * cas_req_smgrp(const struct m0_cas_req *req)
Definition: client.c:218
return M0_RC(rc)
op
Definition: libdemo.c:64
struct m0_sm ccr_sm
Definition: client.h:125
#define M0_ENTRY(...)
Definition: trace.h:170
static int cas_req_fragment_continue(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:1552
Definition: buf.h:37
struct m0_cas_rec * cr_rec
Definition: cas.h:236
static void nreq_asmbl_accept(struct m0_cas_req *req)
Definition: client.c:960
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
static void cas_req_replied_cb(struct m0_rpc_item *item)
Definition: client.c:1178
struct m0_rpc_at_buf ck_val
Definition: cas.h:128
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
struct m0_rpc_at_buf cr_key
Definition: cas.h:172
static int cas_index_op_prepare(const struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, bool recv_val, uint32_t flags, struct m0_cas_op **out)
Definition: client.c:1200
int i
Definition: dir.c:1033
struct m0_fop_type * f_type
Definition: fop.h:81
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
uint64_t cr_nr
Definition: cas.h:235
m0_cas_req_state
Definition: client.h:109
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
M0_INTERNAL int m0_cas_req_generic_rc(const struct m0_cas_req *req)
Definition: client.c:457
static const struct m0_rpc_item_ops asmbl_item_ops
Definition: client.c:83
Definition: cas.h:126
M0_INTERNAL bool m0_rpc_at_rep_is_bulk(const struct m0_rpc_at_buf *rcvd, uint64_t *len)
Definition: at.c:651
M0_INTERNAL void m0_cas_req_init(struct m0_cas_req *req, struct m0_rpc_session *sess, struct m0_sm_group *grp)
Definition: client.c:195
static int key
Definition: locality.c:283
static void cas_req_replied_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: client.c:1137
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
Definition: refs.h:34
if(value==NULL)
Definition: dir.c:350
static void cas_req_failure_ast_post(struct m0_cas_req *req, int32_t rc)
Definition: client.c:556
M0_INTERNAL void m0_cas_put_rep(struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1685
Definition: cas.h:264
M0_INTERNAL int m0_cas_next(struct m0_cas_req *req, struct m0_cas_id *index, struct m0_bufvec *start_keys, uint32_t *recs_nr, uint32_t flags)
Definition: client.c:1775
M0_INTERNAL int m0_cas_versioned_get(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys)
Definition: client.c:1748
static struct m0_cas_rep * cas_rep(struct m0_rpc_item *reply)
Definition: client.c:452
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL void m0_cas_next_rep(const struct m0_cas_req *req, uint32_t idx, struct m0_cas_next_reply *rep)
Definition: client.c:1825
static int nreq_asmbl_fill(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:838
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
uint64_t cr_nr
Definition: cas.h:283
static struct m0_rpc_machine * creq_rpc_mach(const struct m0_cas_req *req)
Definition: client.c:213
M0_INTERNAL int m0_cas_sm_conf_init(void)
Definition: client.c:1883
struct m0_crv cr_ver
Definition: cas.h:228
M0_INTERNAL bool m0_rpc_item_max_payload_exceeded(struct m0_rpc_item *item, struct m0_rpc_session *session)
Definition: item.c:490
static int greq_asmbl_add(struct m0_cas_req *req, struct m0_cas_rec *rec, uint64_t idx, uint64_t orig_idx, uint64_t vlen)
Definition: client.c:623
uint64_t cni_kpos
Definition: client.c:69
M0_INTERNAL int m0_xcode_obj_enc_to_buf(struct m0_xcode_obj *obj, void **buf, m0_bcount_t *len)
Definition: xcode.c:832
M0_INTERNAL struct m0_fop_type cas_get_fopt
Definition: cas.c:47
static struct m0_sm_state_descr cas_req_states[]
Definition: client.c:88
M0_INTERNAL int m0_cas_del(struct m0_cas_req *req, struct m0_cas_id *index, struct m0_bufvec *keys, struct m0_dtx *dtx, uint32_t flags)
Definition: client.c:1842
static int nreq_asmbl_prep(struct m0_cas_req *req, struct m0_cas_op *op)
Definition: client.c:805
struct m0_cas_kv * cv_rec
Definition: cas.h:136
struct m0_rpc_item * ri_reply
Definition: item.h:163
struct m0_cas_rec * cni_req
Definition: client.c:61
void * f_opaque
Definition: fop.h:84
#define M0_POST(cond)
M0_INTERNAL struct m0_fop_type cas_cur_fopt
Definition: cas.c:50
M0_INTERNAL void m0_sm_addb2_fini(struct m0_sm_conf *conf)
Definition: sm.c:870
static int cas_rep_validate(const struct m0_cas_req *req)
Definition: client.c:531
struct m0_sm_conf cas_req_sm_conf
Definition: client.c:133
uint32_t v_nr
Definition: vec.h:51
static void creq_recv_fini(struct m0_cas_recv *recv, bool op_is_meta)
Definition: client.c:329
struct m0_fid ci_fid
Definition: cas.h:113
M0_INTERNAL int m0_cas_index_create(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, struct m0_dtx *dtx)
Definition: client.c:1321
M0_INTERNAL void m0_cas_index_list_rep(struct m0_cas_req *req, uint32_t idx, struct m0_cas_ilist_reply *rep)
Definition: client.c:1488
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
m0_bcount_t * v_count
Definition: vec.h:53
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
struct m0_mutex s_lock
Definition: sm.h:514
void(* rio_sent)(struct m0_rpc_item *item)
Definition: item.h:267
struct m0_cas_kv_vec cr_kv_bufs
Definition: cas.h:195
struct m0_sm_ast ccr_failure_ast
Definition: client.h:157
struct m0_cas_recv cg_rec
Definition: cas.h:384
struct m0_cas_hint cr_hint
Definition: cas.h:200
Definition: cas.h:372
static int creq_fop_create_and_prepare(struct m0_cas_req *req, struct m0_fop_type *ftype, struct m0_cas_op *op, enum m0_cas_req_state *next_state)
Definition: client.c:414
M0_INTERNAL int m0_cas_index_delete(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, struct m0_dtx *dtx, uint32_t flags)
Definition: client.c:1366
static uint64_t greq_asmbl_count(const struct m0_cas_req *req)
Definition: client.c:648
static void creq_op_free(struct m0_cas_op *op)
Definition: client.c:186
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
#define m0_forall(var, nr,...)
Definition: misc.h:112
uint32_t sd_flags
Definition: sm.h:378
M0_INTERNAL struct m0_fop_type cas_del_fopt
Definition: cas.c:49
uint64_t cni_rep_i
Definition: client.c:68
static int creq_niter_next(struct creq_niter *it)
Definition: client.c:770
M0_INTERNAL void m0_cas_sm_conf_fini(void)
Definition: client.c:1891
static struct m0_rpc_conn * creq_rpc_conn(const struct m0_cas_req *req)
Definition: client.c:208
struct m0_fop_data f_data
Definition: fop.h:82
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
M0_INTERNAL void m0_cas_index_delete_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1394
M0_INTERNAL void m0_cas_req_fini_lock(struct m0_cas_req *req)
Definition: client.c:295
M0_INTERNAL void m0_cas_del_rep(struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1873
M0_INTERNAL uint64_t m0_cas_req_nr(const struct m0_cas_req *req)
Definition: client.c:1308
void m0_rpc_item_put_lock(struct m0_rpc_item *item)
Definition: item.c:454
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
struct m0_ref f_ref
Definition: fop.h:80
Definition: fid.h:38
static void creq_asmbl_replied_cb(struct m0_rpc_item *item)
Definition: client.c:1125
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
static bool greq_asmbl_post(struct m0_cas_req *req)
Definition: client.c:715
#define M0_IS0(obj)
Definition: misc.h:70
M0_INTERNAL int m0_cas_index_lookup(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr)
Definition: client.c:1403
static int creq_op_alloc(uint64_t recs_nr, struct m0_cas_op **out)
Definition: client.c:161
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
static int cas_next_rc(int64_t service_rc)
Definition: client.c:1463
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
static void creq_item_prepare(const struct m0_cas_req *req, struct m0_rpc_item *item, const struct m0_rpc_item_ops *ops)
Definition: client.c:565
static void addb2_add_cas_req_attrs(const struct m0_cas_req *req)
Definition: client.c:1262
static void creq_niter_fini(struct creq_niter *it)
Definition: client.c:796
M0_INTERNAL int m0_cas_put(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys, const struct m0_bufvec *values, struct m0_dtx *dtx, uint32_t flags)
Definition: client.c:1643
struct m0_sm_ast ccr_replied_ast
Definition: client.h:155
struct m0_rpc_session * ri_session
Definition: item.h:147
static struct m0_rpc_item * cas_req_to_item(const struct m0_cas_req *req)
Definition: client.c:447
static int m0_cas__get(struct m0_cas_req *req, struct m0_cas_id *index, const struct m0_bufvec *keys, int flags)
Definition: client.c:1695
M0_INTERNAL int m0_cas_req_wait(struct m0_cas_req *req, uint64_t states, m0_time_t to)
Definition: client.c:1313
int32_t cgr_rc
Definition: cas.h:416
M0_INTERNAL void m0_rpc_at_fini(struct m0_rpc_at_buf *ab)
Definition: at.c:441
struct m0_rpc_at_buf cr_key
Definition: cas.h:290
static bool creq_niter_invariant(struct creq_niter *it)
Definition: client.c:751
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static int creq_kv_buf_add(const struct m0_cas_req *req, const struct m0_bufvec *kv, uint32_t idx, struct m0_rpc_at_buf *buf)
Definition: client.c:592
static struct m0_fop * fop
Definition: item.c:57
static void greq_asmbl_accept(struct m0_cas_req *req)
Definition: client.c:995
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
static void cas_req_reply_fini(struct m0_cas_req *req)
Definition: client.c:249
#define M0_XCODE_OBJ(type, ptr)
Definition: xcode.h:962
M0_INTERNAL struct m0_fop_type cas_put_fopt
Definition: cas.c:48
M0_INTERNAL void m0_cas_rep_mlock(const struct m0_cas_req *req, uint64_t idx)
Definition: client.c:1816
static void cas_req_fini(struct m0_cas_req *req)
Definition: client.c:261
static int cas_req_reply_handle(struct m0_cas_req *req, bool *fragm_continue)
Definition: client.c:1039
#define out(...)
Definition: gen.c:41
static void cas_req_fsync_remid_copy(struct m0_cas_req *req)
Definition: client.c:1026
M0_INTERNAL void m0_sm_conf_fini(struct m0_sm_conf *conf)
Definition: sm.c:376
static int cas_rep__validate(const struct m0_fop_type *ftype, struct m0_cas_op *op, struct m0_cas_rep *rep)
Definition: client.c:482
static void creq_asmbl_replied_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: client.c:1089
struct m0_cas_recv * cni_reqv
Definition: client.c:65
struct m0_fom_ops ops
Definition: io_foms.c:623
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
static void cas_req_failure(struct m0_cas_req *req, int32_t rc)
Definition: client.c:540
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
M0_INTERNAL int m0_rpc_at_rep2inline(struct m0_rpc_at_buf *sent, struct m0_rpc_at_buf *rcvd)
Definition: at.c:663
void m0_free(void *data)
Definition: memory.c:146
uint64_t cr_rc
Definition: cas.h:221
struct m0_rpc_item f_item
Definition: fop.h:83
Definition: cas.h:107
#define M0_BUF_INIT(size, data)
Definition: buf.h:64
M0_INTERNAL void m0_dtm0_tx_desc_fini(struct m0_dtm0_tx_desc *td)
Definition: tx_desc.c:111
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_cas_index_lookup_rep(const struct m0_cas_req *req, uint64_t idx, struct m0_cas_rec_reply *rep)
Definition: client.c:1426
static struct m0_sm_state_descr states[C_NR]
Definition: sm.c:512
static int cas_index_req_prepare(struct m0_cas_req *req, const struct m0_cas_id *cids, uint64_t cids_nr, uint64_t max_replies_nr, bool recv_val, uint32_t flags, struct m0_cas_op **op)
Definition: client.c:1273
const m0_time_t M0_TIME_IMMEDIATELY
Definition: time.c:107
Definition: fop.h:79
static void creq_fop_release(struct m0_ref *ref)
Definition: client.c:368
struct m0_cas_id cg_id
Definition: cas.h:374
static bool fid_is_meta(struct m0_fid *fid)
Definition: client.c:155
Definition: vec.h:145
M0_INTERNAL void m0_rpc_at_detach(struct m0_rpc_at_buf *ab)
Definition: at.c:694
M0_INTERNAL int m0_rpc_at_recv(struct m0_rpc_at_buf *ab, const struct m0_rpc_conn *conn, uint32_t len, bool force_bulk)
Definition: at.c:508
static bool cas_rep_val_is_valid(struct m0_rpc_at_buf *val, struct m0_fid *idx_fid)
Definition: client.c:476
m0_time_t ri_deadline
Definition: item.h:141
Definition: idx_mock.c:47
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331