66 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE 97 #define BTBI_F "(qdata=%p bbd_user=%p bbd_credit="BETXCR_F" " \ 98 "bbd_payload_size=%" PRIu64 ")" 100 #define BTBI_P(btbi) (btbi), (btbi)->bbd_user, BETXCR_P(&(btbi)->bbd_credit), \ 101 (btbi)->bbd_payload_size 140 uint64_t worker_partition;
141 uint64_t workers_per_partition;
142 uint64_t partitions_per_locality;
143 uint64_t localities_nr;
180 for (
i = 0;
i <
tb_cfg->tbc_partitions_nr; ++
i) {
199 .tbw_queue_get_successful =
false,
251 localities_nr - 1) / localities_nr;
253 worker_partition = 0;
254 for (
i = 0;
i < localities_nr; ++
i) {
257 --partitions_per_locality;
258 for (j = 0; j < partitions_per_locality; ++j) {
259 if (worker_partition ==
263 ++workers_per_partition;
264 for (k = 0; k < workers_per_partition; ++k) {
279 (workers_per_partition, workers_per_partition - 1))));
339 uint32_t done_nr = 0;
382 if (worker->
tbw_rc != 0) {
403 M0_ENTRY(
"worker=%p tbw_index=%" PRIu64 " tbw_queue_get_successful=%d",
470 accum_payload_size +=
data->bbd_payload_size;
592 .bbd_credit = *credit,
593 .bbd_payload_size = payload_credit,
598 M0_PRE(partition < tb->btb_cfg.tbc_partitions_nr);
641 #undef M0_TRACE_SUBSYSTEM uint64_t bqc_consumers_nr_max
static void be_tx_bulk_open(struct be_tx_bulk_worker *worker, struct m0_be_tx_credit *cred, m0_bcount_t cred_payload)
#define M0_ALLOC_ARR(arr, nr)
M0_INTERNAL void m0_be_tx_bulk_run(struct m0_be_tx_bulk *tb, struct m0_be_op *op)
M0_INTERNAL int m0_be_tx_bulk_init(struct m0_be_tx_bulk *tb, struct m0_be_tx_bulk_cfg *tb_cfg)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
#define M0_BE_QUEUE_PUT(bq, op, ptr)
M0_INTERNAL struct m0_locality * m0_locality_get(uint64_t value)
M0_INTERNAL void m0_be_tx_bulk_end(struct m0_be_tx_bulk *tb)
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
struct m0_sm_ast tbw_queue_get
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
#define M0_BE_QUEUE_GET(bq, op, ptr, successful)
static struct m0_sm_group * grp
#define M0_LOG(level,...)
struct m0_sm_ast tbw_init
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
struct m0_be_queue * btb_q
M0_INTERNAL void m0_be_tx_fini(struct m0_be_tx *tx)
#define M0_BE_OP_SYNC(op_obj, action)
m0_bcount_t bbd_payload_size
struct m0_be_domain * tbc_dom
M0_INTERNAL void m0_be_tx_prep(struct m0_be_tx *tx, const struct m0_be_tx_credit *credit)
M0_INTERNAL void m0_be_op_callback_set(struct m0_be_op *op, m0_be_op_cb_t cb, void *param, enum m0_be_op_state state)
#define container_of(ptr, type, member)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
static bool be_tx_bulk_open_cb(struct m0_clink *clink)
struct m0_sm_group * tbw_grp
static void be_tx_bulk_queues_drain(struct m0_be_tx_bulk *tb)
uint64_t tbc_partitions_nr
static struct m0_sm_ast ast[NR]
struct m0_sm_ast tbw_close
M0_INTERNAL int m0_be_queue_init(struct m0_be_queue *bq, struct m0_be_queue_cfg *cfg)
struct m0_be_tx_credit bbd_credit
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_be_queue_unlock(struct m0_be_queue *bq)
M0_INTERNAL int m0_be_tx_bulk_status(struct m0_be_tx_bulk *tb)
struct be_tx_bulk_item * tbw_item
M0_INTERNAL void m0_be_queue_fini(struct m0_be_queue *bq)
struct m0_be_tx_bulk * tbw_tb
M0_INTERNAL void m0_be_tx_credit_add(struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
static void be_tx_bulk_close_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
bool tbw_queue_get_successful
M0_INTERNAL void m0_be_tx_close(struct m0_be_tx *tx)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
struct m0_sm_group * lo_grp
M0_INTERNAL void m0_be_op_done(struct m0_be_op *op)
static void put_fail(void)
static struct m0_clink clink[RDWR_REQUEST_MAX]
struct m0_sm_ast tbw_finish
uint64_t tbc_work_items_per_tx_max
static void be_tx_bulk_unlock(struct m0_be_tx_bulk *tb)
static void be_tx_bulk_lock(struct m0_be_tx_bulk *tb)
M0_INTERNAL void m0_be_tx_init(struct m0_be_tx *tx, uint64_t tid, struct m0_be_domain *dom, struct m0_sm_group *sm_group, m0_be_tx_cb_t persistent, m0_be_tx_cb_t discarded, void(*filler)(struct m0_be_tx *tx, void *payload), void *datum)
#define m0_forall(var, nr,...)
static void be_tx_bulk_finish_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_be_op_reset(struct m0_be_op *op)
M0_INTERNAL bool m0_be_tx_bulk_put(struct m0_be_tx_bulk *tb, struct m0_be_op *op, struct m0_be_tx_credit *credit, m0_bcount_t payload_credit, uint64_t partition, void *user)
M0_INTERNAL void m0_be_op_active(struct m0_be_op *op)
M0_INTERNAL void m0_be_tx_bulk_fini(struct m0_be_tx_bulk *tb)
static void be_tx_bulk_gc_cb(struct m0_be_tx *tx, void *param)
m0_bcount_t bqc_item_length
struct m0_clink tbw_clink
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
M0_INTERNAL void m0_be_tx_open(struct m0_be_tx *tx)
M0_INTERNAL struct m0_fom_domain * m0_fom_dom(void)
M0_INTERNAL void m0_be_op_fini(struct m0_be_op *op)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
#define M0_BE_QUEUE_PEEK(bq, ptr)
M0_INTERNAL bool m0_be_should_break(struct m0_be_engine *eng, const struct m0_be_tx_credit *accum, const struct m0_be_tx_credit *delta)
M0_INTERNAL void m0_be_queue_lock(struct m0_be_queue *bq)
struct m0_be_op btb_kill_put_op
void(* tbc_done)(struct m0_be_tx_bulk *tb, void *datum, void *user, uint64_t worker_index, uint64_t partition)
static void be_tx_bulk_init_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_be_tx_gc_enable(struct m0_be_tx *tx, void(*gc_free)(struct m0_be_tx *, void *param), void *param)
struct m0_be_tx_bulk_cfg btb_cfg
M0_INTERNAL void m0_be_tx_payload_prep(struct m0_be_tx *tx, m0_bcount_t size)
struct m0_be_engine bd_engine
M0_INTERNAL void m0_be_queue_end(struct m0_be_queue *bq)
bool btb_termination_in_progress
M0_INTERNAL void m0_be_op_init(struct m0_be_op *op)
struct m0_be_queue_cfg tbc_q_cfg
static void be_tx_bulk_queue_get_done_cb(struct m0_be_op *op, void *param)
struct be_tx_bulk_worker * btb_worker
static void be_tx_bulk_queue_get_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)