Motr  M0
sit.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
36 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_ADDB
37 
38 #include "lib/trace.h"
39 #include "lib/chan.h"
40 #include "lib/types.h"
41 #include "lib/assert.h"
42 #include "lib/arith.h" /* min_check */
43 #include "lib/memory.h"
44 #include "lib/errno.h" /* EPROTO, EAGAIN */
45 #include "stob/stob.h"
46 #include "stob/io.h"
47 #include "motr/magic.h"
48 #include "xcode/xcode.h" /* m0_xcode_encdec */
49 #include "addb2/addb2.h"
50 #include "addb2/consumer.h"
51 #include "addb2/internal.h"
52 #include "addb2/identifier.h" /* M0_AVI_SIT */
53 #include "addb2/storage.h"
54 #include "addb2/storage_xc.h"
55 
59 struct m0_addb2_sit {
60  struct m0_stob *s_stob;
66  unsigned s_bshift;
71  uint64_t s_payload[9];
75  char *s_buf;
80  uint64_t *s_trace_ptr;
85  bool s_fired;
86 };
87 
88 static bool header_is_valid(const struct m0_addb2_sit *it,
89  const struct m0_addb2_frame_header *h);
90 static int it_init(struct m0_addb2_sit *it,
92 static int it_alloc(struct m0_addb2_sit *it, struct m0_stob *stob);
93 static void it_free(struct m0_addb2_sit *it);
94 static int it_next(struct m0_addb2_sit *it, struct m0_addb2_record **out);
95 static void it_rec (struct m0_addb2_sit *it, struct m0_addb2_record **out);
96 static int it_load(struct m0_addb2_sit *it);
97 static int it_read(const struct m0_addb2_sit *it, void *buf,
99 static bool it_rounded(const struct m0_addb2_sit *it, m0_bindex_t index);
100 static bool it_is_in(const struct m0_addb2_sit *it, m0_bindex_t index);
101 static bool it_invariant(const struct m0_addb2_sit *it);
102 static void it_trace_set(struct m0_addb2_sit *it);
103 static int header_read(struct m0_addb2_sit *it, struct m0_addb2_frame_header *h,
105 static m0_bindex_t header_next(const struct m0_addb2_sit *it,
106  const struct m0_addb2_frame_header *h);
107 
109  struct m0_stob *stob, m0_bindex_t start)
110 {
111  struct m0_addb2_frame_header h;
112  struct m0_addb2_sit *it;
113  int result;
114 
115  M0_ALLOC_PTR(it);
116  if (it != NULL) {
117  result = it_alloc(it, stob);
118  if (result == 0) {
119  result = m0_addb2_storage_header(stob, &h);
120  if (result == 0) {
121  it->s_size = h.he_stob_size;
122  m0_addb2_source_init(&it->s_src);
123  result = it_init(it, &h, start);
124  if (result != 0)
126  }
127  } else
128  it_free(it);
129  } else {
130  m0_free(it);
131  result = M0_ERR(-ENOMEM);
132  }
133  if (result == 0)
134  *out = it;
135  M0_POST(ergo(result == 0, it_invariant(*out)));
136  return result;
137 }
138 
140 {
141  if (it->s_cursor.cu_trace != NULL)
142  m0_addb2_cursor_fini(&it->s_cursor);
143  m0_addb2_source_fini(&it->s_src);
144  it_free(it);
145  m0_free(it);
146 }
147 
149 {
150  int result;
151 
153 
154  if (!it->s_fired) {
155  it_rec(it, out);
156  it->s_fired = true;
157  return +1;
158  }
159  *out = NULL;
160  result = m0_addb2_cursor_next(&it->s_cursor);
161  if (result > 0) {
162  *out = &it->s_cursor.cu_rec;
163  } else if (result == 0) {
164  m0_addb2_cursor_fini(&it->s_cursor);
165  result = it_next(it, out);
166  }
167  if (*out != NULL)
168  m0_addb2_consume(&it->s_src, *out);
169  M0_POST(ergo(result >= 0, it_invariant(it)));
170  return M0_RC(result);
171 }
172 
174 {
176  return &it->s_src;
177 }
178 
179 M0_INTERNAL int m0_addb2_storage_header(struct m0_stob *stob,
180  struct m0_addb2_frame_header *h)
181 {
182  struct m0_addb2_sit it = {};
183  int result;
184 
185  result = it_alloc(&it, stob);
186  if (result == 0) {
187  result = header_read(&it, h, 0);
188  it_free(&it);
189  } else
190  result = M0_ERR(-ENOMEM);
191  return result;
192 }
193 
198 static int it_init(struct m0_addb2_sit *it,
200 {
202  uint64_t last_frame_end;
203  int result;
204 
205  if (start != 0) {
206  result = header_read(it, h, start);
207  } else {
208  /* Search forward for the last frame on the stob. */
209  while (1) {
210  result = header_read(it, &header, h->he_offset + h->he_size);
211  if (result != 0 || header.he_seqno != h->he_seqno + 1)
212  break;
213  *h = header;
214  }
215 
216  last_frame_end = h->he_offset + h->he_size;
217  /* Search backward */
218  while (1) {
219  result = header_read(it, &header, h->he_prev_offset);
220  if (result != 0 || header.he_seqno != h->he_seqno - 1)
221  break;
222  *h = header;
223  if (header.he_offset >= last_frame_end &&
224  header.he_prev_offset < last_frame_end)
225  break; /* Found the oldest frame. */
226  }
227  }
228  if (result == 0) {
229  it->s_current = *h;
230  result = it_load(it);
231  }
232  M0_POST(ergo(result >= 0, it_invariant(it)));
233  return M0_RC(result);
234 }
235 
236 static int it_alloc(struct m0_addb2_sit *it, struct m0_stob *stob)
237 {
238  void *buf;
239  int result;
240  uint32_t shift = m0_stob_block_shift(stob);
241 
243  if (buf != NULL) {
244  m0_stob_get(stob);
245  it->s_stob = stob;
246  it->s_bshift = shift;
247  it->s_bsize = M0_BITS(it->s_bshift);
248  it->s_buf = buf;
249  result = 0;
250  } else
251  result = M0_ERR(-ENOMEM);
252  return result;
253 }
254 
255 static void it_free(struct m0_addb2_sit *it)
256 {
257  m0_stob_put(it->s_stob);
258  m0_free_aligned(it->s_buf, FRAME_SIZE_MAX, it->s_bshift);
259 }
260 
264 static int header_read(struct m0_addb2_sit *it, struct m0_addb2_frame_header *h,
266 {
267  int result;
268 
269  M0_ASSERT(sizeof *h <= it->s_bsize);
270 
271  result = it_read(it, it->s_buf, offset, it->s_bsize);
272  if (result == 0) {
273  void *addr = it->s_buf;
275  &it->s_bsize);
276  struct m0_bufvec_cursor cur;
277 
278  M0_SET0(h);
280  result = m0_xcode_encdec(HEADER_XO(h), &cur, M0_XCODE_DECODE) ?:
281  header_is_valid(it, h) &&
282  (offset == 0 || h->he_offset == offset) ?
283  0 : -EPROTO;
284  }
285  return M0_RC(result);
286 }
287 
292 static int it_next(struct m0_addb2_sit *it, struct m0_addb2_record **out)
293 {
294  struct m0_addb2_frame_header *h = &it->s_current;
296  int result;
297 
299 
300  if (it->s_trace_idx + 1 < h->he_trace_nr) {
301  /*
302  * If still within the current frame, go to the next trace.
303  */
304  ++ it->s_trace_idx;
305  it->s_trace_ptr += it->s_trace.tr_nr + 1;
306  it_trace_set(it);
307  result = +1;
308  } else {
309  next.he_offset = header_next(it, h);
310  /*
311  * Go to the next frame.
312  */
313  result = header_read(it, &next, next.he_offset);
314  if (result == 0 && next.he_seqno == h->he_seqno + 1) {
315  *h = next;
316  result = it_load(it);
317  if (result == 0)
318  result = +1;
319  } else /* Cannot read the next header, stop iteration. */
320  result = 0;
321  }
322  if (result == +1)
323  it_rec(it, out);
324  M0_POST(ergo(result >= 0, it_invariant(it)));
325  return M0_RC(result);
326 }
327 
332 static void it_rec(struct m0_addb2_sit *it, struct m0_addb2_record **out)
333 {
334  struct m0_addb2_frame_header *h = &it->s_current;
335  int i = 0;
336 
337  it->s_payload[i++] = h->he_seqno;
338  it->s_payload[i++] = h->he_offset;
339  it->s_payload[i++] = h->he_prev_offset;
340  it->s_payload[i++] = header_next(it, h);
341  it->s_payload[i++] = h->he_size;
342  it->s_payload[i++] = it->s_trace_idx;
343  it->s_payload[i++] = h->he_trace_nr;
344  it->s_payload[i++] = h->he_fid.f_container;
345  it->s_payload[i++] = h->he_fid.f_key;
346  M0_ASSERT(i == ARRAY_SIZE(it->s_payload));
347  it->s_rec = (struct m0_addb2_record) {
348  .ar_val = {
349  .va_id = M0_AVI_SIT,
350  .va_time = h->he_time,
351  .va_nr = ARRAY_SIZE(it->s_payload),
352  .va_data = it->s_payload
353  },
354  .ar_label_nr = 0
355  };
356  *out = &it->s_rec;
357 }
358 
362 static int it_load(struct m0_addb2_sit *it)
363 {
364  struct m0_addb2_frame_header *h = &it->s_current;
365  int result;
366 
367  result = it_read(it, it->s_buf, h->he_offset, h->he_size);
368  if (result == 0) {
370 
371  /*
372  * Re-read the header to guard against concurrent writes by
373  * m0_addb2_storage.
374  */
375  result = header_read(it, &copy, h->he_offset) ?:
376  memcmp(h, &copy, sizeof *h) == 0 ? 0 : M0_ERR(-EAGAIN);
377  if (result == 0) {
378  it->s_trace_ptr = ((void *)it->s_buf) + sizeof *h;
379  it_trace_set(it);
380  it->s_trace_idx = 0;
381  }
382  }
383  M0_POST(ergo(result >= 0, it_invariant(it)));
384  return M0_RC(result);
385 }
386 
388 static m0_bindex_t header_next(const struct m0_addb2_sit *it,
389  const struct m0_addb2_frame_header *h)
390 {
392 
393  offset = h->he_offset + h->he_size;
394  if (it->s_size - offset < FRAME_SIZE_MAX)
395  offset = BSIZE;
396  return offset;
397 }
398 
399 static bool it_rounded(const struct m0_addb2_sit *it, m0_bindex_t index)
400 {
401  return m0_round_up(index, it->s_bsize) == index;
402 }
403 
404 static bool it_is_in(const struct m0_addb2_sit *it, m0_bindex_t index)
405 {
406  return ergo(it->s_size != 0, index < it->s_size);
407 }
408 
409 static bool header_is_valid(const struct m0_addb2_sit *it,
410  const struct m0_addb2_frame_header *h)
411 {
412  return it_rounded(it, h->he_offset) &&
413  it_rounded(it, h->he_size) &&
415  it_is_in(it, h->he_offset) &&
416  it_is_in(it, h->he_prev_offset) &&
417  it_is_in(it, h->he_offset + h->he_size - 1) &&
419  h->he_trace_nr > 0 &&
421  h->he_size > 0 &&
423 }
424 
425 static bool it_invariant(const struct m0_addb2_sit *it)
426 {
427  const struct m0_addb2_frame_header *h = &it->s_current;
428  char *body = (char *)it->s_trace.tr_body;
429 
430  return _0C(header_is_valid(it, h)) &&
431  _0C(it->s_trace_idx < h->he_trace_nr) &&
432  _0C(ergo(body != NULL,
433  it->s_buf <= body && body < it->s_buf + h->he_size));
434 }
435 
439 static int it_read(const struct m0_addb2_sit *it, void *buf,
441 {
442  int result;
443 
446  M0_PRE(it_rounded(it, (uint64_t)buf));
447 
448  buf = m0_stob_addr_pack(buf, it->s_bshift);
449  count >>= it->s_bshift;
450  offset >>= it->s_bshift;
451 
452  result = m0_stob_io_bufvec_launch(it->s_stob,
454  SIO_READ, offset);
455 
456  return M0_RC(result);
457 }
458 
462 static void it_trace_set(struct m0_addb2_sit *it)
463 {
464  M0_CASSERT(sizeof it->s_trace.tr_nr == sizeof *it->s_trace_ptr);
465 
466  it->s_trace.tr_nr = *it->s_trace_ptr;
467  it->s_trace.tr_body = it->s_trace_ptr + 1;
468  M0_SET0(&it->s_cursor);
469  m0_addb2_cursor_init(&it->s_cursor, &it->s_trace);
470 }
471 
472 #undef M0_TRACE_SUBSYSTEM
473 
476 /*
477  * Local variables:
478  * c-indentation-style: "K&R"
479  * c-basic-offset: 8
480  * tab-width: 8
481  * fill-column: 80
482  * scroll-step: 1
483  * End:
484  */
485 /*
486  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
487  */
uint8_t * s_buf
Definition: string.h:100
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
Definition: vec.h:165
M0_INTERNAL int m0_xcode_encdec(struct m0_xcode_obj *obj, struct m0_bufvec_cursor *cur, enum m0_xcode_what what)
Definition: xcode.c:416
#define M0_PRE(cond)
M0_INTERNAL int m0_stob_io_bufvec_launch(struct m0_stob *stob, struct m0_bufvec *bufvec, int op_code, m0_bindex_t offset)
Definition: io.c:248
uint64_t he_prev_offset
Definition: storage.h:135
#define NULL
Definition: misc.h:38
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
#define ergo(a, b)
Definition: misc.h:293
unsigned s_bshift
Definition: sit.c:66
uint64_t he_stob_size
Definition: storage.h:139
static void it_free(struct m0_addb2_sit *it)
Definition: sit.c:255
struct m0_fid he_fid
Definition: storage.h:141
static int it_alloc(struct m0_addb2_sit *it, struct m0_stob *stob)
Definition: sit.c:236
#define M0_CASSERT(cond)
Definition: internal.h:78
static struct m0t1fs_fsync_interactions copy
Definition: fsync.c:75
static m0_bindex_t header_next(const struct m0_addb2_sit *it, const struct m0_addb2_frame_header *h)
Definition: sit.c:388
int m0_addb2_cursor_next(struct m0_addb2_cursor *cur)
Definition: addb2.c:704
void m0_addb2_cursor_fini(struct m0_addb2_cursor *cur)
Definition: addb2.c:699
static struct m0_be_emap_cursor it
Definition: extmap.c:46
m0_bindex_t s_trace_idx
Definition: sit.c:84
M0_INTERNAL void m0_free_aligned(void *data, size_t size, unsigned shift)
Definition: memory.c:192
static bool it_is_in(const struct m0_addb2_sit *it, m0_bindex_t index)
Definition: sit.c:404
uint64_t m0_bindex_t
Definition: types.h:80
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
static int void * buf
Definition: dir.c:1019
#define M0_SET0(obj)
Definition: misc.h:64
static int it_load(struct m0_addb2_sit *it)
Definition: sit.c:362
static int it_read(const struct m0_addb2_sit *it, void *buf, m0_bindex_t offset, m0_bcount_t count)
Definition: sit.c:439
static bool it_rounded(const struct m0_addb2_sit *it, m0_bindex_t index)
Definition: sit.c:399
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
M0_INTERNAL uint64_t m0_round_up(uint64_t val, uint64_t size)
Definition: misc.c:181
struct m0_fop_cob * body
Definition: dir.c:1436
return M0_RC(rc)
M0_INTERNAL uint32_t m0_stob_block_shift(struct m0_stob *stob)
Definition: stob.c:270
int i
Definition: dir.c:1033
return M0_ERR(-EOPNOTSUPP)
struct m0_addb2_frame_header s_current
Definition: sit.c:65
static void it_trace_set(struct m0_addb2_sit *it)
Definition: sit.c:462
struct m0_addb2_source * m0_addb2_sit_source(struct m0_addb2_sit *it)
Definition: sit.c:173
Definition: stob.h:163
static struct m0_stob * stob
Definition: storage.c:39
#define M0_ASSERT(cond)
uint64_t he_offset
Definition: storage.h:134
struct m0_addb2_source s_src
Definition: sit.c:69
static int next[]
Definition: cp.c:248
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
static int header_read(struct m0_addb2_sit *it, struct m0_addb2_frame_header *h, m0_bindex_t offset)
Definition: sit.c:264
M0_INTERNAL void * m0_stob_addr_pack(const void *buf, uint32_t shift)
Definition: io.c:293
m0_bcount_t s_size
Definition: sit.c:61
int m0_addb2_sit_init(struct m0_addb2_sit **out, struct m0_stob *stob, m0_bindex_t start)
Definition: sit.c:108
m0_bcount_t s_bsize
Definition: sit.c:67
void m0_addb2_consume(struct m0_addb2_source *src, const struct m0_addb2_record *rec)
Definition: consumer.c:143
static bool header_is_valid(const struct m0_addb2_sit *it, const struct m0_addb2_frame_header *h)
Definition: sit.c:409
uint64_t s_payload[9]
Definition: sit.c:71
uint64_t f_container
Definition: fid.h:39
#define M0_POST(cond)
Definition: xcode.h:73
static m0_bindex_t offset
Definition: dump.c:173
void m0_addb2_cursor_init(struct m0_addb2_cursor *cur, const struct m0_addb2_trace *trace)
Definition: addb2.c:690
static int it_init(struct m0_addb2_sit *it, struct m0_addb2_frame_header *h, m0_bindex_t start)
Definition: sit.c:198
uint32_t he_trace_nr
Definition: storage.h:136
static void it_rec(struct m0_addb2_sit *it, struct m0_addb2_record **out)
Definition: sit.c:332
bool s_fired
Definition: sit.c:85
uint64_t f_key
Definition: fid.h:40
int m0_addb2_sit_next(struct m0_addb2_sit *it, struct m0_addb2_record **out)
Definition: sit.c:148
struct m0_addb2_record s_rec
Definition: sit.c:70
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_addb2_trace s_trace
Definition: sit.c:76
#define _0C(exp)
Definition: assert.h:311
static int start(struct m0_fom *fom)
Definition: trigger_fom.c:321
static bool it_invariant(const struct m0_addb2_sit *it)
Definition: sit.c:425
static int it_next(struct m0_addb2_sit *it, struct m0_addb2_record **out)
Definition: sit.c:292
char * s_buf
Definition: sit.c:75
struct m0_addb2_cursor s_cursor
Definition: sit.c:68
void m0_addb2_source_fini(struct m0_addb2_source *src)
Definition: consumer.c:60
#define out(...)
Definition: gen.c:41
M0_INTERNAL void * m0_alloc_aligned(size_t size, unsigned shift)
Definition: memory.c:168
Definition: io.h:229
#define HEADER_XO(h)
Definition: internal.h:34
uint64_t * s_trace_ptr
Definition: sit.c:80
void m0_free(void *data)
Definition: memory.c:146
struct m0_stob * s_stob
Definition: sit.c:60
void m0_addb2_source_init(struct m0_addb2_source *src)
Definition: consumer.c:55
M0_INTERNAL void m0_stob_get(struct m0_stob *stob)
Definition: stob.c:275
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_stob_put(struct m0_stob *stob)
Definition: stob.c:291
Definition: vec.h:145
M0_INTERNAL int m0_addb2_storage_header(struct m0_stob *stob, struct m0_addb2_frame_header *h)
Definition: sit.c:179
void m0_addb2_sit_fini(struct m0_addb2_sit *it)
Definition: sit.c:139