23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE 73 M0_ENTRY(
"en=%p gr=%p state=%d gr->tg_state=%d",
93 "Maximum transaction size shouldn't be greater than " 94 "maximum group size: " 135 goto err_service_fini;
156 (etx_tlist_init(&en->
eng_txs[
i]),
true));
184 "There is at least one transaction which didn't become " 185 "stable yet. log_size = %" PRIu64 ", log_free = %"PRIu64,
191 (etx_tlist_fini(&en->
eng_txs[
i]),
true));
236 return etx_tlist_head(&en->
eng_txs[state]);
260 etx_tlist_is_empty(&en->
eng_txs[state])) &&
306 "tx=%p engine=%p t_prepared="BETXCR_F" " 307 "t_payload_prepared=%" PRIu64 " " 309 " bec_tx_payload_max=%"PRIu64,
342 M0_ENTRY(
"en=%p gr=%p sm_grp=%p", en, gr, sm_grp);
376 M0_ENTRY(
"en=%p gr=%p sm_grp=%p", en, gr, sm_grp);
427 uint64_t grouping_q_length;
428 uint64_t tx_per_group_max;
430 M0_ENTRY(
"en=%p gr=%p sm_grp=%p", en, gr, sm_grp);
436 grouping_q_length =
min_check(grouping_q_length, tx_per_group_max);
437 delay = t_min + (t_max - t_min) * grouping_q_length / tx_per_group_max;
443 grouping_q_length,
delay);
487 if (M0_IN(
rc, (-EBUSY, -EXFULL)))
489 if (M0_IN(
rc, (0, -ENOSPC)))
519 bool group_recovery_started =
false;
531 group_recovery_started =
true;
537 M0_LEAVE(
"group_recovery_started=%d eng_recovery_finished=%d",
620 return M0_RC(rc_bool);
635 etx_tlink_del_fini(tx);
650 etx_tlist_add_tail(&en->
eng_txs[state], tx);
760 for (
i = 0;
i <
nr; ++
i) {
915 M0_LEAVE(
"en=%p tx=%p state=%s", en, tx,
929 etx_tlist_is_empty(&en->
eng_txs[state])) &&
943 if (payload_size !=
NULL)
949 uint32_t *tx_per_group)
951 if (group_nr !=
NULL)
953 if (tx_per_group !=
NULL)
958 #undef M0_TRACE_SUBSYSTEM m0_bcount_t tgc_payload_max
struct m0_be_tx_credit t_prepared
M0_INTERNAL void m0_be_engine_fini(struct m0_be_engine *en)
struct m0_be_domain * bec_domain
static bool be_engine_recovery_is_finished(struct m0_be_engine *en)
m0_time_t bec_group_freeze_timeout_max
#define M0_ALLOC_ARR(arr, nr)
M0_TL_DEFINE(etx, M0_INTERNAL, struct m0_be_tx)
static void be_engine_group_timer_cb(struct m0_sm_timer *timer)
M0_INTERNAL void m0_be_tx_group_close(struct m0_be_tx_group *gr)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
static void be_engine_tx_state_post(struct m0_be_engine *en, struct m0_be_tx *tx, enum m0_be_tx_state state)
struct m0_semaphore eng_recovery_wait_sem
static void be_engine_tx_group_state_move(struct m0_be_engine *en, struct m0_be_tx_group *gr, enum m0_be_tx_group_state state)
bool bec_wait_for_recovery
M0_INTERNAL struct m0_be_tx * m0_be_engine__tx_find(struct m0_be_engine *en, uint64_t id)
M0_INTERNAL int m0_be_tx_group_init(struct m0_be_tx_group *gr, struct m0_be_tx_group_cfg *gr_cfg)
M0_INTERNAL void m0_be_tx_group_recovery_prepare(struct m0_be_tx_group *gr, struct m0_be_log *log)
M0_INTERNAL void m0_be_seg_dict_init(struct m0_be_seg *seg)
m0_bcount_t t_log_reserved_size
static void be_engine_group_timer_arm(struct m0_sm_group *sm_grp, struct m0_sm_ast *ast)
M0_INTERNAL bool m0_be_log_recovery_record_available(struct m0_be_log *log)
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
M0_INTERNAL struct m0_sm_group * m0_be_tx_group__sm_group(struct m0_be_tx_group *gr)
M0_INTERNAL void m0_be_engine__tx_group_ready(struct m0_be_engine *en, struct m0_be_tx_group *gr)
M0_INTERNAL void m0_be_tx_group_tx_closed(struct m0_be_tx_group *gr, struct m0_be_tx *tx)
struct m0_be_log_discard * tgc_log_discard
static struct m0_sm_group * grp
#define M0_LOG(level,...)
static int be_engine_tx_trygroup(struct m0_be_engine *en, struct m0_be_tx *tx)
static struct m0_be_tx * be_engine_tx_opening_peek(struct m0_be_engine *en)
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
static struct m0_be_tx * be_engine_recovery_tx_find(struct m0_be_engine *en, enum m0_be_tx_state state)
static struct m0_be_tx_group * be_engine_group_find(struct m0_be_engine *en)
struct m0_be_domain * tgc_domain
struct m0_mutex * bec_lock
static void be_engine_group_tryclose(struct m0_be_engine *en, struct m0_be_tx_group *gr)
M0_INTERNAL int m0_be_engine_start(struct m0_be_engine *en)
M0_INTERNAL struct m0_be_seg * m0_be_domain_seg0_get(struct m0_be_domain *dom)
struct m0_sm_timer tg_close_timer
M0_TL_DESCR_DEFINE(etx, "m0_be_engine::eng_txs[]", M0_INTERNAL, struct m0_be_tx, t_engine_linkage, t_magic, M0_BE_TX_MAGIC, M0_BE_TX_ENGINE_MAGIC)
static void be_engine_recovery_finish(struct m0_be_engine *en)
struct m0_be_engine * tg_engine
#define container_of(ptr, type, member)
M0_INTERNAL int m0_be_tx_group_start(struct m0_be_tx_group *gr)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
struct m0_be_engine_cfg * eng_cfg
static void be_engine_group_timer_disarm(struct m0_sm_group *sm_grp, struct m0_sm_ast *ast)
struct m0_be_tx_credit bec_tx_size_max
struct m0_sm_ast tg_close_timer_arm
M0_INTERNAL int m0_be_engine__exclusive_open_invariant(struct m0_be_engine *en, struct m0_be_tx *excl)
uint64_t bec_tx_active_max
static void be_engine_tx_group_open(struct m0_be_engine *en, struct m0_be_tx_group *gr)
struct m0_be_domain_cfg bd_cfg
M0_INTERNAL bool m0_be_tx__is_fast(struct m0_be_tx *tx)
struct m0_tl eng_groups[M0_BGS_NR]
static struct m0_sm_ast ast[NR]
m0_time_t bec_group_freeze_timeout_limit
struct m0_reqh * bec_reqh
static int be_engine_group_start(struct m0_be_engine *en, size_t index)
M0_INTERNAL void m0_be_engine_got_log_space_cb(struct m0_be_log *log)
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
struct m0_be_log * tgc_log
return M0_ERR(-EOPNOTSUPP)
struct m0_be_tx_group * eng_group
#define M0_AMB(obj, ptr, field)
struct m0_be_tx_group * t_group
M0_INTERNAL void m0_be_engine__tx_state_set(struct m0_be_engine *en, struct m0_be_tx *tx, enum m0_be_tx_state state)
static uint64_t be_engine_tx_id_allocate(struct m0_be_engine *en)
m0_time_t tg_close_deadline
struct m0_be_tx_credit tgc_size_max
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
static void be_engine_group_freeze(struct m0_be_engine *en, struct m0_be_tx_group *gr)
struct m0_sm_ast tg_close_timer_disarm
struct m0_be_log_discard * bec_log_discard
static void be_engine_group_stop_nr(struct m0_be_engine *en, size_t nr)
M0_INTERNAL void m0_be_tx__group_assign(struct m0_be_tx *tx, struct m0_be_tx_group *gr)
struct m0_sm_group * tr_grp
m0_time_t m0_time_now(void)
static void be_engine_got_tx_closed(struct m0_be_engine *en, struct m0_be_tx *tx)
static struct m0_stob_domain * dom
M0_INTERNAL bool m0_be_tx_group_is_recovering(struct m0_be_tx_group *gr)
static bool be_engine_is_locked(const struct m0_be_engine *en)
struct m0_be_domain * eng_domain
M0_INTERNAL size_t m0_be_tx_group_tx_nr(struct m0_be_tx_group *gr)
unsigned long tgc_tx_nr_max
M0_INTERNAL bool m0_be_engine__invariant(struct m0_be_engine *en)
m0_bcount_t t_payload_prepared
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
struct m0_reqh * tgc_reqh
M0_INTERNAL bool m0_be_tx__is_exclusive(const struct m0_be_tx *tx)
static void be_engine_got_tx_done(struct m0_be_engine *en, struct m0_be_tx *tx)
M0_INTERNAL void m0_be_tx_get(struct m0_be_tx *tx)
static void be_engine_tx_group_ready(struct m0_be_engine *en, struct m0_be_tx_group *gr)
struct m0_tl eng_txs[M0_BTS_NR+1]
M0_INTERNAL void m0_be_engine__tx_fini(struct m0_be_engine *en, struct m0_be_tx *tx)
M0_INTERNAL void m0_be_engine_tx_size_max(struct m0_be_engine *en, struct m0_be_tx_credit *cred, m0_bcount_t *payload_size)
M0_INTERNAL int m0_be_log_reserve(struct m0_be_log *log, m0_bcount_t size)
M0_INTERNAL void m0_be_engine__group_limits(struct m0_be_engine *en, uint32_t *group_nr, uint32_t *tx_per_group)
static void be_engine_group_timeout_arm(struct m0_be_engine *en, struct m0_be_tx_group *gr)
static bool be_engine_invariant(struct m0_be_engine *en)
M0_INTERNAL void m0_be_tx_group_stop(struct m0_be_tx_group *gr)
#define m0_forall(var, nr,...)
M0_INTERNAL int m0_be_tx_group_tx_add(struct m0_be_tx_group *gr, struct m0_be_tx *tx)
M0_INTERNAL void m0_be_engine_stop(struct m0_be_engine *en)
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
M0_INTERNAL void m0_be_log_discard_sync(struct m0_be_log_discard *ld)
M0_INTERNAL int m0_be_tx_service_init(struct m0_be_engine *en, struct m0_reqh *reqh)
static void be_engine_got_tx_grouping(struct m0_be_engine *en)
struct m0_be_engine * tgc_engine
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
static struct m0_be_tx * be_engine_tx_peek(struct m0_be_engine *en, enum m0_be_tx_state state)
m0_time_t bec_group_freeze_timeout_min
M0_INTERNAL void m0_be_tx_service_fini(struct m0_be_engine *en)
M0_INTERNAL void m0_be_tx_group_fini(struct m0_be_tx_group *gr)
M0_INTERNAL m0_bcount_t m0_be_group_format_log_reserved_size(struct m0_be_log *log, struct m0_be_tx_credit *cred, m0_bcount_t cred_payload)
M0_INTERNAL void m0_be_engine__tx_force(struct m0_be_engine *en, struct m0_be_tx *tx)
M0_INTERNAL int m0_be_engine_init(struct m0_be_engine *en, struct m0_be_domain *dom, struct m0_be_engine_cfg *en_cfg)
M0_INTERNAL bool m0_be_tx__is_recovering(struct m0_be_tx *tx)
static void be_engine_try_recovery(struct m0_be_engine *en)
static bool is_engine_at_stopped_or_done(const struct m0_be_engine *en, bool stopped)
#define M0_ASSERT_INFO(cond, fmt,...)
struct m0_be_tx_group_cfg bec_group_cfg
M0_INTERNAL void m0_be_engine_full_log_cb(struct m0_be_log *log)
struct m0_be_tx_group_cfg * bec_groups_cfg
M0_INTERNAL const char * m0_be_tx_state_name(enum m0_be_tx_state state)
M0_INTERNAL void m0_sm_ast_cancel(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
M0_INTERNAL void m0_be_tx_group_stable(struct m0_be_tx_group *gr)
M0_INTERNAL bool m0_be_tx_credit_le(const struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
enum m0_be_tx_group_state tg_state
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
#define m0_tl_find(name, var, head,...)
static void be_engine_unlock(struct m0_be_engine *en)
M0_INTERNAL void m0_be_tx__state_post(struct m0_be_tx *tx, enum m0_be_tx_state state)
static void be_engine_lock(struct m0_be_engine *en)
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
bool eng_recovery_finished
#define m0_tl_exists(name, var, head,...)
static int be_engine_cfg_validate(struct m0_be_engine_cfg *en_cfg)
M0_INTERNAL void m0_be_engine__tx_init(struct m0_be_engine *en, struct m0_be_tx *tx, enum m0_be_tx_state state)
static void be_engine_got_tx_open(struct m0_be_engine *en, struct m0_be_tx *tx)
M0_INTERNAL void m0_be_log_unreserve(struct m0_be_log *log, m0_bcount_t size)
m0_bcount_t bec_tx_payload_max