Motr  M0
m0hsm_api.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2018-2020 Seagate Technology LLC and/or its Affiliates
3  * COPYRIGHT 2017-2018 CEA[1] and SAGE partners
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  * [1]Commissariat a l'energie atomique et aux energies alternatives
21  *
22  * Original author: Thomas Leibovici <thomas.leibovici@cea.fr>
23  */
24 
25 /* HSM invariants:
26  * - There is always a writable layer with the highest priority
27  * to protect object copies (source or target) to be modified while they are moved.
28  * - Applications always read the last written data from composite objects,
29  * whereever it extents are located.
30  */
31 
32 #include <stdlib.h>
33 #include <unistd.h>
34 #include <errno.h>
35 #include <fcntl.h>
36 #include <sys/time.h>
37 #include <getopt.h>
38 #include <stdarg.h>
39 
40 #include "lib/trace.h"
41 #include "conf/obj.h"
42 #include "fid/fid.h"
43 #include "motr/idx.h"
44 #include "motr/layout.h"
45 
46 #include "m0hsm_api.h"
47 
48 #define MAX_BLOCK_COUNT (200)
49 
53 #define HSM_EXTENT_SCAN_BATCH 8
54 
55 /* global variables (set by m0hsm_init())*/
58  .op_timeout = 10,
59  .log_stream = NULL, /* default will be set by m0hsm_init() */
60 };
61 
62 static struct m0_client *m0_instance;
63 static struct m0_realm *m0_uber_realm;
64 
65 /* logging macros */
66 #define ERROR(_fmt, ...) if (options.trace_level >= LOG_ERROR) \
67  fprintf(options.log_stream, _fmt, ##__VA_ARGS__)
68 #define INFO(_fmt, ...) if (options.trace_level >= LOG_INFO) \
69  fprintf(options.log_stream, _fmt, ##__VA_ARGS__)
70 #define VERB(_fmt, ...) if (options.trace_level >= LOG_VERB) \
71  fprintf(options.log_stream, _fmt, ##__VA_ARGS__)
72 #define DBG(_fmt, ...) if (options.trace_level >= LOG_DEBUG) \
73  fprintf(options.log_stream, _fmt, ##__VA_ARGS__)
74 
75 #define ENTRY DBG("> ENTERING %s()\n", __func__)
76 #define RETURN(_rc) do { DBG("< LEAVING %s() line %d, rc=%d\n", \
77  __func__, __LINE__, (_rc)); \
78  return (_rc); } while(0)
79 
81 struct extent {
82  off_t off;
83  size_t len;
84 };
85 
87 static const struct extent EXT_FULLRANGE = {
88  .off = 0,
89  .len = M0_BCOUNT_MAX,
90 };
91 
92 enum {
93  MAX_LEN=128,
94  MAX_POOLS = 16,
95 };
96 
97 struct param {
98  char name[MAX_LEN];
99  char value[MAX_LEN];
100 };
101 
102 static struct param hsm_rc_params[128];
103 static struct m0_fid hsm_pools[MAX_POOLS] = {};
104 
105 static int read_params(FILE *in, struct param *p, int max_params)
106 {
107  int ln, n=0;
108  char s[MAX_LEN];
109 
110  for (ln=1; max_params > 0 && fgets(s, MAX_LEN, in); ln++) {
111  if (sscanf(s, " %[#\n\r]", p->name))
112  continue; /* skip emty line or comment */
113  if (sscanf(s, " %[a-z_A-Z0-9] = %[^#\n\r]",
114  p->name, p->value) < 2) {
115  ERROR("m0hsm: %s: error at line %d: %s\n", __func__,
116  ln, s);
117  return -1;
118  }
119  DBG("%s: %d: name='%s' value='%s'\n", __func__,
120  ln, p->name, p->value);
121  p++, max_params--, n++;
122  }
123 
124  RETURN(n);
125 }
126 
127 static int hsm_pool_fid_set(struct param *p)
128 {
129  int i;
130  char pname[32];
131 
132  for (i = 0; i < MAX_POOLS; i++) {
133  sprintf(pname, "M0_POOL_TIER%d", i + 1);
134  if (strcmp(p->name, pname) == 0) {
135  if (m0_fid_sscanf(p->value, hsm_pools + i) != 0) {
136  ERROR("%s: failed to parse FID of %s\n",
137  __func__, pname);
138  return -1;
139  }
140  return 1;
141  }
142  }
143 
144  return 0;
145 }
146 
147 static int hsm_pools_fids_set(struct param p[], int n)
148 {
149  int i, rc;
150 
151  for (i = 0; n > 0; n--, p++, i += rc) {
152  rc = hsm_pool_fid_set(p);
153  DBG("%s: rc=%d\n", __func__, rc);
154  if (rc < 0)
155  return rc;
156  }
157 
158  if (i < 1) {
159  ERROR("m0hsm: no pools configured\n");
160  return -1;
161  }
162 
163  return 0;
164 }
165 
167  const struct m0hsm_options *in_options)
168 {
169  int rc;
170 
171  options.log_stream = stderr; /* set default */
172 
173  /* set options */
174  if (in_options)
175  options = *in_options;
176 
177  if (!instance || !uber_realm) {
178  ERROR("Missing instance or realm argument to %s()\n", __func__);
179  return -EINVAL;
180  }
181 
184 
186  ARRAY_SIZE(hsm_rc_params))) < 0) {
187  ERROR("%s: failed to read parameters\n", __func__);
188  return -EINVAL;
189  }
190 
191  DBG("%s: read %d params\n", __func__, rc);
192 
193  if (hsm_pools_fids_set(hsm_rc_params, rc) < 0) {
194  ERROR("%s: failed to configure pools\n", __func__);
195  return -EINVAL;
196  }
197 
198  return 0;
199 }
200 
202 #define HSM_ANY_TIER UINT8_MAX
203 
204 /* Keep high 32 bits reserved for motr. Use 2^31 for HSM */
205 #define HSM_ID_MASK_HI (1LL<<31)
206 static bool is_hsm_reserved(struct m0_uint128 id)
207 {
208  return (id.u_hi & HSM_ID_MASK_HI);
209 }
210 
211 /* HSM priority is composed of:
212  * 24 bits for generation, 8 bits for tier priority
213  * Priority is in descending order (the smaller the value,
214  * the higher priority), so we must reverse generation
215  * numbering (from the higher value down to 0).
216  */
217 
221 static uint32_t hsm_prio(uint32_t generation, uint8_t tier_idx)
222 {
223  /* generation prio is the opposite of generation */
224  uint32_t gen_prio;
225 
226  /* generation must fit on 24bits */
227  M0_ASSERT(generation <= 0x00FFFFFF);
228 
229  gen_prio = 0x00FFFFFF - generation;
230 
231  return (gen_prio << 8) | tier_idx;
232 }
233 
237 static struct m0_uint128 hsm_subobj_id(struct m0_uint128 id, uint32_t gen,
238  uint8_t tier)
239 {
240  struct m0_uint128 newid = id;
241 
242  newid.u_hi <<= 32;
243  newid.u_hi |= HSM_ID_MASK_HI;
244  newid.u_hi |= hsm_prio(gen, tier);
245 
246  return newid;
247 }
248 
252 static uint32_t hsm_prio2gen(uint32_t priority)
253 {
254  return 0x00FFFFFF - (priority >> 8);
255 }
256 
260 static uint8_t hsm_prio2tier(uint32_t priority)
261 {
262  return priority & 0xFF;
263 }
264 
265 
269 static struct m0_fid *hsm_tier2pool(uint8_t tier_idx)
270 {
271  if (tier_idx < 1 || tier_idx > MAX_POOLS)
272  return NULL;
273  return hsm_pools + tier_idx - 1;
274 }
275 
277 static int open_entity(struct m0_entity *entity)
278 {
279  struct m0_op *ops[1] = {NULL};
280  int rc;
281  ENTRY;
282 
283  m0_entity_open(entity, &ops[0]);
284  m0_op_launch(ops, 1);
286  M0_OS_STABLE),
288  m0_rc(ops[0]);
289  m0_op_fini(ops[0]);
290  m0_op_free(ops[0]);
291 
292  RETURN(rc);
293 }
294 
295 
301 static int create_obj(struct m0_uint128 id, struct m0_obj *obj,
302  bool close_entity, uint8_t tier_idx)
303 {
304  struct m0_op *ops[1] = {NULL};
305  struct m0_fid *pool = NULL;
306  int rc;
307  ENTRY;
308 
309  DBG("creating id=%" PRIx64 ":%" PRIx64 "\n", id.u_hi, id.u_lo);
310 
311  /* first create the main object with a default layout */
312  m0_obj_init(obj, m0_uber_realm, &id, 9 /* XXX: 1MB */);
313  /* m0_client_layout_id(m0_instance)); */
314 
315  rc = open_entity(&obj->ob_entity);
316  if (rc == 0) {
317  ERROR("Object %" PRIx64 ":%" PRIx64 " already exists\n", id.u_hi,
318  id.u_lo);
319  RETURN(-EEXIST);
320  } else if (rc != -ENOENT) {
321  ERROR("Failed to create object %" PRIx64 ":%" PRIx64 ": rc=%d\n",
322  id.u_hi, id.u_lo, rc);
323  RETURN(rc);
324  }
325 
326  if (tier_idx != HSM_ANY_TIER) {
327  pool = hsm_tier2pool(tier_idx);
328  DBG("%s: got pool "FID_F"\n", __func__, FID_P(pool));
329  if (pool == NULL || !m0_fid_is_set(pool)) {
330  ERROR("m0hsm: pool index %d is not configured\n", tier_idx);
331  return -EINVAL;
332  }
333  }
334  m0_entity_create(pool, &obj->ob_entity, &ops[0]);
335 
337  rc = m0_op_wait(
340 
341  m0_op_fini(ops[0]);
342  m0_op_free(ops[0]);
343 
344  if (close_entity)
345  m0_entity_fini(&obj->ob_entity);
346 
347  RETURN(rc);
348 }
349 
355 static int delete_obj(struct m0_uint128 id) __attribute__((unused));
356 static int delete_obj(struct m0_uint128 id)
357 {
358  struct m0_op *ops[1] = {NULL};
359  struct m0_obj obj;
360  int rc;
361  ENTRY;
362 
363  memset(&obj, 0, sizeof(struct m0_obj));
364 
365  DBG("deleting id=%" PRIx64 ":%" PRIx64 "\n", id.u_hi, id.u_lo);
366 
368  m0_entity_delete(&obj.ob_entity, &ops[0]);
369 
371  rc = m0_op_wait(
374 
375  m0_op_fini(ops[0]);
376  m0_op_free(ops[0]);
377 
378  m0_entity_fini(&obj.ob_entity);
379 
380  RETURN(rc);
381 }
382 
386 static int create_obj_with_layout(struct m0_uint128 id,
387  struct m0_obj *obj,
388  struct m0_client_layout *layout,
389  bool close_entity)
390 {
391  struct m0_op *ops[2] = {NULL};
392  int rc;
393  ENTRY;
394 
395  DBG("creating id=%" PRIx64 ":%" PRIx64 "\n", id.u_hi, id.u_lo);
396 
397  /* first create the main object with a default layout */
399 
400  /* set first operation of batch */
401  m0_entity_create(NULL, &obj->ob_entity, &ops[0]);
402 
403  /* set second operation of batch */
404  rc = m0_client_layout_op(obj, M0_EO_LAYOUT_SET, layout, &ops[1]);
405  if (rc) {
406  m0_op_free(ops[0]);
407  RETURN(rc);
408  }
409 
410  /* launch them both */
412  /* wait for first to complete */
413  rc = m0_op_wait(
416  m0_op_fini(ops[0]);
417  m0_op_free(ops[0]);
418 
419  /* wait for second to complete */
420  rc = rc ?: m0_op_wait(
423  m0_op_fini(ops[1]);
424  m0_op_free(ops[1]);
425 
426  if (close_entity)
427  m0_entity_fini(&obj->ob_entity);
428 
429  RETURN(rc);
430 }
431 
434  struct m0_uint128 parent_id,
435  struct m0_client_layout *parent_layout)
436 {
437  struct m0_op *ops[2] = {NULL};
438  struct m0_obj parent_obj;
439  struct m0_obj obj;
440  int rc;
441 
442  ENTRY;
443 
444  DBG("deleting id=%" PRIx64 ":%" PRIx64 "\n", id.u_hi, id.u_lo);
445 
446  memset(&obj, 0, sizeof(struct m0_obj));
447  memset(&parent_obj, 0, sizeof(struct m0_obj));
448 
450  m0_obj_init(&parent_obj, m0_uber_realm, &parent_id,
452 
453  /* open the entities */
454  rc = open_entity(&obj.ob_entity);
455  if (rc)
456  RETURN(rc);
457  rc = open_entity(&parent_obj.ob_entity);
458  if (rc)
459  RETURN(rc);
460 
461  /* set first operation of batch */
462  rc = m0_entity_delete(&obj.ob_entity, &ops[0]);
463  if (rc)
464  RETURN(rc);
465 
466  /* set second operation of batch */
467  rc = m0_client_layout_op(&parent_obj, M0_EO_LAYOUT_SET,
468  parent_layout, &ops[1]);
469  if (rc) {
470  m0_op_free(ops[0]);
471  RETURN(rc);
472  }
473 
474  /* launch them both */
476  /* wait for first op to complete */
477  rc = m0_op_wait(
480  m0_op_fini(ops[0]);
481  m0_op_free(ops[0]);
482 
483  /* wait for second op to complete */
484  rc = rc ?: m0_op_wait(
487  m0_op_fini(ops[1]);
488  m0_op_free(ops[1]);
489 
490  /* close entities */
491  m0_entity_fini(&obj.ob_entity);
492  m0_entity_fini(&parent_obj.ob_entity);
493 
494  RETURN(rc);
495 }
496 
500 static int obj_layout_get(struct m0_obj *obj,
501  struct m0_client_layout **layout)
502 {
503  struct m0_op *ops[1] = {NULL};
504  int rc;
505  ENTRY;
506 
508  if (*layout == NULL)
509  RETURN(-ENOMEM);
510 
512 
513  m0_op_launch(ops, 1);
514 
515  rc = m0_op_wait(
518  m0_op_fini(ops[0]);
519  m0_op_free(ops[0]);
520 
521  RETURN(rc);
522 }
523 
527 static int layout_get(struct m0_uint128 id, struct m0_client_layout **layout)
528 {
529  struct m0_obj obj;
530  int rc;
531  ENTRY;
532 
533  /* instanciate the object to be handled */
534  M0_SET0(&obj);
536 
537  rc = open_entity(&obj.ob_entity);
538  if (rc)
539  RETURN(rc);
540 
541  rc = obj_layout_get(&obj, layout);
542 
543  /* close it */
544  m0_entity_fini(&obj.ob_entity);
545  RETURN(rc);
546 }
547 
549 static int obj_layout_set(struct m0_obj *obj,
550  struct m0_client_layout *layout)
551 {
552  struct m0_op *ops[1] = {NULL};
553  int rc;
554  ENTRY;
555 
556  rc = m0_client_layout_op(obj, M0_EO_LAYOUT_SET, layout, &ops[0]);
557  if (rc)
558  RETURN(rc);
559 
560  m0_op_launch(ops, 1);
561 
562  rc = m0_op_wait(
565 
566  m0_op_fini(ops[0]);
567  m0_op_free(ops[0]);
568 
569  RETURN(rc);
570 }
571 
573 static int layout_set(struct m0_uint128 id, struct m0_client_layout *layout)
574 {
575  struct m0_obj obj;
576  int rc;
577  ENTRY;
578 
579  /* open the object */
580  memset(&obj, 0, sizeof(obj));
582 
583  rc = open_entity(&obj.ob_entity);
584  if (rc)
585  RETURN(rc);
586 
587  /* update its layout */
588  rc = obj_layout_set(&obj, layout);
589 
590  /* close it */
591  m0_entity_fini(&obj.ob_entity);
592  RETURN(rc);
593 }
594 
595 /* The definitions above allow iterating on layers and extent lists */
596 #include "motr/magic.h"
597 M0_TL_DESCR_DEFINE(clayer, "composite layout layers",
598  static, struct m0_composite_layer,
599  ccr_tlink, ccr_tlink_magic,
601 M0_TL_DEFINE(clayer, static, struct m0_composite_layer);
602 
603 M0_TL_DESCR_DEFINE(cext, "composite layout extents",
604  static, struct m0_composite_extent,
605  ce_tlink, ce_tlink_magic,
607 M0_TL_DEFINE(cext, static, struct m0_composite_extent);
608 
609 
613 static int get_next_extents(struct m0_idx *idx,
614  struct m0_bufvec *keys,
615  struct m0_bufvec *vals,
616  int *rc_list, int32_t flags)
617 {
618  int rc;
619  struct m0_op *ops[1] = {NULL};
620  ENTRY;
621 
622  m0_idx_op(idx, M0_IC_NEXT,
623  keys, vals, rc_list, flags, &ops[0]);
624  m0_op_launch(ops, 1);
625  rc = m0_op_wait(ops[0],
627  M0_OS_STABLE),
629  rc = rc ? rc : ops[0]->op_sm.sm_rc;
630 
631  /* fini and release */
632  m0_op_fini(ops[0]);
633  m0_op_free(ops[0]);
634  RETURN(rc);
635 }
636 
637 
642 static int read_extent_keys(struct m0_uint128 subobjid,
643  struct m0_bufvec *keys,
644  struct m0_bufvec *vals,
645  int *rc_list,
646  struct m0_tl *ext_list,
647  struct m0_composite_layer_idx_key *last_key)
648 {
651  struct m0_composite_extent *ext;
652  int i;
653  ENTRY;
654 
655  for (i = 0; i < keys->ov_vec.v_nr; i++) {
656  /* Reach the end of index. */
657  if (keys->ov_buf[i] == NULL ||
658  vals->ov_buf[i] == NULL || rc_list[i] != 0)
659  break;
660 
661  /* Have retrieved all kv pairs for a layer. */
663  &key, keys->ov_buf[i]);
664  if (!m0_uint128_eq(&key.cek_layer_id, &subobjid))
665  break;
666 
668  &val, vals->ov_buf[i]);
669 
670  /* Add a new extent. */
671  M0_ALLOC_PTR(ext);
672  if (ext == NULL)
673  RETURN(-ENOMEM);
674  ext->ce_id = key.cek_layer_id;
675  ext->ce_off = key.cek_off;
676  ext->ce_len = val.cev_len;
677 
678  DBG("%s: extent %#" PRIx64 ":%#"PRIx64
679  " [%#" PRIx64 "-%#" PRIx64 "]\n", __func__,
680  ext->ce_id.u_hi, ext->ce_id.u_lo, key.cek_off,
681  key.cek_off + ext->ce_len - 1);
682 
683  /* The extents are in increasing order of offset. */
684  cext_tlink_init_at_tail(ext, ext_list);
685  }
686 
687  ext = cext_tlist_tail(ext_list);
688  if (ext != NULL) {
689  last_key->cek_layer_id = ext->ce_id;
690  last_key->cek_off = ext->ce_off;
691  }
692  RETURN(i);
693 }
694 
696 static void extent_list_free(struct m0_tl *ext_list)
697 {
698  struct m0_composite_extent *ext;
699 
700  m0_tl_teardown(cext, ext_list, ext)
701  m0_free(ext);
702 }
703 
705 static void reset_bufvec(struct m0_bufvec *keys, int count)
706 {
707  int i;
708 
709  for (i = 0; i < count; i++) {
710  m0_free(keys->ov_buf[i]);
711  keys->ov_buf[i] = NULL;
712  keys->ov_vec.v_count[i] = 0;
713  }
714 }
715 
723 static int layer_load_extent_list(struct m0_uint128 subobjid, bool write,
724  struct m0_tl *ext_list)
725 {
726  struct m0_idx idx = {{0}};
727  struct m0_composite_layer_idx_key curr_key;
728  struct m0_bufvec keys;
729  struct m0_bufvec vals;
730  int *rc_list = NULL;
731  int32_t flags = 0;
732  int rc, nb;
733  ENTRY;
734 
735  if (!cext_tlist_is_empty(ext_list))
736  /* already loaded */
737  RETURN(0);
738 
739  /* Allocate argument parameters */
741  if (rc)
742  return rc;
744  if (rc)
745  goto out_free;
746 
748  if (rc_list == NULL) {
749  rc = -ENOMEM;
750  goto out_free;
751  }
752 
753  curr_key.cek_layer_id = subobjid;
754  curr_key.cek_off = 0;
755 
756  m0_composite_layer_idx(subobjid, write, &idx);
757 
758  while (true) {
759  /* convert current key to idx buffer */
761  &curr_key, &keys.ov_buf[0], &keys.ov_vec.v_count[0]);
762  if (rc)
763  goto out_free;
764 
765  rc = get_next_extents(&idx, &keys, &vals, rc_list, flags);
766  if (rc)
767  goto out_free;
768 
769  nb = read_extent_keys(subobjid, &keys, &vals, rc_list, ext_list,
770  &curr_key);
771  if (nb < HSM_EXTENT_SCAN_BATCH)
772  break;
773 
774  /* Reset keys and vals. */
777 
779  }
780 
781 out_free:
782  m0_idx_fini(&idx);
783  m0_bufvec_free(&keys);
784  m0_bufvec_free(&vals);
785  m0_free0(&rc_list);
786  RETURN(rc);
787 }
788 
789 static void print_extents(FILE *stream, const struct m0_tl *ext_list,
790  bool details)
791 {
792  struct m0_composite_extent *ext;
793  bool is_first = true;
794 
795  m0_tl_for(cext, ext_list, ext) {
796  if (details)
797  fprintf(stream, "%s<%#" PRIx64 ":%#" PRIx64 ">:"
798  "[%#" PRIx64 "->%#" PRIx64 "]",
799  is_first ? "" : " ",
800  ext->ce_id.u_hi, ext->ce_id.u_lo,
801  ext->ce_off, ext->ce_off + ext->ce_len - 1);
802  else
803  fprintf(stream, "%s[%#" PRIx64 "->%#" PRIx64 "]",
804  is_first ? "" : " ",
805  ext->ce_off, ext->ce_off + ext->ce_len - 1);
806 
807  is_first = false;
808  } m0_tl_endfor;
809  if (details)
810  fprintf(stream, "\n");
811 }
812 
813 static void print_layer(FILE *stream, struct m0_composite_layer *layer,
814  bool details)
815 {
816  if (details) {
817  fprintf(stream, "subobj=<%#" PRIx64 ":%#" PRIx64 ">\n",
818  layer->ccr_subobj.u_hi, layer->ccr_subobj.u_lo);
819  fprintf(stream, "lid=%" PRIu64 "\n", layer->ccr_lid);
820  fprintf(stream, "priority=%#x (gen=%u, tier=%hhu)\n",
821  layer->ccr_priority,
822  hsm_prio2gen(layer->ccr_priority),
823  hsm_prio2tier(layer->ccr_priority));
824  } else {
825  fprintf(stream, "gen %u, tier %hhu, extents: ",
826  hsm_prio2gen(layer->ccr_priority),
827  hsm_prio2tier(layer->ccr_priority));
828  }
829 
830  /* load read extents for this layer */
831  layer_load_extent_list(layer->ccr_subobj, false, &layer->ccr_rd_exts);
832  /* load write extents for this layer */
833  layer_load_extent_list(layer->ccr_subobj, true,
834  &layer->ccr_wr_exts);
835  if (details)
836  fprintf(stream, "R extents:\n");
837  print_extents(stream, &layer->ccr_rd_exts, details);
838 
839  if (details) {
840  fprintf(stream, "W extents:\n");
841  print_extents(stream, &layer->ccr_wr_exts, details);
842  } else {
843  if (!cext_tlist_is_empty(&layer->ccr_wr_exts))
844  fprintf(stream, " (writable)\n");
845  else
846  fprintf(stream, "\n");
847  }
848 }
849 
850 static void print_layout(FILE *stream, const struct m0_client_layout *layout,
851  bool details)
852 {
853  struct m0_client_composite_layout *clayout;
854  struct m0_composite_layer *layer;
855  int i;
856 
857  clayout = M0_AMB(clayout, layout, ccl_layout);
858  M0_ASSERT(clayout != NULL);
859 
860  if (details)
861  fprintf(stream, "%" PRIu64 " layers:\n", clayout->ccl_nr_layers);
862 
863  /* iterate on all layers and display their extents */
864  i = 0;
865  m0_tl_for(clayer, &clayout->ccl_layers, layer) {
866  if (details)
867  fprintf(stream, "==== layer #%d ====\n", i);
868  else
869  fprintf(stream, " - ");
870  print_layer(stream, layer, details);
871  i++;
872  } m0_tl_endfor;
873 }
874 
875 int m0hsm_dump(FILE *stream, struct m0_uint128 id, bool details)
876 {
877  struct m0_client_layout *layout = NULL;
878  int rc;
879  ENTRY;
880 
881  if (is_hsm_reserved(id))
882  RETURN(-EINVAL);
883 
884  rc = layout_get(id, &layout);
885  if (rc)
886  RETURN(rc);
887  M0_ASSERT(layout != NULL);
888 
889  print_layout(stream, layout, details);
890  RETURN(0);
891 }
892 
893 
894 
904 static int layer_extent_add(struct m0_uint128 subobjid,
905  const struct extent *ext,
906  bool write, bool overwrite)
907 {
908  struct m0_bufvec keys;
909  struct m0_bufvec vals;
912  int *rcs = NULL;
913  struct m0_op *ops[1] = {NULL};
914  struct m0_idx idx;
915  int rc;
916  ENTRY;
917 
918  rc = m0_bufvec_empty_alloc(&keys, 1);
919  if (rc)
920  RETURN(rc);
921  rc = m0_bufvec_empty_alloc(&vals, 1);
922  if (rc)
923  goto free_keys;
924 
925  /* Set key and value. */
926  key.cek_layer_id = subobjid;
927  key.cek_off = ext->off;
928  val.cev_len = ext->len;
930  &key, &keys.ov_buf[0], &keys.ov_vec.v_count[0]);
931  if (rc)
932  goto free_vals;
933 
935  &val, &vals.ov_buf[0], &vals.ov_vec.v_count[0]);
936  if (rc)
937  return rc;
938 
939  /* now set the key/value */
940  memset(&idx, 0, sizeof idx);
941  M0_ALLOC_ARR(rcs, 1);
942  if (rcs == NULL) {
943  rc = -ENOMEM;
944  goto out_free;
945  }
946 
947  DBG("%s %s extent for <%" PRIx64 ":%" PRIx64 ">: "
948  "[%#" PRIx64 "-%#" PRIx64 "]\n",
949  overwrite ? "Changing" : "Adding",
950  write ? "write" : "read",
951  subobjid.u_hi, subobjid.u_lo, ext->off,
952  ext->off + ext->len - 1);
953 
954  ops[0] = NULL;
955  m0_composite_layer_idx(subobjid, write, &idx);
956  m0_idx_op(&idx, M0_IC_PUT, &keys, &vals, rcs,
957  overwrite ? M0_OIF_OVERWRITE : 0, &ops[0]);
958  m0_op_launch(ops, 1);
959  rc = m0_op_wait(ops[0],
962  if (rc)
963  goto out_free;
964  rc = ops[0]->op_sm.sm_rc;
965 
966  /* fini and release */
967  m0_op_fini(ops[0]);
968  m0_op_free(ops[0]);
970 
971 out_free:
972  m0_free0(&rcs);
973 free_vals:
974  m0_bufvec_free(&vals);
975 free_keys:
976  m0_bufvec_free(&keys);
977  RETURN(rc);
978 }
979 
980 static int layer_extent_del(struct m0_uint128 subobjid, off_t off, bool write)
981 {
982  struct m0_bufvec keys;
984  int *rcs = NULL;
985  struct m0_op *ops[1] = {NULL};
986  struct m0_idx idx;
987  int rc;
988  ENTRY;
989 
990  rc = m0_bufvec_empty_alloc(&keys, 1);
991  if (rc)
992  RETURN(rc);
993 
994  /* Set key and value. */
995  key.cek_layer_id = subobjid;
996  key.cek_off = off;
998  &key, &keys.ov_buf[0], &keys.ov_vec.v_count[0]);
999  if (rc)
1000  goto free_keys;
1001 
1002  /* now set the key/value */
1003  memset(&idx, 0, sizeof idx);
1004  M0_ALLOC_ARR(rcs, 1);
1005  if (rcs == NULL) {
1006  rc = -ENOMEM;
1007  goto out_free;
1008  }
1009 
1010  DBG("Dropping %s extent for <%" PRIx64 ":%" PRIx64 "> at offset %#"PRIx64
1011  " (if it exists)\n", write ? "write" : "read",
1012  subobjid.u_hi, subobjid.u_lo, off);
1013 
1014  ops[0] = NULL;
1015  m0_composite_layer_idx(subobjid, write, &idx);
1016  m0_idx_op(&idx, M0_IC_DEL, &keys, NULL, rcs, 0, &ops[0]);
1017  m0_op_launch(ops, 1);
1018  rc = m0_op_wait(ops[0],
1021  if (rc)
1022  goto out_free;
1023  rc = ops[0]->op_sm.sm_rc;
1024 
1025  /* fini and release */
1026  m0_op_fini(ops[0]);
1027  m0_op_free(ops[0]);
1028  m0_entity_fini(&idx.in_entity);
1029 
1030 out_free:
1031  m0_free0(&rcs);
1032 free_keys:
1033  m0_bufvec_free(&keys);
1034  RETURN(rc);
1035 }
1036 
1037 
1042 static void layout_top_prio(struct m0_client_layout *layout, int32_t *max_gen,
1043  struct m0_uint128 *max_gen_id, uint8_t *top_tier)
1044 {
1045  struct m0_client_composite_layout *clayout;
1046  struct m0_composite_layer *layer;
1047 
1048  *max_gen = -1;
1049  *top_tier = UINT8_MAX;
1050 
1051  /* get the max generation and create the new layer */
1052  clayout = M0_AMB(clayout, layout, ccl_layout);
1053  M0_ASSERT(clayout != NULL);
1054 
1055  /* iterate on all layers to get the max gen, top tier... */
1056  m0_tl_for(clayer, &clayout->ccl_layers, layer) {
1057  int32_t gen;
1058  uint8_t tier;
1059 
1060  gen = hsm_prio2gen(layer->ccr_priority);
1061  tier = hsm_prio2tier(layer->ccr_priority);
1062 
1063  if (gen > *max_gen) {
1064  *max_gen = gen;
1065  *max_gen_id = layer->ccr_subobj;
1066  }
1067  if (top_tier != NULL && tier < *top_tier)
1068  *top_tier = tier;
1069  } m0_tl_endfor;
1070 }
1071 
1077 static int layer_clean(struct m0_uint128 parent_id,
1078  struct m0_client_layout *layout,
1079  struct m0_composite_layer *layer)
1080 {
1081  int rc;
1082 
1083  ENTRY;
1084 
1085  /* remove the subobj from the layout */
1086  m0_composite_layer_del(layout, layer->ccr_subobj);
1087 
1088  /* delete the subobject and update the parent layout */
1089  rc = delete_obj_set_parent_layout(layer->ccr_subobj, parent_id, layout);
1090 
1091  RETURN(rc);
1092 }
1093 
1100 static int layer_check_clean(struct m0_uint128 parent_id,
1101  struct m0_client_layout *layout,
1102  struct m0_composite_layer *layer)
1103 {
1104  int rc;
1105 
1106  ENTRY;
1107 
1108  /* load read extents for this layer */
1109  rc = layer_load_extent_list(layer->ccr_subobj, false,
1110  &layer->ccr_rd_exts);
1111  if (rc)
1112  RETURN(rc);
1113 
1114  /* does the subobject still has extent? */
1115  if (!cext_tlist_is_empty(&layer->ccr_rd_exts)) {
1116  DBG("Subobj %" PRIx64 ":%" PRIx64 " still has read extents\n",
1117  layer->ccr_subobj.u_hi, layer->ccr_subobj.u_lo);
1118  RETURN(-ENOTEMPTY);
1119  }
1120 
1121  rc = layer_clean(parent_id, layout, layer);
1122 
1123  RETURN(rc);
1124 }
1125 
1133 static int layout_layer_clean(struct m0_uint128 parent_id,
1134  struct m0_client_layout *layout,
1135  struct m0_uint128 subobj_id)
1136 {
1137  struct m0_client_composite_layout *clayout;
1138  struct m0_composite_layer *layer;
1139 
1140  clayout = M0_AMB(clayout, layout, ccl_layout);
1141  M0_ASSERT(clayout != NULL);
1142 
1143  /* look for the given subobj id */
1144  m0_tl_for(clayer, &clayout->ccl_layers, layer) {
1145  if (m0_uint128_cmp(&layer->ccr_subobj, &subobj_id) != 0)
1146  continue;
1147 
1148  return layer_check_clean(parent_id, layout, layer);
1149  } m0_tl_endfor;
1150 
1151  /* subobj not found */
1152  return -ENOENT;
1153 }
1154 
1155 static int layout_add_top_layer(struct m0_uint128 id,
1156  struct m0_client_layout *layout,
1157  uint8_t tier)
1158 {
1159  struct m0_uint128 old_id;
1160  struct m0_obj subobj = {};
1161  int32_t gen;
1162  uint8_t top_tier;
1163  int rc;
1164  ENTRY;
1165 
1166  DBG("Adding new layer in tier %u to collect new writes\n", tier);
1167 
1168  layout_top_prio(layout, &gen, &old_id, &top_tier);
1169  if (gen == -1) {
1170  ERROR("No layers in composite object\n");
1171  RETURN(-EINVAL);
1172  }
1173 
1174  gen++;
1175  VERB("Creating new layer: gen=%d, tier=%hhu\n", gen, tier);
1176 
1177  rc = create_obj(hsm_subobj_id(id, gen, tier), &subobj, false, tier);
1178  if (rc != 0)
1179  RETURN(rc);
1180 
1181  /* add the new top layer */
1182  m0_composite_layer_add(layout, &subobj, hsm_prio(gen, tier));
1183 
1184  /* add write extent for the new layer */
1185  rc = layer_extent_add(subobj.ob_entity.en_id, &EXT_FULLRANGE, true,
1186  false);
1187  if (rc)
1188  RETURN(rc);
1189 
1190  /* close the new sub-object */
1191  m0_entity_fini(&subobj.ob_entity);
1192 
1193  /* drop the previous writable layer */
1194  rc = layer_extent_del(old_id, 0, true);
1195  if (rc)
1196  RETURN(rc);
1197 
1198  /* Check if the layer where we just dropped the write extent
1199  * still has a read extent: if not, the layer can be fully dropped
1200  * (subobject deleted and removed from the composite layout). */
1201  rc = layout_layer_clean(id, layout, old_id);
1202  if (rc != 0 && rc != -ENOENT && rc != -ENOTEMPTY)
1203  RETURN(rc);
1204 
1205  /* set the layout in case it was not set above */
1206  if (rc != 0)
1207  rc = layout_set(id, layout);
1208 
1209  RETURN(rc);
1210 }
1211 
1212 static struct m0_composite_layer *
1214 {
1215  struct m0_client_composite_layout *clayout;
1216 
1217  clayout = M0_AMB(clayout, layout, ccl_layout);
1218  if (clayout == NULL)
1219  return NULL;
1220 
1221  return clayer_tlist_head(&clayout->ccl_layers);
1222 }
1223 
1224 int m0hsm_set_write_tier(struct m0_uint128 id, uint8_t tier_idx)
1225 {
1226  struct m0_composite_layer *layer;
1227  struct m0_client_layout *layout = NULL;
1228  int rc;
1229  ENTRY;
1230 
1231  if (is_hsm_reserved(id))
1232  RETURN(-EINVAL);
1233 
1234  rc = layout_get(id, &layout);
1235  if (rc)
1236  RETURN(rc);
1237  M0_ASSERT(layout != NULL);
1238 
1239  layer = top_layer_get(layout);
1240  if (layer != NULL && hsm_prio2tier(layer->ccr_priority) == tier_idx) {
1241  INFO("Writable layer is already in tier %u\n", tier_idx);
1242  RETURN(0);
1243  }
1244 
1245  /* If not, create one */
1246  rc = layout_add_top_layer(id, layout, tier_idx);
1247  RETURN(rc);
1248 }
1249 
1251  EM_ERROR = -1, /* Match error */
1252  EM_NONE = 0, /* Extent has no match. */
1253  EM_PARTIAL = (1 << 0), /* Extent partially matches (or contiguous,
1254  * with merge mode). */
1255  EM_FULL = (1 << 2), /* Extent is fully included in existing
1256  * extents. */
1257 };
1258 
1262 };
1263 
1264 #define MAX(_x, _y) ((_x) > (_y) ? (_x) : (_y))
1265 #define MIN(_x, _y) ((_x) < (_y) ? (_x) : (_y))
1266 
1267 static enum ext_match_code ext_match(struct m0_uint128 layer_id,
1268  struct m0_tl *ext_list,
1269  const struct extent *ext_in,
1270  struct extent *match,
1271  enum ext_match_type mode,
1272  bool is_write,
1273  bool del_prev)
1274 {
1275  struct m0_composite_extent *ext;
1276  off_t end = ext_in->off + ext_in->len - 1;
1277  bool is_merged = false;
1278 
1279  m0_tl_for(cext, ext_list, ext) {
1280  off_t ext_end = ext->ce_off + ext->ce_len - 1;
1281 
1282  // XXX assuming that READ extents are aggregated, so a single
1283  // extent should match
1284 
1285  /* extent is fully included in another */
1286  if (ext_in->off >= ext->ce_off &&
1287  (ext_in->off + ext_in->len) <= ext->ce_off + ext->ce_len) {
1288  if (mode == EMT_INTERSECT) {
1289  /* match->off = max(in_offset, ext->ce_off)
1290  * = in_offset */
1291  *match = *ext_in;
1292  } else { /* merge */
1293  /* match_off = min(offset, ext->ce_off) */
1294  match->off = ext->ce_off;
1295  /* extent is fully included: no change in
1296  * previous length */
1297  match->len = ext->ce_len;
1298  /* No previous extent to be overwritten */
1299  }
1300  return EM_FULL;
1301  }
1302 
1303  /* Check partial match */
1304  if ((ext->ce_off >= ext_in->off && ext->ce_off <= end) ||
1305  (ext_end >= ext_in->off && ext_end <= end)) {
1306  off_t m_end;
1307  if (mode == EMT_INTERSECT) {
1308  match->off = MAX(ext_in->off, ext->ce_off);
1309  m_end = MIN(ext_end, end);
1310  match->len = (m_end - match->off) + 1;
1311  return EM_PARTIAL;
1312  } else { /* merge */
1313 
1314  /* Check if it already merged with previous extents? */
1315  /* If so, keep the value in match_off
1316  * (assuming extents are ordered by offset). */
1317  if (!is_merged)
1318  match->off = MIN(ext_in->off, ext->ce_off);
1319 
1320  m_end = MAX(ext_end, end);
1321  match->len = (m_end - match->off) + 1;
1322 
1323  /* Drop merged extent. */
1324  if (del_prev && ext->ce_off != match->off) {
1325  if (layer_extent_del(layer_id,
1326  ext->ce_off, is_write) != 0)
1327  return EM_ERROR;
1328  cext_tlist_del(ext);
1329  }
1330 
1331  is_merged = true;
1332  /* next overlapping extents should be merged too */
1333  continue;
1334  }
1335  }
1336 
1337  /* Check consecutive extent (merge mode only). */
1338  if (mode == EMT_MERGE) {
1339  if (ext->ce_off + ext->ce_len == ext_in->off) {
1340  /* contiguous extent before */
1341  match->off = ext->ce_off;
1342  match->len = ext->ce_len + ext_in->len;
1343  is_merged = true;
1344 
1345  DBG("Merge with previous extent "
1346  "starting at %#" PRIx64 "\n", ext->ce_off);
1347 
1348  /* Merged extent keeps the same offset
1349  * (don't delete it from index) */
1350 
1351  } else if (ext_in->off + ext_in->len == ext->ce_off) {
1352  /* contiguous extent after */
1353  if (!is_merged) {
1354  match->off = ext_in->off;
1355  match->len = ext->ce_len +
1356  ext_in->len;
1357  is_merged = true;
1358  } else {
1359  /* Already merged with a previous extent,
1360  * just extend size. */
1361  match->len += ext->ce_len;
1362  }
1363 
1364  DBG("Merge with next extent starting "
1365  "at %#" PRIx64 "\n", ext->ce_off);
1366 
1367  /* Merged extent offset is different.
1368  * Delete it from index. */
1369  if (del_prev) {
1370  if (layer_extent_del(layer_id,
1371  ext->ce_off, is_write) != 0)
1372  return EM_ERROR;
1373  cext_tlist_del(ext);
1374  }
1375  }
1376  /* continue, as the next extent can overlap */
1377  }
1378 
1379  } m0_tl_endfor;
1380 
1381  return is_merged ? EM_PARTIAL : EM_NONE;
1382 }
1383 
1390 static int ext_subtract(struct m0_uint128 layer_id,
1391  struct m0_tl *ext_list,
1392  const struct extent *ext_in,
1393  bool is_write,
1394  bool *layer_empty)
1395 {
1396  /* possible cases:
1397  orig ext: [---------]
1398  - [XXX]
1399  = [----]
1400  - [XXX]
1401  = [-] [-]
1402  -: [XXX]
1403  =: [---]
1404  */
1405  struct m0_composite_extent *ext;
1406  int rc;
1407 
1408  /* counter to determine if the layer is finally empty */
1409  int remaining_extent = 0;
1410 
1411  m0_tl_for(cext, ext_list, ext) {
1412  struct extent new_ext;
1413  off_t ext_end = ext->ce_off + ext->ce_len - 1;
1414 
1415  remaining_extent ++;
1416 
1417  // XXX assuming that READ extents are aggregated, so a single
1418  // extent should match
1419 
1420  /* extent is expected to be fully included in another */
1421  if (ext_in->off < ext->ce_off ||
1422  (ext_in->off + ext_in->len) > ext->ce_off + ext->ce_len)
1423  continue;
1424 
1425  /* If the extent start exactly matches, drop it
1426  * and create the remaining extent (except if the
1427  * subtracted region ends >= extent end) */
1428  if (ext->ce_off == ext_in->off) {
1429  rc = layer_extent_del(layer_id, ext->ce_off,
1430  is_write);
1431  if (rc)
1432  return rc;
1433  cext_tlist_del(ext);
1434 
1435  remaining_extent --;
1436 
1437  /* create the remaining region expect if it
1438  * was fully covered */
1439  if (ext_in->off + ext_in->len <
1440  ext->ce_off + ext->ce_len) {
1441  new_ext.off = ext_in->off + ext_in->len;
1442  new_ext.len = ext->ce_len - ext_in->len;
1443  /* don't overwrite, the extent is not supposed
1444  * to exist already */
1445  rc = layer_extent_add(layer_id, &new_ext,
1446  is_write, false);
1447  if (rc)
1448  return rc;
1449  remaining_extent ++;
1450  }
1451  /* continue iterating to count remaining extents */
1452  /* next extents are not supposed to match */
1453  continue;
1454  }
1455 
1456  /* Else, shorten and/or split the original extent */
1457  /* 1) extent before */
1458  new_ext.off = ext->ce_off;
1459  new_ext.len = ext_in->off - ext->ce_off;
1460  rc = layer_extent_add(layer_id, &new_ext, is_write,
1461  true);
1462  if (rc)
1463  return rc;
1464  remaining_extent ++;
1465 
1466  /* 2) extent after, if any */
1467  if (ext_in->off + ext_in->len <
1468  ext->ce_off + ext->ce_len) {
1469  new_ext.off = ext_in->off + ext_in->len;
1470  new_ext.len = ext_end - new_ext.off + 1;
1471  /* don't overwrite, the extent is not
1472  * supposed to exist already */
1473  rc = layer_extent_add(layer_id,
1474  &new_ext,
1475  is_write, false);
1476  if (rc)
1477  return rc;
1478  remaining_extent ++;
1479  }
1480  } m0_tl_endfor;
1481 
1482  *layer_empty = (remaining_extent == 0);
1483  return 0;
1484 }
1485 
1488  const struct extent *ext_in)
1489 {
1490  struct m0_uint128 subobj_id = layer->ccr_subobj;
1491  struct extent ext_merge = {.off = -1LL, .len = 0};
1492  enum ext_match_code m;
1493  int rc;
1494  ENTRY;
1495 
1496  /* load previous extents to see if we can merge them */
1497  layer_load_extent_list(layer->ccr_subobj, false, &layer->ccr_rd_exts);
1498 
1499  m = ext_match(layer->ccr_subobj, &layer->ccr_rd_exts, ext_in,
1500  &ext_merge, EMT_MERGE, false, true);
1501 
1502  switch (m) {
1503  case EM_ERROR:
1504  ERROR("Failed to match/merge extent\n");
1505  rc = -EIO;
1506  break;
1507 
1508  case EM_NONE:
1509  /* no match, add read extent to the layer */
1510  rc = layer_extent_add(subobj_id, ext_in, false, false);
1511  break;
1512 
1513  case EM_FULL:
1514  DBG("Extent included to another. Nothing to do\n");
1515  rc = 0;
1516  break;
1517 
1518  case EM_PARTIAL:
1519  DBG("Extent overlap detected: must merge\n");
1520 
1521  /* Add the extent in overwrite mode. Other merged extents
1522  * have been dropped by ext_match previously. */
1523  rc = layer_extent_add(subobj_id, &ext_merge, false, true);
1524  break;
1525  default:
1526  ERROR("ERROR: unexpected match type\n");
1527  rc = -EINVAL;
1528  }
1529  RETURN(rc);
1530 }
1531 
1532 static int m0hsm_release_maxgen(struct m0_uint128 id, uint8_t tier, int max_gen,
1533  off_t offset, size_t len,
1534  enum hsm_rls_flags flags,
1535  bool user_display);
1536 
1538  const struct extent *ext)
1539 {
1540  struct m0_client_layout *layout = NULL;
1541  struct m0_composite_layer *layer;
1542  int rc;
1543  int gen;
1544  ENTRY;
1545 
1546  rc = obj_layout_get(obj, &layout);
1547  if (rc)
1548  RETURN(rc);
1549  M0_ASSERT(layout != NULL);
1550 
1551  layer = top_layer_get(layout);
1552  if (layer == NULL)
1553  /* TODO free layout */
1554  RETURN(-ENODATA);
1555 
1556  rc = add_merge_read_extent(layer, ext);
1557  if (rc)
1558  RETURN(rc);
1559 
1560  gen = hsm_prio2gen(layer->ccr_priority);
1561  if (gen > 1) {
1562  /* clean older versions of data in the write tier */
1563  rc = m0hsm_release_maxgen(obj->ob_entity.en_id,
1564  hsm_prio2tier(layer->ccr_priority),
1565  gen -1, ext->off, ext->len, 0, false);
1566  }
1567  RETURN(rc);
1568 }
1569 
1570 
1571 int m0hsm_create(struct m0_uint128 id, struct m0_obj *obj,
1572  uint8_t tier_idx, bool keep_open)
1573 {
1574  /* create a composite layout that will be assigned to the object */
1575  int rc;
1576  struct m0_client_layout *layout;
1577  struct m0_obj subobj = {};
1578  ENTRY;
1579 
1580  if (is_hsm_reserved(id))
1581  RETURN(-EINVAL);
1582 
1583  /* Create a sub-object that will be the first layer.
1584  * This initial subobj is generation 0 */
1585  rc = create_obj(hsm_subobj_id(id, 0, tier_idx), &subobj, false, tier_idx);
1586  if (rc != 0)
1587  RETURN(rc);
1588 
1589  /* allocate composite layout */
1591  if (layout == NULL) {
1592  rc = M0_ERR(-ENOMEM);
1593  goto err;
1594  }
1595 
1596  /* make the subobject a single-level layout */
1597  rc = m0_composite_layer_add(layout, &subobj, hsm_prio(0, tier_idx));
1598  if (rc != 0) {
1599  rc = M0_ERR(rc);
1600  goto err;
1601  }
1602 
1603  /* create an extent to enable write operations anywhere in this subobject */
1604  rc = layer_extent_add(subobj.ob_entity.en_id, &EXT_FULLRANGE, true,
1605  false);
1606  err:
1607  if (rc != 0 && layout != NULL) {
1608  m0_composite_layer_del(layout, hsm_subobj_id(id, 0, tier_idx));
1609  m0_client_layout_free(layout);
1610  }
1611 
1612  m0_entity_fini(&subobj.ob_entity);
1613 
1614  /* then create the main objet */
1615  if (0) {
1616  rc = create_obj_with_layout(id, obj, layout, false);
1617  if (rc)
1618  RETURN(rc);
1619  } else if (rc == 0) {
1620  rc = create_obj(id, obj, false, HSM_ANY_TIER);
1621  if (rc)
1622  RETURN(rc);
1623 
1624  rc = obj_layout_set(obj, layout);
1625  if (rc)
1626  RETURN(rc);
1627  }
1628  if (rc == 0 && !keep_open)
1629  m0_entity_fini(&obj->ob_entity);
1630 
1631  if (rc == 0)
1632  INFO("Composite object successfully created with "
1633  "id=%#" PRIx64 ":%#" PRIx64 "\n", id.u_hi, id.u_lo);
1634 
1635  RETURN(rc);
1636 }
1637 
1639 struct io_ctx {
1641  size_t curr_bsize;
1643  struct m0_bufvec data;
1644  struct m0_bufvec attr;
1645 };
1646 
1648 static int do_io_op(struct m0_obj *obj, enum m0_obj_opcode opcode,
1649  struct io_ctx *ctx)
1650 {
1651  struct m0_op *ops[1] = {NULL};
1652  int rc;
1653  ENTRY;
1654 
1655  /* Create the write request */
1656  m0_obj_op(obj, opcode, &ctx->ext, &ctx->data, &ctx->attr,
1657  0, 0, &ops[0]);
1658 
1659  /* Launch the write request*/
1660  m0_op_launch(ops, 1);
1661 
1662  /* wait for completion */
1664  M0_OS_STABLE),
1665  M0_TIME_NEVER) ?: m0_rc(ops[0]);
1666 
1667  /* finalize and release */
1668  m0_op_fini(ops[0]);
1669  m0_op_free(ops[0]);
1670 
1671  RETURN(rc);
1672 }
1673 
1675 static int write_blocks(struct m0_obj *obj, struct io_ctx *ctx)
1676 {
1678 }
1679 
1681 static int read_blocks(struct m0_obj *obj, struct io_ctx *ctx)
1682 {
1684 }
1685 
1687 static void gen_data(struct m0_bufvec *data, size_t bsize, int seed)
1688 {
1689  int i;
1690 
1691  for (i = 0; i < data->ov_vec.v_nr; i++)
1692  memset(data->ov_buf[i], seed, bsize);
1693 }
1694 
1695 static void dump_data(struct m0_bufvec *data, size_t bsize)
1696 {
1697  int i, j;
1698 
1699  for (i = 0; i < data->ov_vec.v_nr; i++)
1700  for (j = 0; j < bsize; j++)
1701  putchar(((char *)data->ov_buf[i])[j]);
1702 }
1703 
1709 static int prepare_io_ctx(struct io_ctx *ctx, int blocks, size_t block_size,
1710  bool alloc_io_buff)
1711 {
1712  int rc;
1713 
1714  /* allocate new I/O vec? */
1715  if (blocks != ctx->curr_blocks || block_size != ctx->curr_bsize) {
1716  if (alloc_io_buff) {
1717  if (ctx->curr_blocks != 0)
1718  m0_bufvec_free(&ctx->data);
1719 
1720  /* Allocate block_count * block_size */
1721  rc = m0_bufvec_alloc(&ctx->data, blocks, block_size);
1722  if (rc != 0)
1723  return rc;
1724  } else {
1725  if (ctx->curr_blocks != 0)
1726  m0_bufvec_free2(&ctx->data);
1727 
1728  rc = m0_bufvec_empty_alloc(&ctx->data, blocks);
1729  if (rc != 0)
1730  return rc;
1731  }
1732  }
1733 
1734  /* allocate extents and attrs, if they are not already */
1735  if (blocks != ctx->curr_blocks) {
1736 
1737  /* free previous vectors */
1738  if (ctx->curr_blocks > 0) {
1739  m0_bufvec_free(&ctx->attr);
1740  m0_indexvec_free(&ctx->ext);
1741  }
1742 
1743  /* Allocate attr and extent list and attr */
1744  rc = m0_bufvec_alloc(&ctx->attr, blocks, 1);
1745  if(rc != 0)
1746  return rc;
1747 
1748  rc = m0_indexvec_alloc(&ctx->ext, blocks);
1749  if (rc != 0)
1750  return rc;
1751  }
1752 
1753  ctx->curr_blocks = blocks;
1754  ctx->curr_bsize = block_size;
1755  return 0;
1756 }
1757 
1758 static inline int check_vec(struct m0_bufvec *vec, int blocks)
1759 {
1760  /* arrays not allocated? */
1761  if (!vec->ov_vec.v_count || !vec->ov_buf)
1762  return -EFAULT;
1763 
1764  /* invalid slot count? */
1765  if (vec->ov_vec.v_nr != blocks)
1766  return -EINVAL;
1767 
1768  return 0;
1769 }
1770 
1775 static int map_io_ctx(struct io_ctx *ctx, int blocks, size_t b_size,
1776  off_t offset, char *buff)
1777 {
1778  int i;
1779 
1780  if (!ctx || blocks == 0)
1781  return -EINVAL;
1782 
1783  M0_ASSERT(check_vec(&ctx->data, blocks) == 0);
1784  M0_ASSERT(check_vec(&ctx->attr, blocks) == 0);
1785 
1786  for (i = 0; i < blocks; i++) {
1787  ctx->ext.iv_index[i] = offset + i * b_size;
1788  ctx->ext.iv_vec.v_count[i] = b_size;
1789 
1790  VERB("IO block: offset=%#" PRIx64 ", length=%#" PRIx64 "\n",
1791  ctx->ext.iv_index[i], ctx->ext.iv_vec.v_count[i]);
1792 
1793  /* we don't want any attributes */
1794  ctx->attr.ov_vec.v_count[i] = 0;
1795 
1796  if (ctx->data.ov_vec.v_count[i] == 0)
1797  ctx->data.ov_vec.v_count[i] = b_size;
1798  /* check the allocated buffer has the right size */
1799  else if (ctx->data.ov_vec.v_count[i] != b_size)
1800  return -EINVAL;
1801 
1802  /* map the user-provided I/O buffer */
1803  if (buff != NULL)
1804  ctx->data.ov_buf[i] = buff + i * b_size;
1805  }
1806 
1807  return 0;
1808 }
1809 
1811 static void free_io_ctx(struct io_ctx *ctx, bool alloc_io_buff)
1812 {
1813  if (alloc_io_buff)
1814  m0_bufvec_free(&ctx->data);
1815  else
1816  m0_bufvec_free2(&ctx->data);
1817  m0_bufvec_free(&ctx->attr);
1818  m0_indexvec_free(&ctx->ext);
1819 }
1820 
1821 static uint64_t roundup_power2(uint64_t x)
1822 {
1823  uint64_t power = 1;
1824 
1825  while (power < x)
1826  power *= 2;
1827 
1828  return power;
1829 }
1830 
1834 uint64_t get_optimal_bs(struct m0_obj *obj, uint64_t obj_sz)
1835 {
1836  unsigned long usz; /* unit size */
1837  unsigned long gsz; /* data units in parity group */
1838  uint64_t max_bs;
1839  unsigned lid;
1840  struct m0_pool_version *pver;
1841  struct m0_pdclust_attr *pa;
1842 
1843  if (obj_sz > MAX_M0_BUFSZ)
1844  obj_sz = MAX_M0_BUFSZ;
1845 
1846  lid = obj->ob_attr.oa_layout_id;
1847  pver = m0_pool_version_find(&m0_instance->m0c_pools_common,
1848  &obj->ob_attr.oa_pver);
1849  if (pver == NULL) {
1850  ERROR("Cannot find the object's pool version\n");
1851  return 0;
1852  }
1853  usz = m0_obj_layout_id_to_unit_size(lid);
1854  pa = &pver->pv_attr;
1855  gsz = usz * pa->pa_N;
1856  /* max 2-times pool-width deep, otherwise we may get -E2BIG */
1857  max_bs = usz * 2 * pa->pa_P * pa->pa_N / (pa->pa_N + pa->pa_K + pa->pa_S);
1858 
1859  VERB("usz=%lu pool="FID_F" (N,K,S,P)=(%u,%u,%u,%u) max_bs=%" PRId64 "\n", usz,
1860  FID_P(&pver->pv_pool->po_id), pa->pa_N, pa->pa_K, pa->pa_S, pa->pa_P, max_bs);
1861 
1862  if (obj_sz >= max_bs)
1863  return max_bs;
1864  else if (obj_sz <= gsz)
1865  return gsz;
1866  else
1867  return roundup_power2(obj_sz);
1868 }
1869 
1875 int m0hsm_test_write(struct m0_uint128 id, off_t offset, size_t len, int seed)
1876 {
1877  int rc;
1878  size_t block_size;
1879  size_t rest = len;
1880  off_t off = offset;
1881  struct m0_composite_layer *layer;
1882  struct m0_client_layout *layout = NULL;
1883  struct extent wext = {.off = off, .len = len};
1884  struct m0_obj obj = {}, subobj = {};
1885  struct io_ctx ctx = {};
1886  ENTRY;
1887 
1888  /* Set the object entity we want to write */
1890  rc = open_entity(&obj.ob_entity);
1891  if (rc)
1892  RETURN(rc);
1893 
1894  rc = obj_layout_get(&obj, &layout);
1895  if (rc) {
1896  ERROR("Could not get object's layout: rc=%d\n", rc);
1897  return rc;
1898  }
1899 
1900  layer = top_layer_get(layout);
1901  if (layer == NULL) {
1902  ERROR("Could not get the object's top layer\n");
1903  rc = -EINVAL;
1904  goto fini;
1905  }
1906 
1907  m0_obj_init(&subobj, m0_uber_realm, &layer->ccr_subobj,
1909  rc = open_entity(&subobj.ob_entity);
1910  if (rc)
1911  RETURN(rc);
1912 
1913  block_size = get_optimal_bs(&subobj, rest);
1914  if (block_size == 0) {
1915  ERROR("Could not get the optimal block size for the object\n");
1916  rc = -EINVAL;
1917  goto fini;
1918  }
1919  m0_entity_fini(&subobj.ob_entity);
1920 
1921  VERB("Using I/O block size of %zu bytes\n", block_size);
1922 
1923  for (; rest > 0; rest -= block_size, off += block_size) {
1924  if (rest < block_size)
1925  /* non full block */
1926  block_size = rest;
1927 
1928  rc = prepare_io_ctx(&ctx, 1, block_size, true);
1929  if (rc) {
1930  ERROR("prepare_io_ctx() failed: rc=%d\n", rc);
1931  goto fini;
1932  }
1933 
1934  rc = map_io_ctx(&ctx, 1, block_size, off, NULL);
1935  if (rc) {
1936  ERROR("map_io_ctx() failed: rc=%d\n", rc);
1937  break;
1938  }
1939 
1940  /* fill buffers with generated data */
1941  gen_data(&ctx.data, block_size, seed);
1942 
1943  /* write them */
1944  rc = write_blocks(&obj, &ctx);
1945  if (rc) {
1946  ERROR("write_blocks() failed: rc=%d\n", rc);
1947  break;
1948  }
1949  }
1950 
1951  /* Free bufvec's and indexvec's */
1952  free_io_ctx(&ctx, true);
1953 
1954  /* Data is always written to the top layer.
1955  * Add corresponding read extent */
1956  rc = rc ?: top_layer_add_read_extent(&obj, &wext);
1957  fini:
1958  m0_entity_fini(&obj.ob_entity);
1959 
1960  if (rc == 0)
1961  INFO("%zu bytes successfully written at offset %#" PRIx64 " "
1962  "(object id=%#" PRIx64 ":%#" PRIx64 ")\n", len, offset,
1963  id.u_hi, id.u_lo);
1964 
1965  RETURN(rc);
1966 }
1967 
1968 int m0hsm_pwrite(struct m0_obj *obj, void *buffer, size_t length, off_t offset)
1969 {
1970  int blocks;
1971  int rc = 0;
1972  size_t io_size;
1973  ssize_t rest = length;
1974  off_t off = offset;
1975  char *buf = buffer;
1976  char *pad_buf = NULL;
1977  struct extent wext;
1978  struct io_ctx ctx = {};
1979  ENTRY;
1980 
1981  wext.off = offset;
1982  wext.len = length;
1983 
1986  M0_ASSERT(io_size > 0);
1987  VERB("using io_size of %zu bytes\n", io_size);
1988 
1989  while (rest > 0) {
1990  /* count remaining blocks */
1991  blocks = rest / io_size;
1992  if (blocks == 0) {
1993  /* last non full block */
1994  blocks = 1;
1995  /* prepare a padded block */
1996  INFO("Padding last block of unaligned size %zd up to "
1997  "%zu\n", rest, io_size);
1998  pad_buf = calloc(1, io_size);
1999  if (!pad_buf) {
2000  rc = -ENOMEM;
2001  break;
2002  }
2003  memcpy(pad_buf, buf, rest);
2004  buf = pad_buf;
2005  wext.len += io_size - rest;
2006  length += io_size - rest;
2007  }
2008  if (blocks > MAX_BLOCK_COUNT)
2009  blocks = MAX_BLOCK_COUNT;
2010 
2011  rc = prepare_io_ctx(&ctx, blocks, io_size, false) ?:
2012  map_io_ctx(&ctx, blocks, io_size, off, buf) ?:
2013  write_blocks(obj, &ctx);
2014  if (rc)
2015  break;
2016 
2017  buf += blocks * io_size;
2018  off += blocks * io_size;
2019  rest -= blocks * io_size;
2020  }
2021 
2022  if (pad_buf)
2023  free(pad_buf);
2024 
2025  /* Free bufvec's and indexvec's */
2026  free_io_ctx(&ctx, false);
2027 
2028  /* Data is always written to the top layer.
2029  * Add corresponding read extent */
2030  if (rc == 0) {
2031  rc = top_layer_add_read_extent(obj, &wext);
2032 
2033  struct m0_uint128 id = obj->ob_entity.en_id;
2034  INFO("%zu bytes successfully written at offset %#" PRIx64 " "
2035  "(object id=%#" PRIx64 ":%#" PRIx64 ")\n", length, offset,
2036  id.u_hi, id.u_lo);
2037  }
2038 
2039  RETURN(rc);
2040 }
2041 
2045 int m0hsm_test_read(struct m0_uint128 id, off_t offset, size_t len)
2046 {
2047  struct m0_obj obj;
2048  int blocks;
2049  size_t io_size;
2050  struct io_ctx ctx = {0};
2051  size_t rest;
2052  off_t start = offset;
2053  int rc;
2054  ENTRY;
2055 
2056  memset(&obj, 0, sizeof(struct m0_obj));
2057 
2058  /* Set the object entity we want to write */
2060 
2063  M0_ASSERT(io_size > 0);
2064  VERB("using io_size of %zu bytes\n", io_size);
2065 
2066  rc = open_entity(&obj.ob_entity);
2067  if (rc)
2068  RETURN(rc);
2069 
2070  for (rest = len; rest > 0; rest -= blocks * io_size) {
2071  /* count remaining blocks */
2072  blocks = rest / io_size;
2073  if (blocks == 0) {
2074  /* last non full block */
2075  blocks = 1;
2076  io_size = rest;
2077  }
2078  if (blocks > MAX_BLOCK_COUNT)
2079  blocks = MAX_BLOCK_COUNT;
2080 
2081  rc = prepare_io_ctx(&ctx, blocks, io_size, true) ?:
2082  map_io_ctx(&ctx, blocks, io_size, start, NULL) ?:
2083  read_blocks(&obj, &ctx);
2084  if (rc)
2085  break;
2086 
2087  /* dump them to stdout */
2088  dump_data(&ctx.data, io_size);
2089 
2090  start += blocks * io_size;
2091  }
2092 
2093  m0_entity_fini(&obj.ob_entity);
2094 
2095  /* Free bufvec's and indexvec's */
2096  free_io_ctx(&ctx, true);
2097 
2098  if (rc == 0)
2099  INFO("%zu bytes successfully read at offset %#" PRIx64 " "
2100  "(object id=%#" PRIx64 ":%#" PRIx64 ")\n", len, offset,
2101  id.u_hi, id.u_lo);
2102 
2103  RETURN(rc);
2104 }
2105 
2108  struct m0_uint128 tgt_id,
2109  const struct extent *range)
2110 {
2111  struct m0_obj src_obj = {};
2112  struct m0_obj tgt_obj = {};
2113  size_t block_size;
2114  struct io_ctx ctx = {};
2115  size_t rest = range->len;
2116  off_t start = range->off;
2117  int rc;
2118  ENTRY;
2119 
2120  m0_obj_init(&src_obj, m0_uber_realm, &src_id,
2122  m0_obj_init(&tgt_obj, m0_uber_realm, &tgt_id,
2124 
2125  /* open the entities */
2126  rc = open_entity(&src_obj.ob_entity);
2127  if (rc)
2128  RETURN(rc);
2129  rc = open_entity(&tgt_obj.ob_entity);
2130  if (rc)
2131  goto out_close_src;
2132 
2133  block_size = get_optimal_bs(&tgt_obj, rest);
2134  if (block_size == 0) {
2135  ERROR("Could not get the optimal block size for the object\n");
2136  rc = -EINVAL;
2137  goto fini;
2138  }
2139 
2140  VERB("Using I/O block size of %zu bytes\n", block_size);
2141 
2142  for (; rest > 0; rest -= block_size, start += block_size) {
2143  if (rest < block_size)
2144  /* non full block */
2145  block_size = rest;
2146 
2147  rc = prepare_io_ctx(&ctx, 1, block_size, true);
2148  if (rc) {
2149  ERROR("prepare_io_ctx() failed: rc=%d\n", rc);
2150  goto fini;
2151  }
2152 
2153  rc = map_io_ctx(&ctx, 1, block_size, start, NULL);
2154  if (rc) {
2155  ERROR("map_io_ctx() failed: rc=%d\n", rc);
2156  break;
2157  }
2158 
2159  /* read blocks */
2160  rc = read_blocks(&src_obj, &ctx);
2161  if (rc) {
2162  ERROR("read_blocks() failed: rc=%d\n", rc);
2163  break;
2164  }
2165 
2166  /* now write data to the target object */
2167  rc = write_blocks(&tgt_obj, &ctx);
2168  if (rc) {
2169  ERROR("write_blocks() failed: rc=%d\n", rc);
2170  break;
2171  }
2172  }
2173 
2174  /* Free bufvec's and indexvec's */
2175  free_io_ctx(&ctx, true);
2176  fini:
2177  m0_entity_fini(&tgt_obj.ob_entity);
2178  out_close_src:
2179  m0_entity_fini(&src_obj.ob_entity);
2180 
2181  if (rc == 0)
2182  INFO("%zu bytes successfully copied from subobj "
2183  "<%#" PRIx64 ":%#" PRIx64 "> to <%#" PRIx64 ":%#" PRIx64 ">"
2184  " at offset %#" PRIx64 "\n", range->len,
2186  range->off);
2187 
2188  RETURN(rc);
2189 }
2190 
2191 
2192 static int check_top_layer_writable(struct m0_client_layout *layout,
2193  int max_prio, int tier)
2194 {
2195  struct m0_composite_layer *layer;
2196 
2197  layer = top_layer_get(layout);
2198  if (!layer)
2199  return -ENOENT;
2200 
2201  if (layer->ccr_priority >= max_prio)
2202  return -ENOENT;
2203 
2204  if (tier != HSM_ANY_TIER && hsm_prio2tier(layer->ccr_priority) != tier)
2205  return -ENOENT;
2206 
2207  return 0;
2208 }
2209 
2218 typedef int (*match_layer_cb_t)(void *cb_arg,
2219  struct m0_client_layout *layout,
2220  struct m0_composite_layer *layer,
2221  const struct extent *match,
2222  bool *stop);
2223 
2228 static int match_layer_foreach(struct m0_client_layout *layout, uint8_t tier,
2229  const struct extent *ext,
2230  match_layer_cb_t cb, void *cb_arg,
2231  bool stop_on_error)
2232 {
2233  struct m0_client_composite_layout *clayout;
2234  struct m0_composite_layer *layer;
2235  struct extent match;
2236  uint8_t ltier;
2237  int rc;
2238  bool stop = false;
2239 
2240  clayout = M0_AMB(clayout, layout, ccl_layout);
2241  M0_ASSERT(clayout != NULL);
2242 
2243  m0_tl_for(clayer, &clayout->ccl_layers, layer) {
2244  /* If tier index matches, load its extents */
2245  ltier = hsm_prio2tier(layer->ccr_priority);
2246  if (tier != HSM_ANY_TIER && ltier != tier)
2247  continue;
2248 
2249  /* load readable extents for this layer */
2250  layer_load_extent_list(layer->ccr_subobj, false,
2251  &layer->ccr_rd_exts);
2252 
2253  enum ext_match_code m;
2254  struct extent ext_curr = *ext;
2255  /* multiple extents may match, so loop on them */
2256  do {
2257  memset(&match, 0, sizeof(match));
2258  m = ext_match(layer->ccr_subobj, &layer->ccr_rd_exts,
2259  &ext_curr, &match,
2260  EMT_INTERSECT, false, false);
2261  if (m == EM_NONE)
2262  continue;
2263 
2264  VERB("Found layer %s matching extent "
2265  "[%#" PRIx64 "-%#" PRIx64 "]: gen=%u, tier=%u, "
2266  "matching_region [%#" PRIx64 "-%#" PRIx64 "]\n",
2267  m == EM_FULL ? "fully" : "partially",
2268  ext->off, ext->off + ext->len - 1,
2269  hsm_prio2gen(layer->ccr_priority), ltier,
2270  match.off, match.off + match.len - 1);
2271 
2272  rc = cb(cb_arg, layout, layer, &match, &stop);
2273  if (rc && stop_on_error)
2274  return rc;
2275 
2276  /* look for next extents in the layer */
2277  if (m == EM_PARTIAL) {
2278  ext_curr.off = match.off + match.len;
2279  ext_curr.len -= match.len;
2280  }
2281 
2282  } while (!stop && m == EM_PARTIAL);
2283  if (stop)
2284  break;
2285 
2286  } m0_tl_endfor;
2287 
2288  /* none found, or 'stop' set to true */
2289  return 0;
2290 }
2291 
2293 static struct m0_composite_layer *
2294  layer_get_by_prio(struct m0_client_layout *layout, int prio)
2295 {
2296  struct m0_client_composite_layout *clayout;
2297  struct m0_composite_layer *layer;
2298 
2299  clayout = M0_AMB(clayout, layout, ccl_layout);
2300  M0_ASSERT(clayout != NULL);
2301 
2302  m0_tl_for(clayer, &clayout->ccl_layers, layer) {
2303  if (layer->ccr_priority == prio)
2304  return layer;
2305  } m0_tl_endfor;
2306 
2307  /* none found */
2308  return NULL;
2309 }
2310 
2311 
2313  int min_gen;
2315  const struct extent *orig_extent;
2316  bool found;
2317 };
2318 
2319 static int min_gen_check_cb(void *cb_arg,
2320  struct m0_client_layout *layout,
2321  struct m0_composite_layer *layer,
2322  const struct extent *match,
2323  bool *stop)
2324 {
2325  struct min_gen_check_arg *arg = cb_arg;
2326  struct extent dummy;
2327 
2328  if (m0_uint128_cmp(&layer->ccr_subobj, &arg->except_subobj) == 0) {
2329  DBG("%s: skip self subobj %#" PRIx64 ":%#" PRIx64 "\n", __func__,
2330  arg->except_subobj.u_hi, arg->except_subobj.u_lo);
2331  /* skip this subobj */
2332  return 0;
2333  }
2334 
2335  if (hsm_prio2gen(layer->ccr_priority) < arg->min_gen) {
2336  DBG("%s: skip layer of lower generation %u\n", __func__,
2337  hsm_prio2gen(layer->ccr_priority));
2338  /* lower generation, skip */
2339  return 0;
2340  }
2341 
2342  /* if the extent fully covers it: OK and stop */
2343  if (ext_match(layer->ccr_subobj, &layer->ccr_rd_exts,
2345  false, false) == EM_FULL) {
2346  arg->found = true;
2347  *stop = true;
2348  return 0;
2349  }
2350 
2351  /* partial match */
2352  DBG("%s: partial match\n", __func__);
2353  return 0;
2354 }
2355 
2358 static int check_min_gen_exists(struct m0_client_layout *layout,
2359  const struct extent *ext, int gen,
2360  struct m0_composite_layer *except_layer)
2361 {
2362  struct min_gen_check_arg cb_arg = {
2363  .min_gen = gen,
2364  .except_subobj = except_layer->ccr_subobj,
2365  .orig_extent = ext,
2366  .found = false,
2367  };
2368  int rc;
2369 
2371  &cb_arg, false);
2372  if (rc != 0 || !cb_arg.found)
2373  {
2374  ERROR("Found no extent matching [%#"PRIx64
2375  "-%#" PRIx64 "] with generation >= %d: "
2376  "can't release it from tier %u\n",
2377  ext->off, ext->off + ext->len - 1, gen,
2378  hsm_prio2tier(except_layer->ccr_priority));
2379  return -EPERM;
2380  }
2381  return 0;
2382 }
2383 
2385  int found;
2387  uint8_t tier;
2388  int max_gen; /* max generation to release */
2389  int max_tier; /* max tier to release from */
2390 };
2391 
2392 static int release_cb(void *cb_arg, struct m0_client_layout *layout,
2393  struct m0_composite_layer *layer,
2394  const struct extent *match,
2395  bool *stop)
2396 {
2397  struct release_cb_ctx *ctx = cb_arg;
2398  bool layer_empty = false;
2399  int gen;
2400  int rc;
2401  int tier;
2402  ENTRY;
2403 
2404  gen = hsm_prio2gen(layer->ccr_priority);
2405  if (ctx->max_gen != -1) {
2406  /* over max gen, skip it */
2407  if (gen > ctx->max_gen)
2408  return 0;
2409  }
2410  tier = hsm_prio2tier(layer->ccr_priority);
2411  if (ctx->max_tier != -1) {
2412  /* over max tier, skip it */
2413  if (tier > ctx->max_tier)
2414  return 0;
2415  }
2416 
2417  ctx->found++;
2418 
2419  /* XXX The covering can be done by multiple extents
2420  * in multiple layers */
2421  /* An extent will equal or higher generation must exist */
2422  rc = check_min_gen_exists(layout, match, gen, layer);
2423  if (rc)
2424  RETURN(rc);
2425 
2426  /* Check there is a writable layer different from current */
2427  rc = check_top_layer_writable(layout, layer->ccr_priority,
2428  HSM_ANY_TIER);
2429  if (rc == -ENOENT) {
2430  /* If not, create one */
2431  rc = layout_add_top_layer(ctx->obj_id, layout, ctx->tier);
2432  if (rc)
2433  RETURN(rc);
2434  }
2435 
2436  /* Subtract the matching extent from the extent it matches */
2437  rc = ext_subtract(layer->ccr_subobj, &layer->ccr_rd_exts, match,
2438  false, &layer_empty);
2439  if (rc)
2440  RETURN(rc);
2441  INFO("Extent [%#" PRIx64 "-%#" PRIx64 "] (gen %d) successfully released "
2442  "from tier %d\n", match->off, match->off + match->len - 1,
2443  gen, hsm_prio2tier(layer->ccr_priority));
2444 
2445  /* Drop the layer if it no longer has readable extents */
2446  if (layer_empty) {
2447  VERB("No remaining extent in layer: deleting "
2448  "subobject <%" PRIx64 ":%" PRIx64 ">\n",
2449  layer->ccr_subobj.u_hi, layer->ccr_subobj.u_lo);
2450 
2451  /* The list of extent has not been updated yet => no_check */
2452  rc = layer_clean(ctx->obj_id, layout, layer);
2453  if (rc)
2454  RETURN(rc);
2455  }
2456 
2458  RETURN(rc);
2459 }
2460 
2462 static int m0hsm_release_maxgen(struct m0_uint128 id, uint8_t tier, int max_gen,
2463  off_t offset, size_t len,
2464  enum hsm_rls_flags flags,
2465  bool user_display)
2466 {
2467  struct m0_client_layout *layout = NULL;
2468  struct release_cb_ctx ctx = {0};
2469  struct extent ext;
2470  int rc;
2471  ENTRY;
2472 
2473  /* get layout once for all, it will be useful in next steps */
2474  rc = layout_get(id, &layout);
2475  if (rc)
2476  RETURN(rc);
2477  M0_ASSERT(layout != NULL);
2478 
2479  ext.off = offset;
2480  ext.len = len;
2481 
2482  ctx.obj_id = id;
2483  ctx.tier = tier;
2484  ctx.max_gen = max_gen;
2485  ctx.max_tier = tier;
2486 
2487  rc = match_layer_foreach(layout, tier, &ext, release_cb, &ctx, false);
2488  if (rc == 0 && ctx.found == 0 && user_display)
2489  ERROR("No matching extent found\n");
2490  RETURN(rc);
2491 }
2492 
2493 int m0hsm_release(struct m0_uint128 id, uint8_t tier,
2494  off_t offset, size_t len, enum hsm_rls_flags flags)
2495 {
2496  return m0hsm_release_maxgen(id, tier, -1, offset, len, flags, true);
2497 }
2498 
2499 int m0hsm_multi_release(struct m0_uint128 id, uint8_t max_tier,
2500  off_t offset, size_t length, enum hsm_rls_flags flags)
2501 {
2502  struct m0_client_layout *layout = NULL;
2503  struct release_cb_ctx ctx = {0};
2504  struct extent ext;
2505  int rc;
2506  ENTRY;
2507 
2508  /* get layout once for all, it will be useful in next steps */
2509  rc = layout_get(id, &layout);
2510  if (rc)
2511  RETURN(rc);
2512  M0_ASSERT(layout != NULL);
2513 
2514  ext.off = offset;
2515  ext.len = length;
2516 
2517  ctx.obj_id = id;
2518  ctx.tier = HSM_ANY_TIER;
2519  ctx.max_gen = -1;
2520  ctx.max_tier = max_tier;
2521 
2522  rc = match_layer_foreach(layout, HSM_ANY_TIER, &ext, release_cb, &ctx,
2523  false);
2524  if (rc == 0 && ctx.found == 0)
2525  ERROR("No matching extent found\n");
2526  RETURN(rc);
2527 }
2528 
2529 struct copy_cb_ctx {
2530  int found;
2532  uint8_t src_tier;
2533  uint8_t tgt_tier;
2535 };
2536 
2540 static int copy_cb(void *cb_arg, struct m0_client_layout *layout,
2541  struct m0_composite_layer *src_layer,
2542  const struct extent *match,
2543  bool *stop)
2544 {
2545  struct m0_composite_layer *tgt_layer;
2546  struct m0_obj subobj = {};
2547  struct m0_uint128 subobj_id;
2548  int tgt_prio, w_prio;
2549  uint8_t w_tier;
2550  int gen;
2551  int rc;
2552  struct copy_cb_ctx *ctx = cb_arg;
2553  ENTRY;
2554 
2555  ctx->found++;
2556 
2557  /* compute the generation of both source and target */
2558  gen = hsm_prio2gen(src_layer->ccr_priority);
2559  tgt_prio = hsm_prio(gen, ctx->tgt_tier);
2560  /* Most prioritary of src and tgt (the lower value, the higher priority) */
2561  w_prio = MIN(src_layer->ccr_priority, tgt_prio);
2562 
2563  INFO("%s extent [%#" PRIx64 "-%#" PRIx64 "] (gen %u) from "
2564  "tier %u to tier %u\n",
2565  ctx->src_tier > ctx->tgt_tier ? "Staging" : "Archiving",
2566  match->off, match->off + match->len - 1,
2567  gen, ctx->src_tier, ctx->tgt_tier);
2568 
2569  /* Make sure there is a writable extent with higher generation
2570  * (must prevent from modifications in both source and target). */
2571  if (ctx->flags & HSM_WRITE_TO_DEST) {
2572  /* write must now be directed to the target tier */
2573  w_tier = ctx->tgt_tier;
2574  rc = check_top_layer_writable(layout, w_prio, w_tier);
2575  } else {
2576  /* write must still be directed to the original tier */
2577  rc = check_top_layer_writable(layout, w_prio, HSM_ANY_TIER);
2578  /* write to source tier if a new layer is to be created */
2579  w_tier = ctx->src_tier;
2580  }
2581 
2582  if (rc == -ENOENT) {
2583  /* If not, create one */
2584  rc = layout_add_top_layer(ctx->obj_id, layout, w_tier);
2585  if (rc)
2586  RETURN(rc);
2587  }
2588 
2589  /* 2: Check if the target subobject already exists */
2590  tgt_layer = layer_get_by_prio(layout, tgt_prio);
2591  if (!tgt_layer) {
2592  /* if not, create it */
2593  subobj_id = hsm_subobj_id(ctx->obj_id, gen, ctx->tgt_tier);
2594 
2595  /* create a sub-object that will be the first layer */
2596  rc = create_obj(subobj_id, &subobj, false, ctx->tgt_tier);
2597  if (rc)
2598  RETURN(rc);
2599  } else {
2600  struct extent already_ext;
2601 
2602  DBG("Target layer already exists\n");
2603  subobj_id = tgt_layer->ccr_subobj;
2604 
2605  /* check if the given segment is already present */
2606  layer_load_extent_list(tgt_layer->ccr_subobj, false,
2607  &tgt_layer->ccr_rd_exts);
2608 
2609  switch (ext_match(tgt_layer->ccr_subobj,
2610  &tgt_layer->ccr_rd_exts, match, &already_ext,
2611  EMT_INTERSECT, false, false))
2612  {
2613  case EM_ERROR:
2614  ERROR("Extent matching error\n");
2615  break;
2616  case EM_NONE:
2617  VERB("No previous copy of this extent\n");
2618  break;
2619  case EM_PARTIAL:
2620  VERB("Extent already been partially copied to target tier\n");
2621  /* TODO only copy missing parts */
2622  break;
2623  case EM_FULL:
2624  /* TODO implement 'force' copy? */
2625  VERB("Extent has already been "
2626  "copied to target tier: nothing to do\n");
2627  goto release;
2628  }
2629  }
2630 
2631  /* 3: copy data (from subojbect to subobject) */
2632  rc = copy_extent_data(src_layer->ccr_subobj, subobj_id, match);
2633  if (rc) {
2634  ERROR("copy_extent_data() failed: rc=%d\n", rc);
2635  goto fini;
2636  }
2637 
2638  /* Add the target extent (readable) */
2639  if (tgt_layer == NULL) {
2640  /* no previous extent */
2641  rc = layer_extent_add(subobj_id, match, false, false);
2642  if (rc) {
2643  ERROR("layer_extent_add() failed: rc=%d\n", rc);
2644  goto fini;
2645  }
2646 
2647  /* Finally, add the subobject to the composite layout (if it was new).
2648  * XXX Is there a need to do this before the copy? perhaps to archive
2649  * multiple extents in parallel. */
2650  DBG("Adding subobj <%" PRIx64 ":%" PRIx64 "> as layer with prio %#x\n",
2651  subobj_id.u_hi, subobj_id.u_lo, tgt_prio);
2652  m0_composite_layer_add(layout, &subobj, tgt_prio);
2653 
2654  rc = layout_set(ctx->obj_id, layout);
2655  if (rc) {
2656  ERROR("layout_set() failed: rc=%d\n", rc);
2657  goto fini;
2658  }
2659  } else {
2660  /* else: merge with previous extents */
2661  rc = add_merge_read_extent(tgt_layer, match);
2662  if (rc) {
2663  ERROR("add_merge_read_extent() failed: rc=%d\n", rc);
2664  goto fini;
2665  }
2666  }
2667 
2668  release:
2669  /* 4: release source extent, if requested */
2670  if (ctx->flags & HSM_MOVE) {
2671  struct release_cb_ctx rls_ctx = {0};
2672  bool dummy;
2673 
2674  rls_ctx.obj_id = ctx->obj_id;
2675  rls_ctx.tier = ctx->src_tier;
2676  /* max gen is its gen */
2677  rls_ctx.max_gen = gen;
2678  rls_ctx.max_tier = ctx->src_tier;
2679 
2680  VERB("Releasing extent [%#" PRIx64 "-%#" PRIx64 "] in tier %d\n",
2681  match->off, match->off + match->len - 1,
2682  ctx->src_tier);
2683  /* Release needs to operate with an updated target layer,
2684  * else it will not allow to release source extent. */
2685  if (tgt_layer != NULL)
2686  extent_list_free(&tgt_layer->ccr_rd_exts);
2687  rc = release_cb(&rls_ctx, layout, src_layer, match, &dummy);
2688  if (rc) {
2689  ERROR("release_cb() failed: rc=%d\n", rc);
2690  goto fini;
2691  }
2692  }
2693 
2694  /* Drop previous version on target tier.
2695  * If gen is 0, no previous version is supposed to exist. */
2696  if (!(ctx->flags & HSM_KEEP_OLD_VERS) && gen > 0) {
2697  VERB("Releasing extents [%#" PRIx64 "-%#" PRIx64 "] in tier %d, with gen <= %d\n",
2698  match->off, match->off + match->len - 1,
2699  ctx->tgt_tier, gen - 1);
2700  rc = m0hsm_release_maxgen(ctx->obj_id, ctx->tgt_tier,
2701  gen - 1, match->off, match->len, 0,
2702  false);
2703  if (rc) {
2704  ERROR("m0hsm_release_maxgen() failed: rc=%d\n", rc);
2705  goto fini;
2706  }
2707  }
2708  fini:
2709  if (tgt_layer == NULL)
2710  m0_entity_fini(&subobj.ob_entity);
2711 
2712  RETURN(rc);
2713 }
2714 
2715 int m0hsm_copy(struct m0_uint128 id, uint8_t src_tier, uint8_t tgt_tier,
2716  off_t offset, size_t len, enum hsm_cp_flags flags)
2717 {
2718  struct m0_client_layout *layout = NULL;
2719  struct copy_cb_ctx ctx = {0};
2720  struct extent ext;
2721  int rc;
2722 
2723  ENTRY;
2724 
2725  /* get layout once for all, it will be useful in next steps */
2726  rc = layout_get(id, &layout);
2727  if (rc)
2728  RETURN(rc);
2729  M0_ASSERT(layout != NULL);
2730 
2731  /* prepare context to be passed to callback functions */
2732  ctx.obj_id = id;
2733  ctx.src_tier = src_tier;
2734  ctx.tgt_tier = tgt_tier;
2735  ctx.flags = flags;
2736 
2737  /* Check there is matching data in the source tier and get
2738  * the corresponding layer */
2739  ext.off = offset;
2740  ext.len = len;
2741 
2742  rc = match_layer_foreach(layout, src_tier, &ext, copy_cb, &ctx, false);
2743  if (rc == 0 && ctx.found == 0)
2744  ERROR("No matching extent found\n");
2745  RETURN(rc);
2746 }
2747 
2751 static int stage_cb(void *cb_arg, struct m0_client_layout *layout,
2752  struct m0_composite_layer *src_layer,
2753  const struct extent *match,
2754  bool *stop)
2755 {
2756  struct copy_cb_ctx *ctx = cb_arg;
2757  uint8_t tier;
2758  int rc;
2759  ENTRY;
2760 
2761  /* check if the tier is > target tier */
2762  tier = hsm_prio2tier(src_layer->ccr_priority);
2763  if (tier <= ctx->tgt_tier)
2764  /* skip it */
2765  return 0;
2766 
2767  /* set source tier for copy_cb */
2768  ctx->src_tier = tier;
2769  rc = copy_cb(cb_arg, layout, src_layer, match, stop);
2770  RETURN(rc);
2771 }
2772 
2773 int m0hsm_stage(struct m0_uint128 id, uint8_t tgt_tier,
2774  off_t offset, size_t length, enum hsm_cp_flags flags)
2775 {
2776  struct m0_client_layout *layout = NULL;
2777  struct copy_cb_ctx ctx = {0};
2778  struct extent ext;
2779  int rc;
2780 
2781  /* prepare context to be passed to callback functions */
2782  ctx.obj_id = id;
2783  ctx.src_tier = HSM_ANY_TIER;
2784  ctx.tgt_tier = tgt_tier;
2785  ctx.flags = flags;
2786 
2787  ENTRY;
2788 
2789  /* get layout once for all, it will be useful in next steps */
2790  rc = layout_get(id, &layout);
2791  if (rc)
2792  RETURN(rc);
2793  M0_ASSERT(layout != NULL);
2794 
2795  ext.off = offset;
2796  ext.len = length;
2797 
2798  /* for each layer > target_tier, move the data to the target tier */
2799  rc = match_layer_foreach(layout, HSM_ANY_TIER, &ext, stage_cb,
2800  &ctx, false);
2801 
2802  if (rc == 0 && ctx.found == 0)
2803  ERROR("No matching extent found\n");
2804  RETURN(rc);
2805 }
2806 
2810 static int archive_cb(void *cb_arg, struct m0_client_layout *layout,
2811  struct m0_composite_layer *src_layer,
2812  const struct extent *match, bool *stop)
2813 {
2814  struct copy_cb_ctx *ctx = cb_arg;
2815  uint8_t tier;
2816  int rc;
2817  ENTRY;
2818 
2819  /* check if the tier is < target tier */
2820  tier = hsm_prio2tier(src_layer->ccr_priority);
2821  if (tier >= ctx->tgt_tier)
2822  /* skip it */
2823  return 0;
2824 
2825  /* set source tier for copy_cb */
2826  ctx->src_tier = tier;
2827  rc = copy_cb(cb_arg, layout, src_layer, match, stop);
2828  RETURN(rc);
2829 }
2830 
2831 int m0hsm_archive(struct m0_uint128 id, uint8_t tgt_tier,
2832  off_t offset, size_t length, enum hsm_cp_flags flags)
2833 {
2834  struct m0_client_layout *layout = NULL;
2835  struct copy_cb_ctx ctx = {0};
2836  struct extent ext;
2837  int rc;
2838 
2839  /* prepare context to be passed to callback functions */
2840  ctx.obj_id = id;
2841  ctx.src_tier = HSM_ANY_TIER;
2842  ctx.tgt_tier = tgt_tier;
2843  ctx.flags = flags;
2844 
2845  ENTRY;
2846 
2847  /* get layout once for all, it will be useful in next steps */
2848  rc = layout_get(id, &layout);
2849  if (rc)
2850  RETURN(rc);
2851  M0_ASSERT(layout != NULL);
2852 
2853  ext.off = offset;
2854  ext.len = length;
2855 
2856  /* for each layer > target_tier, move the data to the target tier */
2857  rc = match_layer_foreach(layout, HSM_ANY_TIER, &ext, archive_cb,
2858  &ctx, false);
2859 
2860  if (rc == 0 && ctx.found == 0)
2861  ERROR("No matching extent found\n");
2862  RETURN(rc);
2863 }
2864 
2865 
2866 /*
2867  * Local variables:
2868  * c-indentation-style: "K&R"
2869  * c-basic-offset: 8
2870  * tab-width: 8
2871  * fill-column: 80
2872  * scroll-step: 1
2873  * End:
2874  */
M0_INTERNAL int m0_uint128_cmp(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:45
static int match_layer_foreach(struct m0_client_layout *layout, uint8_t tier, const struct extent *ext, match_layer_cb_t cb, void *cb_arg, bool stop_on_error)
Definition: m0hsm_api.c:2228
int m0hsm_archive(struct m0_uint128 id, uint8_t tgt_tier, off_t offset, size_t length, enum hsm_cp_flags flags)
Definition: m0hsm_api.c:2831
uint8_t tier
Definition: m0hsm_api.c:2387
uint64_t id
Definition: cob.h:2380
static struct m0_addb2_philter p
Definition: consumer.c:40
static struct m0_be_active_record_domain dummy
Definition: active_record.c:35
size_t curr_bsize
Definition: m0hsm_api.c:1641
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
Definition: client.h:835
void m0_entity_fini(struct m0_entity *entity)
Definition: client.c:438
struct m0_uint128 obj_id
Definition: m0hsm_api.c:2386
static void layout_top_prio(struct m0_client_layout *layout, int32_t *max_gen, struct m0_uint128 *max_gen_id, uint8_t *top_tier)
Definition: m0hsm_api.c:1042
Definition: client.h:788
M0_INTERNAL int m0_indexvec_alloc(struct m0_indexvec *ivec, uint32_t len)
Definition: vec.c:532
static int prepare_io_ctx(struct io_ctx *ctx, int blocks, size_t block_size, bool alloc_io_buff)
Definition: m0hsm_api.c:1709
static enum ext_match_code ext_match(struct m0_uint128 layer_id, struct m0_tl *ext_list, const struct extent *ext_in, struct extent *match, enum ext_match_type mode, bool is_write, bool del_prev)
Definition: m0hsm_api.c:1267
static int check_min_gen_exists(struct m0_client_layout *layout, const struct extent *ext, int gen, struct m0_composite_layer *except_layer)
Definition: m0hsm_api.c:2358
int const char const void size_t int flags
Definition: dir.c:328
static int top_layer_add_read_extent(struct m0_obj *obj, const struct extent *ext)
Definition: m0hsm_api.c:1537
#define NULL
Definition: misc.h:38
uint64_t get_optimal_bs(struct m0_obj *obj, uint64_t obj_sz)
Definition: m0hsm_api.c:1834
static struct m0_addb2_mach * m
Definition: consumer.c:38
Definition: idx_mock.c:52
int m0hsm_stage(struct m0_uint128 id, uint8_t tgt_tier, off_t offset, size_t length, enum hsm_cp_flags flags)
Definition: m0hsm_api.c:2773
char name[MAX_LEN]
Definition: m0hsm_api.c:98
Definition: beck.c:170
static bool x
Definition: sm.c:168
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
Definition: ub.c:78
void m0_op_fini(struct m0_op *op)
Definition: client.c:847
M0_INTERNAL struct m0_pool_version * m0_pool_version_find(struct m0_pools_common *pc, const struct m0_fid *id)
Definition: pool.c:586
static uint32_t hsm_prio(uint32_t generation, uint8_t tier_idx)
Definition: m0hsm_api.c:221
M0_INTERNAL bool m0_uint128_eq(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:39
struct m0_indexvec ext
Definition: m0hsm_api.c:1642
uint32_t pa_N
Definition: pdclust.h:104
string release
Definition: conf.py:42
static int read_extent_keys(struct m0_uint128 subobjid, struct m0_bufvec *keys, struct m0_bufvec *vals, int *rc_list, struct m0_tl *ext_list, struct m0_composite_layer_idx_key *last_key)
Definition: m0hsm_api.c:642
FILE * log_stream
Definition: m0hsm_api.h:56
static int obj_layout_set(struct m0_obj *obj, struct m0_client_layout *layout)
Definition: m0hsm_api.c:549
#define MAX_BLOCK_COUNT
Definition: m0hsm_api.c:48
static int layer_load_extent_list(struct m0_uint128 subobjid, bool write, struct m0_tl *ext_list)
Definition: m0hsm_api.c:723
struct m0hsm_options options
Definition: m0hsm_api.c:56
static void reset_bufvec(struct m0_bufvec *keys, int count)
Definition: m0hsm_api.c:705
int m0hsm_multi_release(struct m0_uint128 id, uint8_t max_tier, off_t offset, size_t length, enum hsm_rls_flags flags)
Definition: m0hsm_api.c:2499
enum hsm_cp_flags flags
Definition: m0hsm_api.c:2534
static uint32_t hsm_prio2gen(uint32_t priority)
Definition: m0hsm_api.c:252
static int check_vec(struct m0_bufvec *vec, int blocks)
Definition: m0hsm_api.c:1758
static int do_io_op(struct m0_obj *obj, enum m0_obj_opcode opcode, struct io_ctx *ctx)
Definition: m0hsm_api.c:1648
static int write_blocks(struct m0_obj *obj, struct io_ctx *ctx)
Definition: m0hsm_api.c:1675
uint32_t pa_K
Definition: pdclust.h:107
enum hsm_log_level trace_level
Definition: m0hsm_api.h:52
struct m0_vec ov_vec
Definition: vec.h:147
static struct m0_realm uber_realm
Definition: m0hsm.c:49
int32_t m0_rc(const struct m0_op *op)
Definition: client.c:943
static uint8_t hsm_prio2tier(uint32_t priority)
Definition: m0hsm_api.c:260
struct m0_bufvec data
Definition: di.c:40
hsm_cp_flags
Definition: m0hsm_api.h:111
enum m0_md_lustre_logrec_type __attribute__
Definition: balloc.c:2745
uint64_t m0_client_layout_id(const struct m0_client *instance)
Definition: obj.c:859
uint64_t u_lo
Definition: types.h:58
M0_INTERNAL void m0_indexvec_free(struct m0_indexvec *ivec)
Definition: vec.c:553
static int layout_layer_clean(struct m0_uint128 parent_id, struct m0_client_layout *layout, struct m0_uint128 subobj_id)
Definition: m0hsm_api.c:1133
ext_match_code
Definition: m0hsm_api.c:1250
#define M0_BITS(...)
Definition: misc.h:236
uint32_t pa_S
Definition: pdclust.h:110
#define HSM_ID_MASK_HI
Definition: m0hsm_api.c:205
void m0_idx_fini(struct m0_idx *idx)
Definition: idx.c:643
static int map_io_ctx(struct io_ctx *ctx, int blocks, size_t b_size, off_t offset, char *buff)
Definition: m0hsm_api.c:1775
#define M0_SET0(obj)
Definition: misc.h:64
off_t off
Definition: m0hsm_api.c:82
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
Definition: fid.c:106
static int layer_extent_add(struct m0_uint128 subobjid, const struct extent *ext, bool write, bool overwrite)
Definition: m0hsm_api.c:904
int curr_blocks
Definition: m0hsm_api.c:1640
void ** ov_buf
Definition: vec.h:149
void m0_composite_layer_idx_val_from_buf(struct m0_composite_layer_idx_val *val, void *vbuf)
static struct foo * obj
Definition: tlist.c:302
#define PRIx64
Definition: types.h:61
Definition: sock.c:887
static int hsm_pools_fids_set(struct param p[], int n)
Definition: m0hsm_api.c:147
static m0_bcount_t count
Definition: xcode.c:167
#define INFO(_fmt,...)
Definition: m0hsm_api.c:68
static struct m0_fid hsm_pools[MAX_POOLS]
Definition: m0hsm_api.c:103
#define m0_tl_endfor
Definition: tlist.h:700
static int delete_obj_set_parent_layout(struct m0_uint128 id, struct m0_uint128 parent_id, struct m0_client_layout *parent_layout)
Definition: m0hsm_api.c:433
M0_INTERNAL int m0_bufvec_alloc(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size)
Definition: vec.c:220
static struct m0_composite_layer * layer_get_by_prio(struct m0_client_layout *layout, int prio)
Definition: m0hsm_api.c:2294
int32_t m0_op_wait(struct m0_op *op, uint64_t bits, m0_time_t to)
Definition: client.c:739
int m0_idx_op(struct m0_idx *idx, enum m0_idx_opcode opcode, struct m0_bufvec *keys, struct m0_bufvec *vals, int32_t *rcs, uint32_t flags, struct m0_op **op)
Definition: idx.c:554
static void free_io_ctx(struct io_ctx *ctx, bool alloc_io_buff)
Definition: m0hsm_api.c:1811
static const struct m0_uint128 * tgt_id(const struct m0_dtm_history *history)
Definition: transmit.c:202
int m0hsm_dump(FILE *stream, struct m0_uint128 id, bool details)
Definition: m0hsm_api.c:875
int m0_obj_op(struct m0_obj *obj, enum m0_obj_opcode opcode, struct m0_indexvec *ext, struct m0_bufvec *data, struct m0_bufvec *attr, uint64_t mask, uint32_t flags, struct m0_op **op)
Definition: io.c:717
static struct m0_realm * m0_uber_realm
Definition: m0hsm_api.c:63
M0_INTERNAL void m0_bufvec_free(struct m0_bufvec *bufvec)
Definition: vec.c:395
struct m0_entity in_entity
Definition: client.h:836
static struct param hsm_rc_params[128]
Definition: m0hsm_api.c:102
static const struct extent EXT_FULLRANGE
Definition: m0hsm_api.c:87
uint8_t src_tier
Definition: m0hsm_api.c:2532
int opcode
Definition: crate.c:301
int m0_obj_layout_id_to_unit_size(uint64_t layout_id)
Definition: obj.c:851
int i
Definition: dir.c:1033
ext_match_type
Definition: m0hsm_api.c:1259
#define PRIu64
Definition: types.h:58
static int layer_clean(struct m0_uint128 parent_id, struct m0_client_layout *layout, struct m0_composite_layer *layer)
Definition: m0hsm_api.c:1077
Definition: client.h:641
#define UINT8_MAX
Definition: types.h:35
#define MIN(_x, _y)
Definition: m0hsm_api.c:1265
return M0_ERR(-EOPNOTSUPP)
struct m0_client_layout * m0_client_layout_alloc(enum m0_client_layout_type type)
Definition: layout.c:473
#define ERROR(_fmt,...)
Definition: m0hsm_api.c:66
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
void m0_composite_layer_idx_key_from_buf(struct m0_composite_layer_idx_key *key, void *kbuf)
static int obj_layout_get(struct m0_obj *obj, struct m0_client_layout **layout)
Definition: m0hsm_api.c:500
#define m0_free0(pptr)
Definition: memory.h:77
static void * vec
Definition: xcode.c:168
void m0_client_layout_free(struct m0_client_layout *layout)
Definition: layout.c:504
#define M0_ASSERT(cond)
static int layout_add_top_layer(struct m0_uint128 id, struct m0_client_layout *layout, uint8_t tier)
Definition: m0hsm_api.c:1155
int m0_composite_layer_idx(struct m0_uint128 layer_id, bool write, struct m0_idx *idx)
static int hsm_pool_fid_set(struct param *p)
Definition: m0hsm_api.c:127
time_t op_timeout
Definition: m0hsm_api.h:54
struct m0_fid pver
Definition: idx_dix.c:74
int m0_composite_layer_idx_key_to_buf(struct m0_composite_layer_idx_key *key, void **out_kbuf, m0_bcount_t *out_klen)
static int read_blocks(struct m0_obj *obj, struct io_ctx *ctx)
Definition: m0hsm_api.c:1681
Definition: tlist.h:251
uint8_t tgt_tier
Definition: m0hsm_api.c:2533
uint64_t b_size
Definition: md_fops.h:403
struct m0_client_layout ccl_layout
Definition: layout.h:157
#define HSM_ANY_TIER
Definition: m0hsm_api.c:202
static struct m0_uint128 hsm_subobj_id(struct m0_uint128 id, uint32_t gen, uint8_t tier)
Definition: m0hsm_api.c:237
void m0_op_launch(struct m0_op **op, uint32_t nr)
Definition: client.c:725
int m0hsm_test_write(struct m0_uint128 id, off_t offset, size_t len, int seed)
Definition: m0hsm_api.c:1875
uint64_t u_hi
Definition: types.h:57
int m0hsm_pwrite(struct m0_obj *obj, void *buffer, size_t length, off_t offset)
Definition: m0hsm_api.c:1968
static int ext_subtract(struct m0_uint128 layer_id, struct m0_tl *ext_list, const struct extent *ext_in, bool is_write, bool *layer_empty)
Definition: m0hsm_api.c:1390
uint64_t u_hi
Definition: types.h:36
void m0_composite_layer_del(struct m0_client_layout *layout, struct m0_uint128 subobj_id)
struct m0_uint128 obj_id
Definition: m0hsm_api.c:2531
static int stop
Definition: rwlock.c:56
int m0hsm_copy(struct m0_uint128 id, uint8_t src_tier, uint8_t tgt_tier, off_t offset, size_t len, enum hsm_cp_flags flags)
Definition: m0hsm_api.c:2715
static int struct dentry int mode
Definition: dir.c:589
struct m0_bufvec data
Definition: m0hsm_api.c:1643
struct m0_uint128 ce_id
Definition: layout.h:132
uint32_t v_nr
Definition: vec.h:51
const struct extent * orig_extent
Definition: m0hsm_api.c:2315
static m0_bindex_t offset
Definition: dump.c:173
static int archive_cb(void *cb_arg, struct m0_client_layout *layout, struct m0_composite_layer *src_layer, const struct extent *match, bool *stop)
Definition: m0hsm_api.c:2810
static void print_layer(FILE *stream, struct m0_composite_layer *layer, bool details)
Definition: m0hsm_api.c:813
m0_bcount_t * v_count
Definition: vec.h:53
static void print_extents(FILE *stream, const struct m0_tl *ext_list, bool details)
Definition: m0hsm_api.c:789
static int get_next_extents(struct m0_idx *idx, struct m0_bufvec *keys, struct m0_bufvec *vals, int *rc_list, int32_t flags)
Definition: m0hsm_api.c:613
M0_INTERNAL int m0_fid_sscanf(const char *s, struct m0_fid *fid)
Definition: fid.c:227
static void dump_data(struct m0_bufvec *data, size_t bsize)
Definition: m0hsm_api.c:1695
uint64_t ccr_lid
Definition: layout.h:142
static struct m0_fid * hsm_tier2pool(uint8_t tier_idx)
Definition: m0hsm_api.c:269
struct m0_uint128 en_id
Definition: client.h:708
#define MAX(_x, _y)
Definition: m0hsm_api.c:1264
#define FID_P(f)
Definition: fid.h:77
static int m0hsm_release_maxgen(struct m0_uint128 id, uint8_t tier, int max_gen, off_t offset, size_t len, enum hsm_rls_flags flags, bool user_display)
Definition: m0hsm_api.c:2462
#define PRId64
Definition: types.h:57
static uint64_t roundup_power2(uint64_t x)
Definition: m0hsm_api.c:1821
static struct m0_client * m0_instance
Definition: m0hsm_api.c:62
#define VERB(_fmt,...)
Definition: m0hsm_api.c:70
static struct m0_pool pool
Definition: iter_ut.c:58
static int copy_cb(void *cb_arg, struct m0_client_layout *layout, struct m0_composite_layer *src_layer, const struct extent *match, bool *stop)
Definition: m0hsm_api.c:2540
struct m0_uint128 cek_layer_id
Definition: client.h:842
int m0_entity_create(struct m0_fid *pool, struct m0_entity *entity, struct m0_op **op)
Definition: obj.c:801
static int open_entity(struct m0_entity *entity)
Definition: m0hsm_api.c:277
static int check_top_layer_writable(struct m0_client_layout *layout, int max_prio, int tier)
Definition: m0hsm_api.c:2192
static int copy_extent_data(struct m0_uint128 src_id, struct m0_uint128 tgt_id, const struct extent *range)
Definition: m0hsm_api.c:2107
struct m0_tl ccl_layers
Definition: layout.h:159
struct m0_uint128 ccr_subobj
Definition: layout.h:141
uint64_t n
Definition: fops.h:107
static int add_merge_read_extent(struct m0_composite_layer *layer, const struct extent *ext_in)
Definition: m0hsm_api.c:1487
static int min_gen_check_cb(void *cb_arg, struct m0_client_layout *layout, struct m0_composite_layer *layer, const struct extent *match, bool *stop)
Definition: m0hsm_api.c:2319
Definition: fid.h:38
static int layout_get(struct m0_uint128 id, struct m0_client_layout **layout)
Definition: m0hsm_api.c:527
#define DBG(_fmt,...)
Definition: m0hsm_api.c:72
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
int m0hsm_init(struct m0_client *instance, struct m0_realm *uber_realm, const struct m0hsm_options *in_options)
Definition: m0hsm_api.c:166
int(* match_layer_cb_t)(void *cb_arg, struct m0_client_layout *layout, struct m0_composite_layer *layer, const struct extent *match, bool *stop)
Definition: m0hsm_api.c:2218
int m0_composite_layer_add(struct m0_client_layout *layout, struct m0_obj *sub_obj, int priority)
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
static void extent_list_free(struct m0_tl *ext_list)
Definition: m0hsm_api.c:696
struct m0_bufvec attr
Definition: m0hsm_api.c:1644
int m0hsm_create(struct m0_uint128 id, struct m0_obj *obj, uint8_t tier_idx, bool keep_open)
Definition: m0hsm_api.c:1571
Definition: addb2.c:200
uint32_t pa_P
Definition: pdclust.h:115
struct m0_entity ob_entity
Definition: client.h:789
static int layer_check_clean(struct m0_uint128 parent_id, struct m0_client_layout *layout, struct m0_composite_layer *layer)
Definition: m0hsm_api.c:1100
static int start(struct m0_fom *fom)
Definition: trigger_fom.c:321
int m0_client_layout_op(struct m0_obj *obj, enum m0_entity_opcode opcode, struct m0_client_layout *layout, struct m0_op **op)
Definition: layout.c:436
static int create_obj_with_layout(struct m0_uint128 id, struct m0_obj *obj, struct m0_client_layout *layout, bool close_entity)
Definition: m0hsm_api.c:386
m0_obj_opcode
Definition: client.h:537
static int stage_cb(void *cb_arg, struct m0_client_layout *layout, struct m0_composite_layer *src_layer, const struct extent *match, bool *stop)
Definition: m0hsm_api.c:2751
int m0hsm_release(struct m0_uint128 id, uint8_t tier, off_t offset, size_t len, enum hsm_rls_flags flags)
Definition: m0hsm_api.c:2493
m0_bcount_t ce_len
Definition: layout.h:134
M0_TL_DEFINE(clayer, static, struct m0_composite_layer)
static void print_layout(FILE *stream, const struct m0_client_layout *layout, bool details)
Definition: m0hsm_api.c:850
struct m0_tl ccr_wr_exts
Definition: layout.h:146
void m0_obj_init(struct m0_obj *obj, struct m0_realm *parent, const struct m0_uint128 *id, uint64_t layout_id)
Definition: client.c:403
#define RETURN(_rc)
Definition: m0hsm_api.c:76
static struct m0 instance
Definition: main.c:78
static const struct m0_uint128 * src_id(const struct m0_dtm_history *history)
Definition: transmit.c:162
#define HSM_EXTENT_SCAN_BATCH
Definition: m0hsm_api.c:53
int m0hsm_set_write_tier(struct m0_uint128 id, uint8_t tier_idx)
Definition: m0hsm_api.c:1224
int fini(struct workload *w)
static int read_params(FILE *in, struct param *p, int max_params)
Definition: m0hsm_api.c:105
static bool is_hsm_reserved(struct m0_uint128 id)
Definition: m0hsm_api.c:206
m0_bindex_t ce_off
Definition: layout.h:133
#define ENTRY
Definition: m0hsm_api.c:75
struct m0_tl ccr_rd_exts
Definition: layout.h:145
int m0_entity_delete(struct m0_entity *entity, struct m0_op **op)
Definition: obj.c:824
FILE * rcfile
Definition: m0hsm_api.h:58
Definition: nucleus.c:42
static int release_cb(void *cb_arg, struct m0_client_layout *layout, struct m0_composite_layer *layer, const struct extent *match, bool *stop)
Definition: m0hsm_api.c:2392
static int create_obj(struct m0_uint128 id, struct m0_obj *obj, bool close_entity, uint8_t tier_idx)
Definition: m0hsm_api.c:301
void m0_op_free(struct m0_op *op)
Definition: client.c:885
int m0hsm_test_read(struct m0_uint128 id, off_t offset, size_t len)
Definition: m0hsm_api.c:2045
int m0_entity_open(struct m0_entity *entity, struct m0_op **op)
Definition: obj.c:885
char value[MAX_LEN]
Definition: m0hsm_api.c:99
static int delete_obj(struct m0_uint128 id) __attribute__((unused))
Definition: m0hsm_api.c:356
struct m0_fom_ops ops
Definition: io_foms.c:623
uint64_t u_lo
Definition: types.h:37
M0_TL_DESCR_DEFINE(clayer, "composite layout layers", static, struct m0_composite_layer, ccr_tlink, ccr_tlink_magic, M0_CLAYER_TL_MAGIC, M0_CLAYER_TL_MAGIC)
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
M0_INTERNAL void m0_bufvec_free2(struct m0_bufvec *bufvec)
Definition: vec.c:401
size_t len
Definition: m0hsm_api.c:83
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
int m0_composite_layer_idx_val_to_buf(struct m0_composite_layer_idx_val *val, void **out_vbuf, m0_bcount_t *out_vlen)
static void gen_data(struct m0_bufvec *data, size_t bsize, int seed)
Definition: m0hsm_api.c:1687
static int layer_extent_del(struct m0_uint128 subobjid, off_t off, bool write)
Definition: m0hsm_api.c:980
#define FID_F
Definition: fid.h:75
Definition: vec.h:145
int const char void * buffer
Definition: dir.c:435
static int layout_set(struct m0_uint128 id, struct m0_client_layout *layout)
Definition: m0hsm_api.c:573
M0_INTERNAL int m0_bufvec_empty_alloc(struct m0_bufvec *bufvec, uint32_t num_segs)
Definition: vec.c:213
hsm_rls_flags
Definition: m0hsm_api.h:146
Definition: idx_mock.c:47
struct m0_uint128 except_subobj
Definition: m0hsm_api.c:2314
static struct m0_composite_layer * top_layer_get(struct m0_client_layout *layout)
Definition: m0hsm_api.c:1213