23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FORMATION 83 #define frm_first_itemq(frm) (&(frm)->f_itemq[0]) 84 #define frm_end_itemq(frm) (&(frm)->f_itemq[ARRAY_SIZE((frm)->f_itemq)]) 86 #define for_each_itemq_in_frm(itemq, frm) \ 87 for (itemq = frm_first_itemq(frm); \ 88 itemq < frm_end_itemq(frm); \ 99 uint64_t nr_items = 0;
111 nr_items += itemq_tlist_length(
q);
123 itemq_tlist_prev(
q,
item);
177 c->fc_max_nr_packets_enqed = 100;
178 c->fc_max_nr_segments = 128;
179 c->fc_max_packet_size = 4096;
180 c->fc_max_nr_bytes_accumulated = 4096;
279 M0_ENTRY(
"frm: %p item: %p size %zu opcode %lu xid %lu",
322 M0_LEAVE(
"nr_items: %llu bytes: %llu",
341 bool deadline_passed;
349 "deadline: "TIME_F" deadline_passed: %s",
423 packet_count = item_count = 0;
443 item_count +=
p->rp_ow.poh_nr_items;
457 M0_LEAVE(
"formed %d packet(s) [%d items]", packet_count, item_count);
475 bool has_urgent_items;
594 if (available_space <= header_footer_size)
597 available_space - header_footer_size);
652 itemq_tlink_del_fini(
item);
661 (
unsigned long long)limit);
677 (
unsigned long long)
p->rp_ow.poh_nr_items);
723 return &
s->s_conn->c_rpcchan->rc_frm;
726 #undef M0_TRACE_SUBSYSTEM static void frm_remove(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
static bool item_less_or_equal(const struct m0_rpc_item *i0, const struct m0_rpc_item *i1)
M0_INTERNAL void m0_rpc_frm_fini(struct m0_rpc_frm *frm)
static struct m0_addb2_philter p
enum m0_rpc_item_priority ri_prio
M0_INTERNAL void m0_rpc_item_change_state(struct m0_rpc_item *item, enum m0_rpc_item_state state)
struct m0_rpc_frm_constraints f_constraints
static struct m0_semaphore q
static void frm_insert(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
static void frm_try_merging_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item, m0_bcount_t limit)
M0_INTERNAL void m0_rpc_frm_packet_done(struct m0_rpc_packet *p)
const struct m0_rpc_item_type_ops * rit_ops
M0_INTERNAL void m0_sm_timeout_init(struct m0_sm_timeout *to)
void m0_rpc_item_put(struct m0_rpc_item *item)
#define M0_LOG(level,...)
static m0_bcount_t available_space_in_packet(const struct m0_rpc_packet *p, const struct m0_rpc_frm *frm)
M0_TL_DEFINE(itemq, M0_INTERNAL, struct m0_rpc_item)
struct m0_tl rm_outgoing_conns
M0_INTERNAL void m0_rpc_item_failed(struct m0_rpc_item *item, int32_t rc)
void m0_rpc_item_get(struct m0_rpc_item *item)
static bool frm_is_ready(const struct m0_rpc_frm *frm)
M0_INTERNAL struct m0_rpc_frm * session_frm(const struct m0_rpc_session *s)
static void __itemq_remove(struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_packet_add_item(struct m0_rpc_packet *p, struct m0_rpc_item *item)
static void frm_fill_packet(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
#define container_of(ptr, type, member)
struct m0_sm_timeout ri_deadline_timeout
M0_BASSERT(ARRAY_SIZE(str_qtype)==FRMQ_NR_QUEUES)
static struct m0_rpc_item * item
struct m0_tl c_item_sources
M0_INTERNAL struct m0_rpc_chan * frm_rchan(const struct m0_rpc_frm *frm)
struct m0_rpc_chan * c_rpcchan
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_footer_size
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
M0_INTERNAL bool m0_rpc_packet_invariant(const struct m0_rpc_packet *p)
M0_INTERNAL m0_bcount_t m0_rpc_item_onwire_header_size
#define M0_ASSERT_EX(cond)
m0_bcount_t m0_rpc_item_size(struct m0_rpc_item *item)
static void item_move_to_urgent_queue(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
static bool frm_rmachine_is_locked(const struct m0_rpc_frm *frm)
M0_INTERNAL void m0_rpc_packet_init(struct m0_rpc_packet *p, struct m0_rpc_machine *rmach)
M0_INTERNAL bool m0_rpc_machine_is_locked(const struct m0_rpc_machine *machine)
M0_INTERNAL void m0_rpc_frm_init(struct m0_rpc_frm *frm, struct m0_rpc_frm_constraints *constraints, const struct m0_rpc_frm_ops *ops)
M0_INTERNAL void m0_rpc_packet_fini(struct m0_rpc_packet *p)
static void frm_fill_packet_from_item_sources(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
static void drop_all_items(struct m0_rpc_frm *frm)
m0_bcount_t f_nr_bytes_accumulated
static bool item_will_exceed_packet_size(struct m0_rpc_item *item, const struct m0_rpc_packet *p, const struct m0_rpc_frm *frm)
M0_INTERNAL struct m0_rpc_machine * frm_rmachine(const struct m0_rpc_frm *frm)
#define for_each_itemq_in_frm(itemq, frm)
M0_INTERNAL void m0_rpc_frm_item_deadline_passed(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
m0_bcount_t fc_max_packet_size
M0_INTERNAL void m0_rpc_item_sm_init(struct m0_rpc_item *item, enum m0_rpc_item_dir dir)
M0_INTERNAL const char * m0_bool_to_str(bool b)
M0_INTERNAL bool m0_rpc_item_is_oneway(const struct m0_rpc_item *item)
m0_time_t m0_time_now(void)
struct m0_rpc_item_header2 ri_header
static struct m0_addb2_callback c
static bool frm_invariant(const struct m0_rpc_frm *frm)
struct m0_rpc_frm * ri_frm
bool(* rito_try_merge)(struct m0_rpc_item *container, struct m0_rpc_item *component, m0_bcount_t limit)
struct m0_rpc_machine * rc_rpc_machine
const struct m0_rpc_item_type * ri_type
M0_INTERNAL void m0_rpc_frm_remove_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
M0_TL_DESCR_DEFINE(itemq, "rpc_itemq", M0_INTERNAL, struct m0_rpc_item, ri_iq_link, ri_magic, M0_RPC_ITEM_MAGIC, M0_RPC_ITEMQ_HEAD_MAGIC)
struct m0_tl f_itemq[FRMQ_NR_QUEUES]
static enum m0_rpc_frm_itemq_type frm_which_qtype(struct m0_rpc_frm *frm, const struct m0_rpc_item *item)
int(* fo_packet_ready)(struct m0_rpc_packet *p)
M0_INTERNAL void m0_rpc_frm_run_formation(struct m0_rpc_frm *frm)
M0_INTERNAL void m0_rpc_frm_constraints_get_defaults(struct m0_rpc_frm_constraints *c)
struct m0_rpc_machine machine
uint64_t f_nr_packets_enqed
M0_INTERNAL void m0_rpc_frm_enq_item(struct m0_rpc_frm *frm, struct m0_rpc_item *item)
#define m0_forall(var, nr,...)
M0_INTERNAL int m0_sm_timeout_arm(struct m0_sm *mach, struct m0_sm_timeout *to, m0_time_t timeout, int state, uint64_t bitmask)
static void frm_balance(struct m0_rpc_frm *frm)
M0_INTERNAL void m0_sm_timeout_fini(struct m0_sm_timeout *to)
#define M0_FI_ENABLED(tag)
static bool item_supports_merging(const struct m0_rpc_item *item)
#define M0_ALLOC_PTR(ptr)
static int frm_packet_ready(struct m0_rpc_frm *frm, struct m0_rpc_packet *p)
static const char * str_qtype[]
M0_INTERNAL bool item_is_in_waiting_queue(const struct m0_rpc_item *item, const struct m0_rpc_frm *frm)
const struct m0_rpc_frm_ops * f_ops
static void __itemq_insert(struct m0_tl *q, struct m0_rpc_item *new_item)
static bool constraints_are_valid(const struct m0_rpc_frm_constraints *constraints)
struct m0_rpc_machine * ri_rmachine
M0_INTERNAL bool m0_sm_timeout_is_armed(const struct m0_sm_timeout *to)
M0_INTERNAL bool m0_rpc_packet_is_empty(const struct m0_rpc_packet *p)
static bool frm_is_idle(const struct m0_rpc_frm *frm)
#define m0_tl_for(name, head, obj)
static struct m0_addb2_source * s
static m0_bcount_t itemq_nr_bytes_acc(const struct m0_tl *q)
static int conn_state(const struct m0_rpc_conn *conn)
static bool itemq_invariant(const struct m0_tl *q)
#define m0_tl_forall(name, var, head,...)