23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_UT 43 static int _test(
void);
63 #define IS_INCR_BY_N(p, n) _0C(saved.rs_ ## p + (n) == stats.rs_ ## p) 64 #define IS_INCR_BY_1(p) IS_INCR_BY_N(p, 1) 411 stats.rs_nr_ha_timedout_items);
413 stats.rs_nr_ha_noted_conns);
427 bool fop_put_flag =
false;
450 if (fop_put_flag && post_sync) {
482 {
"m0_bufvec_alloc_aligned",
"oom", -ENOMEM},
483 {
"m0_net_buffer_register",
"fake_error", -EINVAL},
484 {
"m0_rpc_packet_encode",
"fake_error", -EFAULT},
485 {
"m0_net_buffer_add",
"fake_error", -EMSGSIZE},
651 if (expected_rc == 0) {
678 if (already_replied) {
711 int sub_tc = reinitialise ? 1 : 2;
834 uint32_t fop_nr = 10;
835 uint32_t fop_cancelled_nr = 0;
836 struct m0_fop *fop_arr[fop_nr];
847 for (
i = 0;
i < fop_nr; ++
i) {
865 for (
i = 0;
i < fop_nr; ++
i) {
965 for (
i = 0;
i < items_nr; ++
i) {
974 for (
i = 0;
i < items_nr; ++
i) {
980 for (
i = 0;
i < items_nr; ++
i) {
988 for (
i = 0;
i < items_nr; ++
i) {
996 for (
i = 0;
i < items_nr; ++
i) {
1029 for (
n = 1;
n <= items_nr; ++
n)
1312 #undef M0_TRACE_SUBSYSTEM
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
static struct m0_mutex lock
static struct m0_rpc_stats saved
m0_time_t ri_resend_interval
M0_INTERNAL void m0_rpc_oneway_item_post(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
#define M0_ALLOC_ARR(arr, nr)
static struct m0_semaphore wait
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
static uint64_t test_item_cache_item_get_xid
M0_INTERNAL void m0_chan_broadcast_lock(struct m0_chan *chan)
const struct m0_sm_conf incoming_item_sm_conf
enum m0_rpc_item_priority ri_prio
static void test_item_cache(void)
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
m0_chan_cb_t rpc_conn_original_ha_cb
static const struct m0_rpc_item_ops session_ut_item_ops
struct m0_semaphore arrow_hit
int m0_thread_join(struct m0_thread *q)
static void test_item_cache_add_nth(struct m0_rpc_item_cache *ic, struct m0_mutex *lock, struct m0_rpc_item *items, int items_nr, int n)
const m0_time_t M0_TIME_NEVER
const struct m0_rpc_item_type_ops * rit_ops
M0_INTERNAL int m0_rpc_session_create(struct m0_rpc_session *session, struct m0_rpc_conn *conn, m0_time_t abs_timeout)
static uint64_t tag(uint8_t code, uint64_t id)
M0_INTERNAL struct m0_conf_obj * m0_conf_cache_lookup(const struct m0_conf_cache *cache, const struct m0_fid *id)
static void test_item_cache_item_get(struct m0_rpc_item *item)
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
M0_INTERNAL bool m0_semaphore_timeddown(struct m0_semaphore *semaphore, const m0_time_t abs_timeout)
#define M0_LOG(level,...)
M0_INTERNAL int m0_file_read(const char *path, char **out)
static void fop_alloc(struct m0_fom *fom, enum cob_fom_type fomtype)
uint64_t rs_nr_ha_timedout_items
uint64_t rs_nr_ha_noted_conns
static int ts_item_fini(void)
void m0_rpc_item_get(struct m0_rpc_item *item)
int m0_rpc_session_destroy(struct m0_rpc_session *session, m0_time_t abs_timeout)
M0_INTERNAL void m0_rpc_item_cache_fini(struct m0_rpc_item_cache *ic)
void disable_packet_ready_set_reply_error(int arg)
M0_INTERNAL void m0_rconfc_stop_sync(struct m0_rconfc *rconfc)
static struct m0_rpc_client_ctx cctx
struct m0_sm_conf rit_incoming_conf
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
static void test_ha_cancel(void)
static struct m0_rpc_item_type test_item_cache_itype
#define container_of(ptr, type, member)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
m0_time_t m0_time(uint64_t secs, long ns)
void m0_rpc_item_init(struct m0_rpc_item *item, const struct m0_rpc_item_type *itype)
static struct m0_rpc_item * item
void fop_test(int expected_rc)
int m0_rpc_item_wait_for_reply(struct m0_rpc_item *item, m0_time_t timeout)
M0_INTERNAL bool m0_rpc_item_is_request(const struct m0_rpc_item *item)
static struct m0_rpc_session * session
struct m0_sm_group rm_sm_grp
static void fop_release(struct m0_ref *ref)
void(* cho_ha_timer_cb)(struct m0_sm_timer *timer)
M0_INTERNAL struct m0_fid * m0_reqh2profile(struct m0_reqh *reqh)
void m0_rpc_item_fini(struct m0_rpc_item *item)
static void test_resend(void)
void m0_rpc_machine_get_stats(struct m0_rpc_machine *machine, struct m0_rpc_stats *stats, bool reset)
const struct m0_rpc_conn_ha_cfg * c_ha_cfg
static const struct m0_rpc_item_ops arrow_item_ops
struct m0_rpc_machine * c_rpc_machine
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
struct m0_fop_type m0_rpc_arrow_fopt
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
void(* rito_item_get)(struct m0_rpc_item *item)
void m0_rpc_item_cancel(struct m0_rpc_item *item)
const struct m0_rpc_item_ops cs_ds_req_fop_rpc_item_ops
M0_INTERNAL struct m0_confc * m0_reqh2confc(struct m0_reqh *reqh)
#define M0_FID_TINIT(type, container, key)
M0_INTERNAL void m0_rpc_test_fops_fini(void)
M0_INTERNAL int m0_rpc_conn_ha_subscribe(struct m0_rpc_conn *conn, struct m0_fid *svc_fid)
M0_INTERNAL void m0_fi_disable(const char *fp_func, const char *fp_tag)
static void test_reply_item_error(void)
static void m0_fi_enable(const char *func, const char *tag)
static struct m0_confc * confc
static void m0_fi_enable_func(const char *func, const char *tag, m0_fi_fpoint_state_func_t trigger_func, void *data)
struct m0_sm_conf rit_outgoing_conf
M0_INTERNAL void m0_rpc_item_send(struct m0_rpc_item *item)
struct m0_rpc_item_header2 ri_header
enum m0_ha_obj_state co_ha_state
M0_INTERNAL void m0_rpc_session_cancel(struct m0_rpc_session *session)
M0_INTERNAL void m0_rpc_item_cache_purge(struct m0_rpc_item_cache *ic)
struct m0_rpc_conn rcx_connection
M0_INTERNAL void m0_rpc_item_cache_del(struct m0_rpc_item_cache *ic, uint64_t xid)
static bool __ha_service_event(struct m0_clink *link)
static void stop_rpc_client_and_server(void)
struct m0_conf_cache cc_cache
int m0_rpc_client_stop(struct m0_rpc_client_ctx *cctx)
int m0_rpc_client_start(struct m0_rpc_client_ctx *cctx)
struct m0_rpc_item * ri_reply
#define IS_INCR_BY_N(p, n)
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
static struct m0_rpc_server_ctx sctx
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
static bool drop_twice(void *data)
int m0_rpc_post_sync(struct m0_fop *fop, struct m0_rpc_session *session, const struct m0_rpc_item_ops *ri_ops, m0_time_t deadline)
static void test_oneway_item(void)
struct m0_ut_suite item_ut
M0_INTERNAL int m0_fop_data_alloc(struct m0_fop *fop)
static void test_dropped(struct m0_rpc_item *item)
static uint64_t test_item_cache_item_put_xid
static void _test_timer_start_failure(void)
void(* rio_sent)(struct m0_rpc_item *item)
uint32_t fop_dispatched_nr
bool REINITIALISE_AFTER_CANCEL
struct m0_semaphore arrow_destroyed
bool(* m0_chan_cb_t)(struct m0_clink *link)
static void __ha_timer__dummy(struct m0_sm_timer *timer)
static bool chk_state(const struct m0_rpc_item *item, enum m0_rpc_item_state state)
static void test_ha_notify()
M0_INTERNAL void m0_rconfc_fini(struct m0_rconfc *rconfc)
static void session_ut_item_cb(struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_item_cache_clear(struct m0_rpc_item_cache *ic)
void(* rio_replied)(struct m0_rpc_item *item)
M0_INTERNAL bool m0_fid_eq(const struct m0_fid *fid0, const struct m0_fid *fid1)
#define M0_UT_RETURN(...)
m0_time_t rchc_ha_interval
static void _ha_do_not_notify(struct m0_rpc_conn *conn, uint8_t state)
static struct m0_rpc_machine * machine
static void test_failure_before_sending(void)
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
struct m0_rpc_conn_ha_ops rchc_ops
static bool only_second_time(void *data)
static void _test_resend(struct m0_fop *fop, bool post_sync)
M0_INTERNAL void m0_rpc_conn_ha_cfg_set(struct m0_rpc_conn *conn, const struct m0_rpc_conn_ha_cfg *cfg)
M0_INTERNAL struct m0_locality * m0_locality0_get(void)
struct m0_reqh_context cc_reqh_ctx
struct m0_rpc_session rcx_session
static void test_timeout(void)
static void _test_timeout(m0_time_t deadline, m0_time_t timeout, bool reset)
M0_INTERNAL bool m0_rpc_item_cache_add(struct m0_rpc_item_cache *ic, struct m0_rpc_item *item, m0_time_t deadline)
static struct m0_rpc_item_type_ops test_item_cache_type_ops
void(* cho_ha_notify)(struct m0_rpc_conn *conn, uint8_t state)
static void test_cancel_session(void)
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
M0_INTERNAL void m0_fop_release(struct m0_ref *ref)
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
static int ts_item_init(void)
M0_INTERNAL int m0_rpc_item_cache_init(struct m0_rpc_item_cache *ic, struct m0_mutex *lock)
const struct m0_rpc_item_ops * ri_ops
static void test_item_cache_item_put(struct m0_rpc_item *item)
m0_time_t m0_time_from_now(uint64_t secs, long ns)
struct m0_sm_timer c_ha_timer
const struct m0_sm_conf outgoing_item_sm_conf
struct m0_rpc_session * ri_session
M0_INTERNAL int m0_rconfc_start(struct m0_rconfc *rconfc)
static struct m0_thread ha_thread
void __ha_accept_imitate(struct m0_fid *sfid)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
void m0_fop_put_lock(struct m0_fop *fop)
struct m0_clink c_ha_clink
static struct m0_fop * fop
M0_INTERNAL struct m0_conf_obj * m0_rpc_conn2svc(const struct m0_rpc_conn *conn)
static void check_cancel(bool already_replied, bool reinitialise)
static void m0_fi_enable_once(const char *func, const char *tag)
M0_INTERNAL struct m0_rpc_item * m0_rpc_item_cache_lookup(struct m0_rpc_item_cache *ic, uint64_t xid)
struct m0_rconfc rh_rconfc
static void cancel_item_with_various_states(bool reinitialise)
M0_INTERNAL void m0_rpc_conn_ha_unsubscribe(struct m0_rpc_conn *conn)
M0_INTERNAL int m0_rconfc_init(struct m0_rconfc *rconfc, const struct m0_fid *profile, struct m0_sm_group *sm_group, struct m0_rpc_machine *rmach, m0_rconfc_cb_t expired_cb, m0_rconfc_cb_t ready_cb)
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
static void test_simple_transitions(void)
static enum m0_ha_obj_state expected_state
struct m0_rpc_stats rm_stats
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
static void test_cancel_item(void)
static struct m0_fid expected_fid
M0_INTERNAL void m0_rpc_test_fops_init(void)
struct m0_rpc_machine * ri_rmachine
int m0_rpc_item_timedwait(struct m0_rpc_item *item, uint64_t states, m0_time_t timeout)
struct m0_rpc_item f_item
static void start_rpc_client_and_server(void)
struct m0_rpc_conn * s_conn
static void arrow_sent_cb(struct m0_rpc_item *item)
struct m0_motr rsx_motr_ctx
static void _ha_notify(struct m0_rpc_conn *conn, uint8_t state)
static void conn_flag_unset(struct m0_rpc_conn *conn, uint64_t flag)
void(* m0_rpc__item_dropped)(struct m0_rpc_item *item)
static bool fop_release_called
static bool arrow_sent_cb_called
void m0_rpc_item_cancel_init(struct m0_rpc_item *item)
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
int m0_nanosleep(const m0_time_t req, m0_time_t *rem)