Motr  M0
io.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE
24 #include "lib/trace.h"
25 
26 #include "be/io.h"
27 
28 #include <unistd.h> /* fdatasync */
29 
30 #include "lib/memory.h" /* m0_alloc */
31 #include "lib/errno.h" /* ENOMEM */
32 #include "lib/ext.h" /* m0_ext_are_overlapping */
33 #include "lib/locality.h" /* m0_locality0_get */
34 
35 #include "stob/io.h" /* m0_stob_iovec_sort */
36 #include "stob/stob.h" /* m0_stob_fd */
37 
38 #include "be/op.h" /* m0_be_op_active */
39 #include "be/ha.h" /* m0_be_io_err_send */
40 
47 static bool be_io_cb(struct m0_clink *link);
48 
50 {
51  return (m0_bindex_t)m0_stob_addr_pack((void *)offset, bshift);
52 }
53 
54 static m0_bcount_t be_io_size_pack(m0_bcount_t size, uint32_t bshift)
55 {
56  return (m0_bcount_t)m0_stob_addr_pack((void *)size, bshift);
57 }
58 
60 {
61  return (m0_bcount_t)m0_stob_addr_open((void *)size, bshift);
62 }
63 
64 static void be_io_free(struct m0_be_io *bio)
65 {
70  m0_free(bio->bio_part);
71 }
72 
73 static void be_io_part_init(struct m0_be_io_part *bip,
74  struct m0_be_io *bio)
75 {
76  bip->bip_bio = bio;
77  m0_stob_io_init(&bip->bip_sio);
80 }
81 
82 static void be_io_part_fini(struct m0_be_io_part *bip)
83 {
85  m0_clink_fini(&bip->bip_clink);
86  m0_stob_io_fini(&bip->bip_sio);
87 }
88 
89 static bool be_io_part_invariant(struct m0_be_io_part *bip)
90 {
91  struct m0_stob_io *sio = &bip->bip_sio;
92 
93  return _0C(bip->bip_bio != NULL) &&
94  _0C(sio->si_user.ov_vec.v_nr == sio->si_stob.iv_vec.v_nr) &&
95  _0C(m0_vec_count(&sio->si_user.ov_vec) ==
96  m0_vec_count(&sio->si_stob.iv_vec));
97 }
98 
99 static int be_io_part_launch(struct m0_be_io_part *bip)
100 {
101  struct m0_stob_io *sio = &bip->bip_sio;
102  int rc;
103 
104  M0_ENTRY("sio=%p stob=%p stob_fid="FID_F" "
105  "si_user=(count=%"PRIu32" size=%llu) "
106  "si_stob=(count=%"PRIu32" size=%llu)",
107  sio, bip->bip_stob, FID_P(m0_stob_fid_get(bip->bip_stob)),
108  sio->si_user.ov_vec.v_nr,
109  (1ULL << bip->bip_bshift) *
110  (unsigned long long)m0_vec_count(&sio->si_user.ov_vec),
111  sio->si_stob.iv_vec.v_nr,
112  (1ULL << bip->bip_bshift) *
113  (unsigned long long)m0_vec_count(&sio->si_stob.iv_vec));
114 
116  return M0_RC(rc);
117 }
118 
119 static void be_io_part_reset(struct m0_be_io_part *bip)
120 {
121  struct m0_stob_io *sio = &bip->bip_sio;
122 
123  sio->si_user.ov_vec.v_nr = 0;
124  sio->si_stob.iv_vec.v_nr = 0;
125  sio->si_obj = NULL;
126 
127  bip->bip_stob = NULL;
128  bip->bip_bshift = 0;
129  bip->bip_rc = 0;
130 }
131 
132 static bool be_io_part_add(struct m0_be_io_part *bip,
133  void *ptr_user,
134  m0_bindex_t offset_stob,
136 {
137  struct m0_stob_io *sio = &bip->bip_sio;
138  uint32_t bshift = bip->bip_bshift;
139  void **u_buf = sio->si_user.ov_buf;
140  struct m0_vec *u_vec = &sio->si_user.ov_vec;
141  m0_bindex_t *s_offs = sio->si_stob.iv_index;
142  struct m0_vec *s_vec = &sio->si_stob.iv_vec;
143  uint32_t nr = u_vec->v_nr;
144  void *ptr_user_packed;
145  m0_bindex_t offset_stob_packed;
146  m0_bcount_t size_packed;
147  bool added;
148 
149  M0_PRE(u_vec->v_nr == s_vec->v_nr);
150 
151  ptr_user_packed = m0_stob_addr_pack(ptr_user, bshift);
152  offset_stob_packed = be_io_stob_offset_pack(offset_stob, bshift);
153  size_packed = be_io_size_pack(size, bshift);
154 
155  if (nr > 0 &&
156  u_buf [nr - 1] + u_vec->v_count[nr - 1] == ptr_user_packed &&
157  s_offs[nr - 1] + s_vec->v_count[nr - 1] == offset_stob_packed) {
158  /* optimization for sequential regions */
159  u_vec->v_count[nr - 1] += size_packed;
160  s_vec->v_count[nr - 1] += size_packed;
161  added = false;
162  } else {
163  u_buf[nr] = ptr_user_packed;
164  u_vec->v_count[nr] = size_packed;
165  s_offs[nr] = offset_stob_packed;
166  s_vec->v_count[nr] = size_packed;
167 
168  ++u_vec->v_nr;
169  ++s_vec->v_nr;
170  added = true;
171  }
172  return added;
173 }
174 
175 M0_INTERNAL int m0_be_io_init(struct m0_be_io *bio)
176 {
177  return 0;
178 }
179 
180 M0_INTERNAL int m0_be_io_allocate(struct m0_be_io *bio,
181  struct m0_be_io_credit *iocred)
182 {
183  struct m0_bufvec *bv = &bio->bio_bv_user;
184  struct m0_indexvec *iv = &bio->bio_iv_stob;
185  int rc;
186  size_t i;
187 
188  M0_ENTRY("bio=%p iocred="BE_IOCRED_F, bio, BE_IOCRED_P(iocred));
189 
190  bio->bio_iocred = *iocred;
191 
197 
198  if (bv->ov_vec.v_count == NULL ||
199  bv->ov_buf == NULL ||
200  iv->iv_vec.v_count == NULL ||
201  iv->iv_index == NULL ||
202  bio->bio_part == NULL) {
203  be_io_free(bio);
204  rc = -ENOMEM;
205  } else {
206  for (i = 0; i < bio->bio_iocred.bic_part_nr; ++i)
207  be_io_part_init(&bio->bio_part[i], bio);
208  m0_be_io_reset(bio);
209  rc = 0;
210  }
211  M0_POST(ergo(rc == 0, m0_be_io__invariant(bio)));
212  return M0_RC(rc);
213 }
214 
215 M0_INTERNAL void m0_be_io_deallocate(struct m0_be_io *bio)
216 {
217  unsigned i;
218 
219  for (i = 0; i < bio->bio_iocred.bic_part_nr; ++i)
220  be_io_part_fini(&bio->bio_part[i]);
221  be_io_free(bio);
222 }
223 
224 M0_INTERNAL void m0_be_io_fini(struct m0_be_io *bio)
225 {
226 }
227 
228 M0_INTERNAL bool m0_be_io__invariant(struct m0_be_io *bio)
229 {
230  struct m0_be_io_part *bip;
231  struct m0_stob_io *sio;
232  struct m0_bufvec *bv = &bio->bio_bv_user;
233  struct m0_indexvec *iv = &bio->bio_iv_stob;
234  m0_bcount_t count_total = 0;
235  uint32_t nr_total = 0;
236  unsigned i;
237  uint32_t pos = 0;
238 
239  for (i = 0; i < bio->bio_stob_nr; ++i) {
240  bip = &bio->bio_part[i];
241  sio = &bip->bip_sio;
242  nr_total += sio->si_user.ov_vec.v_nr;
243  count_total += m0_vec_count(&sio->si_user.ov_vec);
244  if (be_io_part_invariant(bip) &&
245  _0C(sio->si_user.ov_vec.v_count ==
246  &bv->ov_vec.v_count[pos]) &&
247  _0C(sio->si_user.ov_buf == &bv->ov_buf[pos]) &&
248  _0C(sio->si_stob.iv_vec.v_count ==
249  &iv->iv_vec.v_count[pos]) &&
250  _0C(sio->si_stob.iv_index == &iv->iv_index[pos])) {
251 
252  pos = nr_total;
253  continue;
254  }
255  return false;
256  }
257 
258  return _0C(bio != NULL) &&
259  _0C(m0_be_io_credit_le(&bio->bio_used, &bio->bio_iocred)) &&
261  bio->bio_stob_nr) &&
262  _0C(nr_total <= bio->bio_used.bic_reg_nr) &&
263  _0C(count_total <= bio->bio_used.bic_reg_size);
264 }
265 
266 static void be_io_vec_cut(struct m0_be_io *bio,
267  struct m0_be_io_part *bip)
268 {
269  struct m0_stob_io *sio = &bip->bip_sio;
270  struct m0_bufvec *bv = &bio->bio_bv_user;
271  struct m0_indexvec *iv = &bio->bio_iv_stob;
272  uint32_t pos = bio->bio_vec_pos;
273 
274  sio->si_user.ov_vec.v_count = &bv->ov_vec.v_count[pos];
275  sio->si_user.ov_buf = &bv->ov_buf[pos];
276  sio->si_stob.iv_vec.v_count = &iv->iv_vec.v_count[pos];
277  sio->si_stob.iv_index = &iv->iv_index[pos];
278 }
279 
280 M0_INTERNAL void m0_be_io_add(struct m0_be_io *bio,
281  struct m0_stob *stob,
282  void *ptr_user,
283  m0_bindex_t offset_stob,
285 {
286  struct m0_be_io_part *bip;
287  bool added;
288 
290  M0_PRE(bio->bio_used.bic_reg_size + size <=
291  bio->bio_iocred.bic_reg_size);
292  M0_PRE(bio->bio_used.bic_reg_nr + 1 <= bio->bio_iocred.bic_reg_nr);
293 
294  if (bio->bio_stob_nr == 0 ||
295  bio->bio_part[bio->bio_stob_nr - 1].bip_stob != stob) {
297  ++bio->bio_stob_nr;
298  bip = &bio->bio_part[bio->bio_stob_nr - 1];
299  bip->bip_stob = stob;
300  bip->bip_bshift = stob == NULL ? 0 : m0_stob_block_shift(stob);
301  be_io_vec_cut(bio, bip);
302  m0_be_io_credit_add(&bio->bio_used, &M0_BE_IO_CREDIT(0, 0, 1));
303  }
304  bip = &bio->bio_part[bio->bio_stob_nr - 1];
305  added = be_io_part_add(bip, ptr_user, offset_stob, size);
306  /* m0_be_io::bio_used is calculated for the worst-case */
308  bio->bio_vec_pos += added;
309 
311 }
312 
313 M0_INTERNAL void m0_be_io_add_nostob(struct m0_be_io *bio,
314  void *ptr_user,
315  m0_bindex_t offset_stob,
317 {
318  m0_be_io_add(bio, NULL, ptr_user, offset_stob, size);
319 }
320 
321 /*
322  * Inserts new element into m0_be_io's vectors at index+1 position.
323  * Space must be preallocated. Caller must fill new element with proper values.
324  */
325 static void be_io_vec_fork(struct m0_be_io *bio, unsigned index)
326 {
327  void **ov = bio->bio_bv_user.ov_buf;
328  m0_bindex_t *iv = bio->bio_iv_stob.iv_index;
329  m0_bcount_t *ov_count = bio->bio_bv_user.ov_vec.v_count;
330  m0_bcount_t *iv_count = bio->bio_iv_stob.iv_vec.v_count;
331  m0_bindex_t *index_ptr;
332  struct m0_be_io_part *bip;
333  uint32_t nr;
334  unsigned i;
335 
336  M0_PRE(index < bio->bio_vec_pos);
337 
338  m0_be_io_credit_add(&bio->bio_used, &M0_BE_IO_CREDIT(1, 0, 0));
340  index_ptr = &iv[index];
341 
342  /* Shift elements to the right by 1 position to make a window for new
343  * element.
344  */
345  for (i = bio->bio_vec_pos; i > index; --i) {
346  ov[i] = ov[i - 1];
347  iv[i] = iv[i - 1];
348  ov_count[i] = ov_count[i - 1];
349  iv_count[i] = iv_count[i - 1];
350  }
351 
352  /* Individual m0_be_io_part uses parts of m0_be_io's vectors. So shift
353  * pointers inside m0_be_io_part if respective parts are shifted
354  * previously.
355  * Also, we need to increase number of elements in the m0_be_io_part
356  * where new element is added.
357  */
358  for (i = 0; i < bio->bio_stob_nr; ++i) {
359  bip = &bio->bio_part[i];
360  nr = bip->bip_sio.si_stob.iv_vec.v_nr;
361  if (nr > 0 && index_ptr >= bip->bip_sio.si_stob.iv_index &&
362  index_ptr <= &bip->bip_sio.si_stob.iv_index[nr - 1]) {
363  ++bip->bip_sio.si_user.ov_vec.v_nr;
364  ++bip->bip_sio.si_stob.iv_vec.v_nr;
365  }
366  if (bip->bip_sio.si_stob.iv_index > index_ptr) {
367  ++bip->bip_sio.si_user.ov_buf;
368  ++bip->bip_sio.si_user.ov_vec.v_count;
369  ++bip->bip_sio.si_stob.iv_index;
370  ++bip->bip_sio.si_stob.iv_vec.v_count;
371  }
372  }
373  ++bio->bio_vec_pos;
374 }
375 
376 static unsigned be_io_vec_index_by_ptr(struct m0_be_io *bio,
377  m0_bindex_t *iv_ptr)
378 {
379  m0_bindex_t *iv = bio->bio_iv_stob.iv_index;
380  unsigned i;
381 
382  for (i = 0; i < bio->bio_vec_pos; ++i)
383  if (&iv[i] == iv_ptr)
384  break;
385  M0_ASSERT(i < bio->bio_vec_pos);
386  return i;
387 }
388 
389 M0_INTERNAL void m0_be_io_stob_assign(struct m0_be_io *bio,
390  struct m0_stob *stob,
393 {
394  struct m0_be_io_part *bip;
395  m0_bcount_t total = 0;
396  int i;
397 
398  /*
399  * Current implementation supports only assignment of 1 stob for
400  * the whole m0_be_io. Must be fixed when multiple stobs support
401  * is added to BE log.
402  */
403  M0_PRE(_0C(bio->bio_stob_nr == 1) &&
404  _0C(bio->bio_part[0].bip_stob == NULL) &&
405  _0C(bio->bio_part[0].bip_sio.si_stob.iv_vec.v_nr > 0) &&
406  _0C(bio->bio_part[0].bip_sio.si_stob.iv_index[0] == offset));
407  for (i = 0; i < bio->bio_part[0].bip_sio.si_stob.iv_vec.v_nr; ++i)
409  M0_PRE(total == size);
410 
411  bip = &bio->bio_part[0];
412  bip->bip_stob = stob;
414 }
415 
416 M0_INTERNAL void m0_be_io_stob_move(struct m0_be_io *bio,
417  struct m0_stob *stob,
419  m0_bindex_t win_start,
420  m0_bcount_t win_size)
421 {
422  struct m0_be_io_part *bip;
423  m0_bindex_t *iv;
424  void **ov;
425  m0_bcount_t *iv_count;
426  m0_bcount_t *ov_count;
427  m0_bindex_t win_end = win_start + win_size;
428  m0_bindex_t end;
429  uint32_t nr;
430  unsigned i;
431 
432  M0_PRE(win_start < win_end && offset >= win_start &&
433  offset < win_end);
434 
435  for (i = 0; i < bio->bio_stob_nr; ++i)
436  if (bio->bio_part[i].bip_stob == stob)
437  break;
438  M0_ASSERT(i != bio->bio_stob_nr);
439  bip = &bio->bio_part[i];
440  nr = bip->bip_sio.si_stob.iv_vec.v_nr;
441  iv = bip->bip_sio.si_stob.iv_index;
442  ov = bip->bip_sio.si_user.ov_buf;
443  iv_count = bip->bip_sio.si_stob.iv_vec.v_count;
444  ov_count = bip->bip_sio.si_user.ov_vec.v_count;
445 
446  M0_ASSERT(ergo(nr > 0, iv[0] == 0));
447 
448  i = 0;
449  while (i < nr) {
450  iv[i] = offset;
451  end = iv[i] + iv_count[i];
452  if (end > win_end) {
453  be_io_vec_fork(bio,
454  be_io_vec_index_by_ptr(bio, &iv[i]));
455  iv_count[i] -= end - win_end;
456  ov_count[i] = iv_count[i];
457  iv_count[i + 1] = end - win_end;
458  ov_count[i + 1] = iv_count[i + 1];
459  iv[i + 1] = win_start;
460  ov[i + 1] = (char*)ov[i] + ov_count[i];
461  ++i;
462  ++nr;
463  }
464  offset = end >= win_end ? end - win_end + win_start : end;
465  ++i;
466  }
468 }
469 
470 M0_INTERNAL void m0_be_io_vec_pack(struct m0_be_io *bio)
471 {
472  struct m0_be_io_part *bip;
473  m0_bcount_t *iv_count;
474  m0_bcount_t *ov_count;
475  m0_bindex_t *iv;
476  void **ov;
477  uint32_t nr;
478  uint32_t bshift;
479  int part;
480  int i;
481 
482  for (part = 0; part < bio->bio_stob_nr; ++part) {
483  bip = &bio->bio_part[part];
484  M0_ASSERT(bip->bip_stob != NULL);
485  nr = bip->bip_sio.si_stob.iv_vec.v_nr;
486  iv = bip->bip_sio.si_stob.iv_index;
487  ov = bip->bip_sio.si_user.ov_buf;
488  iv_count = bip->bip_sio.si_stob.iv_vec.v_count;
489  ov_count = bip->bip_sio.si_user.ov_vec.v_count;
490  bshift = bip->bip_bshift;
491  for (i = 0; i < nr; ++i) {
492  ov[i] = m0_stob_addr_pack(ov[i], bshift);
493  iv[i] = be_io_stob_offset_pack(iv[i], bshift);
494  iv_count[i] = be_io_size_pack(iv_count[i], bshift);
495  ov_count[i] = be_io_size_pack(ov_count[i], bshift);
496  }
497  }
498 }
499 
500 M0_INTERNAL m0_bcount_t m0_be_io_size(struct m0_be_io *bio)
501 {
502  struct m0_be_io_part *bip;
504  m0_bcount_t size_total = 0;
505  int i;
506 
507  for (i = 0; i < bio->bio_stob_nr; ++i) {
508  bip = &bio->bio_part[i];
511  size_total += size;
512  }
513  return size_total;
514 }
515 
516 M0_INTERNAL void m0_be_io_configure(struct m0_be_io *bio,
518 {
519  struct m0_stob_io *sio;
520  unsigned i;
521 
522  bio->bio_opcode = opcode;
523  for (i = 0; i < bio->bio_stob_nr; ++i) {
524  sio = &bio->bio_part[i].bip_sio;
525  sio->si_opcode = opcode;
526  }
527 }
528 
529 static void be_io_finished(struct m0_be_io *bio)
530 {
531  struct m0_be_op *op = bio->bio_op;
532  uint64_t finished_nr;
533  unsigned i;
534  int rc;
535 
536  finished_nr = bio->bio_stob_nr == 0 ? 0 :
538  /*
539  * Next `if' body will be executed only in the last finished stob I/O
540  * callback.
541  */
542  if (finished_nr == bio->bio_stob_nr) {
543  rc = 0;
544  for (i = 0; i < bio->bio_stob_nr; ++i) {
545  rc = bio->bio_part[i].bip_rc ?: rc;
546  if (rc != 0) {
547  M0_LOG(M0_INFO,
548  "failed I/O part number: %u, rc = %d",
549  i, rc);
550  break;
551  }
552  }
553  M0_ASSERT_INFO(rc == 0, "m0_be_op can't fail, rc = %d", rc);
555  m0_be_op_done(op);
556  }
557 }
558 
559 static bool be_io_cb(struct m0_clink *link)
560 {
561  struct m0_be_io_part *bip = container_of(link,
562  struct m0_be_io_part,
563  bip_clink);
564  struct m0_be_io *bio = bip->bip_bio;
565  struct m0_stob_io *sio = &bip->bip_sio;
566  int fd;
567  int rc;
568 
569  /* XXX Temporary workaround. I/O error should be handled gracefully. */
570  M0_ASSERT_INFO(sio->si_rc == 0, "stob I/O operation failed: "
571  "bio = %p, sio = %p, sio->si_rc = %d",
572  bio, sio, sio->si_rc);
573  rc = sio->si_rc;
574 
575  /* XXX temporary workaround:
576  * - sync() should be implemented on stob level or at least
577  * - sync() shoudn't be called from linux stob worker thread as it is
578  * now.
579  */
580  if (rc == 0 && bio->bio_sync) {
581  fd = m0_stob_fd(bip->bip_sio.si_obj);
582  rc = fdatasync(fd);
583  rc = rc == 0 ? 0 : M0_ERR_INFO(-errno, "fd=%d", fd);
584  M0_ASSERT_INFO(rc == 0, "fdatasync() failed: rc=%d", rc);
585  }
586  bip->bip_rc = rc;
587  be_io_finished(bio);
588  return rc == 0;
589 }
590 
591 static void be_io_empty_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
592 {
593  struct m0_be_io *bio = ast->sa_datum;
594 
596  be_io_finished(bio);
597 }
598 
599 M0_INTERNAL void m0_be_io_launch(struct m0_be_io *bio, struct m0_be_op *op)
600 {
601  unsigned i;
602  int rc;
603 
604  M0_ENTRY("bio=%p op=%p sync=%d opcode=%d "
605  "m0_be_io_size(bio)=%"PRIu64,
606  bio, op, !!bio->bio_sync, bio->bio_opcode, m0_be_io_size(bio));
607 
609 
610  bio->bio_op = op;
612 
613  if (m0_be_io_is_empty(bio)) {
614  bio->bio_ast.sa_cb = &be_io_empty_ast;
615  bio->bio_ast.sa_datum = bio;
616  m0_sm_ast_post(m0_locality0_get()->lo_grp, &bio->bio_ast);
617  M0_LEAVE();
618  return;
619  }
620 
621  rc = 0;
622  for (i = 0; i < bio->bio_stob_nr; ++i) {
623  if (rc == 0)
624  rc = be_io_part_launch(&bio->bio_part[i]);
625  if (rc != 0)
626  be_io_finished(bio);
627  }
628  M0_LEAVE("rc=%d", rc);
629 }
630 
631 M0_INTERNAL void m0_be_io_sync_enable(struct m0_be_io *bio)
632 {
633  bio->bio_sync = true;
634 }
635 
636 M0_INTERNAL bool m0_be_io_sync_is_enabled(struct m0_be_io *bio)
637 {
638  return bio->bio_sync;
639 }
640 
641 M0_INTERNAL enum m0_stob_io_opcode m0_be_io_opcode(struct m0_be_io *io)
642 {
643  return io->bio_opcode;
644 }
645 
646 M0_INTERNAL bool m0_be_io_is_empty(struct m0_be_io *bio)
647 {
648  return bio->bio_stob_nr == 0;
649 }
650 
651 M0_INTERNAL void m0_be_io_reset(struct m0_be_io *bio)
652 {
653  unsigned i;
654 
655  for (i = 0; i < bio->bio_stob_nr; ++i)
656  be_io_part_reset(&bio->bio_part[i]);
658  bio->bio_vec_pos = 0;
659  bio->bio_used = M0_BE_IO_CREDIT(0, 0, 0);
660  bio->bio_stob_nr = 0;
661  bio->bio_sync = false;
662 }
663 
664 M0_INTERNAL void m0_be_io_sort(struct m0_be_io *bio)
665 {
666  unsigned i;
667 
668  for (i = 0; i < bio->bio_stob_nr; ++i)
670 }
671 
672 M0_INTERNAL void m0_be_io_user_data_set(struct m0_be_io *bio, void *data)
673 {
674  bio->bio_user_data = data;
675 }
676 
677 M0_INTERNAL void *m0_be_io_user_data(struct m0_be_io *bio)
678 {
679  return bio->bio_user_data;
680 }
681 
682 M0_INTERNAL int m0_be_io_single(struct m0_stob *stob,
684  void *ptr_user,
685  m0_bindex_t offset_stob,
687 {
688  struct m0_be_io bio = {};
689  int rc;
690 
691  rc = m0_be_io_init(&bio);
692  if (rc != 0)
693  goto out;
694  rc = m0_be_io_allocate(&bio, &M0_BE_IO_CREDIT(1, size, 1));
695  if (rc == 0) {
696  m0_be_io_add(&bio, stob, ptr_user, offset_stob, size);
697  m0_be_io_configure(&bio, opcode);
699  m0_be_io_deallocate(&bio);
700  }
701  m0_be_io_fini(&bio);
702 out:
703  if (rc != 0)
705  return M0_RC(rc);
706 }
707 
708 static bool be_io_part_intersect(const struct m0_be_io_part *bip1,
709  const struct m0_be_io_part *bip2)
710 {
711  const struct m0_indexvec *iv1 = &bip1->bip_sio.si_stob;
712  const struct m0_indexvec *iv2 = &bip2->bip_sio.si_stob;
713  struct m0_ext e1;
714  struct m0_ext e2;
715  uint32_t i;
716  uint32_t j;
717 
718 #define EXT(start, end) (struct m0_ext){ .e_start = (start), .e_end = (end) }
719  for (i = 0; i < iv1->iv_vec.v_nr; ++i)
720  for (j = 0; j < iv2->iv_vec.v_nr; ++j) {
721  e1 = EXT(iv1->iv_index[i],
722  iv1->iv_index[i] + iv1->iv_vec.v_count[i]);
723  e2 = EXT(iv2->iv_index[j],
724  iv2->iv_index[j] + iv2->iv_vec.v_count[j]);
725  if (m0_ext_are_overlapping(&e1, &e2))
726  return true;
727  }
728 #undef EXT
729  return false;
730 }
731 
732 M0_INTERNAL bool m0_be_io_intersect(const struct m0_be_io *bio1,
733  const struct m0_be_io *bio2)
734 {
735  int i;
736  int j;
737 
738  for (i = 0; i < bio1->bio_stob_nr; ++i)
739  for (j = 0; j < bio2->bio_stob_nr; ++j) {
740  if (be_io_part_intersect(&bio1->bio_part[i],
741  &bio2->bio_part[j]))
742  return true;
743  }
744  return false;
745 }
746 
748 M0_INTERNAL bool m0_be_io_ptr_user_is_eq(const struct m0_be_io *bio1,
749  const struct m0_be_io *bio2)
750 {
751  struct m0_stob_io *sio1 = &bio1->bio_part[0].bip_sio;
752  struct m0_stob_io *sio2 = &bio2->bio_part[0].bip_sio;
753  struct m0_bufvec *bv1 = &sio1->si_user;
754  struct m0_bufvec *bv2 = &sio2->si_user;
755 
756  return bio1->bio_stob_nr == bio2->bio_stob_nr &&
757  bio2->bio_stob_nr == 1 && /* XXX */
758  bv1->ov_vec.v_nr == bv2->ov_vec.v_nr &&
759  bv2->ov_vec.v_nr == 1 && /* XXX */
760  bv1->ov_buf[0] == bv2->ov_buf[0];
761 }
762 
764 M0_INTERNAL bool m0_be_io_offset_stob_is_eq(const struct m0_be_io *bio1,
765  const struct m0_be_io *bio2)
766 {
767  struct m0_stob_io *sio1 = &bio1->bio_part[0].bip_sio;
768  struct m0_stob_io *sio2 = &bio2->bio_part[0].bip_sio;
769  struct m0_indexvec *iv1 = &sio1->si_stob;
770  struct m0_indexvec *iv2 = &sio2->si_stob;
771 
772  return bio1->bio_stob_nr == bio2->bio_stob_nr &&
773  bio2->bio_stob_nr == 1 && /* XXX */
774  iv1->iv_vec.v_nr == iv2->iv_vec.v_nr &&
775  iv2->iv_vec.v_nr == 1 && /* XXX */
776  iv1->iv_index[0] == iv2->iv_index[0];
777 }
778 
779 M0_INTERNAL void m0_be_io_credit_add(struct m0_be_io_credit *iocred0,
780  const struct m0_be_io_credit *iocred1)
781 {
782  iocred0->bic_reg_nr += iocred1->bic_reg_nr;
783  iocred0->bic_reg_size += iocred1->bic_reg_size;
784  iocred0->bic_part_nr += iocred1->bic_part_nr;
785 }
786 
787 M0_INTERNAL bool m0_be_io_credit_le(const struct m0_be_io_credit *iocred0,
788  const struct m0_be_io_credit *iocred1)
789 {
790  return iocred0->bic_reg_nr <= iocred1->bic_reg_nr &&
791  iocred0->bic_reg_size <= iocred1->bic_reg_size &&
792  iocred0->bic_part_nr <= iocred1->bic_part_nr;
793 }
794 
798 /*
799  * Local variables:
800  * c-indentation-style: "K&R"
801  * c-basic-offset: 8
802  * tab-width: 8
803  * fill-column: 80
804  * scroll-step: 1
805  * End:
806  */
807 /*
808  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
809  */
static bool be_io_part_add(struct m0_be_io_part *bip, void *ptr_user, m0_bindex_t offset_stob, m0_bcount_t size)
Definition: io.c:132
struct m0_be_io * bip_bio
Definition: io.h:63
struct m0_be_io_part * bio_part
Definition: io.h:89
static size_t nr
Definition: dump.c:1505
#define M0_PRE(cond)
struct m0_stob_io bip_sio
Definition: io.h:58
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL m0_bcount_t m0_be_io_size(struct m0_be_io *bio)
Definition: io.c:500
M0_INTERNAL void m0_be_io_vec_pack(struct m0_be_io *bio)
Definition: io.c:470
struct m0_clink bip_clink
Definition: io.h:60
M0_INTERNAL void m0_stob_io_fini(struct m0_stob_io *io)
Definition: io.c:122
M0_INTERNAL void m0_be_io_sync_enable(struct m0_be_io *bio)
Definition: io.c:631
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
#define ergo(a, b)
Definition: misc.h:293
static bool be_io_part_intersect(const struct m0_be_io_part *bip1, const struct m0_be_io_part *bip2)
Definition: io.c:708
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
bool bio_sync
Definition: io.h:110
static void be_io_vec_cut(struct m0_be_io *bio, struct m0_be_io_part *bip)
Definition: io.c:266
static struct m0_sm_group * grp
Definition: bytecount.c:38
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
unsigned bio_stob_nr
Definition: io.h:97
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
struct m0_be_io_credit bio_used
Definition: io.h:95
struct m0_vec ov_vec
Definition: vec.h:147
uint32_t bio_vec_pos
Definition: io.h:93
struct m0_bufvec data
Definition: di.c:40
Definition: vec.h:49
uint64_t m0_bindex_t
Definition: types.h:80
struct m0_chan si_wait
Definition: io.h:318
uint64_t m0_bcount_t
Definition: types.h:77
Definition: sm.h:504
M0_INTERNAL const struct m0_fid * m0_stob_fid_get(struct m0_stob *stob)
Definition: stob.c:255
#define container_of(ptr, type, member)
Definition: misc.h:33
#define BE_IOCRED_P(iocred)
Definition: io.h:84
M0_INTERNAL void m0_be_op_rc_set(struct m0_be_op *op, int rc)
Definition: op.c:314
void ** ov_buf
Definition: vec.h:149
static m0_bcount_t be_io_size_pack(m0_bcount_t size, uint32_t bshift)
Definition: io.c:54
struct m0_sm_ast bio_ast
Definition: io.h:112
static void be_io_part_reset(struct m0_be_io_part *bip)
Definition: io.c:119
struct m0_vec iv_vec
Definition: vec.h:139
return M0_RC(rc)
op
Definition: libdemo.c:64
M0_INTERNAL uint32_t m0_stob_block_shift(struct m0_stob *stob)
Definition: stob.c:270
struct m0_bufvec si_user
Definition: io.h:300
m0_bcount_t bic_reg_size
Definition: io.h:72
#define M0_ENTRY(...)
Definition: trace.h:170
uint64_t bic_reg_nr
Definition: io.h:71
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_be_io_configure(struct m0_be_io *bio, enum m0_stob_io_opcode opcode)
Definition: io.c:516
M0_INTERNAL void m0_be_io_err_send(uint32_t errcode, uint8_t location, uint8_t io_opcode)
Definition: ha.c:38
m0_bindex_t * iv_index
Definition: vec.h:141
int opcode
Definition: crate.c:301
int i
Definition: dir.c:1033
M0_INTERNAL void m0_be_io_add_nostob(struct m0_be_io *bio, void *ptr_user, m0_bindex_t offset_stob, m0_bcount_t size)
Definition: io.c:313
#define PRIu64
Definition: types.h:58
M0_INTERNAL void m0_be_io_fini(struct m0_be_io *bio)
Definition: io.c:224
uint64_t bic_part_nr
Definition: io.h:73
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
void * sa_datum
Definition: sm.h:508
M0_INTERNAL void * m0_be_io_user_data(struct m0_be_io *bio)
Definition: io.c:677
Definition: trace.h:482
struct m0_atomic64 bio_stob_io_finished_nr
Definition: io.h:103
static void be_io_finished(struct m0_be_io *bio)
Definition: io.c:529
M0_INTERNAL int m0_stob_io_prepare_and_launch(struct m0_stob_io *io, struct m0_stob *obj, struct m0_dtx *tx, struct m0_io_scope *scope)
Definition: io.c:219
Definition: stob.h:163
struct m0_indexvec si_stob
Definition: io.h:311
int32_t si_rc
Definition: io.h:334
static struct m0_stob * stob
Definition: storage.c:39
#define M0_ASSERT(cond)
m0_stob_io_opcode
Definition: io.h:227
M0_INTERNAL int m0_be_io_single(struct m0_stob *stob, enum m0_stob_io_opcode opcode, void *ptr_user, m0_bindex_t offset_stob, m0_bcount_t size)
Definition: io.c:682
M0_INTERNAL void m0_be_io_credit_add(struct m0_be_io_credit *iocred0, const struct m0_be_io_credit *iocred1)
Definition: io.c:779
static void be_io_empty_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: io.c:591
static m0_bindex_t be_io_stob_offset_pack(m0_bindex_t offset, uint32_t bshift)
Definition: io.c:49
M0_INTERNAL void m0_be_io_stob_move(struct m0_be_io *bio, struct m0_stob *stob, m0_bindex_t offset, m0_bindex_t win_start, m0_bcount_t win_size)
Definition: io.c:416
static void be_io_part_init(struct m0_be_io_part *bip, struct m0_be_io *bio)
Definition: io.c:73
uint32_t bip_bshift
Definition: io.h:62
struct m0_bufvec bio_bv_user
Definition: io.h:90
M0_INTERNAL bool m0_be_io_offset_stob_is_eq(const struct m0_be_io *bio1, const struct m0_be_io *bio2)
Definition: io.c:764
M0_INTERNAL void * m0_stob_addr_pack(const void *buf, uint32_t shift)
Definition: io.c:293
M0_INTERNAL int m0_be_io_init(struct m0_be_io *bio)
Definition: io.c:175
M0_INTERNAL bool m0_be_io_credit_le(const struct m0_be_io_credit *iocred0, const struct m0_be_io_credit *iocred1)
Definition: io.c:787
static void be_io_free(struct m0_be_io *bio)
Definition: io.c:64
#define M0_POST(cond)
#define BE_IOCRED_F
Definition: io.h:82
M0_INTERNAL bool m0_be_io_sync_is_enabled(struct m0_be_io *bio)
Definition: io.c:636
M0_INTERNAL bool m0_be_io_intersect(const struct m0_be_io *bio1, const struct m0_be_io *bio2)
Definition: io.c:732
uint32_t v_nr
Definition: vec.h:51
M0_INTERNAL void m0_be_op_done(struct m0_be_op *op)
Definition: stubs.c:104
static m0_bindex_t offset
Definition: dump.c:173
Definition: io.h:285
m0_bcount_t * v_count
Definition: vec.h:53
static int added
Definition: base.c:300
static bool be_io_part_invariant(struct m0_be_io_part *bip)
Definition: io.c:89
struct m0_stob * bip_stob
Definition: io.h:61
#define FID_P(f)
Definition: fid.h:77
#define M0_BE_IO_CREDIT(reg_nr, reg_size, part_nr)
Definition: io.h:76
static struct m0_stob_io io
Definition: ad.c:59
enum m0_stob_io_opcode bio_opcode
Definition: io.h:111
M0_INTERNAL void m0_be_io_user_data_set(struct m0_be_io *bio, void *data)
Definition: io.c:672
struct m0_be_op * bio_op
Definition: io.h:108
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
struct m0_stob * si_obj
Definition: io.h:326
#define M0_BE_OP_SYNC_RC(op_obj, action)
Definition: op.h:196
M0_INTERNAL bool m0_be_io_is_empty(struct m0_be_io *bio)
Definition: io.c:646
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
#define PRIu32
Definition: types.h:66
M0_INTERNAL void m0_be_op_active(struct m0_be_op *op)
Definition: stubs.c:100
M0_INTERNAL bool m0_be_io_ptr_user_is_eq(const struct m0_be_io *bio1, const struct m0_be_io *bio2)
Definition: io.c:748
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
Definition: locality.c:169
int bip_rc
Definition: io.h:64
Definition: ext.h:37
M0_INTERNAL int m0_be_io_allocate(struct m0_be_io *bio, struct m0_be_io_credit *iocred)
Definition: io.c:180
M0_INTERNAL void * m0_stob_addr_open(const void *buf, uint32_t shift)
Definition: io.c:302
M0_INTERNAL int m0_stob_fd(struct m0_stob *stob)
Definition: stob.c:360
M0_INTERNAL void m0_stob_iovec_sort(struct m0_stob_io *stob)
Definition: io.c:311
M0_INTERNAL void m0_be_io_deallocate(struct m0_be_io *bio)
Definition: io.c:215
M0_INTERNAL enum m0_stob_io_opcode m0_be_io_opcode(struct m0_be_io *io)
Definition: io.c:641
static m0_bcount_t be_io_size_unpack(m0_bcount_t size, uint32_t bshift)
Definition: io.c:59
m0_bcount_t size
Definition: di.c:39
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
M0_INTERNAL bool m0_be_io__invariant(struct m0_be_io *bio)
Definition: io.c:228
M0_INTERNAL bool m0_ext_are_overlapping(const struct m0_ext *e0, const struct m0_ext *e1)
Definition: ext.c:53
static void be_io_vec_fork(struct m0_be_io *bio, unsigned index)
Definition: io.c:325
struct m0_indexvec bio_iv_stob
Definition: io.h:91
struct m0t1fs_filedata * fd
Definition: dir.c:1030
M0_INTERNAL void m0_stob_io_init(struct m0_stob_io *io)
Definition: io.c:111
Definition: io.h:87
#define M0_ASSERT_INFO(cond, fmt,...)
#define EXT(start, end)
static int total
Definition: base.c:302
static int be_io_part_launch(struct m0_be_io_part *bip)
Definition: io.c:99
#define out(...)
Definition: gen.c:41
struct m0_be_io_credit bio_iocred
Definition: io.h:94
static unsigned be_io_vec_index_by_ptr(struct m0_be_io *bio, m0_bindex_t *iv_ptr)
Definition: io.c:376
Definition: op.h:74
#define M0_PRE_EX(cond)
void * bio_user_data
Definition: io.h:114
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL void m0_be_io_add(struct m0_be_io *bio, struct m0_stob *stob, void *ptr_user, m0_bindex_t offset_stob, m0_bcount_t size)
Definition: io.c:280
M0_INTERNAL void m0_be_io_stob_assign(struct m0_be_io *bio, struct m0_stob *stob, m0_bindex_t offset, m0_bcount_t size)
Definition: io.c:389
M0_INTERNAL void m0_be_io_launch(struct m0_be_io *bio, struct m0_be_op *op)
Definition: io.c:599
M0_INTERNAL void m0_be_io_reset(struct m0_be_io *bio)
Definition: io.c:651
int32_t rc
Definition: trigger_fop.h:47
#define M0_POST_EX(cond)
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
static void be_io_part_fini(struct m0_be_io_part *bip)
Definition: io.c:82
#define FID_F
Definition: fid.h:75
M0_INTERNAL void m0_be_io_sort(struct m0_be_io *bio)
Definition: io.c:664
Definition: vec.h:145
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
static bool be_io_cb(struct m0_clink *link)
Definition: io.c:559
enum m0_stob_io_opcode si_opcode
Definition: io.h:286