Motr  M0
ioq.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2011-2020 Seagate Technology LLC and/or its Affiliates
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * For any questions about this software or licensing,
17  * please email opensource@seagate.com or cortx-questions@seagate.com.
18  *
19  */
20 
21 
22 #include "stob/ioq.h"
23 
24 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_STOB
25 #include "lib/trace.h"
26 
27 #include <limits.h> /* IOV_MAX */
28 #include <sys/uio.h> /* iovec */
29 #include <libaio.h> /* io_getevents */
30 
31 #include "ha/ha.h" /* m0_ha_send */
32 #include "ha/msg.h" /* m0_ha_msg */
33 
34 #include "lib/misc.h" /* M0_SET0 */
35 #include "lib/errno.h" /* ENOMEM */
36 #include "lib/finject.h" /* M0_FI_ENABLED */
37 #include "lib/locality.h"
38 #include "lib/memory.h" /* M0_ALLOC_PTR */
39 
40 #include "module/instance.h" /* m0_get() */
41 #include "reqh/reqh.h" /* m0_reqh */
42 #include "rpc/session.h" /* m0_rpc_session */
43 #include "addb2/addb2.h"
44 
45 #include "stob/addb2.h"
46 #include "stob/linux.h" /* m0_stob_linux_container */
47 #include "stob/io.h" /* m0_stob_io */
48 #include "stob/ioq_error.h" /* m0_stob_ioq_error */
49 
106 /* ---------------------------------------------------------------------- */
107 
114 struct ioq_qev {
115  struct iocb iq_iocb;
121  struct m0_stob_io *iq_io;
122 };
123 
129  uint32_t si_nr;
135  struct ioq_qev *si_qev;
138 };
139 
140 static struct ioq_qev *ioq_queue_get (struct m0_stob_ioq *ioq);
141 static void ioq_queue_put (struct m0_stob_ioq *ioq,
142  struct ioq_qev *qev);
143 static void ioq_queue_submit(struct m0_stob_ioq *ioq);
144 static void ioq_queue_lock (struct m0_stob_ioq *ioq);
145 static void ioq_queue_unlock(struct m0_stob_ioq *ioq);
146 
147 static const struct m0_stob_io_op stob_linux_io_op;
148 
149 enum {
150  /*
151  * Alignment for direct-IO.
152  *
153  * According to open(2) manpage: "Under Linux 2.6, alignment to
154  * 512-byte boundaries suffices".
155  */
156  STOB_IOQ_BSHIFT = 12, /* pow(2, 12) == 4096 */
159 };
160 
161 M0_INTERNAL int m0_stob_linux_io_init(struct m0_stob *stob,
162  struct m0_stob_io *io)
163 {
164  struct m0_stob_linux *lstob = m0_stob_linux_container(stob);
165  struct stob_linux_io *lio;
166  int result;
167 
168  M0_PRE(io->si_state == SIS_IDLE);
169 
170  M0_ALLOC_PTR(lio);
171  if (lio != NULL) {
172  io->si_stob_private = lio;
174  lio->si_ioq = &lstob->sl_dom->sld_ioq;
175  result = 0;
176  } else
177  result = M0_ERR(-ENOMEM);
178  return result;
179 }
180 
181 static void stob_linux_io_release(struct stob_linux_io *lio)
182 {
183  if (lio->si_qev != NULL)
184  m0_free(lio->si_qev->iq_iocb.u.c.buf);
185  m0_free0(&lio->si_qev);
186 }
187 
188 static void stob_linux_io_fini(struct m0_stob_io *io)
189 {
190  struct stob_linux_io *lio = io->si_stob_private;
191 
193  m0_free(lio);
194 }
195 
205 static int stob_linux_io_launch(struct m0_stob_io *io)
206 {
208  struct stob_linux_io *lio = io->si_stob_private;
209  struct m0_stob_ioq *ioq = lio->si_ioq;
210  struct ioq_qev *qev;
211  struct iovec *iov;
212  struct m0_vec_cursor src;
213  struct m0_vec_cursor dst;
214  uint32_t frags = 0;
215  uint32_t chunks; /* contiguous stob chunks */
216  m0_bcount_t frag_size;
217  int result = 0;
218  int i;
219  bool eosrc;
220  bool eodst;
221  int opcode;
222 
223  M0_PRE(M0_IN(io->si_opcode, (SIO_READ, SIO_WRITE)));
224  /* prefix fragments execution mode is not yet supported */
225  M0_ASSERT((io->si_flags & SIF_PREFIX) == 0);
227 
229  chunks = io->si_stob.iv_vec.v_nr;
230 
233 
234  do {
235  frag_size = min_check(m0_vec_cursor_step(&src),
237  M0_ASSERT(frag_size > 0);
238  frags++;
239  eosrc = m0_vec_cursor_move(&src, frag_size);
240  eodst = m0_vec_cursor_move(&dst, frag_size);
241  M0_ASSERT(eosrc == eodst);
242  } while (!eosrc);
243 
246 
247  lio->si_nr = max_check(frags / IOV_MAX + 1, chunks);
248  M0_LOG(M0_DEBUG, "chunks=%d frags=%d si_nr=%d",
249  chunks, frags, lio->si_nr);
250  m0_atomic64_set(&lio->si_done, 0);
251  m0_atomic64_set(&lio->si_bdone, 0);
252  M0_ALLOC_ARR(lio->si_qev, lio->si_nr);
253  M0_ALLOC_ARR(iov, frags);
254  qev = lio->si_qev;
255  if (qev == NULL || iov == NULL) {
256  m0_free(iov);
257  result = M0_ERR(-ENOMEM);
258  goto out;
259  }
260  opcode = io->si_opcode == SIO_READ ? IO_CMD_PREADV : IO_CMD_PWRITEV;
261 
262  ioq_queue_lock(ioq);
263  while (result == 0) {
264  struct iocb *iocb = &qev->iq_iocb;
265  m0_bindex_t off = io->si_stob.iv_index[dst.vc_seg] +
266  dst.vc_offset;
267  m0_bindex_t prev_off = ~0;
268  m0_bcount_t chunk_size = 0;
269 
270  qev->iq_io = io;
272 
273  iocb->u.v.vec = iov;
274  iocb->aio_fildes = lstob->sl_fd;
275  iocb->u.v.nr = min32u(frags, IOV_MAX);
276  iocb->u.v.offset = off << m0_stob_ioq_bshift(ioq);
277  iocb->aio_lio_opcode = opcode;
278 
279  for (i = 0; i < iocb->u.v.nr; ++i) {
280  void *buf;
281  m0_bindex_t off;
282 
283  buf = io->si_user.ov_buf[src.vc_seg] + src.vc_offset;
284  off = io->si_stob.iv_index[dst.vc_seg] + dst.vc_offset;
285 
286  if (prev_off != ~0 && prev_off + frag_size != off)
287  break;
288  prev_off = off;
289 
290  frag_size = min_check(m0_vec_cursor_step(&src),
292  if (frag_size > (size_t)~0ULL) {
293  result = M0_ERR(-EOVERFLOW);
294  break;
295  }
296 
297  iov->iov_base = m0_stob_addr_open(buf,
298  m0_stob_ioq_bshift(ioq));
299  iov->iov_len = frag_size << m0_stob_ioq_bshift(ioq);
300  chunk_size += frag_size;
301 
302  m0_vec_cursor_move(&src, frag_size);
303  m0_vec_cursor_move(&dst, frag_size);
304  ++iov;
305  }
306  M0_LOG(M0_DEBUG, FID_F"(%p) %2d: frags=%d op=%d off=%lx sz=%lx"
307  ": rc = %d",
309  (int)(qev - lio->si_qev), i, io->si_opcode,
310  (unsigned long)off, (unsigned long)chunk_size, result);
311  if (result == 0) {
312  iocb->u.v.nr = i;
313  qev->iq_nbytes = chunk_size << m0_stob_ioq_bshift(ioq);
314  qev->iq_offset = off << m0_stob_ioq_bshift(ioq);
315 
316  ioq_queue_put(ioq, qev);
317 
318  frags -= i;
319  if (frags == 0)
320  break;
321 
322  ++qev;
323  M0_ASSERT(qev - lio->si_qev < lio->si_nr);
324  }
325  }
326  lio->si_nr = ++qev - lio->si_qev;
327  /* The lock should be held until all 'qev's are pushed into queue and
328  * the lio->si_nr is correctly updated. When this lock is released,
329  * these 'qev's may be submitted.
330  */
331  ioq_queue_unlock(ioq);
332 out:
333  if (result != 0) {
334  M0_LOG(M0_ERROR, "Launch op=%d io=%p failed: rc=%d",
335  io->si_opcode, io, result);
337  } else
338  ioq_queue_submit(ioq);
339 
340  return result;
341 }
342 
343 static const struct m0_stob_io_op stob_linux_io_op = {
345  .sio_fini = stob_linux_io_fini
346 };
347 
351 static struct ioq_qev *ioq_queue_get(struct m0_stob_ioq *ioq)
352 {
353  struct m0_queue_link *head;
354 
357 
358  head = m0_queue_get(&ioq->ioq_queue);
359  ioq->ioq_queued--;
361  return container_of(head, struct ioq_qev, iq_linkage);
362 }
363 
367 static void ioq_queue_put(struct m0_stob_ioq *ioq,
368  struct ioq_qev *qev)
369 {
372  // M0_ASSERT(qev->iq_io->si_obj->so_domain == &ioq->sdl_base);
373 
374  m0_queue_put(&ioq->ioq_queue, &qev->iq_linkage);
375  ioq->ioq_queued++;
377 }
378 
379 static void ioq_queue_lock(struct m0_stob_ioq *ioq)
380 {
381  m0_mutex_lock(&ioq->ioq_lock);
382 }
383 
384 static void ioq_queue_unlock(struct m0_stob_ioq *ioq)
385 {
386  m0_mutex_unlock(&ioq->ioq_lock);
387 }
388 
393 static void ioq_queue_submit(struct m0_stob_ioq *ioq)
394 {
395  int got;
396  int put;
397  int avail;
398  int i;
399 
400  struct ioq_qev *qev[M0_STOB_IOQ_BATCH_IN_SIZE];
401  struct iocb *evin[M0_STOB_IOQ_BATCH_IN_SIZE];
402 
403  do {
404  ioq_queue_lock(ioq);
405  avail = m0_atomic64_get(&ioq->ioq_avail);
406  got = min32(ioq->ioq_queued, min32(avail, ARRAY_SIZE(evin)));
407  m0_atomic64_sub(&ioq->ioq_avail, got);
408  for (i = 0; i < got; ++i) {
409  qev[i] = ioq_queue_get(ioq);
410  evin[i] = &qev[i]->iq_iocb;
411  }
412  ioq_queue_unlock(ioq);
413 
414  if (got > 0) {
415  put = io_submit(ioq->ioq_ctx, got, evin);
416  if (put < 0)
417  M0_LOG(M0_ERROR, "got=%d put=%d", got, put);
418  if (put < 0)
419  put = 0;
420  ioq_queue_lock(ioq);
421  for (i = put; i < got; ++i)
422  ioq_queue_put(ioq, qev[i]);
423  ioq_queue_unlock(ioq);
424 
425  if (got > put)
426  m0_atomic64_add(&ioq->ioq_avail, got - put);
427  }
428  } while (got > 0);
429 }
430 
434 static void ioq_io_error(struct m0_stob_ioq *ioq, struct ioq_qev *qev)
435 {
436  struct m0_stob_io *io = qev->iq_io;
438  struct m0_ha_msg *msg;
439  uint64_t tag;
440 
441  M0_ENTRY();
442  M0_LOG(M0_WARN, "IO error: stob_id=" STOB_ID_F " conf_sdev=" FID_F,
443  STOB_ID_P(&lstob->sl_stob.so_id), FID_P(&lstob->sl_conf_sdev));
444 
445  if (m0_get()->i_ha == NULL || m0_get()->i_ha_link == NULL) {
446  /*
447  * HA is not initialised. It may happen when I/O error occurs
448  * in UT or some subsystem performs I/O after HA finalisation.
449  */
450  M0_LOG(M0_DEBUG, "IO error rc=%d is not sent to HA", io->si_rc);
451  M0_LEAVE();
452  return;
453  }
454 
455  M0_ALLOC_PTR(msg);
456  if (msg == NULL) {
457  M0_LOG(M0_ERROR, "Can't allocate memory for msg");
458  M0_LEAVE();
459  return;
460  }
461  *msg = (struct m0_ha_msg){
462  .hm_fid = lstob->sl_conf_sdev,
463  .hm_time = m0_time_now(),
464  .hm_data = {
465  .hed_type = M0_HA_MSG_STOB_IOQ,
466  .u.hed_stob_ioq = {
467  /* stob info */
468  .sie_conf_sdev = lstob->sl_conf_sdev,
469  .sie_stob_id = lstob->sl_stob.so_id,
470  .sie_fd = lstob->sl_fd,
471  /* IO info */
472  .sie_opcode = io->si_opcode,
473  .sie_rc = io->si_rc,
474  .sie_bshift = m0_stob_ioq_bshift(ioq),
475  .sie_size = qev->iq_nbytes,
476  .sie_offset = qev->iq_offset,
477  },
478  },
479  };
480  m0_ha_send(m0_get()->i_ha, m0_get()->i_ha_link, msg, &tag);
481  m0_free(msg);
482 
483  M0_LEAVE("tag=%"PRIu64, tag);
484 }
485 
486 /* Note: it is not the number of emulated errors, see below. */
488 
495 static void ioq_complete(struct m0_stob_ioq *ioq, struct ioq_qev *qev,
496  long res, long res2)
497 {
498  struct m0_stob_io *io = qev->iq_io;
499  struct stob_linux_io *lio = io->si_stob_private;
500  struct iocb *iocb = &qev->iq_iocb;
501  const struct m0_fid *fid = m0_stob_fid_get(io->si_obj);
502 
503  static bool emulate_disk_error_found = false;
504 
505  M0_LOG(M0_DEBUG, "io=%p iocb=%p res=%lx nbytes=%lx",
506  io, iocb, (unsigned long)res, (unsigned long)qev->iq_nbytes);
507 
510  M0_ASSERT(m0_atomic64_get(&lio->si_done) < lio->si_nr);
511 
512  /* short read. */
513  if (io->si_opcode == SIO_READ && res >= 0 && res < qev->iq_nbytes) {
514  /* fill the rest of the user buffer with zeroes. */
515  const struct iovec *iov = iocb->u.v.vec;
516  int i;
517 
518  for (i = 0; i < iocb->u.v.nr; ++i) {
519  if (iov->iov_len < res) {
520  res -= iov->iov_len;
521  } else {
522  memset(iov->iov_base + res, 0,
523  iov->iov_len - res);
524  res = 0;
525  }
526  }
527  res = qev->iq_nbytes;
528  }
529 
530  if (res > 0) {
531  if ((res & m0_stob_ioq_bmask(ioq)) != 0)
532  res = M0_ERR(-EIO);
533  else
534  m0_atomic64_add(&lio->si_bdone, res);
535  }
536  if (res < 0 && io->si_rc == 0)
537  io->si_rc = res;
538 
539  if (emulate_disk_errors_nr > 0) {
540  static struct m0_fid disk = {};
541  /*
542  * Yes, this is not the actual number of errors,
543  * but rather the number of checks. Otherwise,
544  * after the disks for which errors were emulated
545  * will be detached - emulate_disk_errors_nr will
546  * be never decreased and we will get stuck in
547  * these emulation checks.
548  */
550  if (!emulate_disk_error_found)
551  disk = M0_FID0;
552  emulate_disk_error_found = false;
553  }
554 
555  if (!m0_fid_is_set(&disk) && emulate_disk_errors_nr > 0)
556  disk = *fid;
557 
558  if (m0_fid_eq(&disk, fid)) {
559  io->si_rc = M0_ERR(-EIO);
560  emulate_disk_error_found = true;
561  }
562  }
563 
564  if (io->si_rc != 0) {
565  ioq_io_error(ioq, qev);
566  if (emulate_disk_error_found)
567  io->si_rc = 0;
568  }
569 
570  /*
571  * The position of this operation is critical:
572  * all threads must complete the above code until
573  * some of them finds here out that all frags are done.
574  */
575  if (m0_atomic64_add_return(&lio->si_done, 1) == lio->si_nr) {
576  m0_bcount_t bdone = m0_atomic64_get(&lio->si_bdone);
577 
578  M0_LOG(M0_DEBUG, FID_F" nr=%d sz=%lx si_rc=%d", FID_P(fid),
579  lio->si_nr, (unsigned long)bdone, (int)io->si_rc);
580  io->si_count = bdone >> m0_stob_ioq_bshift(ioq);
583  io->si_rc, io->si_count, lio->si_nr);
585  io->si_state = SIS_IDLE;
588  }
589 }
590 
591 static const struct timespec ioq_timeout_default = {
592  .tv_sec = 1,
593  .tv_nsec = 0
594 };
595 
596 static unsigned long stob_ioq_timer_cb(unsigned long data)
597 {
598  struct m0_semaphore *stop_sem = (void *)data;
599 
600  m0_semaphore_up(stop_sem);
601  return 0;
602 }
603 
604 static int stob_ioq_thread_init(struct m0_stob_ioq *ioq)
605 {
606  struct m0_timer_locality *timer_loc;
607  int thread_index;
608  int rc;
609 
610  thread_index = m0_thread_self() - ioq->ioq_thread;
611  timer_loc = &ioq->ioq_stop_timer_loc[thread_index];
612  m0_timer_locality_init(timer_loc);
613  rc = m0_timer_thread_attach(timer_loc);
614  if (rc != 0) {
615  m0_timer_locality_fini(timer_loc);
616  return M0_ERR(rc);
617  }
618  rc = m0_timer_init(&ioq->ioq_stop_timer[thread_index], M0_TIMER_HARD,
619  timer_loc, &stob_ioq_timer_cb,
620  (unsigned long)&ioq->ioq_stop_sem[thread_index]);
621  if (rc != 0) {
622  m0_timer_thread_detach(timer_loc);
623  m0_timer_locality_fini(timer_loc);
624  return M0_ERR(rc);
625  }
626  m0_semaphore_init(&ioq->ioq_stop_sem[thread_index], 0);
627  return M0_RC(rc);
628 }
629 
637 static void stob_ioq_thread(struct m0_stob_ioq *ioq)
638 {
639  int got;
640  int avail;
641  int i;
642  struct io_event evout[M0_STOB_IOQ_BATCH_OUT_SIZE];
643  struct timespec timeout;
644  struct m0_addb2_hist inflight = {};
645  struct m0_addb2_hist queued = {};
646  struct m0_addb2_hist gotten = {};
647  int thread_index;
648 
649  thread_index = m0_thread_self() - ioq->ioq_thread;
650  M0_ADDB2_PUSH(M0_AVI_STOB_IOQ, thread_index);
653  m0_addb2_hist_add_auto(&gotten, 1000, M0_AVI_STOB_IOQ_GOT, -1);
654  while (!m0_semaphore_trydown(&ioq->ioq_stop_sem[thread_index])) {
656  got = io_getevents(ioq->ioq_ctx, 1, ARRAY_SIZE(evout),
657  evout, &timeout);
658  if (got > 0) {
659  avail = m0_atomic64_add_return(&ioq->ioq_avail, got);
661  }
662  for (i = 0; i < got; ++i) {
663  struct ioq_qev *qev;
664  struct io_event *iev;
665 
666  iev = &evout[i];
667  qev = container_of(iev->obj, struct ioq_qev, iq_iocb);
669  ioq_complete(ioq, qev, iev->res, iev->res2);
670  }
671  ioq_queue_submit(ioq);
672  m0_addb2_hist_mod(&gotten, got);
673  m0_addb2_hist_mod(&queued, ioq->ioq_queued);
675  m0_atomic64_get(&ioq->ioq_avail));
676  m0_addb2_force(M0_MKTIME(5, 0));
677  }
679  m0_semaphore_fini(&ioq->ioq_stop_sem[thread_index]);
680  m0_timer_stop(&ioq->ioq_stop_timer[thread_index]);
681  m0_timer_fini(&ioq->ioq_stop_timer[thread_index]);
682  m0_timer_thread_detach(&ioq->ioq_stop_timer_loc[thread_index]);
683  m0_timer_locality_fini(&ioq->ioq_stop_timer_loc[thread_index]);
684 }
685 
686 M0_INTERNAL int m0_stob_ioq_init(struct m0_stob_ioq *ioq)
687 {
688  int result;
689  int i;
690 
691  ioq->ioq_ctx = NULL;
693  ioq->ioq_queued = 0;
694 
695  m0_queue_init(&ioq->ioq_queue);
696  m0_mutex_init(&ioq->ioq_lock);
697 
698  result = io_setup(M0_STOB_IOQ_RING_SIZE, &ioq->ioq_ctx);
699  if (result == 0) {
700  for (i = 0; i < ARRAY_SIZE(ioq->ioq_thread); ++i) {
701  result = M0_THREAD_INIT(&ioq->ioq_thread[i],
702  struct m0_stob_ioq *,
704  &stob_ioq_thread, ioq,
705  "ioq_thread%d", i);
706  if (result != 0)
707  break;
708  m0_stob_ioq_directio_setup(ioq, false);
709  }
710  }
711  if (result != 0)
712  m0_stob_ioq_fini(ioq);
713  return result;
714 }
715 
716 M0_INTERNAL void m0_stob_ioq_fini(struct m0_stob_ioq *ioq)
717 {
718  int i;
719 
720  for (i = 0; i < ARRAY_SIZE(ioq->ioq_stop_timer); ++i)
722  for (i = 0; i < ARRAY_SIZE(ioq->ioq_thread); ++i) {
723  if (ioq->ioq_thread[i].t_func != NULL)
724  m0_thread_join(&ioq->ioq_thread[i]);
725  }
726  if (ioq->ioq_ctx != NULL)
727  io_destroy(ioq->ioq_ctx);
728  m0_queue_fini(&ioq->ioq_queue);
729  m0_mutex_fini(&ioq->ioq_lock);
730 }
731 
732 M0_INTERNAL uint32_t m0_stob_ioq_bshift(struct m0_stob_ioq *ioq)
733 {
734  return ioq->ioq_use_directio ? STOB_IOQ_BSHIFT : 0;
735 }
736 
737 M0_INTERNAL m0_bcount_t m0_stob_ioq_bsize(struct m0_stob_ioq *ioq)
738 {
739  return ioq->ioq_use_directio ? STOB_IOQ_BSIZE : 0;
740 }
741 
742 M0_INTERNAL m0_bcount_t m0_stob_ioq_bmask(struct m0_stob_ioq *ioq)
743 {
744  return ioq->ioq_use_directio ? STOB_IOQ_BMASK : 0;
745 }
746 
747 M0_INTERNAL bool m0_stob_ioq_directio(struct m0_stob_ioq *ioq)
748 {
749  return ioq->ioq_use_directio;
750 }
751 
752 M0_INTERNAL void m0_stob_ioq_directio_setup(struct m0_stob_ioq *ioq,
753  bool use_directio)
754 {
755  ioq->ioq_use_directio = use_directio;
756 }
757 
758 #undef M0_TRACE_SUBSYSTEM
759 
762 /*
763  * Local variables:
764  * c-indentation-style: "K&R"
765  * c-basic-offset: 8
766  * tab-width: 8
767  * fill-column: 80
768  * scroll-step: 1
769  * End:
770  */
M0_INTERNAL void m0_timer_locality_fini(struct m0_timer_locality *loc)
Definition: timer.c:84
M0_INTERNAL void m0_stob_ioq_fini(struct m0_stob_ioq *ioq)
Definition: ioq.c:716
static void ioq_queue_unlock(struct m0_stob_ioq *ioq)
Definition: ioq.c:384
M0_INTERNAL int m0_timer_thread_attach(struct m0_timer_locality *loc)
Definition: timer.c:127
struct m0_queue ioq_queue
Definition: ioq.h:88
void m0_addb2_force(m0_time_t delay)
Definition: addb2.c:589
struct m0_stob_io * iq_io
Definition: ioq.c:121
struct m0_timer ioq_stop_timer[M0_STOB_IOQ_NR_THREADS]
Definition: ioq.h:90
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
enum m0_stob_io_flags si_flags
Definition: io.h:290
struct m0_mutex ioq_lock
Definition: ioq.h:85
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static void stob_linux_io_fini(struct m0_stob_io *io)
Definition: ioq.c:188
struct m0_fid hm_fid
Definition: msg.h:117
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
Definition: chan.c:178
static struct m0_semaphore inflight
Definition: mdstore.c:63
#define NULL
Definition: misc.h:38
static struct m0_bufvec dst
Definition: xform.c:61
Definition: io.h:230
M0_INTERNAL bool m0_semaphore_trydown(struct m0_semaphore *semaphore)
Definition: semaphore.c:60
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
M0_INTERNAL uint32_t m0_stob_ioq_bshift(struct m0_stob_ioq *ioq)
Definition: ioq.c:732
static void ioq_queue_lock(struct m0_stob_ioq *ioq)
Definition: ioq.c:379
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
#define min_check(a, b)
Definition: arith.h:88
struct iocb iq_iocb
Definition: ioq.c:115
struct m0_atomic64 ioq_avail
Definition: ioq.h:77
static void m0_atomic64_sub(struct m0_atomic64 *a, int64_t num)
struct ioq_qev * si_qev
Definition: ioq.c:135
struct m0_vec ov_vec
Definition: vec.h:147
#define max_check(a, b)
Definition: arith.h:95
Definition: io.h:274
m0_bindex_t iq_offset
Definition: ioq.c:117
#define M0_ADDB2_PUSH(id,...)
Definition: addb2.h:261
struct m0_semaphore ioq_stop_sem[M0_STOB_IOQ_NR_THREADS]
Definition: ioq.h:89
struct m0_queue_link iq_linkage
Definition: ioq.c:120
M0_INTERNAL bool m0_stob_ioq_directio(struct m0_stob_ioq *ioq)
Definition: ioq.c:747
struct m0_bufvec data
Definition: di.c:40
M0_INTERNAL void m0_ha_send(struct m0_ha *ha, struct m0_ha_link *hl, const struct m0_ha_msg *msg, uint64_t *tag)
Definition: ha.c:862
const struct m0_stob_io_op * si_op
Definition: io.h:328
uint64_t m0_bindex_t
Definition: types.h:80
struct m0_chan si_wait
Definition: io.h:318
M0_INTERNAL void m0_timer_thread_detach(struct m0_timer_locality *loc)
Definition: timer.c:153
uint64_t m0_bcount_t
Definition: types.h:77
static void ioq_queue_submit(struct m0_stob_ioq *ioq)
Definition: ioq.c:393
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
M0_INTERNAL int m0_timer_init(struct m0_timer *timer, enum m0_timer_type type, struct m0_timer_locality *loc, m0_timer_callback_t callback, unsigned long data)
Definition: timer.c:39
M0_INTERNAL struct m0 * m0_get(void)
Definition: instance.c:41
M0_INTERNAL const struct m0_fid * m0_stob_fid_get(struct m0_stob *stob)
Definition: stob.c:255
void m0_addb2_hist_mod(struct m0_addb2_hist *hist, int64_t val)
Definition: histogram.c:68
static int void * buf
Definition: dir.c:1019
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
Definition: fid.c:106
int(* sio_launch)(struct m0_stob_io *io)
Definition: io.h:439
void ** ov_buf
Definition: vec.h:149
struct m0_stob sl_stob
Definition: linux.h:57
Definition: sock.c:887
struct m0_fid sl_conf_sdev
Definition: linux.h:64
m0_bcount_t iq_nbytes
Definition: ioq.c:116
M0_INTERNAL bool m0_queue_is_empty(const struct m0_queue *q)
Definition: queue.c:65
struct m0_fid fid
Definition: di.c:46
struct m0_vec iv_vec
Definition: vec.h:139
bool ioq_use_directio
Definition: ioq.h:66
return M0_RC(rc)
static int head(struct m0_sm *mach)
Definition: sm.c:468
#define M0_ASSERT_EX(cond)
M0_INTERNAL void m0_timer_fini(struct m0_timer *timer)
Definition: timer.c:65
struct m0_bufvec si_user
Definition: io.h:300
M0_INTERNAL m0_bcount_t m0_stob_ioq_bmask(struct m0_stob_ioq *ioq)
Definition: ioq.c:742
#define M0_ENTRY(...)
Definition: trace.h:170
M0_INTERNAL struct m0_stob_linux * m0_stob_linux_container(struct m0_stob *stob)
Definition: linux.c:100
m0_bindex_t * iv_index
Definition: vec.h:141
int sl_fd
Definition: linux.h:60
int opcode
Definition: crate.c:301
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
M0_INTERNAL bool m0_vec_is_empty(const struct m0_vec *vec)
Definition: vec.c:58
return M0_ERR(-EOPNOTSUPP)
static void ioq_queue_put(struct m0_stob_ioq *ioq, struct ioq_qev *qev)
Definition: ioq.c:367
struct m0_timer_locality ioq_stop_timer_loc[M0_STOB_IOQ_NR_THREADS]
Definition: ioq.h:91
M0_INTERNAL void m0_stob_ioq_directio_setup(struct m0_stob_ioq *ioq, bool use_directio)
Definition: ioq.c:752
static void put(void)
Definition: client_ut.c:2450
Definition: stob.h:163
M0_INTERNAL void m0_vec_cursor_init(struct m0_vec_cursor *cur, const struct m0_vec *vec)
Definition: vec.c:92
M0_INTERNAL void m0_timer_stop(struct m0_timer *timer)
Definition: timer.c:86
struct m0_indexvec si_stob
Definition: io.h:311
int32_t si_rc
Definition: io.h:334
static struct m0_stob * stob
Definition: storage.c:39
#define m0_free0(pptr)
Definition: memory.h:77
struct m0_atomic64 si_done
Definition: ioq.c:131
#define M0_ASSERT(cond)
static void stob_ioq_thread(struct m0_stob_ioq *ioq)
Definition: ioq.c:637
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
#define STOB_ID_P(si)
Definition: stob.h:109
M0_INTERNAL void m0_timer_start(struct m0_timer *timer, m0_time_t expire)
Definition: timer.c:75
m0_time_t m0_time_now(void)
Definition: time.c:134
static unsigned long stob_ioq_timer_cb(unsigned long data)
Definition: ioq.c:596
M0_INTERNAL void m0_queue_link_init(struct m0_queue_link *ql)
Definition: queue.c:71
m0_time_t si_start
Definition: io.h:405
Definition: io.h:244
uint32_t si_nr
Definition: ioq.c:129
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
static const struct timespec ioq_timeout_default
Definition: ioq.c:591
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
int64_t emulate_disk_errors_nr
Definition: ioq.c:487
Definition: msg.h:115
struct m0_atomic64 si_bdone
Definition: ioq.c:133
static void ioq_complete(struct m0_stob_ioq *ioq, struct ioq_qev *qev, long res, long res2)
Definition: ioq.c:495
uint64_t si_id
Definition: io.h:406
uint32_t v_nr
Definition: vec.h:51
enum m0_stob_io_state si_state
Definition: io.h:345
Definition: io.h:255
Definition: io.h:285
static void ioq_io_error(struct m0_stob_ioq *ioq, struct ioq_qev *qev)
Definition: ioq.c:434
static bool m0_atomic64_dec_and_test(struct m0_atomic64 *a)
#define FID_P(f)
Definition: fid.h:77
M0_INTERNAL int m0_stob_ioq_init(struct m0_stob_ioq *ioq)
Definition: ioq.c:686
static struct m0_stob_io io
Definition: ad.c:59
void m0_addb2_pop(uint64_t id)
Definition: addb2.c:440
m0_bcount_t si_count
Definition: io.h:340
void m0_addb2_hist_add_auto(struct m0_addb2_hist *hist, int skip, uint64_t label, int idx)
Definition: histogram.c:50
struct m0_stob * si_obj
Definition: io.h:326
static uint32_t timeout
Definition: console.c:52
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
static int stob_linux_io_launch(struct m0_stob_io *io)
Definition: ioq.c:205
M0_INTERNAL struct m0_queue_link * m0_queue_get(struct m0_queue *q)
Definition: queue.c:112
static struct ioq_qev * ioq_queue_get(struct m0_stob_ioq *ioq)
Definition: ioq.c:351
struct m0_stob_ioq * si_ioq
Definition: ioq.c:137
M0_INTERNAL void m0_queue_put(struct m0_queue *q, struct m0_queue_link *ql)
Definition: queue.c:131
static uint32_t min32u(uint32_t a, uint32_t b)
Definition: arith.h:56
M0_INTERNAL m0_bcount_t m0_vec_cursor_step(const struct m0_vec_cursor *cur)
Definition: vec.c:125
static const struct m0_stob_io_op stob_linux_io_op
Definition: ioq.c:147
void * si_stob_private
Definition: io.h:367
Definition: ioq.c:114
Definition: fid.h:38
void(* t_func)(void *)
Definition: thread.h:114
M0_INTERNAL void * m0_stob_addr_open(const void *buf, uint32_t shift)
Definition: io.c:302
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
int ioq_queued
Definition: ioq.h:79
M0_INTERNAL bool m0_vec_cursor_move(struct m0_vec_cursor *cur, m0_bcount_t count)
Definition: vec.c:102
M0_INTERNAL struct m0_thread * m0_thread_self(void)
Definition: thread.c:122
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
M0_INTERNAL size_t m0_queue_length(const struct m0_queue *q)
Definition: queue.c:100
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_queue_init(struct m0_queue *q)
Definition: queue.c:53
M0_INTERNAL m0_bcount_t m0_stob_ioq_bsize(struct m0_stob_ioq *ioq)
Definition: ioq.c:737
M0_INTERNAL int m0_stob_linux_io_init(struct m0_stob *stob, struct m0_stob_io *io)
Definition: ioq.c:161
#define M0_MKTIME(secs, ns)
Definition: time.h:86
io_context_t ioq_ctx
Definition: ioq.h:75
#define STOB_ID_F
Definition: stob.h:108
M0_INTERNAL void m0_timer_locality_init(struct m0_timer_locality *loc)
Definition: timer.c:75
#define out(...)
Definition: gen.c:41
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
#define M0_FID0
Definition: fid.h:93
Definition: io.h:229
static int32_t min32(int32_t a, int32_t b)
Definition: arith.h:36
M0_INTERNAL void m0_queue_fini(struct m0_queue *q)
Definition: queue.c:59
static void m0_atomic64_add(struct m0_atomic64 *a, int64_t num)
void m0_free(void *data)
Definition: memory.c:146
struct m0_thread ioq_thread[M0_STOB_IOQ_NR_THREADS]
Definition: ioq.h:81
struct m0_stob_ioq sld_ioq
Definition: linux.h:49
static void stob_linux_io_release(struct stob_linux_io *lio)
Definition: ioq.c:181
struct m0_pdclust_src_addr src
Definition: fd.c:108
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
struct m0_stob_id so_id
Definition: stob.h:166
const m0_time_t M0_TIME_IMMEDIATELY
Definition: time.c:107
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
M0_INTERNAL bool m0_queue_link_is_in(const struct m0_queue_link *ql)
Definition: queue.c:81
#define FID_F
Definition: fid.h:75
Definition: trace.h:478
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
static int stob_ioq_thread_init(struct m0_stob_ioq *ioq)
Definition: ioq.c:604
struct m0_stob_linux_domain * sl_dom
Definition: linux.h:58
enum m0_stob_io_opcode si_opcode
Definition: io.h:286