Motr  M0
fol.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/arith.h" /* M0_3WAY */
24 #include "lib/memory.h"
25 #include "lib/errno.h"
26 #include "lib/misc.h" /* M0_SET0 */
27 #include "lib/vec.h"
28 #include "motr/magic.h"
29 #include "rpc/rpc_opcodes.h"
30 #include "fol/fol.h"
31 #include "fol/fol_private.h"
32 #include "fol/fol_xc.h" /* m0_xc_fol_init */
33 #include "fop/fop.h" /* m0_fop_fol_frag_type */
34 #include "fop/fop_xc.h" /* m0_fop_fol_frag_xc */
35 
36 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FOL
37 #include "lib/trace.h"
38 
59 M0_TL_DESCR_DEFINE(m0_rec_frag, "fol record fragment", M0_INTERNAL,
60  struct m0_fol_frag, rp_link, rp_magic,
62 M0_TL_DEFINE(m0_rec_frag, M0_INTERNAL, struct m0_fol_frag);
63 
64 #define FRAG_HEADER_XCODE_OBJ(ptr) \
65  M0_XCODE_OBJ(m0_fol_frag_header_xc, ptr)
66 
67 #define FRAG_XCODE_OBJ(r) (struct m0_xcode_obj) { \
68  .xo_type = frag->rp_ops != NULL ? \
69  frag->rp_ops->rpo_type->rpt_xt : \
70  m0_fop_fol_frag_type.rpt_xt, \
71  .xo_ptr = r->rp_data \
72 }
73 
74 /* ------------------------------------------------------------------
75  * LSN
76  * ------------------------------------------------------------------ */
77 
78 static size_t fol_rec_header_pack_size(struct m0_fol_rec_header *h);
79 
80 /* ------------------------------------------------------------------
81  * m0_fol operations
82  *------------------------------------------------------------------*/
83 
84 M0_INTERNAL void m0_fol_init(struct m0_fol *fol)
85 {
86  m0_mutex_init(&fol->f_lock);
87 }
88 
89 M0_INTERNAL void m0_fol_fini(struct m0_fol *fol)
90 {
91  m0_mutex_fini(&fol->f_lock);
92 }
93 
94 /*------------------------------------------------------------------
95  * FOL records and their fragments
96  *------------------------------------------------------------------*/
97 
98 M0_INTERNAL void m0_fol_rec_init(struct m0_fol_rec *rec, struct m0_fol *fol)
99 {
100  m0_rec_frag_tlist_init(&rec->fr_frags);
101  rec->fr_fol = fol;
102 }
103 
104 M0_INTERNAL void m0_fol_rec_fini(struct m0_fol_rec *rec)
105 {
106  struct m0_fol_frag *frag;
107 
108  m0_tl_for(m0_rec_frag, &rec->fr_frags, frag) {
109  m0_fol_frag_fini(frag);
110  } m0_tl_endfor;
111  m0_rec_frag_tlist_fini(&rec->fr_frags);
112 }
113 
114 M0_INTERNAL bool m0_fol_rec_invariant(const struct m0_fol_rec *rec)
115 {
116  const struct m0_fol_rec_header *h = &rec->fr_header;
117 
118  return h->rh_magic == M0_FOL_REC_MAGIC;
119 }
120 
121 M0_INTERNAL void m0_fol_frag_init(struct m0_fol_frag *frag, void *data,
122  const struct m0_fol_frag_type *type)
123 {
124  M0_PRE(frag != NULL);
125  M0_PRE(type != NULL && type->rpt_ops != NULL);
126 
127  frag->rp_data = data;
128  type->rpt_ops->rpto_rec_frag_init(frag);
129  m0_rec_frag_tlink_init(frag);
130 }
131 
132 M0_INTERNAL void m0_fol_frag_fini(struct m0_fol_frag *frag)
133 {
134  M0_PRE(frag != NULL);
135  M0_PRE(frag->rp_ops != NULL);
136  M0_PRE(frag->rp_data != NULL);
137 
138  if (m0_rec_frag_tlink_is_in(frag))
139  m0_rec_frag_tlist_del(frag);
140  m0_rec_frag_tlink_fini(frag);
141 
142  if (frag->rp_flag == M0_XCODE_DECODE) {
144  m0_free(frag);
145  } else {
146  if (frag->rp_ops->rpo_type == &m0_fop_fol_frag_type) {
147  m0_free(frag->rp_data);
148  m0_free(frag);
149  } else
151  }
152 }
153 
154 /* ------------------------------------------------------------------
155  * Record fragment types
156  * ------------------------------------------------------------------ */
157 
158 enum {
161 };
162 
164 static struct m0_mutex rptypes_lock;
165 
166 M0_INTERNAL int m0_fols_init(void)
167 {
169  return 0;
170 }
171 
172 M0_INTERNAL void m0_fols_fini(void)
173 {
175 }
176 
177 M0_INTERNAL int
179 {
180  int result;
181  static uint32_t index = PART_TYPE_START_INDEX;
182 
183  M0_PRE(type != NULL);
184  M0_PRE(type->rpt_xt != NULL && type->rpt_ops != NULL);
185  M0_PRE(type->rpt_index == 0);
186 
188  if (IS_IN_ARRAY(index, rptypes)) {
190  rptypes[index] = type;
191  type->rpt_index = index;
192  ++index;
193  result = 0;
194  } else
195  result = -EFBIG;
197  return result;
198 }
199 
200 M0_INTERNAL void
202 {
203  M0_PRE(type != NULL);
204 
206  M0_PRE(IS_IN_ARRAY(type->rpt_index, rptypes));
207  M0_PRE(rptypes[type->rpt_index] == type ||
208  rptypes[type->rpt_index] == NULL);
209 
210  rptypes[type->rpt_index] = NULL;
212  type->rpt_index = 0;
213  type->rpt_xt = NULL;
214  type->rpt_ops = NULL;
215 }
216 
217 static const struct m0_fol_frag_type *
219 {
221  return rptypes[index];
222 }
223 
224 /* ------------------------------------------------------------------
225  * Record encoding/decoding
226  * ------------------------------------------------------------------ */
227 
229 {
230  struct m0_xcode_ctx ctx;
232 
233  M0_POST(len > 0);
234  return len;
235 }
236 
237 static size_t fol_record_pack_size(struct m0_fol_rec *rec)
238 {
239  struct m0_fol_rec_header *h = &rec->fr_header;
240  const struct m0_fol_frag *frag;
241  struct m0_fol_frag_header rph;
242  struct m0_xcode_ctx ctx;
243  size_t len;
244 
245  len = fol_rec_header_pack_size(h) +
246  h->rh_frags_nr *
248 
249  m0_tl_for(m0_rec_frag, &rec->fr_frags, frag) {
250  len += m0_xcode_data_size(&ctx, &FRAG_XCODE_OBJ(frag));
251  } m0_tl_endfor;
252 
253  len = m0_align(len, 8);
254  M0_POST(len <= FOL_REC_MAXSIZE);
255  return len;
256 }
257 
258 static int fol_rec_encdec(struct m0_fol_rec *rec,
259  struct m0_bufvec_cursor *cur,
260  enum m0_xcode_what what)
261 {
262  struct m0_fol_rec_header *h = &rec->fr_header;
263  int rc;
264 
266 
268  if (rc != 0)
269  return M0_RC(rc);
270 
272  return 0;
273 }
274 
275 static int fol_record_pack(struct m0_fol_rec *rec, struct m0_buf *buf)
276 {
277  struct m0_fol_frag *frag;
278  m0_bcount_t len = buf->b_nob;
279  struct m0_bufvec bvec = M0_BUFVEC_INIT_BUF(&buf->b_addr, &len);
280  struct m0_bufvec_cursor cur;
281  int rc;
282 
284 
286  if (rc != 0)
287  return M0_RC(rc);
288 
289  m0_tl_for(m0_rec_frag, &rec->fr_frags, frag) {
290  struct m0_fol_frag_header rph;
291  uint32_t index;
292 
293  index = frag->rp_ops != NULL ?
294  frag->rp_ops->rpo_type->rpt_index :
296 
297  rph = (struct m0_fol_frag_header) {
298  .rph_index = index,
299  .rph_magic = M0_FOL_FRAG_MAGIC
300  };
301 
303  &cur, M0_XCODE_ENCODE) ?:
305  &cur, M0_XCODE_ENCODE);
306  if (rc != 0)
307  return M0_RC(rc);
308  } m0_tl_endfor;
309  buf->b_nob = m0_bufvec_cursor_addr(&cur) - buf->b_addr;
310 
311  return M0_RC(rc);
312 }
313 
314 M0_INTERNAL int m0_fol_rec_encode(struct m0_fol_rec *rec, struct m0_buf *at)
315 {
316  struct m0_fol_rec_header *h = &rec->fr_header;
317  size_t size;
318 
320  h->rh_frags_nr = m0_rec_frag_tlist_length(&rec->fr_frags);
321 
322  size = fol_record_pack_size(rec);
324  M0_ASSERT(size <= at->b_nob);
325 
326  h->rh_data_len = size;
327 
328  return fol_record_pack(rec, at);
329 }
330 
331 M0_INTERNAL int m0_fol_rec_decode(struct m0_fol_rec *rec, struct m0_buf *at)
332 {
333  struct m0_bufvec bvec = M0_BUFVEC_INIT_BUF(&at->b_addr,
334  &at->b_nob);
335  struct m0_bufvec_cursor cur;
336  uint32_t i;
337  int rc;
338 
340 
342  if (rc != 0)
343  return M0_RC(rc);
344 
345  for (i = 0; rc == 0 && i < rec->fr_header.rh_frags_nr; ++i) {
346  struct m0_fol_frag *frag;
347  const struct m0_fol_frag_type *frag_type;
348  struct m0_fol_frag_header ph;
349 
352  if (rc == 0) {
353  void *rp_data;
354 
355  frag_type = fol_frag_type_lookup(ph.rph_index);
356 
357  M0_ALLOC_PTR(frag);
358  if (frag == NULL)
359  return M0_ERR(-ENOMEM);
360 
361  rp_data = m0_alloc(frag_type->rpt_xt->xct_sizeof);
362  if (rp_data == NULL) {
363  m0_free(frag);
364  return M0_ERR(-ENOMEM);
365  }
366 
367  frag->rp_flag = M0_XCODE_DECODE;
368 
369  m0_fol_frag_init(frag, rp_data, frag_type);
372  if (rc == 0)
373  m0_fol_frag_add(rec, frag);
374  }
375  }
376  return M0_RC(rc);
377 }
378 
379 /* @todo Return number of bytes that needed to print everything */
380 int m0_fol_rec_to_str(struct m0_fol_rec *rec, char *str, int str_len)
381 {
382  int ret;
383  struct m0_fol_rec_header *h = &rec->fr_header;
384 
385  ret = snprintf(str, str_len, "rec->fr_header->rh_frags_nr: %u\n"
386  "rec->fr_header->rh_data_len: %u\n"
387  "rec->fr_header->rh_self->ui_node: %u\n"
388  "rec->fr_header->rh_self->ui_update: %" PRIu64 "\n"
389  "rec->fr_epoch: %p\n"
390  "rec->fr_sibling: %p\n",
392  h->rh_self.ui_update, rec->fr_epoch, rec->fr_sibling);
393 
394  if (ret >= str_len) {
395  return -ENOMEM;
396  }
397  else {
398  struct m0_fol_frag *frag;
399  str_len -= ret;
400  str += ret;
401 
402  m0_tl_for(m0_rec_frag, &rec->fr_frags, frag) {
403  uint32_t index;
404 
405  index = frag->rp_ops != NULL ?
406  frag->rp_ops->rpo_type->rpt_index :
408 
409  ret = snprintf(str, str_len,
410  "frag->rp_ops: %p\n"
411  "frag->rp_ops->rpo_type: %p\n"
412  "frag->rp_ops->rpo_type->rpt_index: %" PRIu32
413  "\n"
414  "frag->rp_ops->rpo_type->rpt_name: %s\n"
415  "frag->rp_data: %p\n",
416  frag->rp_ops, frag->rp_ops->rpo_type, index,
417  frag->rp_ops ?
418  frag->rp_ops->rpo_type->rpt_name : "(nil)",
419  frag->rp_data);
420 
421  if (ret >= str_len) {
422  return -ENOMEM;
423  }
424  else {
425  str_len -= ret;
426  str += ret;
427  }
428 
429  if (frag->rp_ops &&
430  frag->rp_ops->rpo_type == &m0_fop_fol_frag_type) {
431  struct m0_fop_fol_frag *rp = frag->rp_data;
432  struct m0_xcode_obj obj = {
433  .xo_type = m0_fop_fol_frag_xc,
434  .xo_ptr = rp
435  };
436 
437  ret = snprintf(str, str_len,
438  "rp->ffrp_fop_code: %u\n"
439  "rp->ffrp_rep_code: %u\n",
440  rp->ffrp_fop_code,
441  rp->ffrp_rep_code);
442 
443  if (ret >= str_len) {
444  return -ENOMEM;
445  }
446  else {
447  str_len -= ret;
448  str += ret;
449  }
450 
451  ret = m0_xcode_print(&obj, str, str_len);
452 
453  if (ret < 0) {
454  return ret;
455  } else if (ret >= str_len) {
456  return -ENOMEM;
457  } else {
458  str_len -= ret;
459  str += ret;
460  }
461  }
462  } m0_tl_endfor;
463  }
464 
465  return 0;
466 }
467 
468 M0_INTERNAL void m0_fol_frag_add(struct m0_fol_rec *rec,
469  struct m0_fol_frag *frag)
470 {
471  M0_PRE(rec != NULL && frag != NULL);
472  m0_rec_frag_tlist_add_tail(&rec->fr_frags, frag);
473 }
474 
475 #undef M0_TRACE_SUBSYSTEM
476 
479 /*
480  * Local variables:
481  * c-indentation-style: "K&R"
482  * c-basic-offset: 8
483  * tab-width: 8
484  * fill-column: 80
485  * scroll-step: 1
486  * End:
487  */
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
Definition: vec.h:165
uint32_t ffrp_rep_code
Definition: fop.h:356
struct m0_fol * fr_fol
Definition: fol.h:190
M0_INTERNAL int m0_xcode_encdec(struct m0_xcode_obj *obj, struct m0_bufvec_cursor *cur, enum m0_xcode_what what)
Definition: xcode.c:416
Definition: fol.h:140
M0_INTERNAL int m0_xcode_print(const struct m0_xcode_obj *obj, char *str, int nr)
Definition: string.c:278
uint32_t ui_node
Definition: dtm_update.h:33
#define M0_REC_HEADER_XCODE_OBJ(ptr)
Definition: fol_private.h:28
#define M0_PRE(cond)
M0_INTERNAL void m0_fol_fini(struct m0_fol *fol)
Definition: fol.c:89
struct m0_fol_rec_header fr_header
Definition: fol.h:192
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
#define NULL
Definition: misc.h:38
static const struct m0_fol_frag_type * rptypes[FOL_FRAG_TYPE_MAX]
Definition: fol.c:163
#define FRAG_HEADER_XCODE_OBJ(ptr)
Definition: fol.c:64
M0_INTERNAL void m0_fol_rec_fini(struct m0_fol_rec *rec)
Definition: fol.c:104
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
#define ergo(a, b)
Definition: misc.h:293
const struct m0_fol_frag_ops * rp_ops
Definition: fol.h:244
M0_INTERNAL int m0_fol_frag_type_register(struct m0_fol_frag_type *type)
Definition: fol.c:178
M0_INTERNAL void m0_fol_frag_init(struct m0_fol_frag *frag, void *data, const struct m0_fol_frag_type *type)
Definition: fol.c:121
static int fol_record_pack(struct m0_fol_rec *rec, struct m0_buf *buf)
Definition: fol.c:275
struct m0_bufvec data
Definition: di.c:40
struct m0_mutex f_lock
Definition: fol.h:142
struct m0_tl fr_frags
Definition: fol.h:201
struct m0_update_id rh_self
Definition: fol.h:170
M0_INTERNAL void m0_xcode_free_obj(struct m0_xcode_obj *obj)
Definition: xcode.c:248
#define FRAG_XCODE_OBJ(r)
Definition: fol.c:67
uint32_t rpt_index
Definition: fol.h:262
M0_INTERNAL void * m0_bufvec_cursor_addr(struct m0_bufvec_cursor *cur)
Definition: vec.c:597
uint64_t m0_bcount_t
Definition: types.h:77
void * rp_data
Definition: fol.h:249
m0_xcode_what
Definition: xcode.h:647
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
const char * rpt_name
Definition: fol.h:263
static struct foo * obj
Definition: tlist.c:302
struct m0_fol_frag_type m0_fop_fol_frag_type
Definition: sock.c:887
const struct m0_fol_frag_type * rpo_type
Definition: fol.h:283
M0_INTERNAL void m0_fols_fini(void)
Definition: fol.c:172
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
Definition: buf.h:37
M0_INTERNAL void m0_fol_rec_init(struct m0_fol_rec *rec, struct m0_fol *fol)
Definition: fol.c:98
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
return M0_ERR(-EOPNOTSUPP)
int m0_fol_rec_to_str(struct m0_fol_rec *rec, char *str, int str_len)
Definition: fol.c:380
uint64_t ui_update
Definition: dtm_update.h:34
const struct m0_xcode_type * rpt_xt
Definition: fol.h:269
#define M0_ASSERT(cond)
M0_TL_DEFINE(m0_rec_frag, M0_INTERNAL, struct m0_fol_frag)
uint64_t rh_magic
Definition: fol.h:178
static int fol_rec_encdec(struct m0_fol_rec *rec, struct m0_bufvec_cursor *cur, enum m0_xcode_what what)
Definition: fol.c:258
static struct m0_bufvec bvec
Definition: xcode.c:169
M0_INTERNAL void m0_fol_init(struct m0_fol *fol)
Definition: fol.c:84
static size_t fol_record_pack_size(struct m0_fol_rec *rec)
Definition: fol.c:237
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
void * m0_alloc(size_t size)
Definition: memory.c:126
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
#define M0_POST(cond)
struct m0_epoch_id * fr_epoch
Definition: fol.h:194
M0_INTERNAL bool m0_fol_rec_invariant(const struct m0_fol_rec *rec)
Definition: fol.c:114
M0_INTERNAL int m0_xcode_data_size(struct m0_xcode_ctx *ctx, const struct m0_xcode_obj *obj)
Definition: xcode.c:437
static bool at(struct ff2c_context *ctx, char c)
Definition: lex.c:77
struct m0_fol_update_ref * fr_sibling
Definition: fol.h:196
#define PRIu32
Definition: types.h:66
M0_INTERNAL void m0_fol_frag_fini(struct m0_fol_frag *frag)
Definition: fol.c:132
static struct m0_mutex rptypes_lock
Definition: fol.c:164
M0_INTERNAL int m0_fols_init(void)
Definition: fol.c:166
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
uint32_t rh_frags_nr
Definition: fol.h:158
enum m0_xcode_what rp_flag
Definition: fol.h:258
uint32_t rph_index
Definition: fol.h:293
M0_INTERNAL void m0_fol_frag_add(struct m0_fol_rec *rec, struct m0_fol_frag *frag)
Definition: fol.c:468
static const struct m0_fol_frag_type * fol_frag_type_lookup(uint32_t index)
Definition: fol.c:218
m0_bcount_t size
Definition: di.c:39
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
#define IS_IN_ARRAY(idx, array)
Definition: misc.h:311
uint32_t rh_data_len
Definition: fol.h:164
size_t xct_sizeof
Definition: xcode.h:341
M0_INTERNAL void m0_fol_frag_type_deregister(struct m0_fol_frag_type *type)
Definition: fol.c:201
Definition: nucleus.c:42
#define M0_IS_8ALIGNED(val)
Definition: arith.h:190
int type
Definition: dir.c:1031
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
int32_t rc
Definition: trigger_fop.h:47
static uint64_t m0_align(uint64_t val, uint64_t alignment)
Definition: arith.h:170
M0_TL_DESCR_DEFINE(m0_rec_frag, "fol record fragment", M0_INTERNAL, struct m0_fol_frag, rp_link, rp_magic, M0_FOL_FRAG_LINK_MAGIC, M0_FOL_FRAG_HEAD_MAGIC)
Definition: vec.h:145
M0_INTERNAL int m0_fol_rec_encode(struct m0_fol_rec *rec, struct m0_buf *at)
Definition: fol.c:314
static size_t fol_rec_header_pack_size(struct m0_fol_rec_header *h)
Definition: fol.c:228
m0_bcount_t b_nob
Definition: buf.h:230
uint32_t ffrp_fop_code
Definition: fop.h:354
M0_INTERNAL int m0_fol_rec_decode(struct m0_fol_rec *rec, struct m0_buf *at)
Definition: fol.c:331