Motr  M0
composite_layout.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CLIENT
23 #include "lib/trace.h"
24 #include "lib/finject.h"
25 #include "lib/errno.h"
26 #include "lib/vec.h"
27 #include "lib/byteorder.h" /* m0_byteorder_{cpu_to_be64|be64_to_cpu}. */
28 #include "fid/fid.h" /* m0_fid */
29 
30 #include "motr/client.h"
31 #include "motr/client_internal.h"
32 #include "motr/layout.h"
33 
34 #include "dix/meta.h"
35 #include "dix/layout.h"
36 
38  struct m0_sm_ast dr_ast;
45 };
46 
47 M0_TL_DESCR_DEFINE(clayer, "composite layout layers",
48  static, struct m0_composite_layer,
49  ccr_tlink, ccr_tlink_magic,
51 M0_TL_DEFINE(clayer, static, struct m0_composite_layer);
52 
53 M0_TL_DESCR_DEFINE(cext, "composite layout extents",
54  static, struct m0_composite_extent,
55  ce_tlink, ce_tlink_magic,
57 M0_TL_DEFINE(cext, static, struct m0_composite_extent);
58 
59 static int composite_layout_io_build(struct m0_io_args *args,
60  struct m0_op **op);
61 static int
63  struct m0_client_composite_layout *clayout);
64 
65 static struct layout_dix_req*
67  m0_chan_cb_t cb, struct m0_op_layout *ol)
68 {
69  struct layout_dix_req *req;
70 
72  if (req == NULL)
73  return NULL;
74 
75  /* Only support dix for now. */
76  m0_dix_meta_req_init(&req->dr_mreq, cli, grp);
77  m0_clink_init(&req->dr_clink, cb);
78  req->dr_ol = ol;
79  return req;
80 }
81 
83 {
84  m0_clink_fini(&req->dr_clink);
85  m0_dix_meta_req_fini(&req->dr_mreq);
86 }
87 
89 {
90  m0_free(req);
91 }
92 
93 static void layout_op_completed(struct m0_op *op)
94 {
95  struct m0_sm_group *op_grp;
96 
97  M0_ENTRY();
98  M0_PRE(op != NULL);
99 
100  op_grp = &op->op_sm_group;
101  m0_sm_group_lock(op_grp);
102  m0_sm_move(&op->op_sm, 0, M0_OS_EXECUTED);
104  m0_sm_move(&op->op_sm, 0, M0_OS_STABLE);
105  m0_op_stable(op);
106  m0_sm_group_unlock(op_grp);
107 
108  M0_LEAVE();
109 }
110 
111 static void layout_op_failed(struct m0_op *op, int rc)
112 {
113  struct m0_sm_group *op_grp;
114 
115  M0_ENTRY();
116  M0_PRE(rc != 0);
117  M0_PRE(op != NULL);
118 
119  op_grp = &op->op_sm_group;
120  m0_sm_group_lock(op_grp);
121  m0_sm_fail(&op->op_sm, M0_OS_FAILED, rc);
122  m0_op_failed(op);
123  m0_sm_group_unlock(op_grp);
124 
125  M0_LEAVE();
126 }
127 
128 static void layout_dix_req_ast(struct m0_sm_group *grp,
129  struct m0_sm_ast *ast)
130 {
131  int ltype;
132  uint64_t lid;
133  struct m0_op *op;
134  struct m0_obj *obj;
135  struct layout_dix_req *req = ast->sa_datum;
136  struct m0_op_layout *ol;
137  int rc;
138 
139  M0_ENTRY();
140 
141  ol = req->dr_ol;
142  M0_ASSERT(ol != NULL && ol->ol_entity != NULL);
143  op = &ol->ol_oc.oc_op;
144  rc = ol->ol_ar.ar_rc;
145  if (rc != 0) {
149  return;
150  }
151 
152  /*
153  * Do we need to change cob's attribute to reflect the
154  * layout type change?
155  */
156  if (ol->ol_oc.oc_op.op_code == M0_EO_LAYOUT_SET) {
157  ltype = ol->ol_layout->ml_type;
159  lid = obj->ob_attr.oa_layout_id;
160  if (ltype != M0_OBJ_LAYOUT_TYPE(lid)) {
161  lid = M0_OBJ_LAYOUT_MAKE_LID(lid, ltype);
162  obj->ob_attr.oa_layout_id = lid;
163  rc = m0__obj_layout_send(obj, ol);
164  if (rc != 0)
165  /* How to undo the changes in layout index? */
167  } else
169  //M0_ASSERT(lid <= 32);
170  M0_LOG(M0_DEBUG, "YJC: layout if = %"PRIu64, obj->ob_attr.oa_layout_id);
171  } else
173 
176  M0_LEAVE();
177 }
178 
179 static bool layout_dix_req_clink_cb(struct m0_clink *cl)
180 {
181  int rc;
182  struct m0_op *op;
183  struct m0_op_layout *ol;
184  struct layout_dix_req *req = M0_AMB(req, cl, dr_clink);
185  struct m0_dix_meta_req *mreq = &req->dr_mreq;
186  struct m0_dix_layout dlayout;
187 
188  m0_clink_del(cl);
189  ol = req->dr_ol;
190  op = &ol->ol_oc.oc_op;
191  M0_ASSERT(M0_IN(op->op_code,
193  rc = m0_dix_meta_generic_rc(mreq);
194  if (rc != 0)
195  goto ast;
196 
197  if (op->op_code == M0_EO_LAYOUT_GET) {
198  m0_dix_layout_rep_get(mreq, 0, &dlayout);
199  ol->ol_ops->olo_copy_to_app(ol->ol_layout, (void *)&dlayout);
200  }
201 ast:
202  /* Post an AST to handle layout op. */
203  ol->ol_ar.ar_rc = rc;
204  req->dr_ast.sa_cb = layout_dix_req_ast;
205  req->dr_ast.sa_datum = req;
206  m0_sm_ast_post(ol->ol_sm_grp, &req->dr_ast);
207 
208  return false;
209 }
210 
211 static void layout_dix_get_ast(struct m0_sm_group *grp,
212  struct m0_sm_ast *ast)
213 {
214  int rc;
215  struct layout_dix_req *req = ast->sa_datum;
216  struct m0_op_layout *ol = req->dr_ol;
217  struct m0_fid *layout_fid;
218 
219  M0_ENTRY();
220 
221  layout_fid = (struct m0_fid *)&ol->ol_entity->en_id;
222  m0_clink_add_lock(&req->dr_mreq.dmr_chan, &req->dr_clink);
223  rc = m0_dix_layout_get(&req->dr_mreq, layout_fid, 1);
224  if (rc != 0) {
225  m0_clink_del_lock(&req->dr_clink);
229  }
230 
231  M0_LEAVE();
232  return;
233 }
234 
235 static void layout_dix_put_ast(struct m0_sm_group *grp,
236  struct m0_sm_ast *ast)
237 {
238  int rc;
239  struct m0_fid *layout_fid;
240  struct layout_dix_req *req = ast->sa_datum;
241  struct m0_op_layout *ol = req->dr_ol;
242  struct m0_dix_layout *dix_layout;
243 
244  M0_ENTRY();
245  M0_PRE(ol != NULL);
246 
247  /* Allocate and et m0_dix_layout. */
248  M0_ALLOC_PTR(dix_layout);
249  if (dix_layout == NULL) {
250  rc = -ENOMEM;
251  goto error;
252  }
253  rc = ol->ol_ops->olo_copy_from_app(ol->ol_layout, dix_layout);
254  if (rc != 0)
255  goto error;
256 
257  /*
258  * Set callback argument for updating sync record as thi is a UPDATE
259  * op to layout index.
260  */
261  req->dr_mreq.dmr_req.dr_sync_datum = &ol->ol_oc.oc_op;
262 
263  /* Send request to dix. */
264  m0_clink_add_lock(&req->dr_mreq.dmr_chan, &req->dr_clink);
265  layout_fid = (struct m0_fid *)&ol->ol_entity->en_id;
266  rc = m0_dix_layout_put(&req->dr_mreq, layout_fid, dix_layout, 1,
267  COF_OVERWRITE);
268  if (rc != 0) {
269  m0_clink_del_lock(&req->dr_clink);
270  goto error;
271  }
272  M0_LEAVE();
273  return;
274 
275 error:
276  M0_ASSERT(rc != 0);
277  M0_ERR(rc);
281  m0_free(dix_layout);
282  M0_LEAVE();
283  return;
284 }
285 
287  void (*exec_fn)(struct m0_sm_group *grp,
288  struct m0_sm_ast *ast))
289 {
290  M0_ENTRY();
291  req->dr_ast.sa_cb = exec_fn;
292  req->dr_ast.sa_datum = req;
293  m0_sm_ast_post(req->dr_ol->ol_sm_grp, &req->dr_ast);
294  M0_LEAVE();
295 }
296 
297 M0_INTERNAL int m0_layout_op_launch(struct m0_op_layout *ol)
298 {
299  int opcode;
300  struct layout_dix_req *req;
301 
302  M0_ENTRY();
303  M0_PRE(ol != NULL);
304  opcode = ol->ol_oc.oc_op.op_code;
305  M0_PRE(M0_IN(opcode,
307 
308  /* Construct request to dix. */
311  if (req == NULL)
312  return M0_ERR(-ENOMEM);
313 
314  switch(opcode) {
315  case M0_EO_LAYOUT_GET:
317  break;
318  case M0_EO_LAYOUT_SET:
320  break;
321  default:
322  M0_IMPOSSIBLE("Wrong layout opcode.");
323  }
324 
325  return M0_RC(0);
326 }
327 
328 M0_INTERNAL int m0__dix_layout_get_sync(struct m0_obj *obj,
329  struct m0_dix_layout *dlayout)
330 {
331  int rc;
332  struct m0_fid *layout_fid;
333  struct m0_dix_req *dreq;
334  struct m0_dix_meta_req *dmreq;
335  struct layout_dix_req *req;
336  struct m0_sm_group *grp = m0_locality0_get()->lo_grp;
337 
338  M0_ENTRY();
339  M0_PRE(obj != NULL && obj->ob_layout != NULL);
340 
342  grp, ent_dixc(&obj->ob_entity), NULL, NULL);
343  if (req == NULL)
344  return M0_ERR(-ENOMEM);
345 
346  /* Send out the dix meta request. */
347  dmreq = &req->dr_mreq;
348  dreq = &dmreq->dmr_req;
349 
350  m0_dix_req_lock(dreq);
351  layout_fid = (struct m0_fid *)&obj->ob_entity.en_id;
352  rc = m0_dix_layout_get(dmreq, layout_fid, 1)? :
353  m0_dix_req_wait(dreq,
355  m0_dix_req_rc(dreq);
356  if (rc != 0)
357  goto exit;
358 
359  /* Parse the layout. */
360  m0_dix_layout_rep_get(dmreq, 0, dlayout);
361 
362 exit:
363  /* m0_dix_meta_req_fini() requires lock. */
365  m0_dix_req_unlock(dreq);
367  return M0_RC(rc);
368 }
369 
374 static int composite_layout_copy_to_app(struct m0_client_layout *to, void *from)
375 {
376  int rc;
377  int i = 0;
378  struct m0_dix_layout *dix_layout;
379  struct m0_dix_composite_ldesc *dix_cldesc;
380  struct m0_client_composite_layout *clayout;
381  struct m0_composite_layer *layer;
382 
383  M0_ENTRY();
384  M0_PRE(to != NULL);
385  M0_PRE(from != NULL);
386 
387  clayout = M0_AMB(clayout, to, ccl_layout);
388  dix_layout = (struct m0_dix_layout *)from;
389  dix_cldesc = &dix_layout->u.dl_comp_desc;
390 
391  for (i = 0; i < dix_cldesc->cld_nr_layers; i++) {
392  M0_ALLOC_PTR(layer);
393  if (layer == NULL) {
394  rc = M0_ERR(-ENOMEM);
395  goto error;
396  }
397  layer->ccr_subobj = dix_cldesc->cld_layers[i].cr_subobj;
398  layer->ccr_lid = dix_cldesc->cld_layers[i].cr_lid;
399  layer->ccr_priority = dix_cldesc->cld_layers[i].cr_priority;
400  cext_tlist_init(&layer->ccr_rd_exts);
401  cext_tlist_init(&layer->ccr_wr_exts);
402  clayer_tlink_init_at_tail(layer, &clayout->ccl_layers);
403  }
404  clayout->ccl_nr_layers = dix_cldesc->cld_nr_layers;
405  return M0_RC(0);
406 
407 error:
408  m0_tl_teardown(clayer, &clayout->ccl_layers, layer)
409  m0_free(layer);
410  return M0_RC(rc);
411 }
412 
414  void *to)
415 {
416  int i = 0;
417  struct m0_dix_layout *dix_layout;
418  struct m0_dix_composite_layer *dix_clayers;
419  struct m0_dix_composite_ldesc *dix_cldesc;
420  struct m0_client_composite_layout *clayout;
421  struct m0_composite_layer *layer;
422 
423  M0_ENTRY();
424  M0_PRE(from != NULL);
425  M0_PRE(to != NULL);
426 
427  clayout = M0_AMB(clayout, from, ccl_layout);
428  M0_ALLOC_ARR(dix_clayers, clayout->ccl_nr_layers);
429  if (dix_clayers == NULL)
430  return M0_ERR(-ENOMEM);
431 
432  dix_layout = (struct m0_dix_layout *)to;
433  dix_layout->dl_type = DIX_LTYPE_COMPOSITE_DESCR;
434  dix_cldesc = &dix_layout->u.dl_comp_desc;
435  dix_cldesc->cld_nr_layers = clayout->ccl_nr_layers;
436  dix_cldesc->cld_layers = dix_clayers;
437  m0_tl_for(clayer, &clayout->ccl_layers, layer) {
438  dix_cldesc->cld_layers[i].cr_subobj = layer->ccr_subobj;
439  dix_cldesc->cld_layers[i].cr_lid = layer->ccr_lid;
440  dix_cldesc->cld_layers[i].cr_priority = layer->ccr_priority;
441  i++;
442  } m0_tl_endfor;
443 
444  return M0_RC(0);
445 }
446 
449  .olo_copy_to_app = composite_layout_copy_to_app,
450  .olo_copy_from_app = composite_layout_copy_from_app,
451 };
452 
454 {
455  int rc;
456  struct m0_dix_layout dlayout;
457 
458  M0_ENTRY();
459  m0__dix_layout_get_sync(obj, &dlayout);
460  rc = composite_layout_copy_to_app(obj->ob_layout, &dlayout);
461  return M0_RC(rc);
462 }
463 
464 static int composite_layout_get(struct m0_client_layout *layout)
465 {
466  int rc = 0;
467  struct m0_obj *obj;
468  struct m0_client_composite_layout *comp;
469 
470  M0_ENTRY();
471 
472  comp = M0_AMB(comp, layout, ccl_layout);
473  m0_mutex_init(&comp->ccl_lock);
474  clayer_tlist_init(&comp->ccl_layers);
475 
476  /*
477  * Retrieve layout and extents for the object this layout
478  * associated with.
479  */
480  obj = layout->ml_obj;
483  return M0_RC(rc);
484 }
485 
486 static void composite_layout_put(struct m0_client_layout *layout)
487 {
488  struct m0_client_composite_layout *comp;
489  struct m0_composite_layer *layer;
490  struct m0_composite_extent *ext;
491 
492  comp = M0_AMB(comp, layout, ccl_layout);
493  m0_mutex_fini(&comp->ccl_lock);
494 
495  /* Teardown extent lists and layer list. */
496  m0_tl_teardown(clayer, &comp->ccl_layers, layer) {
497  m0_tl_teardown(cext, &layer->ccr_rd_exts, ext)
498  m0_free(ext);
499  m0_tl_teardown(cext, &layer->ccr_wr_exts, ext)
500  m0_free(ext);
501  m0_free0(&layer);
502  }
503 }
504 
506 {
507  struct m0_client_layout *layout;
508  struct m0_client_composite_layout *comp;
509 
510  M0_ENTRY();
511 
512  *out = NULL;
513  M0_ALLOC_PTR(comp);
514  if (comp == NULL)
515  return M0_ERR(-ENOMEM);
516 
517  layout = &comp->ccl_layout;
518  layout->ml_type = M0_LT_COMPOSITE;
519  layout->ml_ops = &layout_composite_ops;
520  /* Initialise layer list. */
521  m0_mutex_init(&comp->ccl_lock);
522  clayer_tlist_init(&comp->ccl_layers);
523  *out = layout;
524 
525  return M0_RC(0);
526 }
527 
530  .lo_get = composite_layout_get,
531  .lo_put = composite_layout_put,
532  .lo_io_build = composite_layout_io_build,
533 };
534 
536  struct m0_obj *sub_obj, int priority)
537 {
538  int rc;
539  struct m0_client_composite_layout *clayout;
540  struct m0_composite_layer *layer;
541  struct m0_composite_layer *found;
542  struct m0_composite_layer *anchor;
543 
544  M0_ENTRY();
545  M0_PRE(layout != NULL);
546  clayout = M0_AMB(clayout, layout, ccl_layout);
547 
548  /* The same sub-object is not allowed to add twice. */
549  m0_mutex_lock(&clayout->ccl_lock);
550  found = m0_tl_find(clayer, found, &clayout->ccl_layers,
551  m0_uint128_eq(&found->ccr_subobj,
552  &sub_obj->ob_entity.en_id));
553  m0_mutex_unlock(&clayout->ccl_lock);
554  if (found != NULL)
555  return 0;
556 
557  /* Prepare for a new layer. */
558  M0_ALLOC_PTR(layer);
559  if (layer == NULL)
560  return M0_ERR(-ENOMEM);
561  cext_tlist_init(&layer->ccr_rd_exts);
562  cext_tlist_init(&layer->ccr_wr_exts);
563  layer->ccr_priority = priority;
564 
565  rc = m0__obj_attr_get_sync(sub_obj);
566  if (rc != 0) {
567  m0_free(layer);
568  return M0_ERR(rc);
569  }
570  layer->ccr_subobj = sub_obj->ob_entity.en_id;
571  layer->ccr_lid = m0__obj_lid(sub_obj);
572  clayer_tlink_init(layer);
573 
574  /* Sort layers by priority (smaller value, higer priority). */
575  m0_mutex_lock(&clayout->ccl_lock);
576  anchor = m0_tl_find(clayer, anchor, &clayout->ccl_layers,
577  anchor->ccr_priority > priority);
578  if (anchor != NULL)
579  clayer_tlist_add_before(anchor, layer);
580  else
581  clayer_tlist_add_tail(&clayout->ccl_layers, layer);
582  clayout->ccl_nr_layers++;
583  m0_mutex_unlock(&clayout->ccl_lock);
584 
585  return M0_RC(0);
586 }
587 M0_EXPORTED(m0_composite_layer_add);
588 
590  struct m0_uint128 subobj_id)
591 {
592  struct m0_client_composite_layout *clayout;
593  struct m0_composite_layer *layer;
594  struct m0_composite_extent *ext;
595 
596  M0_ENTRY();
597  M0_PRE(layout != NULL);
598 
599  clayout = M0_AMB(clayout, layout, ccl_layout);
600  m0_mutex_lock(&clayout->ccl_lock);
601  layer = m0_tl_find(clayer, layer, &clayout->ccl_layers,
602  m0_uint128_eq(&layer->ccr_subobj, &subobj_id));
603  if (layer == NULL) {
604  M0_LEAVE();
605  return;
606  }
607  m0_tl_teardown(cext, &layer->ccr_rd_exts, ext)
608  m0_free(ext);
609  m0_tl_teardown(cext, &layer->ccr_wr_exts, ext)
610  m0_free(ext);
611  clayer_tlist_del(layer);
612  clayout->ccl_nr_layers--;
613  m0_mutex_unlock(&clayout->ccl_lock);
614 
615  M0_LEAVE();
616 }
617 M0_EXPORTED(m0_composite_layer_del);
618 
626  void *sie_buf;
627 
628  uint64_t sie_tlink_magic;
630 };
631 
634  uint64_t si_lid;
636  struct m0_tl si_exts;
637 };
638 
639 M0_TL_DESCR_DEFINE(sio_ext, "composite layout subobj extent list",
640  static, struct composite_sub_io_ext,
641  sie_tlink, sie_tlink_magic,
643 M0_TL_DEFINE(sio_ext, static, struct composite_sub_io_ext);
644 
645 static const struct m0_bob_type oci_bobtype;
647 static const struct m0_bob_type oci_bobtype = {
648  .bt_name = "oci_bobtype",
649  .bt_magix_offset = offsetof(struct m0_op_composite_io, oci_magic),
650  .bt_magix = M0_OCI_MAGIC,
651  .bt_check = NULL,
652 };
653 
654 static bool
656 {
657  return M0_RC(oci != NULL &&
658  m0_op_composite_io_bob_check(oci) &&
659  oci->oci_oo.oo_oc.oc_op.op_size < sizeof *oci &&
660  M0_IN(oci->oci_oo.oo_oc.oc_op.op_code,
661  (M0_OC_READ, M0_OC_WRITE)) &&
662  m0_fid_is_valid(&oci->oci_oo.oo_fid));
663 }
664 
665 static void composite_sub_io_destroy(struct composite_sub_io *sio_arr,
666  int nr_subobjs)
667 {
668  int i;
669  struct composite_sub_io *io;
670  struct composite_sub_io_ext *io_ext;
671 
672  for (i = 0; i < nr_subobjs; i++) {
673  io = &sio_arr[i];
674  m0_tl_teardown(sio_ext, &io->si_exts, io_ext)
675  m0_free(io_ext);
676  }
677  m0_free(sio_arr);
678 }
679 
680 /* Compute the step to advance. */
681 static m0_bindex_t get_next_off(struct m0_composite_extent *cexts[], int n)
682 {
683  int i;
684  m0_bindex_t next_off;
685 
686  next_off = cexts[n]->ce_off + cexts[n]->ce_len;
687  for (i = n - 1; i >= 0; i--) {
688  if (cexts[i] == NULL)
689  continue;
690  if (cexts[i]->ce_off <= next_off )
691  break;
692  }
693  if (i >= 0)
694  next_off = cexts[i]->ce_off;
695 
696  return next_off;
697 }
698 
699 /* Advance each layer's extent cursor. */
700 static void advance_layers_cursor(struct m0_tl *cext_tlists[],
701  struct m0_composite_extent *cexts[], int n,
702  m0_bindex_t off)
703 {
704  int i;
705 
706  for (i = 0; i < n; i++) {
707  if (cexts[i] == NULL ||
708  cexts[i]->ce_off + cexts[i]->ce_len > off)
709  continue;
710  cexts[i] = cext_tlist_next(cext_tlists[i], cexts[i]);
711  while (cexts[i] != NULL &&
712  cexts[i]->ce_off + cexts[i]->ce_len <= off)
713  cexts[i] = cext_tlist_next(cext_tlists[i], cexts[i]);
714  }
715 }
716 
717 /*
718  * Divide original IO index vector and buffers according to sub-objects.
719  */
721  enum m0_obj_opcode opcode,
722  struct m0_indexvec *ext, struct m0_bufvec *data,
723  struct composite_sub_io **out,
724  int *out_nr_sios)
725 {
726  int rc = 0;
727  int i;
728  int nr_subobjs;
729  int valid_subobj_cnt = 0;
730  m0_bindex_t off;
731  m0_bcount_t len = 0;
732  m0_bindex_t next_off;
733  struct m0_ivec_cursor icursor;
734  struct m0_bufvec_cursor bcursor;
735  struct composite_sub_io *sio_arr;
736  struct composite_sub_io_ext *sio_ext;
737  struct m0_composite_layer *layer = NULL;
738  struct m0_composite_extent **cexts;
739  struct m0_tl **cext_tlists;
740  struct m0_tl *tl;
741 
742  nr_subobjs = clayout->ccl_nr_layers;
743  M0_ASSERT(nr_subobjs != 0);
744  M0_ASSERT(!clayer_tlist_is_empty(&clayout->ccl_layers));
745  M0_ALLOC_ARR(sio_arr, nr_subobjs);
746  M0_ALLOC_ARR(cext_tlists, nr_subobjs);
747  M0_ALLOC_ARR(cexts, nr_subobjs);
748  if (sio_arr == NULL || cext_tlists == NULL || cexts == NULL) {
749  m0_free(sio_arr);
750  m0_free(cext_tlists);
751  m0_free(cexts);
752  return M0_ERR(-ENOMEM);
753  }
754 
755  for (i = 0; i < nr_subobjs; i++) {
756  layer = (i == 0) ?
757  clayer_tlist_head(&clayout->ccl_layers) :
758  clayer_tlist_next(&clayout->ccl_layers, layer);
759  tl = (opcode == M0_OC_READ)?
760  &layer->ccr_rd_exts: &layer->ccr_wr_exts;
761 
762  /* Only those layers with extents are considered valid. */
763  if (cext_tlist_is_empty(tl))
764  continue;
765 
766  /* Initialise subobj IO. */
767  cext_tlists[valid_subobj_cnt] = tl;
768  cexts[valid_subobj_cnt] = cext_tlist_head(tl);
769  sio_ext_tlist_init(&sio_arr[valid_subobj_cnt].si_exts);
770  sio_arr[valid_subobj_cnt].si_id = layer->ccr_subobj;
771  sio_arr[valid_subobj_cnt].si_lid = layer->ccr_lid;
772  valid_subobj_cnt++;
773  }
774  M0_ASSERT(valid_subobj_cnt != 0);
775 
776  m0_ivec_cursor_init(&icursor, ext);
777  m0_bufvec_cursor_init(&bcursor, data);
778 
779  /*
780  * Skip the first few extents whose end is less than the offset
781  * of IO range as they are certainly not in the range.
782  */
783  off = m0_ivec_cursor_index(&icursor);
784  advance_layers_cursor(cext_tlists, cexts, valid_subobj_cnt, off);
785 
786  while (!m0_ivec_cursor_move(&icursor, len) &&
787  !m0_bufvec_cursor_move(&bcursor, len)) {
788  off = m0_ivec_cursor_index(&icursor);
789  len = m0_ivec_cursor_step(&icursor);
790 
791  /*
792  * Naive search for an extent covering the offset. Note that
793  * layers are sorted in priority order and extents are sorted
794  * in offset order in a layer. It is considered an assert if
795  * there is no sub-object covering the offset.
796  */
797  for (i = 0; i < valid_subobj_cnt; i++)
798  if (cexts[i] != NULL && off >= cexts[i]->ce_off)
799  break;
800  M0_ASSERT(i < valid_subobj_cnt);
801 
802  next_off = get_next_off(cexts, i);
803  if (next_off > off + len)
804  next_off = off + len;
805  len = next_off - off;
806 
807  /* Create a new IO extent. */
808  M0_ALLOC_PTR(sio_ext);
809  if (sio_ext == NULL) {
810  rc = M0_ERR(-ENOMEM);
811  goto err;
812  }
813  sio_ext->sie_off = off;
814  sio_ext->sie_len = len;
815  sio_ext->sie_buf = m0_bufvec_cursor_addr(&bcursor);
816  sio_arr[i].si_nr_exts++;
817  sio_ext_tlink_init_at(sio_ext, &sio_arr[i].si_exts);
818 
819  advance_layers_cursor(cext_tlists, cexts, valid_subobj_cnt,
820  next_off);
821  }
822  *out = sio_arr;
823  *out_nr_sios = valid_subobj_cnt;
824  err:
825  m0_free(cext_tlists);
826  m0_free(cexts);
827  if (rc != 0)
828  composite_sub_io_destroy(sio_arr, nr_subobjs);
829 
830  return M0_RC(rc);
831 }
832 
833 static void composite_io_op_done(struct m0_op_composite_io *oci)
834 {
835  int i;
836  struct m0_op *child_op = NULL;
837  struct m0_op *op;
838  struct m0_sm_group *op_grp;
839 
840  M0_ENTRY();
841  M0_PRE(oci != NULL);
842  M0_PRE(oci->oci_nr_sub_ops > 0);
843 
844  op = &oci->oci_oo.oo_oc.oc_op;
845  op_grp = &op->op_sm_group;
846 
847  for (i = 0; i < oci->oci_nr_sub_ops; i++) {
848  child_op = oci->oci_sub_ops[i];
849  if (child_op->op_rc != 0)
850  break;
851  }
852 
853  m0_sm_group_lock(op_grp);
854  if (i == oci->oci_nr_sub_ops) {
855  /* IO is completed successfully. */
856  m0_sm_move(&op->op_sm, 0, M0_OS_EXECUTED);
858  m0_sm_move(&op->op_sm, 0, M0_OS_STABLE);
859  m0_op_stable(op);
860  } else {
861  /* IO fails. */
862  m0_sm_move(&op->op_sm, 0, M0_OS_EXECUTED);
864  m0_sm_fail(&op->op_sm, M0_OS_FAILED, child_op->op_rc);
865  m0_op_failed(op);
866  op->op_rc = child_op->op_rc;
867  }
868  m0_sm_group_unlock(op_grp);
869  M0_LEAVE();
870 }
871 
872 static void composite_io_op_ast(struct m0_sm_group *grp,
873  struct m0_sm_ast *ast)
874 {
875  struct m0_op *child_op;
876  struct m0_op *op;
877  struct m0_op_common *oc;
878  struct m0_op_obj *oo;
879  struct m0_op_composite_io *oci;
880 
881  M0_ENTRY();
882 
883  M0_PRE(grp != NULL);
885  M0_PRE(ast != NULL);
886 
887  child_op =
888  bob_of(ast, struct m0_op, op_parent_ast, &op_bobtype);
889  op = child_op->op_parent;
890  oc = bob_of(op, struct m0_op_common, oc_op, &oc_bobtype);
891  oo = bob_of(oc, struct m0_op_obj, oo_oc, &oo_bobtype);
892  oci = bob_of(oo, struct m0_op_composite_io,
893  oci_oo, &oci_bobtype);
895  oci->oci_nr_replied++;
896  if (oci->oci_nr_replied == oci->oci_nr_sub_ops)
898 
899  M0_LEAVE();
900 }
901 
902 static int
904  struct m0_op *cop,
905  struct composite_sub_io *sio,
906  struct m0_op **out)
907 {
908  int i = 0;
909  int rc;
910  struct m0_obj *obj = NULL;
911  struct m0_op *op = NULL;
912  struct m0_indexvec *ext = NULL;
913  struct m0_bufvec *data = NULL;
914  struct m0_bufvec *attr = NULL;
915  struct composite_sub_io_ext *sio_ext;
916 
917  M0_ALLOC_PTR(ext);
920  if (ext == NULL || data == NULL || attr == NULL) {
921  rc = M0_ERR(-ENOMEM);
922  goto error;
923  }
924 
925  if (m0_indexvec_alloc(ext, sio->si_nr_exts) ||
927  m0_bufvec_alloc(attr, sio->si_nr_exts, 1)) {
928  rc = M0_ERR(-ENOMEM);
929  goto error;
930  }
931  m0_tl_for(sio_ext, &sio->si_exts, sio_ext) {
932  ext->iv_vec.v_count[i] = sio_ext->sie_len;
933  ext->iv_index[i] = sio_ext->sie_off;
934  data->ov_buf[i] = sio_ext->sie_buf;
935  data->ov_vec.v_count[i] = sio_ext->sie_len;
936  i++;
937  } m0_tl_endfor;
938 
939  /* Initial the object for IO. */
940  M0_ALLOC_PTR(obj);
941  if (obj == NULL) {
942  rc = M0_ERR(-ENOMEM);
943  goto error;
944  }
946  &sio->si_id, sio->si_lid);
948  if (rc != 0)
949  goto error;
950 
955  m0_obj_op(obj, cop->op_code, ext, data, NULL, 0, 0, &op);
956  if (op == NULL)
957  goto error;
958  op->op_parent = cop;
960  *out = op;
961  return M0_RC(0);
962 
963 error:
964  m0_indexvec_free(ext);
967  m0_free(ext);
968  m0_free(data);
969  m0_free(attr);
970  m0_free(obj);
971  m0_free(op);
972  return M0_RC(rc);
973 }
974 
975 static int
977  struct m0_op *cop,
978  struct composite_sub_io *sio_arr,
979  int nr_subobjs)
980 {
981  int i;
982  int rc = 0;
983  struct m0_op *op = NULL;
984  struct m0_op_obj *oo;
985  struct m0_op_common *oc;
986  struct m0_op_composite_io *oci;
987  struct composite_sub_io *sio;
988 
989  M0_ENTRY();
990  oc = M0_AMB(oc, cop, oc_op);
991  oo = M0_AMB(oo, oc, oo_oc);
992  oci = M0_AMB(oci, oo, oci_oo);
993  M0_ALLOC_ARR(oci->oci_sub_ops, nr_subobjs);
994  if (oci->oci_sub_ops == NULL)
995  return M0_ERR(-ENOMEM);
996 
997  for (i = 0; i < nr_subobjs; i++) {
998  sio = &sio_arr[i];
999  if (sio->si_nr_exts == 0)
1000  continue;
1001  rc = composite_sub_io_op_build(cobj, cop, sio, &op);
1002  if (rc != 0)
1003  goto error;
1004  oci->oci_sub_ops[oci->oci_nr_sub_ops] = op;
1005  oci->oci_nr_sub_ops++;
1006  }
1007  M0_ASSERT(rc == 0);
1008  return M0_RC(rc);
1009 error:
1010  m0_free(oci->oci_sub_ops);
1011  return M0_ERR(rc);
1012 }
1013 
1014 static void composite_io_op_cb_fini(struct m0_op_common *oc)
1015 {
1016  int i;
1017  struct m0_op *sop;
1018  struct m0_op_obj *oo;
1019  struct m0_op_composite_io *oci;
1020 
1021  M0_ENTRY();
1022 
1023  M0_PRE(oc != NULL);
1024  M0_PRE(M0_IN(oc->oc_op.op_code,
1025  (M0_OC_WRITE, M0_OC_READ)));
1026  M0_PRE(M0_IN(oc->oc_op.op_sm.sm_state,
1028  M0_OS_INITIALISED)));
1029  M0_PRE(oc->oc_op.op_size >= sizeof *oci);
1030 
1031  /* Finialise each sub IO op. */
1032  oo = bob_of(oc, struct m0_op_obj, oo_oc, &oo_bobtype);
1033  oci = bob_of(oo, struct m0_op_composite_io,
1034  oci_oo, &oci_bobtype);
1036  for (i = 0; i < oci->oci_nr_sub_ops; i++) {
1037  sop = oci->oci_sub_ops[i];
1038  if (M0_FI_ENABLED("skip_fini_sub_io_op"))
1039  continue;
1040 
1041  M0_ASSERT(sop != NULL);
1042  m0_op_fini(sop);
1043  m0_entity_fini(sop->op_entity);
1044  }
1045 
1046  /* Finalise the bob type */
1047  m0_op_obj_bob_fini(oo);
1048  m0_op_composite_io_bob_fini(oci);
1049  M0_LEAVE();
1050 }
1051 
1052 static void composite_io_op_cb_free(struct m0_op_common *oc)
1053 {
1054  int i;
1055  struct m0_op *sop;
1056  struct m0_op_common *soc;
1057  struct m0_op_obj *soo;
1058  struct m0_op_io *sioo;
1059  struct m0_op_obj *oo;
1060  struct m0_op_composite_io *oci;
1061 
1062  M0_ENTRY();
1063  M0_PRE(oc != NULL);
1064  M0_PRE((oc->oc_op.op_size >= sizeof *oci));
1065 
1066  /* Can't use bob_of here */
1067  oo = M0_AMB(oo, oc, oo_oc);
1068  oci = M0_AMB(oci, oo, oci_oo);
1069  for (i = 0; i < oci->oci_nr_sub_ops; i++) {
1070  sop = oci->oci_sub_ops[i];
1071  if (M0_FI_ENABLED("skip_free_sub_io_op"))
1072  continue;
1073 
1074  soc = M0_AMB(soc, sop, oc_op);
1075  soo = M0_AMB(soo, soc, oo_oc);
1076  sioo = M0_AMB(sioo, soo, ioo_oo);
1077  M0_ASSERT(sioo != NULL);
1078  m0_indexvec_free(&sioo->ioo_ext);
1079  m0_bufvec_free2(&sioo->ioo_data);
1080  m0_bufvec_free2(&sioo->ioo_attr);
1081  m0_op_free(&soc->oc_op);
1082  }
1083 
1084  m0_free(oci->oci_sub_ops);
1085  M0_LEAVE();
1086 }
1087 
1089 {
1090  int i;
1091  struct m0_op *sop;
1092  struct m0_op *ops[1] = {NULL};
1093  struct m0_op_obj *oo;
1094  struct m0_op_composite_io *oci;
1095 
1096  M0_ENTRY();
1097  M0_PRE(oc != NULL);
1098  M0_PRE(oc->oc_op.op_entity != NULL);
1100  &oc->oc_op.op_entity->en_id) < 0);
1101  M0_PRE(M0_IN(oc->oc_op.op_code,
1102  (M0_OC_WRITE, M0_OC_READ)));
1103  M0_PRE(oc->oc_op.op_size >= sizeof *oci);
1104 
1105  oo = bob_of(oc, struct m0_op_obj, oo_oc, &oo_bobtype);
1106  oci = bob_of(oo, struct m0_op_composite_io,
1107  oci_oo, &oci_bobtype);
1109  for (i = 0; i < oci->oci_nr_sub_ops; i++) {
1110  if (M0_FI_ENABLED("no_subobj_ops_launched"))
1111  continue;
1112  sop = oci->oci_sub_ops[i];
1113  M0_ASSERT(sop != NULL);
1114  ops[0] = sop;
1115  m0_op_launch(ops, 1);
1116  }
1118 
1119  M0_LEAVE();
1120 }
1121 
1122 static int composite_io_op_init(struct m0_obj *obj,
1123  int opcode, struct m0_op *op)
1124 {
1125  int rc;
1126  struct m0_client *cinst;
1127  struct m0_entity *entity;
1128  struct m0_locality *locality;
1129  struct m0_op_obj *oo;
1130  struct m0_op_common *oc;
1131  struct m0_op_composite_io *oci;
1132 
1133  M0_ENTRY();
1134 
1135  M0_PRE(obj != NULL);
1136  entity = &obj->ob_entity;
1137  cinst = m0__entity_instance(entity);
1138 
1139  M0_ASSERT(op->op_size >= sizeof(struct m0_op_composite_io));
1140  oc = M0_AMB(oc, op, oc_op);
1141  oo = M0_AMB(oo, oc, oo_oc);
1142  oci = M0_AMB(oci, oo, oci_oo);
1143 
1144  /* Initialise the operation */
1145  op->op_code = opcode;
1146  rc = m0_op_init(op, &m0_op_conf, entity);
1147  if (rc != 0)
1148  return M0_ERR(rc);
1149 
1150  /* Initalise the vtable */
1154 
1155  /* Set locality for this op. */
1157  M0_ASSERT(locality != NULL);
1158  oo->oo_sm_grp = locality->lo_grp;
1159 
1160  /* Initialise op data structures as 'bob's. */
1161  m0_op_common_bob_init(oc);
1162  m0_op_obj_bob_init(oo);
1163  m0_op_composite_io_bob_init(oci);
1164 
1165  return M0_RC(0);
1166 }
1167 
1168 /*
1169  * Note: current version simplifies constructing layer's IOs by retrieving
1170  * all layout details back from global layout index and extent indices. The
1171  * layout information is used to calculate which part of IO goes to which
1172  * layer(sub-object). This method may not be good in the case where there are
1173  * large number of extents and IO op only covers a small range of the object.
1174  * An alternative method is on-demand extent retrival. But this method may
1175  * have to re-retrieve extent information for every op even for back to back
1176  * ops if extents for a layer are not continuous.
1177  */
1179  struct m0_op **op)
1180 {
1181  int rc;
1182  int nr_sios = 0;
1183  struct m0_obj *obj;
1184  struct m0_client_composite_layout *clayout;
1185  struct composite_sub_io *sio_arr = NULL;
1186 
1187  M0_ENTRY();
1188  M0_PRE(args != NULL);
1189  M0_PRE(args->ia_obj != NULL);
1190 
1191  obj = args->ia_obj;
1192  rc = m0_op_get(op, sizeof(struct m0_op_composite_io)) ?:
1193  composite_io_op_init(obj, args->ia_opcode, *op);
1194  if (rc != 0)
1195  return M0_ERR(rc);
1196 
1197  /* Construct child IO ops. */
1198  clayout = M0_AMB(clayout, obj->ob_layout, ccl_layout);
1200  clayout, args->ia_opcode, args->ia_ext,
1201  args->ia_data, &sio_arr, &nr_sios)?:
1202  composite_sub_io_ops_build(obj, *op, sio_arr, nr_sios);
1203  composite_sub_io_destroy(sio_arr, nr_sios);
1204  return (rc == 0)?M0_RC(0):M0_ERR(rc);
1205 }
1206 
1227 
1228 /* Container used by Client internally. */
1230 
1231 enum {
1233 };
1234 
1237  void **out_kbuf, m0_bcount_t *out_klen)
1238 {
1239  struct m0_composite_layer_idx_key *be_key;
1240 
1241  M0_ENTRY();
1242  M0_PRE(out_kbuf != NULL);
1243  M0_PRE(out_klen != NULL);
1244 
1245  M0_ALLOC_PTR(be_key);
1246  if (be_key == NULL)
1247  return M0_ERR(-ENOMEM);
1248  be_key->cek_layer_id.u_hi =
1249  m0_byteorder_cpu_to_be64(key->cek_layer_id.u_hi);
1250  be_key->cek_layer_id.u_lo =
1251  m0_byteorder_cpu_to_be64(key->cek_layer_id.u_lo);
1252  be_key->cek_off =
1253  m0_byteorder_cpu_to_be64(key->cek_off);
1254  *out_kbuf = be_key;
1255  *out_klen = sizeof *be_key;
1256  return M0_RC(0);
1257 }
1259 
1262  void *kbuf)
1263 {
1264  struct m0_composite_layer_idx_key *be_key;
1265 
1266  M0_PRE(key != NULL);
1267  M0_PRE(kbuf != NULL);
1268  be_key = (struct m0_composite_layer_idx_key *)kbuf;
1269  key->cek_layer_id.u_hi =
1271  key->cek_layer_id.u_lo =
1273  key->cek_off =
1275 }
1277 
1280  void **out_vbuf, m0_bcount_t *out_vlen)
1281 {
1282  struct m0_composite_layer_idx_val *be_val;
1283 
1284  M0_ENTRY();
1285  M0_PRE(out_vbuf != NULL);
1286  M0_PRE(out_vlen != NULL);
1287 
1288  M0_ALLOC_PTR(be_val);
1289  if (be_val == NULL)
1290  return M0_ERR(-ENOMEM);
1291  be_val->cev_len = val->cev_len;
1292  *out_vbuf = be_val;
1293  *out_vlen = sizeof *be_val;
1294  return M0_RC(0);
1295 }
1297 
1300  void *vbuf)
1301 {
1302  struct m0_composite_layer_idx_val *be_val;
1303 
1304  M0_PRE(val != NULL);
1305  M0_PRE(vbuf != NULL);
1306 
1307  be_val = (struct m0_composite_layer_idx_val *)vbuf;
1308  val->cev_len = be_val->cev_len;
1309 }
1311 
1313  bool write, struct m0_idx *idx)
1314 {
1315  struct m0_fid fid;
1316 
1317  M0_ENTRY();
1318  M0_PRE(idx != NULL);
1319 
1320  /* Initialise in-memory index data structure. */
1321  fid = (write == true)?
1324  (struct m0_uint128 *)&fid);
1325 
1326  return M0_RC(0);
1327 }
1328 M0_EXPORTED(m0_composite_layer_idx);
1329 
1331 {
1332  int rc = 0;
1333 
1335  NULL, &M0_UBER_REALM, cinst);
1337  if (rc != 0)
1338  return M0_ERR(rc);
1339  else
1340  return M0_RC(rc);
1341 }
1342 
1344  struct m0_bufvec *keys,
1345  struct m0_bufvec *vals,
1346  int *rcs, uint32_t flags)
1347 {
1348  int rc;
1349  struct m0_op *ops[1] = {NULL};
1350 
1351  m0_idx_op(idx, M0_IC_NEXT, keys, vals, rcs, flags,
1352  &ops[0]);
1353  m0_op_launch(ops, 1);
1354  rc = m0_op_wait(ops[0],
1356  M0_OS_STABLE),
1357  M0_TIME_NEVER)?:
1358  ops[0]->op_sm.sm_rc;
1359 
1360  /* fini and release */
1361  m0_op_fini(ops[0]);
1362  m0_op_free(ops[0]);
1363  return M0_RC(rc);
1364 }
1365 
1366 static int
1368  struct m0_composite_layer *layer,
1369  struct m0_bufvec *keys,
1370  struct m0_bufvec *vals, int *rcs,
1371  struct m0_tl *ext_list,
1372  struct m0_composite_layer_idx_key *max_key)
1373 {
1374  int i;
1375  struct m0_composite_extent *ext;
1377 
1378  for (i = 0; i < keys->ov_vec.v_nr; i++) {
1379  /* Reach the end of index. */
1380  if (keys->ov_buf[i] == NULL ||
1381  vals->ov_buf[i] == NULL || rcs[i] != 0)
1382  break;
1383 
1384  /* Have retrieved all kv pairs for an layer. */
1386  &key, keys->ov_buf[i]);
1387  if (!m0_uint128_eq(&key.cek_layer_id, &layer->ccr_subobj))
1388  break;
1389 
1390  /* Add a new extent. */
1391  M0_ALLOC_PTR(ext);
1392  if (ext == NULL)
1393  return M0_ERR(-ENOMEM);
1394  ext->ce_id = key.cek_layer_id;
1395  ext->ce_off = key.cek_off;
1396  ext->ce_len = *(m0_bcount_t *)vals->ov_buf[i];
1397 
1398  /* The extents are in increasing order for `offset`. */
1399  cext_tlink_init_at_tail(ext, ext_list);
1400  }
1401 
1402  ext = cext_tlist_tail(ext_list);
1403  if (ext != NULL) {
1404  max_key->cek_layer_id = ext->ce_id;
1405  max_key->cek_off = ext->ce_off;
1406  }
1407  return M0_RC(i);
1408 }
1409 
1411  bool is_wr_list)
1412 {
1413  int i;
1414  int rc;
1415  int nr_kvp;
1416  int nr_exts;
1417  int *rcs = NULL;
1418  uint32_t flags = 0;
1419  struct m0_bufvec *keys;
1420  struct m0_bufvec *vals;
1421  struct m0_idx idx;
1422  struct m0_composite_extent *ext;
1423  struct m0_tl *ext_list = NULL;
1424  struct m0_composite_layer_idx_key start_key;
1425  struct m0_composite_layer_idx_key max_key;
1426 
1427  M0_ENTRY();
1428 
1429  M0_ALLOC_PTR(keys);
1430  M0_ALLOC_PTR(vals);
1431  if (keys == NULL || vals == NULL) {
1432  rc = M0_ERR(-ENOMEM);
1433  goto exit;
1434  }
1435 
1436  /* Allocate bufvec's for keys and vals. */
1438  rc = m0_bufvec_empty_alloc(keys, nr_kvp)?:
1439  m0_bufvec_empty_alloc(vals, nr_kvp);
1440  if (rc != 0) {
1441  rc = M0_ERR(-ENOMEM);
1442  goto exit;
1443  }
1444  M0_ALLOC_ARR(rcs, nr_kvp);
1445  if (rcs == NULL) {
1446  rc = M0_ERR(-ENOMEM);
1447  goto exit;
1448  }
1449 
1450  /* Initialise the layer index to be queried. */
1451  rc = m0_composite_layer_idx(layer->ccr_subobj, is_wr_list, &idx);
1452  if (rc != 0)
1453  goto exit;
1454  ext_list = (is_wr_list == true)?&layer->ccr_wr_exts:&layer->ccr_rd_exts;
1455 
1456  /* Use NEXT op to scan the layer index. */
1457  start_key.cek_layer_id = layer->ccr_subobj;
1458  start_key.cek_off = 0;
1459  while(true) {
1460  /* Set key and then launch NEXT query. */
1462  &start_key, &keys->ov_buf[0],
1463  &keys->ov_vec.v_count[0])?:
1465  &idx, keys, vals, rcs, flags);
1466  if (rc != 0) {
1467  M0_ERR(rc);
1468  goto exit;
1469  }
1470 
1471  /* Extract extents and add them to the list. */
1473  layer, keys, vals, rcs, ext_list, &max_key);
1474  if (nr_exts < 0)
1475  break;
1476  else if (nr_exts == 0)
1477  /*
1478  * This can happen when we just reach the
1479  * end of an index. Rest the keys and vals
1480  * to avoid m0_bufvec_free() to free `start_key`
1481  * variable.
1482  */
1483  break;
1484  else if (nr_exts < nr_kvp)
1485  break;
1486 
1487  /* Reset keys and vals. */
1488  for (i = 0; i < nr_kvp; i++) {
1489  m0_free(keys->ov_buf[i]);
1490  keys->ov_buf[i] = NULL;
1491  keys->ov_vec.v_count[i] = 0;
1492  m0_free(vals->ov_buf[i]);
1493  vals->ov_buf[i] = NULL;
1494  vals->ov_vec.v_count[i] = 0;
1495  }
1496 
1497  /* Next round. */
1498  start_key = max_key;
1500  }
1501 
1502 exit:
1503  m0_idx_fini(&idx);
1504  m0_bufvec_free(keys);
1505  m0_bufvec_free(vals);
1506  m0_free0(&rcs);
1507  m0_free(keys);
1508  m0_free(vals);
1509  if (rc != 0) {
1510  /* Cleanup the list if an error happens. */
1511  if (ext_list != NULL) {
1512  m0_tl_teardown(cext, ext_list, ext)
1513  m0_free(ext);
1514  }
1515  }
1516  return M0_RC(rc);
1517 }
1518 
1519 static int
1521  struct m0_client_composite_layout *clayout)
1522 {
1523  int rc = 0;
1524  struct m0_composite_layer *layer;
1525 
1526  m0_tl_for(clayer, &clayout->ccl_layers, layer) {
1527  /* Query and update write extent list. */
1528  rc = composite_layer_idx_scan(layer, true);
1529  if (rc != 0) {
1530  composite_layout_put(&clayout->ccl_layout);
1531  return M0_ERR(rc);
1532  }
1533 
1534  /* Query and update read extent list. */
1535  rc = composite_layer_idx_scan(layer, false);
1536  if (rc != 0) {
1537  composite_layout_put(&clayout->ccl_layout);
1538  return M0_ERR(rc);
1539  }
1540  } m0_tl_endfor;
1541 
1542  M0_ASSERT(rc == 0);
1543  return M0_RC(rc);
1544 }
1545 
1546 #undef M0_TRACE_SUBSYSTEM
1547 
1548 /*
1549  * Local variables:
1550  * c-indentation-style: "K&R"
1551 
1552  * c-basic-offset: 8
1553  * tab-width: 8
1554  * fill-column: 80
1555  * scroll-step: 1
1556  * End:
1557  */
1558 /*
1559  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
1560  */
const struct m0_client_layout_ops layout_composite_ops
M0_INTERNAL int m0_uint128_cmp(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:45
struct m0_entity * ol_entity
Definition: layout.h:183
struct m0_mutex ccl_lock
Definition: layout.h:160
M0_INTERNAL void m0_ivec_cursor_init(struct m0_ivec_cursor *cur, const struct m0_indexvec *ivec)
Definition: vec.c:707
M0_INTERNAL int m0_dix_layout_put(struct m0_dix_meta_req *req, const struct m0_fid *fid, const struct m0_dix_layout *dlay, uint32_t nr, uint32_t flags)
Definition: meta.c:574
static void composite_io_op_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
const struct m0_bob_type oo_bobtype
Definition: client.c:45
M0_INTERNAL int m0_dix_layout_get(struct m0_dix_meta_req *req, const struct m0_fid *fid, uint32_t nr)
Definition: meta.c:626
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
struct m0_op_layout * dr_ol
M0_BOB_DEFINE(static, &oci_bobtype, m0_op_composite_io)
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
Definition: sm.c:468
Definition: client.h:835
void m0_entity_fini(struct m0_entity *entity)
Definition: client.c:438
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
M0_INTERNAL int m0__obj_layout_send(struct m0_obj *obj, struct m0_op_layout *ol)
Definition: cob.c:2043
M0_INTERNAL int m0_op_get(struct m0_op **op, size_t size)
Definition: client.c:568
Definition: client.h:788
static int composite_io_divide(struct m0_client_composite_layout *clayout, enum m0_obj_opcode opcode, struct m0_indexvec *ext, struct m0_bufvec *data, struct composite_sub_io **out, int *out_nr_sios)
M0_INTERNAL int m0_indexvec_alloc(struct m0_indexvec *ivec, uint32_t len)
Definition: vec.c:532
int const char const void size_t int flags
Definition: dir.c:328
struct m0_sm_group * ol_sm_grp
Definition: layout.h:178
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL int m0_layout_op_launch(struct m0_op_layout *ol)
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
M0_INTERNAL m0_bcount_t m0_ivec_cursor_step(const struct m0_ivec_cursor *cur)
Definition: vec.c:726
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
static size_t locality(const struct m0_fom *fom)
Definition: rm_foms.c:269
Definition: idx_mock.c:52
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
const struct m0_bob_type op_bobtype
Definition: client.c:46
void m0_op_fini(struct m0_op *op)
Definition: client.c:847
static struct io_request req
Definition: file.c:100
M0_INTERNAL bool m0_uint128_eq(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:39
static struct m0_container composite_container
static struct m0_sm_group * grp
Definition: bytecount.c:38
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
static void layout_dix_get_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
struct m0_sm_group * oo_sm_grp
#define M0_OBJ_LAYOUT_TYPE(lid)
Definition: layout.h:108
struct m0_op * op_parent
Definition: client.h:673
static int composite_layer_idx_next_query(struct m0_idx *idx, struct m0_bufvec *keys, struct m0_bufvec *vals, int *rcs, uint32_t flags)
struct m0_vec ov_vec
Definition: vec.h:147
M0_INTERNAL void m0_dix_meta_req_init(struct m0_dix_meta_req *req, struct m0_dix_cli *cli, struct m0_sm_group *grp)
Definition: meta.c:259
M0_INTERNAL int m0_dix_req_rc(const struct m0_dix_req *req)
Definition: req.c:2489
struct m0_bufvec data
Definition: di.c:40
struct m0_op oc_op
M0_INTERNAL int m0_op_init(struct m0_op *op, const struct m0_sm_conf *conf, struct m0_entity *entity)
Definition: client.c:806
M0_INTERNAL struct m0_dix_cli * ent_dixc(const struct m0_entity *ent)
Definition: idx_dix.c:191
M0_INTERNAL void m0_indexvec_free(struct m0_indexvec *ivec)
Definition: vec.c:553
static int error
Definition: mdstore.c:64
static int composite_sub_io_op_build(struct m0_obj *cobj, struct m0_op *cop, struct composite_sub_io *sio, struct m0_op **out)
uint64_t m0_bindex_t
Definition: types.h:80
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL void * m0_bufvec_cursor_addr(struct m0_bufvec_cursor *cur)
Definition: vec.c:597
uint64_t m0_bcount_t
Definition: types.h:77
void m0_idx_fini(struct m0_idx *idx)
Definition: idx.c:643
Definition: sm.h:504
struct m0_fid composite_extent_rd_idx_fid
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
Definition: ub.c:49
struct m0_dix_req dmr_req
Definition: meta.h:94
const struct m0_uint128 M0_UBER_REALM
Definition: client.c:85
static void composite_io_op_cb_fini(struct m0_op_common *oc)
void ** ov_buf
Definition: vec.h:149
void m0_composite_layer_idx_val_from_buf(struct m0_composite_layer_idx_val *val, void *vbuf)
static struct foo * obj
Definition: tlist.c:302
const char * bt_name
Definition: bob.h:73
M0_INTERNAL int m0__obj_attr_get_sync(struct m0_obj *obj)
Definition: cob.c:1974
M0_INTERNAL uint64_t m0__obj_lid(struct m0_obj *obj)
Definition: obj.c:126
const struct m0_uint128 M0_ID_APP
Definition: client.c:92
#define m0_tl_endfor
Definition: tlist.h:700
static void layout_dix_req_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL int m0_bufvec_alloc(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size)
Definition: vec.c:220
int32_t m0_op_wait(struct m0_op *op, uint64_t bits, m0_time_t to)
Definition: client.c:739
struct m0_vec iv_vec
Definition: vec.h:139
int m0_idx_op(struct m0_idx *idx, enum m0_idx_opcode opcode, struct m0_bufvec *keys, struct m0_bufvec *vals, int32_t *rcs, uint32_t flags, struct m0_op **op)
Definition: idx.c:554
return M0_RC(rc)
op
Definition: libdemo.c:64
static void composite_sub_io_destroy(struct composite_sub_io *sio_arr, int nr_subobjs)
unsigned int op_code
Definition: client.h:650
int m0_obj_op(struct m0_obj *obj, enum m0_obj_opcode opcode, struct m0_indexvec *ext, struct m0_bufvec *data, struct m0_bufvec *attr, uint64_t mask, uint32_t flags, struct m0_op **op)
Definition: io.c:717
int(* olo_launch)(struct m0_op_layout *ol)
Definition: layout.h:167
M0_INTERNAL struct m0_obj * m0__obj_entity(struct m0_entity *entity)
Definition: obj.c:51
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_bufvec_free(struct m0_bufvec *bufvec)
Definition: vec.c:395
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
m0_bindex_t * iv_index
Definition: vec.h:141
M0_INTERNAL bool m0_bufvec_cursor_move(struct m0_bufvec_cursor *cur, m0_bcount_t count)
Definition: vec.c:574
int opcode
Definition: crate.c:301
enum m0_client_layout_type ml_type
Definition: client.h:798
M0_INTERNAL int m0__dix_layout_get_sync(struct m0_obj *obj, struct m0_dix_layout *dlayout)
int i
Definition: dir.c:1033
size_t op_size
Definition: client.h:664
static void composite_layout_put(struct m0_client_layout *layout)
#define PRIu64
Definition: types.h:58
struct m0_realm co_realm
Definition: client.h:881
M0_INTERNAL int m0_dix_layout_rep_get(struct m0_dix_meta_req *req, uint64_t idx, struct m0_dix_layout *dlay)
Definition: meta.c:650
struct m0_sm_ast op_parent_ast
Definition: client.h:674
Definition: client.h:641
static void layout_dix_req_free(struct layout_dix_req *req)
const struct m0_bob_type oc_bobtype
Definition: client.c:44
void(* oc_cb_free)(struct m0_op_common *oc)
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
struct m0_sm op_sm
Definition: client.h:656
M0_INTERNAL int m0_op_stable(struct m0_op *op)
Definition: client.c:520
struct m0_indexvec ioo_ext
#define M0_FID_TINIT(type, container, key)
Definition: fid.h:90
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
static void attr(struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
Definition: dump.c:949
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
if(value==NULL)
Definition: dir.c:350
void m0_composite_layer_idx_key_from_buf(struct m0_composite_layer_idx_key *key, void *kbuf)
struct m0_entity re_entity
Definition: client.h:871
#define m0_free0(pptr)
Definition: memory.h:77
static void layout_dix_put_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
#define M0_ASSERT(cond)
int m0_composite_layer_idx(struct m0_uint128 layer_id, bool write, struct m0_idx *idx)
static void layout_dix_req_launch(struct layout_dix_req *req, void(*exec_fn)(struct m0_sm_group *grp, struct m0_sm_ast *ast))
static int composite_layout_get(struct m0_client_layout *layout)
int32_t op_rc
Definition: client.h:652
static struct layout_dix_req * layout_dix_req_alloc(struct m0_sm_group *grp, struct m0_dix_cli *cli, m0_chan_cb_t cb, struct m0_op_layout *ol)
int m0_composite_layer_idx_key_to_buf(struct m0_composite_layer_idx_key *key, void **out_kbuf, m0_bcount_t *out_klen)
static int composite_sub_io_ops_build(struct m0_obj *cobj, struct m0_op *cop, struct composite_sub_io *sio_arr, int nr_subobjs)
struct m0_sm_ast dr_ast
Definition: tlist.h:251
static void layout_op_completed(struct m0_op *op)
struct m0_client_layout * ol_layout
Definition: layout.h:181
static int composite_layout_alloc(struct m0_client_layout **out)
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
struct m0_bufvec ioo_data
static const struct m0_bob_type oci_bobtype
M0_TL_DESCR_DEFINE(clayer, "composite layout layers", static, struct m0_composite_layer, ccr_tlink, ccr_tlink_magic, M0_CLAYER_TL_MAGIC, M0_CLAYER_TL_MAGIC)
struct m0_client_layout ccl_layout
Definition: layout.h:157
M0_INTERNAL void m0_dix_req_lock(struct m0_dix_req *req)
Definition: req.c:184
void m0_op_launch(struct m0_op **op, uint32_t nr)
Definition: client.c:725
M0_INTERNAL int m0_dix_req_wait(struct m0_dix_req *req, uint64_t states, m0_time_t to)
Definition: req.c:201
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
struct m0_op_common ol_oc
Definition: layout.h:175
struct m0_uint128 cr_subobj
Definition: layout.h:89
static bool layout_dix_req_clink_cb(struct m0_clink *cl)
uint64_t u_hi
Definition: types.h:36
void m0_composite_layer_del(struct m0_client_layout *layout, struct m0_uint128 subobj_id)
M0_INTERNAL void m0_dix_meta_req_fini(struct m0_dix_meta_req *req)
Definition: meta.c:285
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static int composite_layout_io_build(struct m0_io_args *args, struct m0_op **op)
static bool m0_op_composite_io_invariant(const struct m0_op_composite_io *oci)
struct m0_fid oo_fid
struct m0_uint128 ce_id
Definition: layout.h:132
uint32_t dl_type
Definition: layout.h:100
uint32_t v_nr
Definition: vec.h:51
struct m0_sm_group * lo_grp
Definition: locality.h:67
int(* lo_alloc)(struct m0_client_layout **)
int32_t sm_rc
Definition: sm.h:336
static void composite_io_op_done(struct m0_op_composite_io *oci)
void(* oc_cb_fini)(struct m0_op_common *oc)
m0_bcount_t * v_count
Definition: vec.h:53
uint64_t ccr_lid
Definition: layout.h:142
M0_INTERNAL bool m0_ivec_cursor_move(struct m0_ivec_cursor *cur, m0_bcount_t count)
Definition: vec.c:718
struct m0_uint128 en_id
Definition: client.h:708
struct m0_op_common oo_oc
union m0_dix_layout::@145 u
static struct m0_stob_io io
Definition: ad.c:59
M0_INTERNAL int m0_op_executed(struct m0_op *op)
Definition: client.c:500
static int composite_extents_scan_sync(struct m0_obj *obj, struct m0_client_composite_layout *clayout)
bool(* m0_chan_cb_t)(struct m0_clink *link)
Definition: chan.h:176
static int composite_layer_idx_extents_extract(struct m0_composite_layer *layer, struct m0_bufvec *keys, struct m0_bufvec *vals, int *rcs, struct m0_tl *ext_list, struct m0_composite_layer_idx_key *max_key)
struct m0_ast_rc ol_ar
Definition: layout.h:179
struct m0_uint128 cek_layer_id
Definition: client.h:842
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
static uint64_t m0_byteorder_be64_to_cpu(uint64_t big_endian_64bits)
Definition: byteorder.h:99
static struct m0_client cinst
Definition: sync.c:84
struct m0_dix_composite_layer * cld_layers
Definition: layout.h:96
struct m0_tl ccl_layers
Definition: layout.h:159
struct m0_obj * ml_obj
Definition: client.h:800
static void advance_layers_cursor(struct m0_tl *cext_tlists[], struct m0_composite_extent *cexts[], int n, m0_bindex_t off)
struct m0_uint128 ccr_subobj
Definition: layout.h:141
uint64_t n
Definition: fops.h:107
int(* olo_copy_to_app)(struct m0_client_layout *to, void *data)
Definition: layout.h:169
struct m0_realm * en_realm
Definition: client.h:710
struct m0_fid composite_extent_wr_idx_fid
struct m0_clink dr_clink
const struct m0_op_layout_ops m0_op_layout_composite_ops
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
Definition: locality.c:169
static void layout_dix_req_fini(struct layout_dix_req *req)
struct m0_op ** oci_sub_ops
Definition: layout.h:195
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL struct m0_dix_cli * ol_dixc(const struct m0_op_layout *ol)
Definition: idx_dix.c:196
Definition: fid.h:38
M0_INTERNAL int m0_op_failed(struct m0_op *op)
Definition: client.c:548
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_entity * op_entity
Definition: client.h:660
M0_INTERNAL struct m0_locality * m0__locality_pick(struct m0_client *cinst)
Definition: client.c:290
int m0_composite_layer_add(struct m0_client_layout *layout, struct m0_obj *sub_obj, int priority)
M0_TL_DEFINE(clayer, static, struct m0_composite_layer)
M0_INTERNAL int m0_dix_meta_generic_rc(const struct m0_dix_meta_req *req)
Definition: meta.c:309
static int composite_layout_get_sync(struct m0_obj *obj)
struct m0_entity ob_entity
Definition: client.h:789
const struct m0_op_layout_ops * ol_ops
Definition: layout.h:185
M0_INTERNAL m0_bindex_t m0_ivec_cursor_index(const struct m0_ivec_cursor *cur)
Definition: vec.c:733
M0_INTERNAL void m0_sm_move(struct m0_sm *mach, int32_t rc, int state)
Definition: sm.c:485
static void layout_op_failed(struct m0_op *op, int rc)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static m0_bindex_t get_next_off(struct m0_composite_extent *cexts[], int n)
m0_obj_opcode
Definition: client.h:537
const struct m0_client_layout_ops * ml_ops
Definition: client.h:801
m0_bcount_t ce_len
Definition: layout.h:134
struct m0_tl ccr_wr_exts
Definition: layout.h:146
M0_INTERNAL void m0_dix_req_unlock(struct m0_dix_req *req)
Definition: req.c:190
void m0_obj_init(struct m0_obj *obj, struct m0_realm *parent, const struct m0_uint128 *id, uint64_t layout_id)
Definition: client.c:403
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
#define M0_OBJ_LAYOUT_MAKE_LID(lid, type)
Definition: layout.h:109
m0_bindex_t ce_off
Definition: layout.h:133
static uint64_t m0_byteorder_cpu_to_be64(uint64_t cpu_64bits)
Definition: byteorder.h:89
int(* olo_copy_from_app)(struct m0_client_layout *from, void *data)
Definition: layout.h:171
struct m0_tl ccr_rd_exts
Definition: layout.h:145
struct m0_tl si_exts
void m0_container_init(struct m0_container *con, struct m0_realm *parent, const struct m0_uint128 *id, struct m0_client *instance)
Definition: realm.c:31
static uint64_t found
Definition: base.c:376
static int composite_layout_copy_from_app(struct m0_client_layout *from, void *to)
#define out(...)
Definition: gen.c:41
void m0_op_free(struct m0_op *op)
Definition: client.c:885
static int composite_io_op_init(struct m0_obj *obj, int opcode, struct m0_op *op)
struct m0_sm en_sm
Definition: client.h:732
static void composite_io_op_cb_launch(struct m0_op_common *oc)
static int composite_layout_copy_to_app(struct m0_client_layout *to, void *from)
struct m0_uint128 si_id
M0_INTERNAL bool m0_fid_is_valid(const struct m0_fid *fid)
Definition: fid.c:96
void(* oc_cb_launch)(struct m0_op_common *oc)
struct m0_fom_ops ops
Definition: io_foms.c:623
uint64_t u_lo
Definition: types.h:37
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
uint32_t sm_state
Definition: sm.h:307
struct m0_bufvec ioo_attr
M0_INTERNAL void m0_bufvec_free2(struct m0_bufvec *bufvec)
Definition: vec.c:401
int32_t rc
Definition: trigger_fop.h:47
static int composite_layer_idx_scan(struct m0_composite_layer *layer, bool is_wr_list)
int m0_composite_layer_idx_val_to_buf(struct m0_composite_layer_idx_val *val, void **out_vbuf, m0_bcount_t *out_vlen)
struct m0_tlink sie_tlink
#define offsetof(typ, memb)
Definition: misc.h:29
struct m0_dix_meta_req dr_mreq
M0_INTERNAL bool m0_sm_group_is_locked(const struct m0_sm_group *grp)
Definition: sm.c:107
void m0_idx_init(struct m0_idx *idx, struct m0_realm *parent, const struct m0_uint128 *id)
Definition: idx.c:626
struct m0_sm_conf m0_op_conf
Definition: client.c:145
M0_INTERNAL int m0__composite_container_init(struct m0_client *cinst)
Definition: vec.h:145
M0_INTERNAL int m0_bufvec_empty_alloc(struct m0_bufvec *bufvec, uint32_t num_segs)
Definition: vec.c:213
Definition: idx_mock.c:47
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL struct m0_client * m0__entity_instance(const struct m0_entity *entity)
Definition: client.c:226
static void composite_io_op_cb_free(struct m0_op_common *oc)
struct m0_op_obj oci_oo
Definition: layout.h:191