Motr  M0
io_foms.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * For any questions about this software or licensing,
17  * please email opensource@seagate.com or cortx-questions@seagate.com.
18  *
19  */
20 
21 
22 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_IOSERVICE
23 #include "lib/trace.h"
24 #include "lib/errno.h"
25 #include "lib/memory.h"
26 #include "lib/tlist.h"
27 #include "lib/assert.h"
28 #include "lib/misc.h" /* M0_BITS */
29 #include "lib/finject.h"
30 #include "lib/cksum_utils.h"
31 #include "net/net_internal.h"
32 #include "net/buffer_pool.h"
33 #include "fop/fop.h"
34 #include "fop/fom_generic.h"
35 #include "stob/stob.h"
36 #include "stob/type.h" /* m0_stob_type_id_by_name */
37 #include "stob/domain.h" /* m0_stob_domain__dom_id */
38 #include "fid/fid.h"
39 #include "reqh/reqh_service.h"
40 #include "ioservice/io_foms.h"
41 #include "ioservice/io_service.h"
42 #include "ioservice/cob_foms.h" /* m0_cc_stob_create */
43 #include "ioservice/storage_dev.h" /* m0_storage_dev_stob_find */
44 #include "cob/cob.h" /* m0_cob_create */
45 #include "motr/magic.h"
46 #include "motr/setup.h" /* m0_cs_storage_devs_get */
47 #include "pool/pool.h"
48 #include "ioservice/io_addb2.h"
49 #include "sns/cm/cm.h" /* m0_sns_cm_fid_repair_done() */
50 #include "ioservice/fid_convert.h" /* m0_fid_convert_cob2stob */
51 #include "balloc/balloc.h" /* M0_BALLOC_NORMAL_ZONE */
52 #include "stob/addb2.h" /* M0_AVI_STOB_IO_REQ */
53 #include "layout/layout.h" /* m0_lid_to_unit_map */
54 
566 #define M0_OBJ_LAYOUT_ID(lid) (lid & 0x0000ffffffffffff)
567 
572 M0_TL_DESCR_DEFINE(stobio, "STOB I/O", static, struct m0_stob_io_desc,
573  siod_linkage, siod_magic,
575 M0_TL_DEFINE(stobio, static, struct m0_stob_io_desc);
576 
577 M0_TL_DESCR_DEFINE(netbufs, "Aquired net buffers", static,
578  struct m0_net_buffer, nb_extern_linkage, nb_magic,
580 M0_TL_DEFINE(netbufs, static, struct m0_net_buffer);
581 
582 M0_TL_DESCR_DEFINE(rpcbulkbufs, "rpc bulk buffers", static,
583  struct m0_rpc_bulk_buf, bb_link, bb_magic,
585 M0_TL_DEFINE(rpcbulkbufs, static, struct m0_rpc_bulk_buf);
586 
587 M0_TL_DESCR_DECLARE(bufferpools, M0_EXTERN);
588 M0_TL_DECLARE(bufferpools, M0_EXTERN, struct m0_rios_buffer_pool);
589 
590 M0_INTERNAL bool m0_is_read_fop(const struct m0_fop *fop);
591 M0_INTERNAL bool m0_is_write_fop(const struct m0_fop *fop);
592 M0_INTERNAL bool m0_is_io_fop(const struct m0_fop *fop);
593 M0_INTERNAL struct m0_fop_cob_rw *io_rw_get(struct m0_fop *fop);
594 M0_INTERNAL struct m0_fop_cob_rw_reply *io_rw_rep_get(struct m0_fop *fop);
595 M0_INTERNAL bool m0_is_cob_create_fop(const struct m0_fop *fop);
596 M0_INTERNAL bool m0_is_cob_delete_fop(const struct m0_fop *fop);
597 
598 static int m0_io_fom_cob_rw_create(struct m0_fop *fop, struct m0_fom **out,
599  struct m0_reqh *reqh);
600 static int m0_io_fom_cob_rw_tick(struct m0_fom *fom);
601 static void m0_io_fom_cob_rw_fini(struct m0_fom *fom);
602 static size_t m0_io_fom_cob_rw_locality_get(const struct m0_fom *fom);
603 M0_INTERNAL const char *m0_io_fom_cob_rw_service_name(struct m0_fom *fom);
604 static bool m0_io_fom_cob_rw_invariant(const struct m0_io_fom_cob_rw *io);
605 
606 static int net_buffer_acquire(struct m0_fom *);
607 static int io_prepare(struct m0_fom *);
608 static int io_launch(struct m0_fom *);
609 static int io_finish(struct m0_fom *);
610 static int io_sync(struct m0_fom *fom);
611 static int zero_copy_initiate(struct m0_fom *);
612 static int zero_copy_finish(struct m0_fom *);
613 static int net_buffer_release(struct m0_fom *);
614 static int nbuf_release_done(struct m0_fom *fom, int still_required);
615 static int cob_bytecount_increment(struct m0_cob *cob, struct m0_cob_bckey *key,
616  uint64_t bytecount, struct m0_be_tx *tx);
617 
618 static void io_fom_addb2_descr(struct m0_fom *fom);
619 
623 struct m0_fom_ops ops = {
625  .fo_tick = &m0_io_fom_cob_rw_tick,
626  .fo_home_locality = &m0_io_fom_cob_rw_locality_get,
627  .fo_addb2_descr = &io_fom_addb2_descr
628 };
629 
635 };
636 
644  M0_FOPH_IO_FOM_BUFFER_ACQUIRE, 0, "io-preparation", },
645 
648  M0_FOPH_IO_STOB_INIT, M0_FOPH_IO_FOM_BUFFER_WAIT, "network-buffer-acquire", },
649 
652  M0_FOPH_IO_STOB_INIT, M0_FOPH_IO_FOM_BUFFER_WAIT, "network-buffer-wait", },
653 
656  0, M0_FOPH_IO_STOB_WAIT, "stobio-launch", },
657 
660  M0_FOPH_IO_ZERO_COPY_INIT, 0, "stobio-finish", },
661 
664  0, M0_FOPH_IO_ZERO_COPY_WAIT, "zero-copy-initiate", },
665 
668  M0_FOPH_IO_BUFFER_RELEASE, 0, "zero-copy-finish", },
669 
672  M0_FOPH_IO_FOM_BUFFER_ACQUIRE, 0, "network-buffer-release", },
673 };
674 
682  M0_FOPH_IO_FOM_BUFFER_ACQUIRE, 0, "io-preparation", },
683 
687  "network-buffer-acquire", },
688 
692  "network-buffer-wait", },
693 
696  0, M0_FOPH_IO_ZERO_COPY_WAIT, "zero-copy-initiate", },
697 
700  M0_FOPH_IO_STOB_INIT, 0, "zero-copy-finish", },
701 
704  0, M0_FOPH_IO_STOB_WAIT, "stobio-launch", },
705 
708  M0_FOPH_IO_BUFFER_RELEASE, 0, "stobio-finish", },
709 
712  M0_FOPH_IO_FOM_BUFFER_ACQUIRE, 0, "network-buffer-release", },
713 
714 [M0_FOPH_IO_SYNC] =
716 
717 };
718 
721  .sd_name = "io-prepare",
725  },
727  .sd_name = "network-buffer-acquire",
728  .sd_allowed = M0_BITS(M0_FOPH_IO_STOB_INIT,
732  },
734  .sd_name = "network-buffer-wait",
735  .sd_allowed = M0_BITS(M0_FOPH_IO_STOB_INIT,
739  },
741  .sd_name = "stobio-launch",
742  .sd_allowed = M0_BITS(M0_FOPH_IO_STOB_WAIT,
744  },
746  .sd_name = "stobio-finish",
747  .sd_allowed = M0_BITS(M0_FOPH_IO_ZERO_COPY_INIT,
750  },
752  .sd_name = "zero-copy-initiate",
753  .sd_allowed = M0_BITS(M0_FOPH_IO_ZERO_COPY_WAIT,
755  },
757  .sd_name = "zero-copy-finish",
758  .sd_allowed = M0_BITS(M0_FOPH_IO_BUFFER_RELEASE,
762  },
764  .sd_name = "network-buffer-release",
767  },
768  [M0_FOPH_IO_SYNC] = {
769  .sd_name = "tx-sync",
771  },
772 };
773 
776  {"start-network-io", M0_FOPH_TXN_INIT, M0_FOPH_IO_FOM_PREPARE},
778  {"io-prepare-failed", M0_FOPH_IO_FOM_PREPARE, M0_FOPH_FAILURE},
779  {"transaction-opened", M0_FOPH_IO_FOM_PREPARE, M0_FOPH_IO_STOB_INIT},
780  {"network-buffer-acquired-stobio",
782  {"network-buffer-acquired-zerocopy",
784  {"network-buffer-wait",
786  {"network-buffer-acquire-failure",
788  {"network-buffer-wait-finished-stobio",
790  {"network-buffer-wait-finished-zerocopy",
792  {"network-buffer-not-available",
794  {"network-buffer-wait-failure",
796  {"stobio-launched", M0_FOPH_IO_STOB_INIT, M0_FOPH_IO_STOB_WAIT},
797  {"stobio-launch-failed", M0_FOPH_IO_STOB_INIT, M0_FOPH_FAILURE},
798  {"stobio-wait-finished-zerocopy",
800  {"stobio-wait-finished-buffer-release",
802  {"stobio-wait-failed", M0_FOPH_IO_STOB_WAIT, M0_FOPH_FAILURE},
803  {"zero-copy-initiated",
805  {"zero-copy-initiate-failed",
807  {"zero-copy-wait-finished-buffer-release",
809  {"zero-copy-wait-finished-stobio",
811  {"zero-copy-wait-finished-txn-open",
813  {"zero-copy-wait-failed", M0_FOPH_IO_ZERO_COPY_WAIT, M0_FOPH_FAILURE},
814  {"network-buffer-released",
816  {"tx-wait", M0_FOPH_QUEUE_REPLY, M0_FOPH_IO_SYNC },
817  {"tx-wait-more", M0_FOPH_IO_SYNC, M0_FOPH_IO_SYNC },
818  {"tx-wait-done", M0_FOPH_IO_SYNC, M0_FOPH_QUEUE_REPLY },
820 };
821 
822 struct m0_sm_conf io_conf = {
823  .scf_name = "io-fom",
824  .scf_nr_states = ARRAY_SIZE(io_phases),
825  .scf_state = io_phases,
826  .scf_trans_nr = ARRAY_SIZE(io_phases_trans),
827  .scf_trans = io_phases_trans,
828 };
829 
831 {
832  struct m0_fop_cob_rw *rwfop = io_rw_get(io->fcrw_gen.fo_fop);
833 
834  return
835  _0C(io->fcrw_ndesc == rwfop->crw_desc.id_nr) &&
836  _0C(io->fcrw_curr_desc_index >= 0) &&
837  _0C(io->fcrw_curr_desc_index <= rwfop->crw_desc.id_nr) &&
838  _0C(M0_CHECK_EX(m0_tlist_invariant(&netbufs_tl,
839  &io->fcrw_netbuf_list))) &&
840  _0C(io->fcrw_batch_size ==
841  netbufs_tlist_length(&io->fcrw_netbuf_list)) &&
842  _0C(io->fcrw_req_count >= io->fcrw_count) &&
843  _0C(io->fcrw_curr_size <= io->fcrw_total_ioivec_cnt) &&
844  _0C(io->fcrw_num_stobio_launched <=
845  stobio_tlist_length(&io->fcrw_stio_list)) &&
846  _0C(ergo(io->fcrw_num_stobio_launched <
847  stobio_tlist_length(&io->fcrw_stio_list),
848  m0_fom_phase(&io->fcrw_gen) == M0_FOPH_IO_STOB_WAIT));
849 }
850 
851 static bool m0_stob_io_desc_invariant(const struct m0_stob_io_desc *stobio_desc)
852 {
853  return stobio_desc->siod_magic == M0_STOB_IO_DESC_LINK_MAGIC;
854 }
855 
864 static void stobio_complete_cb(struct m0_fom_callback *cb)
865 {
866  struct m0_fom *fom = cb->fc_fom;
867  struct m0_io_fom_cob_rw *fom_obj;
868  struct m0_stob_io_desc *stio_desc;
869 
872  stio_desc = container_of(cb, struct m0_stob_io_desc, siod_fcb);
874 
875  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
877 
878  /* Update checksum count in reply fop for read only */
879  if (m0_is_read_fop(fom->fo_fop)) {
880  struct m0_fop_cob_rw_reply *rwrep = io_rw_rep_get(fom->fo_rep_fop);
881 
882  /* The si_cksum_nob_read will get updated in function stob_ad_read_prepare
883  * for every emap segment read with non-zero CS, this value gets updated.
884  */
886 
887  M0_ASSERT( rwrep->rwr_cksum_nob_read <=
888  rwrep->rwr_di_data_cksum.b_nob );
889  }
890 
892  if (fom_obj->fcrw_num_stobio_launched == 0)
893  m0_fom_ready(fom);
894 }
895 
896 enum {
900  IO_FOM_STOB_KEY_MAX = 0x10000000ULL,
901 };
902 
903 M0_INTERNAL int m0_io_cob_create(struct m0_cob_domain *cdom,
904  struct m0_fid *fid,
905  struct m0_fid *pver,
906  uint64_t lid,
907  struct m0_be_tx *tx)
908 {
909  int rc;
910  struct m0_cob *cob;
911  struct m0_cob_nskey *nskey = NULL;
912  struct m0_cob_nsrec nsrec = {};
913  struct m0_fid gfid;
914 
915  M0_ENTRY("COB fid:"FID_F"pver:"FID_F, FID_P(fid), FID_P(pver));
916  rc = m0_cob_alloc(cdom, &cob);
917  if (rc != 0)
918  return M0_RC(rc);
919 
922  if (rc != 0) {
923  m0_cob_put(cob);
924  return M0_RC(rc);
925  }
926 
927  m0_cob_nsrec_init(&nsrec);
928  nsrec.cnr_fid = *fid;
929  nsrec.cnr_nlink = 1;
930  nsrec.cnr_pver = *pver;
931  nsrec.cnr_lid = lid;
932 
933  rc = m0_cob_create(cob, nskey, &nsrec, NULL, NULL, tx);
934  if (rc != 0)
935  m0_free(nskey);
936  m0_cob_put(cob);
937 
938  return M0_RC(rc);
939 }
940 
941 static struct m0_cob_domain *fom_cdom(struct m0_fom *fom)
942 {
943  struct m0_reqh_io_service *ios;
944 
945  ios = container_of(fom->fo_service, struct m0_reqh_io_service,
946  rios_gen);
947 
948  return ios->rios_cdom;
949 }
950 
951 M0_INTERNAL int m0_io_cob_stob_create(struct m0_fom *fom,
952  struct m0_cob_domain *cdom,
953  struct m0_fid *fid,
954  struct m0_fid *pver,
955  uint64_t lid,
956  bool crow,
957  struct m0_cob **out)
958 {
959  struct m0_cob_oikey oikey;
960  struct m0_cob *cob;
961  struct m0_stob_id stob_id;
962  bool cob_recreate = false;
963  int rc;
964 
965  M0_ENTRY("COB:"FID_F"pver:"FID_F, FID_P(fid), FID_P(pver));
966  m0_cob_oikey_make(&oikey, fid, 0);
967  rc = m0_cob_locate(cdom, &oikey, 0, &cob);
968  if (rc == 0 && cob != NULL &&
972  cob_recreate = true;
973  M0_LOG(M0_DEBUG, "Cob pool version mismatch" FID_F FID_F,
975  } else if (rc == -ENOENT && crow) {
976  m0_fid_convert_cob2stob(fid, &stob_id);
977  rc = m0_cc_stob_create(fom, &stob_id);
978  cob_recreate = true;
979  M0_LOG(M0_INFO, "Create on write if cob doesn't exists");
980  }
981 
982  if (rc == 0 && cob_recreate) {
983  if (cob != NULL)
984  m0_cob_put(cob);
985  rc = m0_io_cob_create(cdom, fid, pver, lid, m0_fom_tx(fom)) ?:
986  m0_cob_locate(cdom, &oikey, 0, &cob);
987  }
988 
989  if (rc == 0)
990  *out = cob;
991 
992  return M0_RC(rc);
993 }
994 
995 static int io_fom_cob2file(struct m0_fom *fom, struct m0_fid *fid,
996  struct m0_file **out)
997 {
998  int rc;
999  struct m0_io_fom_cob_rw *fom_obj;
1000  struct m0_fid *pver;
1001  struct m0_fop_cob_rw *rwfop;
1002  bool crow;
1003 
1004  M0_PRE(fom != NULL);
1006 
1007  rwfop = io_rw_get(fom->fo_fop);
1008  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1009  pver = &fom_obj->fcrw_pver->pv_id;
1010  crow = fom_obj->fcrw_flags & M0_IO_FLAG_CROW;
1012  crow, &fom_obj->fcrw_cob);
1013  if (rc == 0)
1014  *out = &fom_obj->fcrw_cob->co_file;
1015 
1016  return M0_RC(rc);
1017 }
1018 
1035 static int align_bufvec(struct m0_fom *fom,
1036  struct m0_bufvec *obuf,
1037  struct m0_bufvec *ibuf,
1038  m0_bcount_t ivec_count,
1039  uint32_t bshift)
1040 {
1041  int rc;
1042  int i;
1043  m0_bcount_t blk;
1044  bool all4k = true;
1045 
1046  M0_PRE(fom != NULL);
1047  M0_PRE(obuf != NULL);
1048  M0_PRE(ibuf != NULL);
1049 
1050  for (i = 0, blk = 0; i < ibuf->ov_vec.v_nr; ++i) {
1051  blk += ibuf->ov_vec.v_count[i] >> bshift;
1052  if (blk >= ivec_count)
1053  break;
1054  }
1055  if (i == ibuf->ov_vec.v_nr)
1056  return M0_ERR(-EPROTO);
1057 
1058  rc = m0_bufvec_empty_alloc(obuf, i + 1);
1059  if (rc != 0)
1060  return M0_ERR(rc);
1061 
1062  /* Align bufvec before copying to bufvec from stob io */
1063  for (i = 0; i < obuf->ov_vec.v_nr; ++i) {
1064  blk = min64u(ibuf->ov_vec.v_count[i] >> bshift, ivec_count);
1065  all4k &= ibuf->ov_vec.v_count[i] == 4096;
1066  obuf->ov_vec.v_count[i] = blk;
1067  obuf->ov_buf[i] = m0_stob_addr_pack(ibuf->ov_buf[i],
1068  bshift);
1069  ivec_count -= blk;
1070  M0_ASSERT((ivec_count > 0) == (i < obuf->ov_vec.v_nr - 1));
1071  M0_ASSERT((ivec_count == 0) == (i == obuf->ov_vec.v_nr - 1));
1072  }
1073 
1074  /*
1075  * Performance: Compaction special case for block size of 1M (256 * 4k).
1076  * Leads to significant pressure reduction on BE layer by means of
1077  * BE allocations minimisation.
1078  */
1079  if (all4k && obuf->ov_vec.v_nr == 256) {
1080  m0_bufvec_pack(obuf);
1081 
1082  /* one block of 1M */
1083  M0_ASSERT(obuf->ov_vec.v_nr == 1);
1084  M0_ASSERT(obuf->ov_vec.v_count[0] == ((1 << 20) >> bshift));
1085  }
1086 
1087  return 0;
1088 }
1089 
1090 static int fom_cob_locate(struct m0_fom *fom)
1091 {
1092  struct m0_io_fom_cob_rw *fom_obj;
1093  struct m0_cob_oikey oikey;
1094  struct m0_fop_cob_rw *rwfop;
1095  int rc;
1096 
1097  rwfop = io_rw_get(fom->fo_fop);
1098  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1099  m0_cob_oikey_make(&oikey, &rwfop->crw_fid, 0);
1100  rc = m0_cob_locate(fom_cdom(fom), &oikey, 0, &fom_obj->fcrw_cob);
1101 
1102  return M0_RC(rc);
1103 }
1104 
1108 static int stob_object_find(struct m0_fom *fom)
1109 {
1110  struct m0_storage_devs *devs = m0_cs_storage_devs_get();
1111  struct m0_io_fom_cob_rw *fom_obj;
1112  struct m0_stob_id stob_id;
1113  struct m0_fop_cob_rw *rwfop;
1114  int rc;
1115 
1116  M0_PRE(fom != NULL);
1117  M0_PRE(m0_is_io_fop(fom->fo_fop));
1118  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1119  M0_PRE(fom_obj->fcrw_stob == NULL);
1120 
1122 
1123  rwfop = io_rw_get(fom->fo_fop);
1124 
1125  m0_fid_convert_cob2stob(&rwfop->crw_fid, &stob_id);
1126  rc = m0_storage_dev_stob_find(devs, &stob_id, &fom_obj->fcrw_stob);
1127  if (rc != 0)
1128  fom_obj->fcrw_stob = NULL;
1129 
1130  return rc;
1131 }
1132 
1144 M0_INTERNAL int m0_io_fom_cob_rw_create(struct m0_fop *fop, struct m0_fom **out,
1145  struct m0_reqh *reqh)
1146 {
1147  int rc = 0;
1148  struct m0_fom *fom;
1149  struct m0_io_fom_cob_rw *fom_obj;
1150  struct m0_fop_cob_rw *rwfop;
1151  struct m0_fop *rep_fop;
1152 
1153  M0_PRE(fop != NULL);
1155  M0_PRE(out != NULL);
1156 
1157  M0_ENTRY("fop=%p", fop);
1158 
1159  M0_ALLOC_PTR(fom_obj);
1160  if (fom_obj == NULL)
1161  return M0_RC(-ENOMEM);
1162 
1166  if (rep_fop == NULL) {
1167  m0_free(fom_obj);
1168  return M0_RC(-ENOMEM);
1169  }
1170 
1171  fom = &fom_obj->fcrw_gen;
1172  rwfop = io_rw_get(fop);
1173  *out = fom;
1175 
1176  fom_obj->fcrw_fom_start_time = m0_time_now();
1177  fom_obj->fcrw_stob = NULL;
1178  fom_obj->fcrw_cob = NULL;
1179  fom_obj->fcrw_cob_size = 0;
1180  fom_obj->fcrw_ndesc = rwfop->crw_desc.id_nr;
1181  fom_obj->fcrw_curr_desc_index = 0;
1182  fom_obj->fcrw_curr_size = 0;
1183  fom_obj->fcrw_batch_size = 0;
1184  fom_obj->fcrw_req_count = 0;
1185  fom_obj->fcrw_total_ioivec_cnt = m0_io_count(&rwfop->crw_ivec);
1186  fom_obj->fcrw_count = 0;
1187  fom_obj->fcrw_num_stobio_launched = 0;
1188  fom_obj->fcrw_bp = NULL;
1189  fom_obj->fcrw_flags = rwfop->crw_flags;
1190 
1191  netbufs_tlist_init(&fom_obj->fcrw_netbuf_list);
1192  stobio_tlist_init(&fom_obj->fcrw_stio_list);
1193  stobio_tlist_init(&fom_obj->fcrw_done_list);
1194 
1195  M0_LOG(M0_DEBUG, "fcrw_total_ioivec_cnt = %"PRIu64, fom_obj->fcrw_total_ioivec_cnt);
1196  M0_LOG(M0_DEBUG, "fom=%p : op=%s, desc=%d gfid"FID_F"cob fid"FID_F
1197  "pver"FID_F, fom, m0_is_read_fop(fop) ? "READ" : "WRITE",
1198  rwfop->crw_desc.id_nr, FID_P(&rwfop->crw_gfid),
1199  FID_P(&rwfop->crw_fid), FID_P(&rwfop->crw_pver));
1200 
1201  return M0_RC(rc);
1202 }
1203 
1204 static int io_prepare(struct m0_fom *fom)
1205 {
1206  struct m0_fop_cob_rw *rwfop;
1207  struct m0_io_fom_cob_rw *fom_obj;
1208  struct m0_poolmach *poolmach;
1209  struct m0_reqh *reqh;
1210  struct m0_motr *motr;
1211  struct m0_fop_cob_rw_reply *rwrep;
1212  enum m0_pool_nd_state device_state = 0;
1213  int rc;
1214  struct m0_pools_common *pc;
1215 
1216  M0_ENTRY("fom=%p", fom);
1217 
1218  reqh = m0_fom_reqh(fom);
1219  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1221  rwfop = io_rw_get(fom->fo_fop);
1222  rwrep = io_rw_rep_get(fom->fo_rep_fop);
1223  M0_ASSERT(fom_obj->fcrw_pver == NULL);
1224  motr = m0_cs_ctx_get(reqh);
1225  pc = &motr->cc_pools_common;
1226  M0_LOG(M0_DEBUG, "Preparing %s IO index:%d @"FID_F"pver"FID_F,
1227  m0_is_read_fop(fom->fo_fop) ? "Read": "Write",
1228  (int)rwfop->crw_index,
1229  FID_P(&rwfop->crw_fid), FID_P(&rwfop->crw_pver));
1231  &rwfop->crw_pver);
1232  if (fom_obj->fcrw_pver == NULL) {
1233  M0_LOG(M0_ERROR, "pool version not found for %s IO index:%d @"
1234  FID_F"pver"FID_F,
1235  m0_is_read_fop(fom->fo_fop) ? "Read": "Write",
1236  (int)rwfop->crw_index,
1237  FID_P(&rwfop->crw_fid), FID_P(&rwfop->crw_pver));
1238  rc = M0_ERR(-EINVAL);
1239  goto out;
1240  }
1241 
1242  /*
1243  * Dumps the state of SNS repair with respect to global fid
1244  * from IO fop.
1245  * The IO request has already acquired file level lock on
1246  * given global fid.
1247  * Also due configuration update pool version may not be present in memory,
1248  * so check for in memory pool version.
1249  */
1251  fom_obj->fcrw_pver = m0_pool_version_lookup(pc, &rwfop->crw_pver);
1252  if (fom_obj->fcrw_pver == NULL) {
1254  M0_LOG(M0_ERROR, "pool version not found for %s IO index:%d @"
1255  FID_F"pver"FID_F,
1256  m0_is_read_fop(fom->fo_fop) ? "Read": "Write",
1257  (int)rwfop->crw_index,
1258  FID_P(&rwfop->crw_fid), FID_P(&rwfop->crw_pver));
1259  rc = M0_ERR(-EINVAL);
1260  goto out;
1261  }
1262  poolmach = &fom_obj->fcrw_pver->pv_mach;
1263  rc = m0_poolmach_device_state(poolmach,
1264  rwfop->crw_index,
1265  &device_state);
1267  if (rc != 0) {
1268  M0_LOG(M0_ERROR, "pm=(%p:%p device=%d state=%d)",
1269  poolmach, poolmach->pm_pver,
1270  rwfop->crw_index,
1271  device_state);
1272  goto out;
1273  }
1274  if (device_state != M0_PNDS_ONLINE) {
1275  M0_LOG(M0_DEBUG, "IO @"FID_F" on failed device: %d "
1276  "(state = %d)",
1277  FID_P(&rwfop->crw_fid),
1278  rwfop->crw_index,
1279  device_state);
1280  rc = M0_RC(-EIO);
1281  }
1282 out:
1283  if (rc != 0)
1286  reqh, device_state);
1287  M0_LEAVE();
1288  return M0_FSO_AGAIN;
1289 }
1305 static int net_buffer_acquire(struct m0_fom *fom)
1306 {
1307  uint32_t colour;
1308  int acquired_net_bufs;
1309  int required_net_bufs;
1310  struct m0_fop *fop;
1311  struct m0_io_fom_cob_rw *fom_obj;
1312  struct m0_net_transfer_mc *tm;
1313  struct m0_net_buffer_pool *pool;
1314 
1315  M0_PRE(fom != NULL);
1316  M0_PRE(m0_is_io_fop(fom->fo_fop));
1317  M0_PRE(fom->fo_service != NULL);
1320 
1321  M0_ENTRY("fom=%p", fom);
1322 
1323  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1325 
1326  fop = fom->fo_fop;
1327  fom_obj->fcrw_phase_start_time = m0_time_now();
1328 
1329  tm = m0_fop_tm_get(fop);
1333  pool = fom_obj->fcrw_bp;
1334  if (pool == NULL) {
1335  struct m0_reqh_io_service *serv_obj;
1336  struct m0_rios_buffer_pool *bpdesc;
1337 
1338  serv_obj = container_of(fom->fo_service,
1339  struct m0_reqh_io_service, rios_gen);
1341 
1342  /* Get network buffer pool for network domain */
1343  bpdesc = m0_tl_find(bufferpools, bpd,
1344  &serv_obj->rios_buffer_pools,
1345  bpd->rios_ndom == tm->ntm_dom);
1346  M0_ASSERT(bpdesc != NULL);
1347  fom_obj->fcrw_bp = pool = &bpdesc->rios_bp;
1348  }
1349  colour = m0_net_tm_colour_get(tm);
1350 
1351  acquired_net_bufs = netbufs_tlist_length(&fom_obj->fcrw_netbuf_list);
1352  required_net_bufs = fom_obj->fcrw_ndesc - fom_obj->fcrw_curr_desc_index;
1353 
1354  /*
1355  * Acquire as many net buffers as to process all descriptors.
1356  * If FOM is able to acquire more buffers then it can change batch size
1357  * dynamically.
1358  */
1359  M0_ASSERT(acquired_net_bufs <= required_net_bufs);
1360  while (acquired_net_bufs < required_net_bufs) {
1361  struct m0_net_buffer *nb;
1362 
1364  nb = m0_net_buffer_pool_get(pool, colour);
1365 
1366  if (nb == NULL && acquired_net_bufs == 0) {
1367  struct m0_rios_buffer_pool *bpdesc;
1368 
1369  /*
1370  * Network buffer is not available. At least one
1371  * buffer is need for zero-copy. Registers FOM clink
1372  * with buffer pool wait channel to get buffer
1373  * pool non-empty signal.
1374  */
1375  bpdesc = container_of(pool, struct m0_rios_buffer_pool,
1376  rios_bp);
1377  m0_fom_wait_on(fom, &bpdesc->rios_bp_wait, &fom->fo_cb);
1380  M0_LEAVE();
1381  return M0_FSO_WAIT;
1382  } else if (nb == NULL) {
1384  /*
1385  * Some network buffers are available for zero copy
1386  * init. FOM can continue with available buffers.
1387  */
1388  break;
1389  }
1390  acquired_net_bufs++;
1391  /* Signal next possible waiter for buffers. */
1392  if (acquired_net_bufs == required_net_bufs && pool->nbp_free > 0)
1393  pool->nbp_ops->nbpo_not_empty(pool);
1395 
1396  if (m0_is_read_fop(fop))
1398  else
1400 
1401  M0_INVARIANT_EX(m0_tlist_invariant(&netbufs_tl,
1402  &fom_obj->fcrw_netbuf_list));
1403 
1404  netbufs_tlink_init(nb);
1405  netbufs_tlist_add(&fom_obj->fcrw_netbuf_list, nb);
1406  }
1407 
1408  fom_obj->fcrw_batch_size = acquired_net_bufs;
1409  M0_LOG(M0_DEBUG, "required=%d acquired=%d", required_net_bufs,
1410  acquired_net_bufs);
1411 
1412  M0_LEAVE();
1413  return M0_FSO_AGAIN;
1414 }
1415 
1428 static int net_buffer_release(struct m0_fom *fom)
1429 {
1430  int still_required;
1431  int rc = 0;
1432  struct m0_io_fom_cob_rw *fom_obj;
1433 
1435 
1436  M0_ENTRY("fom=%p", fom);
1437 
1438  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1439  still_required = fom_obj->fcrw_ndesc - fom_obj->fcrw_curr_desc_index;
1440 
1441  nbuf_release_done(fom, still_required);
1442 
1443  if (still_required == 0)
1445 
1446  M0_LEAVE();
1447  return M0_FSO_AGAIN;
1448 }
1449 
1450 static int nbuf_release_done(struct m0_fom *fom, int still_required)
1451 {
1452  uint32_t colour;
1453  int acquired;
1454  int released = 0;
1455  struct m0_fop *fop;
1456  struct m0_io_fom_cob_rw *fom_obj;
1457  struct m0_net_transfer_mc *tm;
1458 
1459  M0_PRE(fom != NULL);
1460  M0_PRE(m0_is_read_fop(fom->fo_fop) || m0_is_write_fop(fom->fo_fop));
1461  M0_PRE(fom->fo_service != NULL);
1462 
1463  M0_ENTRY("fom=%p", fom);
1464 
1465  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1467  M0_ASSERT(fom_obj->fcrw_bp != NULL);
1468 
1469  fop = fom->fo_fop;
1470  tm = m0_fop_tm_get(fop);
1471  colour = m0_net_tm_colour_get(tm);
1472 
1473  M0_INVARIANT_EX(m0_tlist_invariant(&netbufs_tl,
1474  &fom_obj->fcrw_netbuf_list));
1475  acquired = netbufs_tlist_length(&fom_obj->fcrw_netbuf_list);
1476 
1477  m0_net_buffer_pool_lock(fom_obj->fcrw_bp);
1478  while (acquired > still_required) {
1479  struct m0_net_buffer *nb;
1480 
1481  nb = netbufs_tlist_tail(&fom_obj->fcrw_netbuf_list);
1482  M0_ASSERT(nb != NULL);
1483  m0_net_buffer_pool_put(fom_obj->fcrw_bp, nb, colour);
1484  netbufs_tlink_del_fini(nb);
1485  --acquired;
1486  ++released;
1487  }
1489 
1490  fom_obj->fcrw_batch_size = acquired;
1491  M0_LOG(M0_DEBUG, "Released %d network buffer(s), batch_size = %d.",
1492  released, still_required);
1493 
1494  M0_LEAVE();
1495  return M0_FSO_AGAIN;
1496 }
1497 
1509 static int zero_copy_initiate(struct m0_fom *fom)
1510 {
1511  int rc;
1512  struct m0_fop *fop;
1513  struct m0_io_fom_cob_rw *fom_obj;
1514  struct m0_fop_cob_rw *rwfop;
1515  struct m0_rpc_bulk *rbulk;
1516  struct m0_net_buffer *nb;
1518  struct m0_net_domain *dom;
1519  uint32_t buffers_added = 0;
1520  m0_bcount_t max_seg_size;
1521 
1522  M0_PRE(fom != NULL);
1523  M0_PRE(m0_is_io_fop(fom->fo_fop));
1525 
1526  M0_ENTRY("fom=%p", fom);
1527 
1528  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1530 
1531  fom_obj->fcrw_phase_start_time = m0_time_now();
1532 
1533  fop = fom->fo_fop;
1534  rwfop = io_rw_get(fop);
1535  rbulk = &fom_obj->fcrw_bulk;
1536  m0_rpc_bulk_init(rbulk);
1537 
1538  M0_INVARIANT_EX(m0_tlist_invariant(&netbufs_tl,
1539  &fom_obj->fcrw_netbuf_list));
1542  nbd_data = &rwfop->crw_desc.id_descs[fom_obj->
1543  fcrw_curr_desc_index];
1544 
1545  /* Create rpc bulk bufs list using available net buffers */
1546  m0_tl_for(netbufs, &fom_obj->fcrw_netbuf_list, nb) {
1547  uint32_t segs_nr = 0;
1548  struct m0_rpc_bulk_buf *rb_buf;
1549  m0_bcount_t used_size;
1550 
1551  used_size = rwfop->crw_desc.id_descs[fom_obj->
1552  fcrw_curr_desc_index].bdd_used;
1553 
1554  segs_nr = (used_size + max_seg_size - 1) / max_seg_size;
1555 
1556  M0_LOG(M0_DEBUG, "segs_nr %d", segs_nr);
1557 
1558  /*
1559  * @todo : Since passing only number of segments, supports full
1560  * stripe I/Os. Should set exact count for last segment
1561  * of network buffer. Also need to reset last segment
1562  * count to original since buffers are reused by other
1563  * I/O requests.
1564  */
1565  rc = m0_rpc_bulk_buf_add(rbulk, segs_nr, used_size,
1566  dom, nb, &rb_buf);
1567  if (rc != 0) {
1568  if (!M0_FI_ENABLED("keep-net-buffers"))
1569  nbuf_release_done(fom, 0);
1571  M0_LEAVE();
1572  return M0_FSO_AGAIN;
1573  }
1574 
1575  fom_obj->fcrw_curr_desc_index++;
1576  buffers_added++;
1577  } m0_tl_endfor;
1578 
1579  M0_ASSERT(buffers_added == fom_obj->fcrw_batch_size);
1580 
1581  /*
1582  * On completion of zero-copy on all buffers rpc_bulk
1583  * sends signal on channel rbulk->rb_chan.
1584  */
1585  m0_mutex_lock(&rbulk->rb_mutex);
1586  m0_fom_wait_on(fom, &rbulk->rb_chan, &fom->fo_cb);
1587  m0_mutex_unlock(&rbulk->rb_mutex);
1588 
1590  rbulk->rb_id);
1591  /*
1592  * This function deletes m0_rpc_bulk_buf object one
1593  * by one as zero copy completes on respective buffer.
1594  */
1597  if (rc != 0) {
1598  m0_mutex_lock(&rbulk->rb_mutex);
1599  m0_fom_callback_cancel(&fom->fo_cb);
1600  m0_mutex_unlock(&rbulk->rb_mutex);
1602  m0_rpc_bulk_fini(rbulk);
1603  nbuf_release_done(fom, 0);
1605  M0_LEAVE();
1606  return M0_FSO_AGAIN;
1607  }
1608  M0_LOG(M0_DEBUG, "Zero-copy initiated. Added buffers %d",
1609  buffers_added);
1610 
1611  M0_LEAVE();
1612  return M0_FSO_WAIT;
1613 }
1614 
1624 static int zero_copy_finish(struct m0_fom *fom)
1625 {
1626  struct m0_io_fom_cob_rw *fom_obj;
1627  struct m0_rpc_bulk *rbulk;
1628 
1629  M0_ENTRY("fom=%p", fom);
1630 
1631  M0_PRE(fom != NULL);
1632  M0_PRE(m0_is_io_fop(fom->fo_fop));
1634 
1635  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1637 
1638  rbulk = &fom_obj->fcrw_bulk;
1639 
1640  m0_mutex_lock(&rbulk->rb_mutex);
1641  M0_ASSERT(rpcbulkbufs_tlist_is_empty(&rbulk->rb_buflist));
1642  if (rbulk->rb_rc != 0){
1643  m0_mutex_unlock(&rbulk->rb_mutex);
1644 
1645  if (!M0_FI_ENABLED("keep-net-buffers")) {
1646  m0_rpc_bulk_fini(rbulk);
1647  nbuf_release_done(fom, 0);
1648  }
1650 
1651  M0_LEAVE();
1652  return M0_FSO_AGAIN;
1653  }
1654 
1655  m0_mutex_unlock(&rbulk->rb_mutex);
1656  m0_rpc_bulk_fini(rbulk);
1657  M0_LOG(M0_DEBUG, "Zero-copy finished.");
1658 
1659  M0_LEAVE();
1660  return M0_FSO_AGAIN;
1661 }
1662 
1663 static void stio_desc_fini(struct m0_stob_io_desc *stio_desc)
1664 {
1665  if (stobio_tlink_is_in(stio_desc))
1666  stobio_tlist_remove(stio_desc);
1667  m0_bufvec_free2(&stio_desc->siod_stob_io.si_user);
1668 }
1669 
1670 M0_INTERNAL uint64_t m0_io_size(struct m0_stob_io *sio, uint32_t bshift)
1671 {
1672  uint64_t last_io_off;
1673  uint64_t off;
1674 
1675  last_io_off = sio->si_stob.iv_vec.v_nr - 1;
1676  off = (sio->si_stob.iv_index[last_io_off] + 1) << bshift;
1677  return off;
1678 }
1679 
1696 static int io_launch(struct m0_fom *fom)
1697 {
1698  int rc;
1699  struct m0_fop *fop;
1700  struct m0_io_fom_cob_rw *fom_obj;
1701  struct m0_net_buffer *nb;
1702  struct m0_fop_cob_rw *rwfop;
1703  struct m0_file *file = NULL;
1704  uint32_t index;
1705 
1706  M0_PRE(fom != NULL);
1707  M0_PRE(m0_is_io_fop(fom->fo_fop));
1709 
1710  M0_ENTRY("fom=%p", fom);
1711 
1712  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1714  M0_ASSERT(fom_obj->fcrw_num_stobio_launched == 0);
1715  M0_ASSERT(fom_obj->fcrw_io.si_stob.iv_vec.v_nr > 0);
1716 
1717  fom_obj->fcrw_phase_start_time = m0_time_now();
1718 
1719  fop = fom->fo_fop;
1720  rwfop = io_rw_get(fop);
1721 
1722  rc = io_fom_cob2file(fom, &rwfop->crw_fid, &file);
1723  if (rc != 0)
1724  goto out;
1725 
1726  /*
1727  Since the upper layer IO block size could differ with IO block size
1728  of storage object, the block alignment and mapping is necessary.
1729  */
1730  fom_obj->fcrw_bshift = m0_stob_block_shift(fom_obj->fcrw_stob);
1731 
1732  M0_INVARIANT_EX(m0_tlist_invariant(&netbufs_tl,
1733  &fom_obj->fcrw_netbuf_list));
1735  &fom_obj->fcrw_stio_list));
1736 
1737  index = fom_obj->fcrw_curr_desc_index;
1738  /*
1739  * During write, zero copy is performed before stob I/O is launched.
1740  * For zero copy processing fcrw_curr_desc_index is
1741  * incremented for each processed net buffer. Whereas for read, first
1742  * stob I/O is launched and then zero copy is performed. So index is
1743  * decremented by value of number of net buffers to process in case of
1744  * write operation.
1745  */
1746  index -= m0_is_write_fop(fop) ?
1747  netbufs_tlist_length(&fom_obj->fcrw_netbuf_list) : 0;
1748 
1749  m0_tl_for(netbufs, &fom_obj->fcrw_netbuf_list, nb) {
1750  struct m0_indexvec *mem_ivec;
1751  struct m0_stob_io_desc *stio_desc;
1752  struct m0_stob_io *stio;
1753  struct m0_stob *stob;
1754  m0_bcount_t ivec_count;
1755  struct m0_buf *di_buf;
1756  struct m0_bufvec cksum_data;
1757  stio_desc = &fom_obj->fcrw_stio[index++];
1758  stio = &stio_desc->siod_stob_io;
1759  stob = fom_obj->fcrw_stob;
1760  mem_ivec = &stio->si_stob;
1761  stobio_tlink_init(stio_desc);
1762 
1763  M0_ADDB2_ADD(M0_AVI_FOM_TO_STIO, fom->fo_sm_phase.sm_id,
1764  stio->si_id);
1765  /*
1766  * Copy aligned network buffer to stobio object.
1767  * Also trim network buffer as per I/O size.
1768  */
1769  ivec_count = m0_vec_count(&mem_ivec->iv_vec);
1770  rc = align_bufvec(fom, &stio->si_user, &nb->nb_buffer,
1771  ivec_count, fom_obj->fcrw_bshift);
1772  if (rc != 0) {
1773  /*
1774  * Since this stob io not added into list
1775  * yet, free it here.
1776  */
1777  fom_obj->fcrw_rc = rc;
1778  stio_desc_fini(stio_desc);
1779  break;
1780  }
1781 
1782  if (m0_is_write_fop(fop)) {
1783  uint32_t di_size = m0_di_size_get(file, ivec_count);
1784  uint32_t curr_pos = m0_di_size_get(file,
1785  fom_obj->fcrw_curr_size);
1786 
1787  di_buf = &rwfop->crw_di_data;
1788  if (di_buf != NULL) {
1789  struct m0_buf buf = M0_BUF_INIT(di_size,
1790  di_buf->b_addr + curr_pos);
1791  cksum_data = (struct m0_bufvec)
1792  M0_BUFVEC_INIT_BUF(&buf.b_addr,
1793  &buf.b_nob);
1795  mem_ivec, &nb->nb_buffer,
1796  &cksum_data));
1797  }
1798  }
1800 
1801  /*
1802  * The value is already bshifted during conversion of
1803  * m0_io_indexvec from on-wire to in-mem.
1804  * */
1805  fom_obj->fcrw_curr_size += ivec_count;
1806  stio_desc->siod_fcb.fc_bottom = stobio_complete_cb;
1807  m0_mutex_lock(&stio->si_mutex);
1808  m0_fom_callback_arm(fom, &stio->si_wait, &stio_desc->siod_fcb);
1809  m0_mutex_unlock(&stio->si_mutex);
1810 
1811  M0_LOG(M0_DEBUG, "launch fom: %p, start_time %" PRIi64 ", "
1812  "req_count: %" PRIx64 ", count: %" PRIx64 ", "
1813  "submitted: %" PRIx64 ", expect: %"PRIx64,
1814  fom, fom_obj->fcrw_fom_start_time,
1815  fom_obj->fcrw_req_count, fom_obj->fcrw_count,
1816  m0_vec_count(&stio->si_user.ov_vec), ivec_count);
1817  fom_obj->fcrw_io_launch_time = m0_time_now();
1818 
1819  rc = m0_stob_io_private_setup(stio, stob);
1820  if (rc != 0) {
1821  M0_LOG(M0_ERROR, "Can not setup adio for stob with"
1822  "id "FID_F" rc = %d",
1823  FID_P(&stob->so_id.si_fid), rc);
1824  break;
1825  }
1826  /*
1827  * XXX: @todo: This makes sense for oostore mode as
1828  * there is no degraded write. Eventually write fop
1829  * should have the info. about the zone to which
1830  * write goes
1831  * (spare or non-spare unit of a parity group).
1832  */
1833  if (m0_is_write_fop(fop) &&
1835  &m0_stob_ad_type))
1837  rc = m0_stob_io_prepare_and_launch(stio, fom_obj->fcrw_stob,
1838  &fom->fo_tx, NULL);
1839  if (rc != 0) {
1840  M0_LOG(M0_ERROR, "stob_io_launch failed: rc=%d", rc);
1841  m0_mutex_lock(&stio->si_mutex);
1842  m0_fom_callback_cancel(&stio_desc->siod_fcb);
1843  m0_mutex_unlock(&stio->si_mutex);
1844  /*
1845  * Since this stob io not added into list
1846  * yet, free it here.
1847  */
1848  fom_obj->fcrw_rc = rc;
1849  stio_desc_fini(stio_desc);
1850  break;
1851  }
1852 
1853  fom_obj->fcrw_req_count += ivec_count;
1854  M0_ASSERT(fom_obj->fcrw_req_count > 0);
1855  /* XXX Race condition here? what if the "stio_desc->siod_fcb"
1856  * is called before code reaches here?
1857  */
1859 
1860  stobio_tlist_add(&fom_obj->fcrw_stio_list, stio_desc);
1861  } m0_tl_endfor;
1862 
1863  m0_cob_put(container_of(file, struct m0_cob, co_file));
1864 
1865  M0_LOG(M0_DEBUG, "total fom: %" PRIi64 ", expect: %"PRIi64,
1866  fom_obj->fcrw_fom_start_time,
1867  fom_obj->fcrw_req_count - fom_obj->fcrw_count);
1868 
1869  if (fom_obj->fcrw_num_stobio_launched > 0) {
1870  M0_LOG(M0_DEBUG, "STOB I/O launched, io_descs = %d",
1871  fom_obj->fcrw_num_stobio_launched);
1872  M0_LEAVE();
1873  return M0_FSO_WAIT;
1874  }
1875 
1876 out:
1877  if (rc != 0) {
1878  if (!M0_FI_ENABLED("keep-net-buffers"))
1879  nbuf_release_done(fom, 0);
1881  } else {
1882  /* empty operation */
1883  M0_ASSERT(fom_obj->fcrw_num_stobio_launched == 0);
1884  M0_IMPOSSIBLE("Implement me.");
1885  }
1886  M0_LEAVE();
1887  return M0_FSO_AGAIN;
1888 }
1889 
1899 static int io_finish(struct m0_fom *fom)
1900 {
1901  struct m0_io_fom_cob_rw *fom_obj;
1902  struct m0_stob_io_desc *stio_desc;
1903  struct m0_cob *cob;
1904  int rc = 0;
1905  m0_bcount_t nob = 0;
1906 
1907  M0_PRE(fom != NULL);
1908  M0_PRE(m0_is_io_fop(fom->fo_fop));
1910 
1911  M0_ENTRY("fom=%p", fom);
1912 
1913  if (M0_FI_ENABLED("fake_error"))
1914  rc = -EINVAL;
1915 
1916  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
1918  M0_ASSERT(fom_obj->fcrw_num_stobio_launched == 0);
1920  &fom_obj->fcrw_stio_list));
1921  /*
1922  * Empty the list as all STOB I/O completed here.
1923  */
1924  m0_tl_teardown(stobio, &fom_obj->fcrw_stio_list, stio_desc) {
1925  struct m0_stob_io *stio;
1926 
1927  stio = &stio_desc->siod_stob_io;
1929  &m0_stob_ad_type))
1931  if (stio->si_rc != 0) {
1932  rc = stio->si_rc;
1933  } else {
1934  if (m0_is_write_fop(fom->fo_fop)) {
1935  fom_obj->fcrw_cob_size =
1936  max64u(fom_obj->fcrw_cob_size,
1937  m0_io_size(stio,
1938  fom_obj->fcrw_bshift));
1939  }
1940  nob += stio->si_count;
1941  M0_LOG(M0_DEBUG, "rw_count %" PRIi64 ", si_count %"PRIi64,
1942  fom_obj->fcrw_count, stio->si_count);
1943  }
1944  stobio_tlist_add(&fom_obj->fcrw_done_list, stio_desc);
1945  }
1946 
1947  if (rc == 0 && m0_is_write_fop(fom->fo_fop)) {
1948  rc = fom_cob_locate(fom);
1949  if (rc == 0) {
1950  cob = fom_obj->fcrw_cob;
1951  fom_obj->fcrw_cob_size =
1952  max64u(fom_obj->fcrw_cob_size,
1953  fom_obj->fcrw_cob->co_nsrec.cnr_size);
1954  rc = m0_cob_size_update(cob, fom_obj->fcrw_cob_size,
1955  m0_fom_tx(fom));
1956  m0_cob_put(cob);
1957  }
1958  }
1959 
1960  M0_LOG(M0_DEBUG, "got fom: %"PRIi64", req_count: %"PRIi64", "
1961  "count: %"PRIx64", nob: %"PRIx64"", fom_obj->fcrw_fom_start_time,
1962  fom_obj->fcrw_req_count, fom_obj->fcrw_count, nob);
1963  fom_obj->fcrw_count += nob;
1964  M0_ASSERT(ergo(rc == 0,
1965  fom_obj->fcrw_req_count == fom_obj->fcrw_count));
1966  rc = fom_obj->fcrw_rc ?: rc;
1967  if (rc != 0) {
1968  M0_LOG(M0_ERROR, "rc=%d", rc);
1969  if (!M0_FI_ENABLED("keep-net-buffers"))
1970  nbuf_release_done(fom, 0);
1972  M0_LEAVE();
1973  return M0_FSO_AGAIN;
1974  }
1975 
1976  M0_LOG(M0_DEBUG, "STOB I/O finished.");
1977 
1978  M0_LEAVE();
1979  return M0_FSO_AGAIN;
1980 }
1981 
1985 static int io_sync(struct m0_fom *fom)
1986 {
1987  struct m0_io_fom_cob_rw *fobj = M0_AMB(fobj, fom, fcrw_gen);
1988  struct m0_be_tx *tx = m0_fom_tx(fom);
1989 
1990  if ((fobj->fcrw_flags & M0_IO_FLAG_SYNC) &&
1991  fom->fo_tx.tx_state != M0_DTX_INVALID &&
1992  m0_be_tx_state(tx) < M0_BTS_LOGGED) {
1993  M0_LOG(M0_DEBUG, "fom wait for tx to be logged");
1994  m0_fom_wait_on(fom, &tx->t_sm.sm_chan, &fom->fo_cb);
1995  return M0_FSO_WAIT;
1996  } else
1997  return M0_FSO_AGAIN;
1998 }
1999 
2000 /*
2001  * This function converts the on-wire m0_io_indexvec to in-mem m0_indexvec,
2002  * with index and count values appropriately block shifted.
2003  */
2004 static int indexvec_wire2mem(struct m0_fom *fom)
2005 {
2006  int rc;
2007  uint32_t max_frags_nr = 0;
2008  uint32_t bshift;
2009  struct m0_io_fom_cob_rw *fom_obj;
2010  struct m0_fop_cob_rw *rwfop;
2011 
2012  M0_PRE(fom != NULL);
2013 
2014  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
2015 
2016  rc = stob_object_find(fom);
2017  if (rc != 0)
2018  return M0_ERR(rc);
2019 
2020  bshift = m0_stob_block_shift(fom_obj->fcrw_stob);
2021  fom_obj->fcrw_bshift = bshift;
2022  rwfop = io_rw_get(fom->fo_fop);
2023  max_frags_nr = rwfop->crw_ivec.ci_nr;
2024  M0_LOG(M0_DEBUG, "max_frags_nr=%d bshift=%u", max_frags_nr, bshift);
2025 
2026  if (max_frags_nr > 0)
2027  rc = m0_indexvec_wire2mem(&rwfop->crw_ivec, max_frags_nr,
2028  bshift, &fom_obj->fcrw_io.si_stob);
2029  return M0_RC(rc);
2030 }
2031 
2032 /*
2033  * Allocate and populate m0_io_fom_cob_rw::fcrw_stio array.
2034  */
2035 static int stob_io_create(struct m0_fom *fom)
2036 {
2037  int rc = 0;
2038  int i;
2039  int j;
2040  m0_bcount_t todo;
2041  m0_bcount_t count = 0;
2042  struct m0_io_fom_cob_rw *fom_obj;
2043  struct m0_fop_cob_rw *rwfop;
2044  struct m0_stob_io_desc *siod;
2045  struct m0_stob_io *stio;
2046  struct m0_fop_cob_rw_reply *rw_replyfop;
2048  m0_bindex_t curr_cksum_nob = 0;
2049 
2050  M0_PRE(fom != NULL);
2051 
2052  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
2053 
2054  M0_ALLOC_ARR(fom_obj->fcrw_stio, fom_obj->fcrw_ndesc);
2055  if (fom_obj->fcrw_stio == NULL)
2056  return M0_ERR(-ENOMEM);
2057 
2058  rwfop = io_rw_get(fom->fo_fop);
2059  rw_replyfop = io_rw_rep_get(fom->fo_rep_fop);
2062  m0_stob_block_shift(fom_obj->fcrw_stob);
2063  if ((m0_is_read_fop(fom->fo_fop)) && rwfop->crw_cksum_size ) {
2064  /* CKSUM_TODO: Enable when cksum for parity is calculated */
2068  /* Init tracker variable, this gets updated in stobio_complete_cb */
2069  rw_replyfop->rwr_cksum_nob_read = 0;
2070 
2071  /* Compute nob based on the COB extents */
2072  rw_replyfop->rwr_di_data_cksum.b_nob = 0;
2073  for (i = 0; i < fom_obj->fcrw_io.si_stob.iv_vec.v_nr; i++)
2074  {
2075  rw_replyfop->rwr_di_data_cksum.b_nob +=
2077  fom_obj->fcrw_io.si_stob.iv_vec.v_count[i],
2078  unit_size,
2079  rwfop->crw_cksum_size);
2080  }
2081 
2082  // Its expected to receive atleast on unit start in a fop
2083  M0_ASSERT(rw_replyfop->rwr_di_data_cksum.b_nob > 0);
2084 
2085  if (m0_buf_alloc( &rw_replyfop->rwr_di_data_cksum,
2086  rw_replyfop->rwr_di_data_cksum.b_nob) != 0) {
2087  m0_free(fom_obj->fcrw_stio);
2088  return M0_ERR(-ENOMEM);
2089  }
2090  }
2091  else {
2092  rw_replyfop->rwr_di_data_cksum.b_nob = 0;
2093  rw_replyfop->rwr_di_data_cksum.b_addr = NULL;
2094  }
2095 
2096  for (i = 0; i < fom_obj->fcrw_ndesc; ++i) {
2097  siod = &fom_obj->fcrw_stio[i];
2100 
2101  stio = &siod->siod_stob_io;
2102  m0_stob_io_init(stio);
2103  if (fom_obj->fcrw_flags & M0_IO_FLAG_NOHOLE)
2104  stio->si_flags |= SIF_NOHOLE;
2105  stio->si_fol_frag = &siod->siod_fol_frag;
2106 
2107  todo = rwfop->crw_desc.id_descs[i].bdd_used >>
2108  fom_obj->fcrw_bshift;
2109  M0_LOG(M0_DEBUG, "i=%d todo=%u, count=%"PRIu64, i, (unsigned)todo, count);
2110  rc = m0_indexvec_split(&fom_obj->fcrw_io.si_stob, count, todo,
2111  /* fom_obj->fcrw_bshift */ 0,
2112  &stio->si_stob);
2113  if (rc != 0)
2114  break;
2115  if ( rwfop->crw_cksum_size)
2116  {
2117  stio->si_cksum_sz = rwfop->crw_cksum_size;
2118  stio->si_unit_sz = unit_size;
2119 
2120  /* This function stob_ad_get_checksum_for_fragment update this values.
2121  * It is read checksum and its compared against the expeted checksum
2122  * nob (si_cksum.b_nob). Increment this value as cksum is put into
2123  * buffer.
2124  */
2125  stio->si_cksum_nob_read = 0;
2126 
2128  stio->si_cksum.b_nob = 0;
2129  for (j = 0; j < stio->si_stob.iv_vec.v_nr; j++)
2130  {
2131  stio->si_cksum.b_nob +=
2133  stio->si_stob.iv_vec.v_count[j],
2134  unit_size,
2135  stio->si_cksum_sz );
2136  }
2137 
2139  if (m0_is_read_fop(fom->fo_fop)) {
2140  stio->si_cksum.b_addr = rw_replyfop->rwr_di_data_cksum.b_addr +
2141  curr_cksum_nob;
2142  }
2143  else {
2144  stio->si_cksum.b_addr = rwfop->crw_di_data_cksum.b_addr +
2145  curr_cksum_nob;
2146  }
2147 
2149  curr_cksum_nob += stio->si_cksum.b_nob;
2150  }
2151  count += todo;
2152  }
2153 
2154  /* Verify that total checksum nob in FOP reply is equal to sum of
2155  * checksum-nob for all stobs
2156  */
2157  M0_ASSERT( m0_is_read_fop(fom->fo_fop) ?
2158  (curr_cksum_nob == rw_replyfop->rwr_di_data_cksum.b_nob) :
2159  (curr_cksum_nob == rwfop->crw_di_data_cksum.b_nob));
2160 
2161  if (rc != 0 && i > 0) {
2162  while (--i >= 0) {
2163  siod = &fom_obj->fcrw_stio[i];
2164  stio = &siod->siod_stob_io;
2165  m0_stob_io_fini(stio);
2167  m0_indexvec_free(&stio->si_stob);
2168  }
2169  m0_free(fom_obj->fcrw_stio);
2170  }
2171 
2172  return M0_RC(rc);
2173 }
2174 
2175 static void stob_io_destroy(struct m0_fom *fom)
2176 {
2177  int i;
2178  struct m0_io_fom_cob_rw *fom_obj;
2179  struct m0_stob_io_desc *siod;
2180  struct m0_stob_io *stio;
2181 
2182  M0_PRE(fom != NULL);
2183 
2184  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
2185 
2186  for (i = 0; i < fom_obj->fcrw_ndesc; ++i) {
2187  siod = &fom_obj->fcrw_stio[i];
2188  stio = &siod->siod_stob_io;
2189  m0_stob_io_fini(stio);
2191  m0_indexvec_free(&stio->si_stob);
2192  }
2193  m0_free(fom_obj->fcrw_stio);
2194  fom_obj->fcrw_stio = NULL;
2195 }
2196 
2197 static void stob_be_credit(struct m0_fom *fom)
2198 {
2199  int i;
2200  struct m0_io_fom_cob_rw *fom_obj;
2201  struct m0_stob_domain *fom_stdom;
2202  struct m0_stob_io_desc *siod;
2203  struct m0_stob_io *stio;
2204 
2205  M0_PRE(fom != NULL);
2206  M0_PRE(m0_is_io_fop(fom->fo_fop));
2207 
2208  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
2210 
2211  if (M0_FI_ENABLED("no_write_credit"))
2212  return;
2213 
2214  fom_stdom = m0_stob_dom_get(fom_obj->fcrw_stob);
2215  M0_ASSERT(fom_stdom != NULL);
2216  for (i = 0; i < fom_obj->fcrw_ndesc; ++i) {
2217  siod = &fom_obj->fcrw_stio[i];
2218  stio = &siod->siod_stob_io;
2219  stio->si_opcode = SIO_WRITE;
2220  m0_stob_io_credit(stio, fom_stdom, m0_fom_tx_credit(fom));
2221  }
2222 }
2223 
2224 
2233 static int m0_io_fom_cob_rw_tick(struct m0_fom *fom)
2234 {
2235  int rc;
2236  int phase = m0_fom_phase(fom);
2237  m0_bcount_t byte_count;
2238  struct m0_fid pver;
2239  struct m0_cob *cob;
2240  struct m0_io_fom_cob_rw *fom_obj;
2242  struct m0_fop_cob_rw *rwfop;
2243  struct m0_fop_cob_rw_reply *rwrep;
2244 
2245  M0_PRE(fom != NULL);
2246  M0_PRE(m0_is_io_fop(fom->fo_fop));
2247 
2248  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
2250 
2251  rwfop = io_rw_get(fom->fo_fop);
2252  byte_count = m0_io_count(&rwfop->crw_ivec);
2253  pver = rwfop->crw_pver;
2254 
2255  M0_ENTRY("fom %p, fop %p, item %p[%u] %s" FID_F, fom, fom->fo_fop,
2256  m0_fop_to_rpc_item(fom->fo_fop), m0_fop_opcode(fom->fo_fop),
2257  m0_fom_phase_name(fom, phase),
2258  FID_P(&rwfop->crw_fid));
2259 
2260  /* first handle generic phase */
2261  if (phase == M0_FOPH_INIT) {
2262  rc = indexvec_wire2mem(fom) ?:
2264  if (rc != 0) {
2266  return M0_RC(M0_FSO_AGAIN);
2267  }
2268  }
2269  if (phase < M0_FOPH_NR && m0_is_write_fop(fom->fo_fop)) {
2270  if (phase == M0_FOPH_TXN_OPEN) {
2271  struct m0_be_tx_credit *accum;
2272 
2273  accum = m0_fom_tx_credit(fom);
2276  if (fom_obj->fcrw_flags & M0_IO_FLAG_CROW) {
2277  struct m0_stob_id stob_id;
2279  &stob_id);
2280  m0_cc_stob_cr_credit(&stob_id, accum);
2282  M0_COB_OP_CREATE, accum);
2284  M0_COB_OP_UPDATE, accum);
2286  M0_COB_OP_DELETE, accum);
2288  M0_COB_OP_BYTECOUNT_SET, accum);
2291  }
2292  } else if (phase == M0_FOPH_AUTHORISATION) {
2296  return M0_RC(rc);
2297  } else if (phase == M0_FOPH_TXN_WAIT) {
2301  return M0_RC(rc);
2302  } else if (phase == M0_FOPH_TXN_COMMIT) {
2306  if (m0_is_write_fop(fom->fo_fop))
2308  return M0_RC(rc);
2309  }
2310  }
2311  if (phase < M0_FOPH_NR) {
2312  M0_LOG(M0_DEBUG, "fom=%p, stob=%p, rc=%d", fom,
2313  fom_obj->fcrw_stob, m0_fom_rc(fom));
2314 
2315  if (phase == M0_FOPH_FAILURE && fom_obj->fcrw_stob != NULL &&
2316  m0_fom_rc(fom) == -E2BIG) {
2318  fom_obj->fcrw_stob);
2319  }
2320 
2322 
2323  M0_LOG(M0_DEBUG, "fom = %p, stob = %p, rc = %d", fom,
2324  fom_obj->fcrw_stob, m0_fom_rc(fom));
2325 
2326  return M0_RC(rc);
2327  }
2328 
2329  st = m0_is_read_fop(fom->fo_fop) ?
2332 
2333  rc = (*st.fcrw_st_state_function)(fom);
2335 
2337 
2338  if (m0_fom_phase(fom) == M0_FOPH_SUCCESS &&
2339  m0_is_write_fop(fom->fo_fop)) {
2340  int bc_rc;
2341  struct m0_cob_bckey key;
2342 
2343  key.cbk_pfid = pver;
2344  key.cbk_user_id = M0_BYTECOUNT_USER_ID;
2345  bc_rc = fom_cob_locate(fom);
2346  if (bc_rc == 0) {
2347  cob = fom_obj->fcrw_cob;
2348  cob_bytecount_increment(cob, &key, byte_count,
2349  m0_fom_tx(fom));
2355  cob->co_nsrec.cnr_bytecount += byte_count;
2356  bc_rc = m0_cob_update(cob, &cob->co_nsrec, NULL, NULL,
2357  m0_fom_tx(fom));
2358  if (bc_rc != 0)
2359  M0_ERR_INFO(bc_rc, "Failed to update cob_size");
2360  }
2361  }
2362  /* Set operation status in reply fop if FOM ends.*/
2363  if (m0_fom_phase(fom) == M0_FOPH_SUCCESS ||
2365  if (fom_obj->fcrw_stob != NULL)
2367  fom_obj->fcrw_stob);
2368  rwrep = io_rw_rep_get(fom->fo_rep_fop);
2369  rwrep->rwr_rc = m0_fom_rc(fom);
2370  rwrep->rwr_count = fom_obj->fcrw_count << fom_obj->fcrw_bshift;
2371  /* Information about the transaction for this update op. */
2373  return M0_RC(rc);
2374  }
2375 
2376  if (m0_is_write_fop(fom->fo_fop) &&
2378  fom->fo_tx.tx_state == M0_DTX_INVALID) {
2380  return M0_RC(M0_FSO_AGAIN);
2381  }
2385  return M0_RC(rc);
2386 }
2387 
2396 static void m0_io_fom_cob_rw_fini(struct m0_fom *fom)
2397 {
2398  uint32_t colour;
2399  struct m0_fop *fop;
2400  struct m0_io_fom_cob_rw *fom_obj;
2401  struct m0_reqh_io_service *serv_obj;
2402  struct m0_net_buffer *nb;
2403  struct m0_net_transfer_mc *tm;
2404  struct m0_stob_io_desc *stio_desc;
2405  struct m0_fop_cob_rw *rw;
2406 
2407 
2408  M0_PRE(fom != NULL);
2409  M0_PRE(fom->fo_fop != NULL);
2410 
2411  fom_obj = container_of(fom, struct m0_io_fom_cob_rw, fcrw_gen);
2413 
2414  serv_obj = container_of(fom->fo_service, struct m0_reqh_io_service,
2415  rios_gen);
2417 
2418  fop = fom->fo_fop;
2419  rw = io_rw_get(fop);
2420 
2421  M0_LOG(M0_DEBUG, "FOM fini: fom=%p op=%s@"FID_F", nbytes=%lu", fom,
2422  m0_is_read_fop(fop) ? "READ" : "WRITE", FID_P(&rw->crw_fid),
2423  (unsigned long)(fom_obj->fcrw_count << fom_obj->fcrw_bshift));
2424 
2425  M0_ADDB2_ADD(M0_AVI_ATTR, m0_sm_id_get(&fom->fo_sm_phase),
2427  fom_obj->fcrw_ndesc);
2428  M0_ADDB2_ADD(M0_AVI_ATTR, m0_sm_id_get(&fom->fo_sm_phase),
2430  fom_obj->fcrw_ndesc);
2431  M0_ADDB2_ADD(M0_AVI_ATTR, m0_sm_id_get(&fom->fo_sm_phase),
2433  fom_obj->fcrw_count);
2434  M0_ADDB2_ADD(M0_AVI_ATTR, m0_sm_id_get(&fom->fo_sm_phase),
2436  fom_obj->fcrw_count << fom_obj->fcrw_bshift);
2437 
2438  tm = m0_fop_tm_get(fop);
2439  colour = m0_net_tm_colour_get(tm);
2440 
2441  if (fom_obj->fcrw_bp != NULL) {
2442  M0_INVARIANT_EX(m0_tlist_invariant(&netbufs_tl,
2443  &fom_obj->fcrw_netbuf_list));
2444  m0_net_buffer_pool_lock(fom_obj->fcrw_bp);
2445  m0_tl_for (netbufs, &fom_obj->fcrw_netbuf_list, nb) {
2446  m0_net_buffer_pool_put(fom_obj->fcrw_bp, nb, colour);
2447  netbufs_tlink_del_fini(nb);
2448  } m0_tl_endfor;
2450  netbufs_tlist_fini(&fom_obj->fcrw_netbuf_list);
2451  }
2452 
2453  m0_tl_teardown(stobio, &fom_obj->fcrw_done_list, stio_desc)
2454  stio_desc_fini(stio_desc);
2455  stobio_tlist_fini(&fom_obj->fcrw_done_list);
2456  stobio_tlist_fini(&fom_obj->fcrw_stio_list);
2457  if (fom_obj->fcrw_io.si_stob.iv_vec.v_nr > 0)
2458  m0_indexvec_free(&fom_obj->fcrw_io.si_stob);
2459  if (fom_obj->fcrw_stio != NULL)
2461 
2462 #if 0
2463  if (m0_is_read_fop(fop))
2464  {
2465  struct m0_fop_cob_rw_reply *rw_replyfop = io_rw_rep_get(fom->fo_rep_fop);
2466 
2467  if (rw_replyfop->rwr_di_data_cksum.b_addr)
2468  m0_buf_free(&rw_replyfop->rwr_di_data_cksum);
2469  }
2470 #endif
2471  m0_fom_fini(fom);
2472  m0_free(fom_obj);
2473 }
2474 
2478 static size_t m0_io_fom_cob_rw_locality_get(const struct m0_fom *fom)
2479 {
2480  uint64_t hash = m0_fid_hash(&io_rw_get(fom->fo_fop)->crw_fid);
2481 
2482  return m0_rnd(1 << 30, &hash) >> 1;
2483 }
2484 
2493 M0_INTERNAL const char *m0_io_fom_cob_rw_service_name(struct m0_fom *fom)
2494 {
2495  M0_PRE(fom != NULL);
2496  M0_PRE(fom->fo_fop != NULL);
2497 
2498  return "M0_CST_IOS";
2499 }
2500 
2501 static void io_fom_addb2_descr(struct m0_fom *fom)
2502 {
2503  struct m0_fop *fop = fom->fo_fop;
2504  struct m0_fop_cob_rw *rwfop = io_rw_get(fop);
2505  struct m0_io_indexvec *iv = &rwfop->crw_ivec;
2506 
2508  FID_P(&rwfop->crw_gfid), FID_P(&rwfop->crw_fid),
2509  iv->ci_nr,
2510  iv->ci_iosegs != NULL ? m0_io_count(iv) : 0,
2511  iv->ci_iosegs != NULL ? iv->ci_iosegs[0].ci_index : 0,
2512  rwfop->crw_desc.id_nr,
2514 }
2515 
2516 static int cob_bytecount_increment(struct m0_cob *cob, struct m0_cob_bckey *key,
2517  uint64_t bytecount, struct m0_be_tx *tx)
2518 {
2519  int rc;
2520  struct m0_rwlock *cdom_lock = &cob->co_dom->cd_lock.bl_u.rwlock;
2521  struct m0_cob_bcrec rec = {};
2522 
2523  M0_ENTRY("KEY: "FID_F"/%" PRIu64, FID_P(&key->cbk_pfid), key->cbk_user_id);
2524 
2525  m0_rwlock_write_lock(cdom_lock);
2526  rc = m0_cob_bc_lookup(cob, key, &rec);
2527  if (rc == -ENOENT) {
2528  rec.cbr_bytecount = bytecount;
2529  rec.cbr_cob_objects = 1;
2530  rc = m0_cob_bc_insert(cob, key, &rec, tx);
2531  m0_rwlock_write_unlock(cdom_lock);
2532  if (rc != 0)
2533  return M0_ERR(rc);
2534  M0_LOG(M0_DEBUG, "Bytecount inserted %" PRIu64, bytecount);
2535  } else if (rc == 0) {
2536  rec.cbr_bytecount += bytecount;
2537  rc = m0_cob_bc_update(cob, key, &rec, tx);
2538  m0_rwlock_write_unlock(cdom_lock);
2539  if (rc != 0)
2540  return M0_ERR(rc);
2541  M0_LOG(M0_DEBUG, "Bytecount increased by %" PRIu64
2542  " to %" PRIu64, bytecount,
2543  rec.cbr_bytecount);
2544  } else
2545  M0_ERR(rc);
2546 
2547  return M0_RC(rc);
2548 }
2549 
2550 #undef M0_TRACE_SUBSYSTEM
2551 
2554 /*
2555  * Local variables:
2556  * c-indentation-style: "K&R"
2557  * c-basic-offset: 8
2558  * tab-width: 8
2559  * fill-column: 80
2560  * scroll-step: 1
2561  * End:
2562  */
struct m0_cob_nsrec co_nsrec
Definition: cob.h:590
M0_INTERNAL m0_bcount_t m0_net_domain_get_max_buffer_segment_size(struct m0_net_domain *dom)
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
Definition: vec.h:165
struct m0_fol_frag siod_fol_frag
Definition: io_foms.h:165
uint32_t fcrw_ndesc
Definition: io_foms.h:178
struct m0_fop_type m0_fop_cob_readv_rep_fopt
Definition: io_fops.c:73
M0_INTERNAL struct m0_stob_domain * m0_stob_dom_get(struct m0_stob *stob)
Definition: stob.c:338
void m0_fom_phase_moveif(struct m0_fom *fom, int32_t rc, int phase0, int phase1)
Definition: fom.c:1710
uint32_t m0_fop_opcode(const struct m0_fop *fop)
Definition: fop.c:226
uint64_t crw_lid
Definition: io_fops.h:394
uint64_t rwr_count
Definition: io_fops.h:324
#define M0_PRE(cond)
uint32_t cnr_nlink
Definition: cob.h:426
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
enum m0_stob_io_flags si_flags
Definition: io.h:290
struct m0_tl fcrw_stio_list
Definition: io_foms.h:213
Definition: cob.h:581
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static int fom_cob_locate(struct m0_fom *fom)
Definition: io_foms.c:1090
M0_INTERNAL void m0_stob_io_fini(struct m0_stob_io *io)
Definition: io.c:122
m0_bindex_t si_unit_sz
Definition: io.h:414
static int stob_object_find(struct m0_fom *fom)
Definition: io_foms.c:1108
uint64_t cnr_size
Definition: cob.h:430
static int io_launch(struct m0_fom *)
Definition: io_foms.c:1696
M0_INTERNAL uint64_t m0_fid_hash(const struct m0_fid *fid)
Definition: fid.c:295
static void stobio_complete_cb(struct m0_fom_callback *cb)
Definition: io_foms.c:864
#define NULL
Definition: misc.h:38
Definition: io.h:230
struct m0_tl rios_buffer_pools
Definition: io_service.h:98
uint32_t crw_index
Definition: io_fops.h:388
struct m0_bufvec nb_buffer
Definition: net.h:1322
#define ergo(a, b)
Definition: misc.h:293
M0_INTERNAL void m0_stob_io_credit(const struct m0_stob_io *io, const struct m0_stob_domain *dom, struct m0_be_tx_credit *accum)
Definition: io.c:130
struct m0_fom_callback siod_fcb
Definition: io_foms.h:160
uint32_t rwr_repair_done
Definition: io_fops.h:333
struct m0_file co_file
Definition: cob.h:586
M0_INTERNAL int m0_storage_dev_stob_find(struct m0_storage_devs *devs, struct m0_stob_id *sid, struct m0_stob **stob)
Definition: storage_dev.c:868
union m0_be_rwlock::@198 bl_u
m0_be_tx_state
Definition: tx.h:214
Definition: sm.h:350
struct m0_pool_version * pm_pver
Definition: pool_machine.h:172
void * b_addr
Definition: buf.h:39
M0_INTERNAL void m0_storage_dev_stob_put(struct m0_storage_devs *devs, struct m0_stob *stob)
Definition: storage_dev.c:912
M0_INTERNAL struct m0_pool_version * m0_pool_version_find(struct m0_pools_common *pc, const struct m0_fid *id)
Definition: pool.c:586
struct m0_file file
Definition: di.c:36
struct m0_tl fcrw_netbuf_list
Definition: io_foms.h:223
struct m0_poolmach pv_mach
Definition: pool.h:133
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
static struct m0_be_tx_credit * m0_fom_tx_credit(struct m0_fom *fom)
Definition: fom.h:542
M0_INTERNAL struct m0_net_buffer * m0_net_buffer_pool_get(struct m0_net_buffer_pool *pool, uint32_t colour)
Definition: buffer_pool.c:215
static bool m0_stob_io_desc_invariant(const struct m0_stob_io_desc *stobio_desc)
Definition: io_foms.c:851
struct m0_cob_domain * co_dom
Definition: cob.h:582
M0_INTERNAL void m0_net_buffer_pool_unlock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:203
static int indexvec_wire2mem(struct m0_fom *fom)
Definition: io_foms.c:2004
M0_INTERNAL uint32_t m0_bufvec_pack(struct m0_bufvec *bv)
Definition: vec.c:480
uint64_t fcrw_flags
Definition: io_foms.h:234
struct m0_vec ov_vec
Definition: vec.h:147
struct m0_chan rb_chan
Definition: bulk.h:258
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
M0_INTERNAL void m0_fom_callback_init(struct m0_fom_callback *cb)
Definition: fom.c:1454
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:263
M0_INTERNAL void m0_rwlock_write_lock(struct m0_rwlock *lock)
Definition: rwlock.c:42
int fcrw_curr_desc_index
Definition: io_foms.h:180
M0_INTERNAL bool m0_stob_domain_is_of_type(const struct m0_stob_domain *dom, const struct m0_stob_type *dt)
Definition: domain.c:349
const struct m0_stob_type m0_stob_ad_type
Definition: ad.c:1003
uint8_t * nbd_data
m0_bcount_t fcrw_count
Definition: io_foms.h:190
struct m0_net_domain * ntm_dom
Definition: net.h:853
M0_INTERNAL void m0_cob_put(struct m0_cob *cob)
Definition: cob.c:1095
struct m0_stob_io_desc * fcrw_stio
Definition: io_foms.h:211
struct m0_rwlock rwlock
Definition: format.h:208
struct m0_net_buf_desc_data * id_descs
Definition: io_fops.h:313
M0_INTERNAL void m0_indexvec_free(struct m0_indexvec *ivec)
Definition: vec.c:553
M0_INTERNAL void m0_stob_ad_balloc_set(struct m0_stob_io *io, uint64_t flags)
Definition: ad.c:2204
struct m0_reqh_service rios_gen
Definition: io_service.h:96
uint64_t m0_bindex_t
Definition: types.h:80
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
M0_INTERNAL int m0_cob_bc_insert(struct m0_cob *cob, struct m0_cob_bckey *bc_key, struct m0_cob_bcrec *bc_val, struct m0_be_tx *tx)
Definition: cob.c:1140
#define M0_BITS(...)
Definition: misc.h:236
struct m0_chan si_wait
Definition: io.h:318
uint64_t m0_bcount_t
Definition: types.h:77
struct m0_sm_conf io_conf
Definition: io_foms.c:822
M0_INTERNAL int m0_poolmach_device_state(struct m0_poolmach *pm, uint32_t device_index, enum m0_pool_nd_state *state_out)
Definition: pool_machine.c:816
struct m0_rpc_bulk fcrw_bulk
Definition: io_foms.h:217
struct m0_net_buffer_pool * fcrw_bp
Definition: io_foms.h:205
static int zero_copy_finish(struct m0_fom *)
Definition: io_foms.c:1624
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL m0_bcount_t m0_extent_get_checksum_nob(m0_bindex_t ext_start, m0_bindex_t ext_length, m0_bindex_t unit_sz, m0_bcount_t cs_size)
Definition: cksum_utils.c:85
uint32_t ci_nr
Definition: vec.h:635
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
struct m0_fid crw_pver
Definition: io_fops.h:391
static int io_fom_cob2file(struct m0_fom *fom, struct m0_fid *fid, struct m0_file **out)
Definition: io_foms.c:995
static int io_sync(struct m0_fom *fom)
Definition: io_foms.c:1985
m0_time_t fcrw_io_launch_time
Definition: io_foms.h:232
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
Definition: fid.c:106
M0_INTERNAL int m0_io_cob_stob_create(struct m0_fom *fom, struct m0_cob_domain *cdom, struct m0_fid *fid, struct m0_fid *pver, uint64_t lid, bool crow, struct m0_cob **out)
Definition: io_foms.c:951
static struct m0_cob_domain * cdom
Definition: xform.c:55
struct m0_sm_trans_descr io_phases_trans[]
Definition: io_foms.c:774
void ** ov_buf
Definition: vec.h:149
M0_INTERNAL int m0_cob_size_update(struct m0_cob *cob, uint64_t size, struct m0_be_tx *tx)
Definition: cob.c:2144
#define PRIx64
Definition: types.h:61
m0_fom_phase
Definition: fom.h:372
struct m0_chan rios_bp_wait
Definition: io_service.h:83
static struct m0_be_tx * m0_fom_tx(struct m0_fom *fom)
Definition: fom.h:537
#define M0_INVARIANT_EX(cond)
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
static struct m0_pools_common pc
Definition: iter_ut.c:59
M0_TL_DESCR_DECLARE(bufferpools, M0_EXTERN)
uint64_t fcrw_cob_size
Definition: io_foms.h:194
int m0_lid_to_unit_map[]
Definition: layout_pver.c:99
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:279
struct m0_fom_type ft_fom_type
Definition: fop.h:232
struct m0_stob_domain * so_domain
Definition: stob.h:165
struct m0_buf crw_di_data_cksum
Definition: io_fops.h:418
m0_bcount_t fcrw_total_ioivec_cnt
Definition: io_foms.h:182
#define m0_tl_endfor
Definition: tlist.h:700
struct m0_fid fid
Definition: di.c:46
struct m0_vec iv_vec
Definition: vec.h:139
#define M0_CHECK_EX(cond)
return M0_RC(rc)
M0_INTERNAL void m0_cob_tx_credit(struct m0_cob_domain *dom, enum m0_cob_op optype, struct m0_be_tx_credit *accum)
Definition: cob.c:2281
M0_INTERNAL uint32_t m0_stob_block_shift(struct m0_stob *stob)
Definition: stob.c:270
M0_INTERNAL bool m0_reqh_io_service_invariant(const struct m0_reqh_io_service *rios)
Definition: io_service.c:183
static uint32_t unit_size
Definition: layout.c:53
struct m0_bufvec si_user
Definition: io.h:300
uint64_t ci_index
Definition: vec.h:626
#define M0_ENTRY(...)
Definition: trace.h:170
Definition: buf.h:37
m0_bindex_t * iv_index
Definition: vec.h:141
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
M0_INTERNAL int m0_rpc_bulk_load(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *from_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:530
int i
Definition: dir.c:1033
static void m0_io_fom_cob_rw_fini(struct m0_fom *fom)
Definition: io_foms.c:2396
struct m0_fop_type * f_type
Definition: fop.h:81
static void stob_be_credit(struct m0_fom *fom)
Definition: io_foms.c:2197
#define PRIu64
Definition: types.h:58
struct m0_fid crw_fid
Definition: io_fops.h:385
M0_INTERNAL enum sns_repair_state m0_sns_cm_fid_repair_done(struct m0_fid *gfid, struct m0_reqh *reqh, enum m0_pool_nd_state device_state)
Definition: cm.c:187
#define M0_OBJ_LAYOUT_ID(lid)
Definition: io_foms.c:566
struct m0_net_buffer_pool rios_bp
Definition: io_service.h:79
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
static bool m0_io_fom_cob_rw_invariant(const struct m0_io_fom_cob_rw *io)
Definition: io_foms.c:830
M0_INTERNAL int m0_cob_bc_update(struct m0_cob *cob, struct m0_cob_bckey *bc_key, struct m0_cob_bcrec *bc_val, struct m0_be_tx *tx)
Definition: cob.c:1161
M0_INTERNAL struct m0_fop_cob_rw_reply * io_rw_rep_get(struct m0_fop *fop)
Definition: io_fops.c:1056
static int net_buffer_release(struct m0_fom *)
Definition: io_foms.c:1428
M0_INTERNAL uint64_t m0_rnd(uint64_t max, uint64_t *seed)
Definition: misc.c:115
Definition: trace.h:482
struct m0_cob_domain * rios_cdom
Definition: io_service.h:100
M0_INTERNAL void m0_fom_ready(struct m0_fom *fom)
Definition: fom.c:429
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
M0_INTERNAL void m0_fom_callback_fini(struct m0_fom_callback *cb)
Definition: fom.c:1497
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
struct m0_fid pv_id
Definition: pool.h:113
static int cob_bytecount_increment(struct m0_cob *cob, struct m0_cob_bckey *key, uint64_t bytecount, struct m0_be_tx *tx)
Definition: io_foms.c:2516
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
M0_INTERNAL int m0_stob_io_prepare_and_launch(struct m0_stob_io *io, struct m0_stob *obj, struct m0_dtx *tx, struct m0_io_scope *scope)
Definition: io.c:219
struct m0_buf si_cksum
Definition: io.h:412
Definition: stob.h:163
int m0_fom_tick_generic(struct m0_fom *fom)
Definition: fom_generic.c:848
struct m0_indexvec si_stob
Definition: io.h:311
int32_t si_rc
Definition: io.h:334
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
static struct m0_stob * stob
Definition: storage.c:39
struct m0_buf crw_di_data
Definition: io_fops.h:416
m0_bcount_t b_nob
Definition: buf.h:38
static void io_fom_addb2_descr(struct m0_fom *fom)
Definition: io_foms.c:2501
int32_t rb_rc
Definition: bulk.h:266
static struct m0_cob * cob
Definition: bytecount.c:40
struct m0_io_descs crw_desc
Definition: io_fops.h:400
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:247
M0_INTERNAL const char * m0_io_fom_cob_rw_service_name(struct m0_fom *fom)
Definition: io_foms.c:2493
m0_time_t m0_time_now(void)
Definition: time.c:134
struct m0_fid pver
Definition: idx_dix.c:74
M0_INTERNAL bool m0_fom_group_is_locked(const struct m0_fom *fom)
Definition: fom.c:229
m0_pool_nd_state
Definition: pool_machine.h:57
M0_INTERNAL void m0_cob_nsrec_init(struct m0_cob_nsrec *nsrec)
Definition: cob.c:2058
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
M0_TL_DECLARE(bufferpools, M0_EXTERN, struct m0_rios_buffer_pool)
M0_INTERNAL void m0_cob_oikey_make(struct m0_cob_oikey *oikey, const struct m0_fid *fid, int linkno)
Definition: cob.c:141
static struct m0_cob_domain * fom_cdom(struct m0_fom *fom)
Definition: io_foms.c:941
M0_INTERNAL void m0_fid_convert_cob2stob(const struct m0_fid *cob_fid, struct m0_stob_id *stob_id)
Definition: fid_convert.c:141
M0_INTERNAL int m0_indexvec_wire2mem(struct m0_io_indexvec *wire_ivec, int max_frags_nr, uint32_t bshift, struct m0_indexvec *mem_ivec)
Definition: vec.c:1058
static struct m0_stob_domain * dom
Definition: storage.c:38
M0_INTERNAL int m0_cob_create(struct m0_cob *cob, struct m0_cob_nskey *nskey, struct m0_cob_nsrec *nsrec, struct m0_cob_fabrec *fabrec, struct m0_cob_omgrec *omgrec, struct m0_be_tx *tx)
Definition: cob.c:1681
M0_INTERNAL void * m0_stob_addr_pack(const void *buf, uint32_t shift)
Definition: io.c:293
struct m0_be_rwlock cd_lock
Definition: cob.h:276
M0_INTERNAL void m0_net_buffer_pool_lock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:186
M0_INTERNAL void m0_fom_mod_rep_fill(struct m0_fop_mod_rep *rep, struct m0_fom *fom)
Definition: fom_generic.c:68
static int stob_io_create(struct m0_fom *fom)
Definition: io_foms.c:2035
struct m0_fid si_fid
Definition: stob.h:105
M0_INTERNAL int m0_stob_io_private_setup(struct m0_stob_io *io, struct m0_stob *obj)
Definition: io.c:49
M0_INTERNAL int m0_buf_alloc(struct m0_buf *buf, size_t size)
Definition: buf.c:43
static struct m0_io_fom_cob_rw_state_transition io_fom_read_st[]
Definition: io_foms.c:641
struct m0_sm_state_descr io_phases[]
Definition: io_foms.c:719
struct m0_fop_mod_rep rwr_mod_rep
Definition: io_fops.h:339
m0_bcount_t crw_cksum_size
Definition: io_fops.h:415
M0_INTERNAL uint32_t m0_fid_cob_device_id(const struct m0_fid *cob_fid)
Definition: fid_convert.c:81
struct m0_buf rwr_di_data_cksum
Definition: io_fops.h:342
struct m0_fol_frag * si_fol_frag
Definition: io.h:390
M0_INTERNAL int m0_cc_stob_create(struct m0_fom *fom, struct m0_stob_id *sid)
Definition: cob_foms.c:824
uint64_t si_id
Definition: io.h:406
Definition: reqh.h:94
uint32_t v_nr
Definition: vec.h:51
Definition: dump.c:103
Definition: io.h:285
M0_INTERNAL void m0_rwlock_write_unlock(struct m0_rwlock *lock)
Definition: rwlock.c:47
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
static size_t m0_io_fom_cob_rw_locality_get(const struct m0_fom *fom)
Definition: io_foms.c:2478
m0_bcount_t * v_count
Definition: vec.h:53
m0_bcount_t si_cksum_sz
Definition: io.h:416
static uint64_t min64u(uint64_t a, uint64_t b)
Definition: arith.h:66
struct m0_sm t_sm
Definition: tx.h:281
#define FID_P(f)
Definition: fid.h:77
static struct m0_stob_io io
Definition: ad.c:59
M0_INTERNAL struct m0_storage_devs * m0_cs_storage_devs_get(void)
Definition: setup.c:1783
m0_bcount_t fcrw_curr_size
Definition: io_foms.h:184
m0_bcount_t si_count
Definition: io.h:340
int(* fcrw_st_state_function)(struct m0_fom *)
Definition: io_foms.h:260
uint64_t rb_id
Definition: bulk.h:267
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
M0_INTERNAL bool m0_is_cob_create_fop(const struct m0_fop *fop)
Definition: io_fops.c:950
struct m0_stob * si_obj
Definition: io.h:326
M0_INTERNAL void m0_fom_callback_arm(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1460
uint32_t rwr_cksum_nob_read
Definition: io_fops.h:336
static int m0_io_fom_cob_rw_tick(struct m0_fom *fom)
Definition: io_foms.c:2233
static int io_finish(struct m0_fom *)
Definition: io_foms.c:1899
static struct m0_pool pool
Definition: iter_ut.c:58
struct m0_mutex si_mutex
Definition: io.h:319
static int io_prepare(struct m0_fom *)
Definition: io_foms.c:1204
struct m0_fop * m0_fop_reply_alloc(struct m0_fop *req, struct m0_fop_type *rept)
Definition: fop.c:129
static int align_bufvec(struct m0_fom *fom, struct m0_bufvec *obuf, struct m0_bufvec *ibuf, m0_bcount_t ivec_count, uint32_t bshift)
Definition: io_foms.c:1035
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:164
static const struct m0_io_fom_cob_rw_state_transition io_fom_write_st[]
Definition: io_foms.c:679
Definition: fom.h:481
static int net_buffer_acquire(struct m0_fom *)
Definition: io_foms.c:1305
Definition: setup.h:354
bool(* do_check)(const struct m0_file *file, const struct m0_indexvec *io_info, const struct m0_bufvec *in, const struct m0_bufvec *out)
Definition: di.h:131
struct m0_reqh reqh
Definition: rm_foms.c:48
M0_TL_DEFINE(stobio, static, struct m0_stob_io_desc)
M0_INTERNAL int m0_indexvec_split(struct m0_indexvec *in, m0_bcount_t curr_pos, m0_bcount_t nb_len, uint32_t bshift, struct m0_indexvec *out)
Definition: vec.c:1039
const char * sd_name
Definition: sm.h:383
M0_INTERNAL m0_bcount_t m0_di_size_get(const struct m0_file *file, const m0_bcount_t size)
Definition: di.c:384
struct m0_stob_io fcrw_io
Definition: io_foms.h:200
struct m0_fid cnr_pver
Definition: cob.h:438
struct m0_cob * fcrw_cob
Definition: io_foms.h:209
Definition: io.h:279
M0_INTERNAL bool m0_is_io_fop(const struct m0_fop *fop)
Definition: io_fops.c:928
M0_INTERNAL void m0_stob_ad_balloc_clear(struct m0_stob_io *io)
Definition: ad.c:2213
#define M0_CNT_INC(cnt)
Definition: arith.h:226
M0_TL_DESCR_DEFINE(stobio, "STOB I/O", static, struct m0_stob_io_desc, siod_linkage, siod_magic, M0_STOB_IO_DESC_LINK_MAGIC, M0_STOB_IO_DESC_HEAD_MAGIC)
M0_INTERNAL uint64_t m0_io_size(struct m0_stob_io *sio, uint32_t bshift)
Definition: io_foms.c:1670
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL bool m0_tlist_invariant(const struct m0_tl_descr *d, const struct m0_tl *list)
Definition: tlist.c:236
Definition: fid.h:38
#define M0_BYTECOUNT_USER_ID
Definition: io_service.h:72
M0_INTERNAL uint32_t m0_net_tm_colour_get(struct m0_net_transfer_mc *tm)
Definition: tm.c:448
uint64_t cbr_bytecount
Definition: cob.h:522
M0_INTERNAL int m0_cob_bc_lookup(struct m0_cob *cob, struct m0_cob_bckey *bc_key, struct m0_cob_bcrec *bc_rec)
Definition: cob.c:1120
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_chan sm_chan
Definition: sm.h:331
struct m0_motr motr
#define PRIi64
Definition: types.h:59
struct m0_net_domain * m0_fop_domain_get(const struct m0_fop *fop)
Definition: fop.c:486
struct m0_fop_type m0_fop_cob_writev_rep_fopt
Definition: io_fops.c:74
m0_time_t fcrw_phase_start_time
Definition: io_foms.h:221
static int zero_copy_initiate(struct m0_fom *)
Definition: io_foms.c:1509
uint32_t id_nr
Definition: io_fops.h:312
struct m0_rpc_session * ri_session
Definition: item.h:147
static void stob_io_destroy(struct m0_fom *fom)
Definition: io_foms.c:2175
M0_INTERNAL m0_bcount_t m0_io_count(const struct m0_io_indexvec *io_info)
Definition: vec.c:999
M0_INTERNAL int m0_fom_rc(const struct m0_fom *fom)
Definition: fom.c:1727
struct m0_fom fcrw_gen
Definition: io_foms.h:174
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:338
struct m0_fid crw_gfid
Definition: io_fops.h:382
M0_INTERNAL int m0_io_cob_create(struct m0_cob_domain *cdom, struct m0_fid *fid, struct m0_fid *pver, uint64_t lid, struct m0_be_tx *tx)
Definition: io_foms.c:903
struct m0_sm_trans_descr m0_generic_phases_trans[]
Definition: fom_generic.c:765
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL int m0_cc_stob_cr_credit(struct m0_stob_id *sid, struct m0_be_tx_credit *accum)
Definition: cob_foms.c:807
static struct m0_fop * fop
Definition: item.c:57
uint32_t fcrw_batch_size
Definition: io_foms.h:186
M0_INTERNAL int m0_cob_update(struct m0_cob *cob, struct m0_cob_nsrec *nsrec, struct m0_cob_fabrec *fabrec, struct m0_cob_omgrec *omgrec, struct m0_be_tx *tx)
Definition: cob.c:1860
uint32_t fcrw_bshift
Definition: io_foms.h:192
M0_INTERNAL void m0_fom_callback_cancel(struct m0_fom_callback *cb)
Definition: fom.c:1514
struct m0_fom * fc_fom
Definition: fom.h:452
M0_INTERNAL int m0_cob_delete(struct m0_cob *cob, struct m0_be_tx *tx)
Definition: cob.c:1789
uint64_t siod_magic
Definition: io_foms.h:155
M0_INTERNAL int m0_cob_locate(struct m0_cob_domain *dom, struct m0_cob_oikey *oikey, uint64_t flags, struct m0_cob **out)
Definition: cob.c:1407
static void stio_desc_fini(struct m0_stob_io_desc *stio_desc)
Definition: io_foms.c:1663
M0_INTERNAL void m0_stob_io_init(struct m0_stob_io *io)
Definition: io.c:111
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
M0_INTERNAL void m0_fid_convert_cob2gob(const struct m0_fid *cob_fid, struct m0_fid *gob_fid)
Definition: fid_convert.c:69
M0_INTERNAL void m0_net_buffer_pool_put(struct m0_net_buffer_pool *pool, struct m0_net_buffer *buf, uint32_t colour)
Definition: buffer_pool.c:243
uint64_t cbr_cob_objects
Definition: cob.h:523
m0_bcount_t fcrw_req_count
Definition: io_foms.h:188
m0_bcount_t si_cksum_nob_read
Definition: io.h:418
struct m0_tl rb_buflist
Definition: bulk.h:256
m0_time_t fcrw_fom_start_time
Definition: io_foms.h:219
uint32_t fcrw_num_stobio_launched
Definition: io_foms.h:203
struct m0_pool_version * fcrw_pver
Definition: io_foms.h:176
M0_INTERNAL int m0_cob_alloc(struct m0_cob_domain *dom, struct m0_cob **out)
Definition: cob.c:1100
#define out(...)
Definition: gen.c:41
Definition: file.h:81
M0_INTERNAL bool m0_is_read_fop(const struct m0_fop *fop)
Definition: io_fops.c:916
M0_INTERNAL bool m0_is_cob_delete_fop(const struct m0_fop *fop)
Definition: io_fops.c:957
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
const struct m0_net_buffer_callbacks m0_rpc__buf_bulk_cb
Definition: bulk.c:238
struct m0_fid gfid
Definition: dir.c:626
const struct m0_fom_type_ops io_fom_type_ops
Definition: io_foms.c:633
struct m0_pools_common cc_pools_common
Definition: setup.h:356
M0_INTERNAL struct m0_fop_cob_rw * io_rw_get(struct m0_fop *fop)
Definition: io_fops.c:1037
Definition: io.h:229
M0_INTERNAL bool m0_is_write_fop(const struct m0_fop *fop)
Definition: io_fops.c:922
static int nbuf_release_done(struct m0_fom *fom, int still_required)
Definition: io_foms.c:1450
struct m0_fom_ops ops
Definition: io_foms.c:623
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk, uint32_t segs_nr, m0_bcount_t length, struct m0_net_domain *netdom, struct m0_net_buffer *nb, struct m0_rpc_bulk_buf **out)
Definition: bulk.c:291
M0_INTERNAL struct m0_pool_version * m0_pool_version_lookup(const struct m0_pools_common *pc, const struct m0_fid *id)
Definition: pool.c:568
struct m0_mutex pc_mutex
Definition: pool.h:221
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
struct m0_rpc_item f_item
Definition: fop.h:83
#define M0_BUF_INIT(size, data)
Definition: buf.h:64
struct m0_ioseg * ci_iosegs
Definition: vec.h:636
struct m0_io_indexvec crw_ivec
Definition: io_fops.h:411
M0_INTERNAL struct m0_motr * m0_cs_ctx_get(struct m0_reqh *reqh)
Definition: setup.c:1778
M0_INTERNAL void m0_bufvec_free2(struct m0_bufvec *bufvec)
Definition: vec.c:401
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
struct m0_stob_id so_id
Definition: stob.h:166
static int m0_io_fom_cob_rw_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: io_foms.c:1144
struct m0_rpc_conn * s_conn
Definition: session.h:312
struct m0_stob * fcrw_stob
Definition: io_foms.h:207
struct m0_tl fcrw_done_list
Definition: io_foms.h:215
struct m0_net_transfer_mc * m0_fop_tm_get(const struct m0_fop *fop)
Definition: fop.c:479
Definition: fop.h:79
struct m0_mutex rb_mutex
Definition: bulk.h:251
static uint64_t max64u(uint64_t a, uint64_t b)
Definition: arith.h:71
const struct m0_di_ops * fi_di_ops
Definition: file.h:92
uint64_t cnr_lid
Definition: cob.h:437
uint64_t crw_flags
Definition: io_fops.h:413
#define FID_F
Definition: fid.h:75
M0_INTERNAL struct m0_reqh * m0_fom_reqh(const struct m0_fom *fom)
Definition: fom.c:283
Definition: vec.h:145
M0_INTERNAL int m0_bufvec_empty_alloc(struct m0_bufvec *bufvec, uint32_t num_segs)
Definition: vec.c:213
M0_INTERNAL int m0_cc_cob_nskey_make(struct m0_cob_nskey **nskey, const struct m0_fid *gfid, uint32_t cob_idx)
Definition: cob_foms.c:846
struct m0_fop * rep_fop
Definition: dir.c:334
uint64_t cnr_bytecount
Definition: cob.h:431
Definition: tx.h:280
struct m0_stob_io siod_stob_io
Definition: io_foms.h:157
Definition: idx_mock.c:47
M0_INTERNAL const char * m0_fom_phase_name(const struct m0_fom *fom, int phase)
Definition: fom.c:1722
#define M0_IMPOSSIBLE(fmt,...)
enum m0_stob_io_opcode si_opcode
Definition: io.h:286
struct m0_fid cnr_fid
Definition: cob.h:419