Motr  M0
io.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "motr/client.h"
24 #include "motr/client_internal.h"
25 #include "motr/layout.h"
26 #include "motr/addb.h"
27 #include "motr/pg.h"
28 #include "motr/io.h"
29 
30 #include "lib/errno.h" /* ENOMEM */
31 #include "fid/fid.h" /* m0_fid */
32 #include "ioservice/fid_convert.h" /* m0_fid_convert_ */
33 #include "rm/rm_service.h" /* m0_rm_svc_domain_get */
34 
35 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CLIENT
36 #include "lib/trace.h" /* M0_LOG */
37 #include "lib/finject.h"
38 
39 #define DGMODE_IO
40 
53 const struct m0_uint128 m0_rm_group = M0_UINT128(0, 1);
54 
58 M0_INTERNAL struct m0_rm_domain *
60 {
61  struct m0_reqh_service *svc;
62 
63  M0_ENTRY();
64 
65  M0_PRE(cinst!= NULL);
66 
68  M0_ASSERT(svc != NULL);
69 
70  M0_LEAVE();
71  return m0_rm_svc_domain_get(svc);
72 }
73 
74 M0_INTERNAL struct m0_poolmach*
76 {
77  struct m0_pool_version *pv;
78  struct m0_client *cinst;
79 
82  &ioo->ioo_pver);
83  return &pv->pv_mach;
84 }
85 
86 /*
87  * This is added to avoid the case of overlapping segments being passed
88  * to parity group mapping due to a bug observed in EOS-4189.
89  * S3 server does not send overlapping segments, hence temporarily
90  * removing the support for the same.
91  * TODO: Remove this after a patch for EOS-5083 lands.
92  */
93 static bool indexvec_segments_overlap(struct m0_indexvec *ivec)
94 {
95  uint32_t seg;
96  bool overlap = false;
97 
98  for (seg = 0; seg < SEG_NR(ivec) - 1; ++seg) {
99  overlap = (INDEX(ivec, seg) + COUNT(ivec, seg)) >
100  INDEX(ivec, seg + 1);
101  if (overlap)
102  break;
103  }
104 
105  return overlap;
106 }
107 
108 M0_UNUSED static inline void indexvec_dump(struct m0_indexvec *ivec)
109 {
110  uint32_t seg;
111 
112  for (seg = 0; seg < SEG_NR(ivec); ++seg)
113  M0_LOG(M0_DEBUG, "seg# %d: [pos, +len) = [%"PRIu64
114  ", +%" PRIu64 ")", seg, INDEX(ivec, seg),
115  COUNT(ivec, seg));
116 }
117 
124 static void segments_sort(struct m0_indexvec *ivec, struct m0_bufvec *data,
125  struct m0_bufvec *attr)
126 {
127  uint32_t i;
128  uint32_t j;
129 
130  M0_ENTRY("indexvec = %p", ivec);
131 
132  M0_PRE(ivec != NULL);
133  M0_PRE(!m0_vec_is_empty(&ivec->iv_vec));
134 
135  /*
136  * TODO Should be replaced by an efficient sorting algorithm,
137  * something like heapsort which is fairly inexpensive in kernel
138  * mode with the least worst case scenario.
139  * Existing heap sort from kernel code can not be used due to
140  * apparent disconnect between index vector and its associated
141  * count vector for same index.
142  */
143  for (i = 0; i < SEG_NR(ivec) && COUNT(ivec, i); ++i) {
144  for (j = i+1; j < SEG_NR(ivec) && COUNT(ivec, j); ++j) {
145  if (INDEX(ivec, i) > INDEX(ivec, j)) {
146  M0_SWAP(INDEX(ivec, i), INDEX(ivec, j));
147  M0_SWAP(COUNT(ivec, i), COUNT(ivec, j));
148  M0_SWAP(BUFVI(data, i), BUFVI(data, j));
149  M0_SWAP(BUFVC(data, i), BUFVC(data, j));
150  if (attr) {
151  M0_SWAP(BUFVI(attr, i), BUFVI(attr, j));
152  M0_SWAP(BUFVC(attr, i), BUFVC(attr, j));
153  }
154  }
155  }
156  }
157 
158  M0_LEAVE();
159 }
160 
161 M0_INTERNAL bool m0_op_io_invariant(const struct m0_op_io *ioo)
162 {
163  unsigned int opcode = ioo->ioo_oo.oo_oc.oc_op.op_code;
164  const struct nw_xfer_request *nxr = &ioo->ioo_nwxfer;
165  uint32_t state = ioreq_sm_state(ioo);
166 
167  M0_ENTRY();
168 
169  return _0C(ioo != NULL) &&
170  _0C(m0_op_io_bob_check(ioo)) &&
171  /* ioo is big enough */
172  _0C(ioo->ioo_oo.oo_oc.oc_op.op_size >= sizeof *ioo) &&
173  /* is a supported type */
174  _0C(M0_IN(opcode, (M0_OC_READ, M0_OC_WRITE,
175  M0_OC_FREE))) &&
176  /* read/write extent is for an area with a size */
177  _0C(m0_vec_count(&ioo->ioo_ext.iv_vec) > 0) &&
178  /* read/write extent is a multiple of block size */
179  _0C(m0_vec_count(&ioo->ioo_ext.iv_vec) %
180  M0_BITS(ioo->ioo_obj->ob_attr.oa_bshift) == 0) &&
181  /* memory area for read/write is the same size as the extents */
182  _0C(ergo(M0_IN(opcode,
184  m0_vec_count(&ioo->ioo_ext.iv_vec) ==
185  m0_vec_count(&ioo->ioo_data.ov_vec))) &&
186 #ifdef BLOCK_ATTR_SUPPORTED /* Block attr not yet enabled. */
187  /* memory area for attribute read/write is big enough */
188  _0C(ergo(M0_IN(opcode,
190  m0_vec_count(&ioo->ioo_attr.ov_vec) ==
191  8 * m0_no_of_bits_set(ioo->ioo_attr_mask) *
192  (m0_vec_count(&ioo->ioo_ext.iv_vec) >>
193  ioo->ioo_obj->ob_attr.oa_bshift))) &&
194 #endif
195  /* alloc/free don't have a memory area */
196  _0C(ergo(M0_IN(opcode,
198  M0_IS0(&ioo->ioo_data) && M0_IS0(&ioo->ioo_attr) &&
199  ioo->ioo_attr_mask == 0)) &&
200  _0C(m0_fid_is_valid(&ioo->ioo_oo.oo_fid)) &&
201  /* a network transfer machine is registered for read/write */
202  _0C(ergo(M0_IN(state, (IRS_READING, IRS_WRITING)),
203  !tioreqht_htable_is_empty(&nxr->nxr_tioreqs_hash))) &&
204  /* if finished, there are no fops left waiting */
205  _0C(ergo(M0_IN(state, (IRS_WRITE_COMPLETE, IRS_READ_COMPLETE)),
206  m0_atomic64_get(&nxr->nxr_iofop_nr) == 0 &&
207  m0_atomic64_get(&nxr->nxr_rdbulk_nr) == 0)) &&
210 }
211 
212 static void addb2_add_ioo_attrs(const struct m0_op_io *ioo, int rmw)
213 {
214  uint64_t ioid = m0_sm_id_get(&ioo->ioo_sm);
215 
217  ioo->ioo_data.ov_vec.v_nr);
219  ioo->ioo_data.ov_vec.v_count[0]);
221  m0__page_size(ioo));
223  (int)addr_is_network_aligned(ioo->ioo_data.ov_buf[0]));
225 }
226 
234 static void obj_io_cb_launch(struct m0_op_common *oc)
235 {
236  int rc;
237  struct m0_op_obj *oo;
238  struct m0_op_io *ioo;
239 
240  M0_ENTRY();
241 
242  M0_PRE(oc != NULL);
243  M0_PRE(oc->oc_op.op_entity != NULL);
245  &oc->oc_op.op_entity->en_id) < 0);
246  M0_PRE(M0_IN(oc->oc_op.op_code, (M0_OC_WRITE,
247  M0_OC_READ,
248  M0_OC_FREE)));
249  M0_PRE(oc->oc_op.op_size >= sizeof *ioo);
250 
251  oo = bob_of(oc, struct m0_op_obj, oo_oc, &oo_bobtype);
252  ioo = bob_of(oo, struct m0_op_io, ioo_oo, &ioo_bobtype);
254 
255  rc = ioo->ioo_ops->iro_iomaps_prepare(ioo);
256  if (rc != 0)
257  goto end;
258 
260  if (rc != 0) {
261  ioo->ioo_ops->iro_iomaps_destroy(ioo);
263  goto end;
264  }
265 
266  /*
267  * This walks through the iomap looking for a READOLD/READREST slot.
268  * Updating ioo_map_idx to indicate where ioreq_iosm_handle_launch
269  * should start. This behaviour is replicated from
270  * m0t1fs:ioreq_iosm_handle.
271  */
272  for (ioo->ioo_map_idx = 0; ioo->ioo_map_idx < ioo->ioo_iomap_nr;
273  ++ioo->ioo_map_idx) {
274  if (M0_IN(ioo->ioo_iomaps[ioo->ioo_map_idx]->pi_rtype,
276  break;
277  }
278 
279  if (M0_IN(oc->oc_op.op_code, (M0_OC_WRITE, M0_OC_READ)))
280  addb2_add_ioo_attrs(ioo, ioo->ioo_map_idx != ioo->ioo_iomap_nr);
281 
283  m0_sm_ast_post(ioo->ioo_oo.oo_sm_grp, &ioo->ioo_ast);
284 end:
285  M0_LEAVE();
286 }
287 
295 static void obj_io_cb_cancel(struct m0_op_common *oc)
296 {
297  struct m0_op_obj *oo;
298  struct m0_op_io *ioo;
299  struct target_ioreq *ti;
300 
301  M0_ENTRY();
302  M0_PRE(oc != NULL);
304  M0_PRE(M0_IN(oc->oc_op.op_code,
306 
307  oo = bob_of(oc, struct m0_op_obj, oo_oc, &oo_bobtype);
308  ioo = bob_of(oo, struct m0_op_io, ioo_oo, &ioo_bobtype);
309 
310  /* If nxr_state is NXS_COMPLETE, then wait for IRS_REQ_COMPLETE state
311  * of io_req.
312  * NXS_COMPLETE state ensures no fops are in exectuion in the
313  * current io_req phase and IRS_REQ_COMPLETE state ensures that io_req
314  * is completed and no more operation needs to be executed.
315  * Example:
316  * In case of degraded mode read/write, nxr_state can be NXS_COMPLETE,
317  * but ioreq_state is not IRS_REQ_COMPLETE, indicating io request is
318  * still in process and more fops can be dispatched. If in this case we
319  * did not wait for IRS_REQ_COMPLETE even when nxr_state is NXS_COMPLETE
320  * then there is a possibility that, cancel operation will return but
321  * new fops are dispatched for the same io_req later which can cause
322  * issues.
323  */
324  while (ioo->ioo_nwxfer.nxr_state == NXS_COMPLETE &&
326  ;
327 
328  if (ioo->ioo_nwxfer.nxr_state == NXS_INFLIGHT) {
330  m0_htable_for(tioreqht, ti,
334  }
335  M0_LEAVE();
336 }
337 
346 static void obj_io_ast_fini(struct m0_sm_group *grp,
347  struct m0_sm_ast *ast)
348 {
349  struct m0_op_io *ioo;
350  struct target_ioreq *ti;
351 
352  M0_ENTRY();
353 
354  M0_PRE(ast != NULL);
355  M0_PRE(grp != NULL);
357  ioo = bob_of(ast, struct m0_op_io, ioo_ast, &ioo_bobtype);
359  /* XXX Shouldn't this be a controlled rc? */
360  M0_PRE(M0_IN(ioo->ioo_sm.sm_state,
362 
363  /* Cleanup the state machine */
364  m0_sm_fini(&ioo->ioo_sm);
365 
366  /* Free all the iorequests */
367  m0_htable_for(tioreqht, ti, &ioo->ioo_nwxfer.nxr_tioreqs_hash) {
368  tioreqht_htable_del(&ioo->ioo_nwxfer.nxr_tioreqs_hash, ti);
369  /*
370  * All ioreq_fop structures in list target_ioreq::ti_iofops
371  * are already finalized in nw_xfer_req_complete().
372  */
373  target_ioreq_fini(ti);
375 
376  /* Free memory used for io maps and buffers */
377  if (ioo->ioo_iomaps != NULL)
378  ioo->ioo_ops->iro_iomaps_destroy(ioo);
379 
385 
386  M0_LEAVE();
387 }
388 
395 static void obj_io_cb_fini(struct m0_op_common *oc)
396 {
397  struct m0_op_obj *oo;
398  struct m0_op_io *ioo;
399  struct m0_clink w;
400 
401  M0_ENTRY();
402 
403  M0_PRE(oc != NULL);
404  M0_PRE(M0_IN(oc->oc_op.op_code,
406  M0_PRE(M0_IN(oc->oc_op.op_sm.sm_state,
408  M0_PRE(oc->oc_op.op_size >= sizeof *ioo);
409 
410  oo = bob_of(oc, struct m0_op_obj, oo_oc, &oo_bobtype);
411  ioo = bob_of(oo, struct m0_op_io, ioo_oo, &ioo_bobtype);
413 
414  /* Finalise the io state machine */
415  /* We do this by posting the fini callback AST, and waiting for it
416  * to complete */
417  m0_clink_init(&w, NULL);
419 
421  m0_sm_ast_post(ioo->ioo_oo.oo_sm_grp, &ioo->ioo_ast);
422 
423  m0_chan_wait(&w);
424  m0_clink_del_lock(&w);
425  m0_clink_fini(&w);
427 
428  /* Finalise the bob type */
429  m0_op_io_bob_fini(ioo);
430 
431  M0_LEAVE();
432 }
433 
442 static void obj_io_cb_free(struct m0_op_common *oc)
443 {
444  struct m0_op_obj *oo;
445  struct m0_op_io *ioo;
446 
447  M0_ENTRY();
448 
449  M0_PRE(oc != NULL);
450  M0_PRE((oc->oc_op.op_size >= sizeof *ioo));
451 
452  /* Can't use bob_of here */
453  oo = M0_AMB(oo, oc, oo_oc);
454  ioo = M0_AMB(ioo, oo, ioo_oo);
455 
456  m0_free(ioo);
457 
458  M0_LEAVE();
459 }
460 
461 static int obj_io_init(struct m0_obj *obj,
462  enum m0_obj_opcode opcode,
463  struct m0_indexvec *ext,
464  struct m0_bufvec *data,
465  struct m0_bufvec *attr,
466  uint64_t mask,
467  uint32_t flags,
468  struct m0_op *op)
469 {
470  int rc;
471  int i;
472  uint64_t max_svc_failures;
473  uint64_t max_node_failures;
474  struct m0_op_io *ioo;
475  struct m0_op_obj *oo;
476  struct m0_op_common *oc;
477  struct m0_locality *locality;
478  struct m0_client *cinst;
479 
480  M0_PRE(obj != NULL);
481  cinst = m0__entity_instance(&obj->ob_entity);
482 
483  M0_ASSERT(op->op_size >= sizeof *ioo);
484  oc = bob_of(op, struct m0_op_common, oc_op, &oc_bobtype);
485  oo = bob_of(oc, struct m0_op_obj, oo_oc, &oo_bobtype);
486  ioo = container_of(oo, struct m0_op_io, ioo_oo);
487  m0_op_io_bob_init(ioo);
488  ioo->ioo_obj = obj;
489  ioo->ioo_ops = &ioo_ops;
490  ioo->ioo_pver = oo->oo_pver;
491 
492  /* Initialise this operation as a network transfer */
494  if (ioo->ioo_nwxfer.nxr_rc != 0) {
495  rc = ioo->ioo_nwxfer.nxr_rc;
496  M0_LOG(M0_ERROR, "nw_xfer_request_init failed with %d", rc);
497  goto err;
498  }
499 
500  /* Allocate and initialise failed sessions and nodes. */
501  max_svc_failures = tolerance_of_level(ioo, M0_CONF_PVER_LVL_CTRLS);
502  M0_ALLOC_ARR(ioo->ioo_failed_session, max_svc_failures + 1);
503  if (ioo->ioo_failed_session == NULL) {
504  rc = M0_ERR(-ENOMEM);
505  goto err;
506  }
507  for (i = 0; i < max_svc_failures; ++i)
508  ioo->ioo_failed_session[i] = ~(uint64_t)0;
509 
510  max_node_failures = tolerance_of_level(ioo, M0_CONF_PVER_LVL_ENCLS);
511  M0_ALLOC_ARR(ioo->ioo_failed_nodes, max_node_failures + 1);
512 
513  if (ioo->ioo_failed_nodes == NULL) {
514  rc = M0_ERR(-ENOMEM);
515  goto err;
516  }
517  for (i = 0; i < max_node_failures; ++i)
518  ioo->ioo_failed_nodes[i] = ~(uint64_t)0;
519 
520  /* Initialise the state machine */
522  M0_ASSERT(locality != NULL);
523  ioo->ioo_oo.oo_sm_grp = locality->lo_grp;
525  locality->lo_grp);
527 
528  /* This is used to wait for the ioo to be finalised */
530 
531  /* Store the remaining parameters */
532  ioo->ioo_iomap_nr = 0;
534  ioo->ioo_ext = *ext;
535  ioo->ioo_flags = flags;
536  ioo->ioo_flags |= M0_OOF_SYNC;
537  if (M0_IN(opcode, (M0_OC_READ, M0_OC_WRITE))) {
538  ioo->ioo_data = *data;
539  ioo->ioo_attr_mask = mask;
541  if (attr != NULL && attr->ov_vec.v_nr)
542  ioo->ioo_attr = *attr;
543  else
544  M0_SET0(&ioo->ioo_attr);
545  }
547  return M0_RC(0);
548 err:
549  return M0_ERR(rc);
550 }
551 
552 static int obj_op_init(struct m0_obj *obj,
553  enum m0_obj_opcode opcode,
554  struct m0_op *op)
555 {
556  int rc;
557  uint64_t layout_id;
558  struct m0_op_obj *oo;
559  struct m0_op_common *oc;
560  struct m0_entity *entity;
561  struct m0_client *cinst;
562 
563  M0_ENTRY();
564  M0_PRE(obj != NULL);
565  M0_PRE(op != NULL);
566 
567  entity = &obj->ob_entity;
568  cinst = m0__entity_instance(entity);
569 
570  /*
571  * Sanity test before proceeding.
572  * Note: Can't use bob_of at this point as oc/oo/ioo haven't been
573  * initialised yet.
574  */
575  oc = container_of(op, struct m0_op_common, oc_op);
576  oo = container_of(oc, struct m0_op_obj, oo_oc);
577 
578  /* Initialise the operation */
579  op->op_code = opcode;
580  rc = m0_op_init(op, &m0_op_conf, entity);
581  if (rc != 0)
582  return M0_ERR(rc);
583 
584  /* Initialise this object as a 'bob' */
585  m0_op_common_bob_init(oc);
586  m0_op_obj_bob_init(oo);
587 
588  /* Initalise the vtable */
589  switch (opcode) {
590  case M0_OC_READ:
591  case M0_OC_WRITE:
592  case M0_OC_FREE:
593  /* General entries */
598  break;
599  default:
600  M0_IMPOSSIBLE("Not implememented yet");
601  break;
602  }
603 
604  /* Convert the client:object-id to a motr:fid */
605  m0_fid_gob_make(&oo->oo_fid, obj->ob_entity.en_id.u_hi,
606  obj->ob_entity.en_id.u_lo);
608  &obj->ob_attr.oa_pver) != NULL);
609  oo->oo_pver = m0__obj_pver(obj);
613  &oo->oo_layout_instance);
614  if (rc != 0) {
615  /*
616  * Setting the callbacks to NULL so that m0_op_fini()
617  * and m0_op_free() functions don't fini and free
618  * m0_op_io as it has not been initialized now.
619  */
620  oc->oc_cb_fini = NULL;
621  oc->oc_cb_free = NULL;
622  return M0_ERR(rc);
623  } else
624  return M0_RC(0);
625 }
626 
627 static void obj_io_args_check(struct m0_obj *obj,
628  enum m0_obj_opcode opcode,
629  struct m0_indexvec *ext,
630  struct m0_bufvec *data,
631  struct m0_bufvec *attr,
632  uint64_t mask)
633 {
635  M0_OC_FREE)));
636  M0_ASSERT(ext != NULL);
637  M0_ASSERT(obj->ob_attr.oa_bshift >= M0_MIN_BUF_SHIFT);
638  M0_ASSERT(m0_vec_count(&ext->iv_vec) %
639  (1ULL << obj->ob_attr.oa_bshift) == 0);
641  data != NULL &&
642  m0_vec_count(&ext->iv_vec) ==
643  m0_vec_count(&data->ov_vec)));
644 #ifdef BLOCK_ATTR_SUPPORTED /* Block metadata is not yet supported */
645  M0_ASSERT(m0_vec_count(&attr->ov_vec) ==
646  (8 * m0_no_of_bits_set(mask) *
647  (m0_vec_count(&ext->iv_vec) >> obj->ob_attr.oa_bshift)));
648 #endif
650  data == NULL && attr == NULL && mask == 0));
651  /* Block metadata is not yet supported */
652  M0_ASSERT(mask == 0);
653 }
654 
656 {
657  return instance->m0c_config->mc_is_read_verify;
658 }
659 
660 M0_INTERNAL bool m0__obj_is_di_enabled(struct m0_op_io *ioo)
661 {
662  return ioo->ioo_obj->ob_entity.en_flags & M0_ENF_DI;
663 }
664 
665 M0_INTERNAL bool m0__obj_is_cksum_validation_allowed(struct m0_op_io *ioo)
666 {
667  /*
668  * Checksum validation is not allowed for degraded read and
669  * for read verify mode in parity.
670  */
671  return m0__obj_is_di_enabled(ioo) &&
672  !ioo->ioo_dgmode_io_sent &&
675 }
676 
677 M0_INTERNAL int m0__obj_io_build(struct m0_io_args *args,
678  struct m0_op **op)
679 {
680  int rc = 0;
681 
682  M0_ENTRY();
683  rc = m0_op_get(op, sizeof(struct m0_op_io))?:
684  obj_op_init(args->ia_obj, args->ia_opcode, *op)?:
685  obj_io_init(args->ia_obj, args->ia_opcode,
686  args->ia_ext, args->ia_data, args->ia_attr,
687  args->ia_mask, args->ia_flags, *op);
688  return M0_RC(rc);
689 };
690 
691 M0_INTERNAL void m0__obj_op_done(struct m0_op *op)
692 {
693  struct m0_op *parent;
694  struct m0_op_common *parent_oc;
695  struct m0_op_obj *parent_oo;
696 
697  M0_ENTRY();
698  M0_PRE(op != NULL);
699 
700  parent = op->op_parent;
701  if (parent == NULL) {
702  M0_LEAVE();
703  return;
704  }
705  parent_oc = bob_of(parent, struct m0_op_common,
706  oc_op, &oc_bobtype);
707  parent_oo = bob_of(parent_oc, struct m0_op_obj,
708  oo_oc, &oo_bobtype);
709 
710  /* Inform its parent. */
711  M0_ASSERT(op->op_parent_ast.sa_cb != NULL);
712  m0_sm_ast_post(parent_oo->oo_sm_grp, &op->op_parent_ast);
713 
714  M0_LEAVE();
715 }
716 
717 int m0_obj_op(struct m0_obj *obj,
718  enum m0_obj_opcode opcode,
719  struct m0_indexvec *ext,
720  struct m0_bufvec *data,
721  struct m0_bufvec *attr,
722  uint64_t mask,
723  uint32_t flags,
724  struct m0_op **op)
725 {
726  int rc;
727  bool op_pre_allocated = true;
728  bool layout_alloc = false;
729  struct m0_io_args io_args;
731 
732  M0_ENTRY("obj_id: " U128X_F " opcode = %s", U128_P(&obj->ob_entity.en_id),
733  opcode == M0_OC_READ ? "read" : opcode == M0_OC_WRITE ? "write" : \
734  opcode == M0_OC_FREE ? "free" : "");
735  M0_PRE(obj != NULL);
736  M0_PRE(op != NULL);
737  M0_PRE(ergo(opcode == M0_OC_READ, M0_IN(flags, (0, M0_OOF_NOHOLE))));
738  M0_PRE(ergo(opcode != M0_OC_READ, M0_IN(flags, (0, M0_OOF_SYNC))));
739  if (M0_FI_ENABLED("fail_op"))
740  return M0_ERR(-EINVAL);
741 
742  if (*op == NULL)
743  op_pre_allocated = false;
744 
745  /*
746  * Lazy retrieve of layout.
747  * TODO: this is a blocking implementation for composite layout.
748  * Making it asynchronous requires to construct a execution plan
749  * for ops, will consider this later.
750  */
751  if (obj->ob_layout == NULL) {
752  /* Allocate and initial layout according its type. */
753  type = M0_OBJ_LAYOUT_TYPE(obj->ob_attr.oa_layout_id);
754  obj->ob_layout = m0_client_layout_alloc(type);
755  if (obj->ob_layout == NULL) {
756  rc = -EINVAL;
757  goto exit;
758  }
759  obj->ob_layout->ml_obj = obj;
760  rc = m0_client__layout_get(obj->ob_layout);
761  if (rc != 0) {
762  m0_client_layout_free(obj->ob_layout);
763  goto exit;
764  }
765  layout_alloc = true;
766  }
767 
768  /* Build object's IO requests using its layout. */
769  obj_io_args_check(obj, opcode, ext, data, attr, mask);
770  segments_sort(ext, data, attr);
772  io_args = (struct m0_io_args) {
773  .ia_obj = obj,
774  .ia_opcode = opcode,
775  .ia_ext = ext,
776  .ia_data = data,
777  .ia_attr = attr,
778  .ia_mask = mask,
779  .ia_flags = flags
780  };
781 
782  M0_ASSERT(obj->ob_layout->ml_ops->lo_io_build != NULL);
783  rc = obj->ob_layout->ml_ops->lo_io_build(&io_args, op);
784  if (rc != 0) {
785  /*
786  * '*op' is set to NULL and freed if the
787  * operation was not pre-allocated by the application and
788  * errors were encountered during its initialiaztion.
789  */
790  if (!op_pre_allocated) {
791  m0_op_fini(*op);
792  m0_op_free(*op);
793  *op = NULL;
794  }
795 
796  if (layout_alloc)
797  m0_client_layout_free(obj->ob_layout);
798 
799  goto exit;
800  }
801  M0_POST(ergo(*op != NULL, (*op)->op_code == opcode &&
802  (*op)->op_sm.sm_state == M0_OS_INITIALISED));
803 
804  return M0_RC(0);
805 exit:
806  return M0_ERR(rc);
807 }
808 M0_EXPORTED(m0_obj_op);
809 
815 M0_INTERNAL void m0_client_init_io_op(void)
816 {
820 }
821 
822 #undef M0_TRACE_SUBSYSTEM
823 
824 /*
825  * Local variables:
826  * c-indentation-style: "K&R"
827 
828  * c-basic-offset: 8
829  * tab-width: 8
830  * fill-column: 80
831  * scroll-step: 1
832  * End:
833  */
834 /*
835  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
836  */
M0_INTERNAL int m0_uint128_cmp(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:45
M0_INTERNAL int m0__obj_io_build(struct m0_io_args *args, struct m0_op **op)
Definition: io.c:677
#define BUFVI(ovec, i)
Definition: io.h:37
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
void(* iro_iosm_handle_launch)(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: pg.h:613
const struct m0_bob_type oo_bobtype
Definition: client.c:45
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
#define COUNT(ivec, i)
Definition: file.c:392
M0_INTERNAL void m0_fid_gob_make(struct m0_fid *gob_fid, uint32_t container, uint64_t key)
Definition: fid_convert.c:46
M0_INTERNAL int m0_op_get(struct m0_op **op, size_t size)
Definition: client.c:568
const struct m0_op_io_ops ioo_ops
Definition: io_req.c:1860
static void addb2_add_ioo_attrs(const struct m0_op_io *ioo, int rmw)
Definition: io.c:212
Definition: client.h:788
#define m0_htable_for(name, var, htable)
Definition: hash.h:483
int const char const void size_t int flags
Definition: dir.c:328
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL struct m0_rm_domain * m0_rm_svc_domain_get(const struct m0_reqh_service *svc)
Definition: rm_service.c:307
struct m0_atomic64 nxr_rdbulk_nr
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
uint64_t * ioo_failed_nodes
static size_t locality(const struct m0_fom *fom)
Definition: rm_foms.c:269
#define ergo(a, b)
Definition: misc.h:293
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
M0_INTERNAL int m0_client__layout_get(struct m0_client_layout *layout)
Definition: layout.c:276
static void obj_io_cb_launch(struct m0_op_common *oc)
Definition: io.c:234
void m0_op_fini(struct m0_op *op)
Definition: client.c:847
M0_INTERNAL struct m0_pool_version * m0_pool_version_find(struct m0_pools_common *pc, const struct m0_fid *id)
Definition: pool.c:586
static void obj_io_args_check(struct m0_obj *obj, enum m0_obj_opcode opcode, struct m0_indexvec *ext, struct m0_bufvec *data, struct m0_bufvec *attr, uint64_t mask)
Definition: io.c:627
M0_INTERNAL struct m0_fid m0__obj_pver(struct m0_obj *obj)
Definition: obj.c:163
static struct m0_sm_group * grp
Definition: bytecount.c:38
const struct m0_uint128 m0_rm_group
Definition: io.c:53
struct m0_pool_version * pv
Definition: dir.c:629
struct m0_poolmach pv_mach
Definition: pool.h:133
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
const struct m0_op_io_ops * ioo_ops
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
struct m0_sm_group * oo_sm_grp
#define M0_OBJ_LAYOUT_TYPE(lid)
Definition: layout.h:108
static int obj_io_init(struct m0_obj *obj, enum m0_obj_opcode opcode, struct m0_indexvec *ext, struct m0_bufvec *data, struct m0_bufvec *attr, uint64_t mask, uint32_t flags, struct m0_op *op)
Definition: io.c:461
struct m0_op * op_parent
Definition: client.h:673
struct m0_vec ov_vec
Definition: vec.h:147
struct m0_bufvec data
Definition: di.c:40
uint64_t ioo_attr_mask
struct m0_sm_conf io_sm_conf
Definition: io_req.c:141
struct m0_op oc_op
struct m0_sm_group m0c_sm_group
M0_INTERNAL int m0_op_init(struct m0_op *op, const struct m0_sm_conf *conf, struct m0_entity *entity)
Definition: client.c:806
static struct m0_bob_type iofop_bobtype
Definition: file.c:339
static void obj_io_cb_free(struct m0_op_common *oc)
Definition: io.c:442
#define M0_BITS(...)
Definition: misc.h:236
Definition: sm.h:504
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
Definition: ub.c:49
M0_INTERNAL bool m0_sm_addb2_counter_init(struct m0_sm *sm)
Definition: sm.c:891
uint64_t bt_magix
Definition: bob.h:77
#define M0_SWAP(v0, v1)
Definition: arith.h:207
void ** ov_buf
Definition: vec.h:149
static struct foo * obj
Definition: tlist.c:302
struct m0_sm ioo_sm
#define SEG_NR(ivec)
Definition: file.c:393
const struct m0_uint128 M0_ID_APP
Definition: client.c:92
struct m0_vec iv_vec
Definition: vec.h:139
return M0_RC(rc)
op
Definition: libdemo.c:64
unsigned int op_code
Definition: client.h:650
int m0_obj_op(struct m0_obj *obj, enum m0_obj_opcode opcode, struct m0_indexvec *ext, struct m0_bufvec *data, struct m0_bufvec *attr, uint64_t mask, uint32_t flags, struct m0_op **op)
Definition: io.c:717
#define M0_ASSERT_EX(cond)
#define M0_ENTRY(...)
Definition: trace.h:170
struct m0_fid oo_pver
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
int opcode
Definition: crate.c:301
uint64_t * ioo_failed_session
int i
Definition: dir.c:1033
size_t op_size
Definition: client.h:664
bool ioo_dgmode_io_sent
const struct m0_bob_type ioo_bobtype
Definition: io_req.c:153
#define PRIu64
Definition: types.h:58
M0_INTERNAL bool m0_vec_is_empty(const struct m0_vec *vec)
Definition: vec.c:58
struct m0_pools_common m0c_pools_common
Definition: client.h:641
struct nw_xfer_request ioo_nwxfer
int(* nxo_distribute)(struct nw_xfer_request *xfer)
const struct m0_bob_type oc_bobtype
Definition: client.c:44
void(* oc_cb_free)(struct m0_op_common *oc)
return M0_ERR(-EOPNOTSUPP)
struct m0_op_obj ioo_oo
M0_INTERNAL uint64_t m0__obj_layout_id_get(struct m0_op_obj *oo)
Definition: obj.c:384
M0_INTERNAL struct m0_poolmach * ioo_to_poolmach(struct m0_op_io *ioo)
Definition: io.c:75
struct m0_client_layout * m0_client_layout_alloc(enum m0_client_layout_type type)
Definition: layout.c:473
struct m0_chan ioo_completion
M0_INTERNAL struct m0_client * m0__op_instance(const struct m0_op *op)
Definition: client.c:236
struct m0_sm op_sm
Definition: client.h:656
M0_INTERNAL uint64_t m0_pool_version2layout_id(const struct m0_fid *pv_fid, uint64_t lid)
Definition: pool.c:1900
enum sns_repair_state ioo_sns_state
struct m0_indexvec ioo_ext
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
static void attr(struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
Definition: dump.c:949
M0_INTERNAL bool m0_op_io_invariant(const struct m0_op_io *ioo)
Definition: io.c:161
void m0_client_layout_free(struct m0_client_layout *layout)
Definition: layout.c:504
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
struct m0_fid ioo_pver
static bool indexvec_segments_overlap(struct m0_indexvec *ivec)
Definition: io.c:93
#define U128_P(x)
Definition: types.h:45
const struct nw_xfer_ops * nxr_ops
struct m0_reqh m0c_reqh
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
struct m0_bufvec ioo_data
struct m0_obj * ioo_obj
m0_bcount_t oa_bshift
Definition: client.h:749
int layout_id
Definition: dir.c:331
M0_INTERNAL uint32_t m0_no_of_bits_set(uint64_t val)
Definition: misc.c:227
M0_INTERNAL struct m0_rm_domain * rm_domain_get(struct m0_client *cinst)
Definition: io.c:59
void(* iro_iomaps_destroy)(struct m0_op_io *ioo)
Definition: pg.h:574
enum pargrp_iomap_rmwtype pi_rtype
#define M0_POST(cond)
static bool nw_xfer_request_invariant(const struct nw_xfer_request *xfer)
Definition: file.c:1090
uint32_t ioo_flags
M0_INTERNAL void target_ioreq_cancel(struct target_ioreq *ti)
Definition: io_nw_xfer.c:423
struct m0_fid oo_fid
struct m0_sm_ast ioo_ast
uint32_t v_nr
Definition: vec.h:51
static uint64_t tolerance_of_level(struct io_request *req, uint64_t lv)
Definition: file.c:3597
M0_INTERNAL void m0_client_init_io_op(void)
Definition: io.c:815
void(* oc_cb_fini)(struct m0_op_common *oc)
struct m0_htable nxr_tioreqs_hash
uint64_t ioo_map_idx
static void target_ioreq_fini(struct target_ioreq *ti)
Definition: file.c:4708
m0_bcount_t * v_count
Definition: vec.h:53
struct m0_mutex s_lock
Definition: sm.h:514
struct m0_uint128 en_id
Definition: client.h:708
struct m0_op_common oo_oc
static void obj_io_cb_fini(struct m0_op_common *oc)
Definition: io.c:395
M0_INTERNAL bool addr_is_network_aligned(void *addr)
Definition: utils.c:29
M0_INTERNAL struct m0_op * m0__ioo_to_op(struct m0_op_io *ioo)
Definition: client.c:249
M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
Definition: chan.c:165
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
#define BUFVC(ovec, i)
Definition: io.h:38
#define U128X_F
Definition: types.h:42
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
static struct m0_client cinst
Definition: sync.c:84
static void obj_io_ast_fini(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: io.c:346
struct m0_reqh_service_type m0_rms_type
Definition: rm_service.c:69
static const struct m0_bob_type tioreq_bobtype
Definition: file.c:338
M0_INTERNAL void m0__obj_op_done(struct m0_op *op)
Definition: io.c:691
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
static void segments_sort(struct m0_indexvec *ivec, struct m0_bufvec *data, struct m0_bufvec *attr)
Definition: io.c:124
struct m0_sm_group op_sm_group
Definition: client.h:654
M0_INTERNAL bool m0__obj_is_cksum_validation_allowed(struct m0_op_io *ioo)
Definition: io.c:665
static void nw_xfer_request_fini(struct nw_xfer_request *xfer)
Definition: file.c:1234
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
#define M0_IS0(obj)
Definition: misc.h:70
struct m0_entity * op_entity
Definition: client.h:660
static bool pargrp_iomap_invariant_nr(struct io_request *req)
Definition: file.c:1201
M0_INTERNAL struct m0_locality * m0__locality_pick(struct m0_client *cinst)
Definition: client.c:290
static M0_UNUSED void indexvec_dump(struct m0_indexvec *ivec)
Definition: io.c:108
static int obj_op_init(struct m0_obj *obj, enum m0_obj_opcode opcode, struct m0_op *op)
Definition: io.c:552
enum nw_xfer_state nxr_state
void(* oc_cb_cancel)(struct m0_op_common *oc)
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL bool m0__obj_is_parity_verify_mode(struct m0_client *instance)
Definition: io.c:655
static void nw_xfer_request_init(struct nw_xfer_request *xfer)
Definition: file.c:1207
struct m0_entity ob_entity
Definition: client.h:789
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
m0_obj_opcode
Definition: client.h:537
struct m0_atomic64 nxr_iofop_nr
int(* iro_iomaps_prepare)(struct m0_op_io *ioo)
Definition: pg.h:567
m0_client_layout_type
Definition: client.h:776
static struct m0 instance
Definition: main.c:78
static struct m0_be_seg * seg
Definition: btree.c:40
#define M0_UINT128(hi, lo)
Definition: types.h:40
uint64_t ioo_iomap_nr
static uint32_t ioreq_sm_state(const struct io_request *req)
Definition: file.c:975
M0_INTERNAL bool m0__obj_is_di_enabled(struct m0_op_io *ioo)
Definition: io.c:660
M0_INTERNAL void m0_layout_instance_fini(struct m0_layout_instance *li)
Definition: layout.c:1123
uint32_t en_flags
Definition: client.h:738
void m0_op_free(struct m0_op *op)
Definition: client.c:885
M0_INTERNAL uint64_t m0__page_size(const struct m0_op_io *ioo)
Definition: utils.c:41
int type
Definition: dir.c:1031
struct m0_layout_instance * oo_layout_instance
M0_INTERNAL bool m0_fid_is_valid(const struct m0_fid *fid)
Definition: fid.c:96
void(* oc_cb_launch)(struct m0_op_common *oc)
#define M0_PRE_EX(cond)
M0_INTERNAL int m0__obj_layout_instance_build(struct m0_client *cinst, const uint64_t layout_id, const struct m0_fid *fid, struct m0_layout_instance **linst)
Definition: obj.c:403
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
#define m0_htable_endfor
Definition: hash.h:491
uint32_t sm_state
Definition: sm.h:307
struct m0_bufvec ioo_attr
static void obj_io_cb_cancel(struct m0_op_common *oc)
Definition: io.c:295
M0_INTERNAL void m0_bob_type_tlist_init(struct m0_bob_type *bt, const struct m0_tl_descr *td)
Definition: bob.c:41
int32_t rc
Definition: trigger_fop.h:47
#define M0_POST_EX(cond)
M0_INTERNAL bool m0_sm_group_is_locked(const struct m0_sm_group *grp)
Definition: sm.c:107
struct m0_sm_conf m0_op_conf
Definition: client.c:145
struct m0_obj_attr ob_attr
Definition: client.h:790
struct pargrp_iomap ** ioo_iomaps
Definition: vec.h:145
struct m0_obj * ia_obj
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL struct m0_client * m0__entity_instance(const struct m0_entity *entity)
Definition: client.c:226
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331
#define M0_UNUSED
Definition: misc.h:380