39 #define NET_TEST_MODULE_NAME node_bulk 320 for (
i = 0;
i < allowed_size; ++
i) {
321 if (allowed[
i].sta_from == from && allowed[
i].sta_to == to)
324 return i < allowed_size;
371 M0_PRE(bs_index < ctx->nbc_bs_nr);
374 LOGD(
"state_change: role = %d, bs_index = %lu, state = %d",
375 ctx->nbc_nh.ntnh_role, bs_index, state);
377 role =
ctx->nbc_nh.ntnh_role;
378 bs = &
ctx->nbc_bs[bs_index];
381 role_client ? allowed_client : allowed_server,
428 static const struct {
432 #define TRANSITION(name) { \ 433 .nbst_transition = name, \ 434 .nbst_nr = ARRAY_SIZE(name), \ 450 for (
i = 0;
i < state_nr; ++
i) {
451 for (j =
i + 1; j < state_nr; ++j) {
477 for (
i = 0;
i < state_nr; ++
i) {
481 M0_IMPOSSIBLE(
"Invalid 'from' state in net-test bulk testing.");
490 size_t transition_size;
494 M0_PRE(bs_index < ctx->nbc_bs_nr);
501 transition_size = success ?
508 transition_size = success ?
529 M0_PRE(bs_index < ctx->nbc_bs_nr);
532 bs = &
ctx->nbc_bs[bs_index];
550 "net-test bulk testing");
564 for (
i = 0;
i <
nr; ++
i) {
586 for (
i = 0;
i <
nr; ++
i) {
599 size_t buf_bulk_index)
606 size_t buf_bulk_index)
609 M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
612 M0_SET0(&
ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
620 size_t buf_bulk_index)
623 M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
626 ctx->nbc_bs[buf_bulk_index].bsb_desc_send);
630 size_t buf_bulk_index)
633 M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
643 M0_PRE(bs_index * 2 + 1 <
ctx->nbc_buf_bulk_nr);
650 size_t buf_bulk_index,
651 size_t buf_ping_index,
658 M0_PRE(buf_ping_index < ctx->nbc_buf_ping_nr);
659 M0_PRE(buf_bulk_index < ctx->nbc_buf_bulk_nr);
664 &
ctx->nbc_net, buf_bulk_index,
677 &
ctx->nbc_net, buf_bulk_index,
703 size_t buf_ping_index,
707 size_t buf_bulk_index;
713 LOGD(
"--- NO UNUSED BUFS");
720 M0_ASSERT(buf_bulk_index < ctx->nbc_buf_bulk_nr);
725 LOGD(
"BADMSG: buf_bulk_index = %lu, " 726 "buf_ping_index = %lu, offset = %lu",
727 buf_bulk_index, buf_ping_index, (
unsigned long)
offset);
736 ctx->nbc_bs[buf_bulk_index].bsb_recv.bse_func_rc =
rc;
746 return rc == 0 ? len : 0;
765 buf_index < ctx->nbc_buf_bulk_nr));
772 LOGD(
"MS_BAD: nr = %lu",
nr);
778 for (
i = 0;
i <
nr; ++
i) {
785 LOGD(
"--- active bulk recv FAILED!");
797 ctx->nbc_bs[buf_index].bsb_send.bse_func_rc =
rc;
799 LOGD(
"--- active bulk send FAILED!");
821 buf_index < ctx->nbc_buf_bulk_nr));
835 bs = &
ctx->nbc_bs[buf_index / 2];
847 const uint32_t buf_index,
859 LOGD(
"node_bulk_cb: tm_addr = %s, buf_index = %u, q = %d" 860 ", ev-nbe_status = %d",
875 LOGD(
"node_bulk_cb: tm_addr = %s, buf_index = %u, q = %d" 876 ", ev-nbe_status = %d",
881 ctx->nbc_callback_executed =
true;
887 bs_index = role_client ? buf_index / 2 : buf_index;
889 bs = &
ctx->nbc_bs[bs_index];
899 (
ctx, buf_index,
q, ev);
935 return bs_index /
ctx->nbc_client_concurrency;
966 M0_PRE(bs_index < ctx->nbc_bs_nr);
968 bs = &
ctx->nbc_bs[bs_index];
998 for (
i = 0;
i <
nr; ++
i) {
1019 size_t msg_buf_index,
1068 size_t msg_index = 0;
1070 size_t msg_bd_nr = 0;
1076 struct m0_tl messages;
1081 M0_PRE(
ctx->nbc_bd_nr_max > 0 &&
ctx->nbc_bd_nr_max % 2 == 0);
1083 bsp_tlist_init(&messages);
1104 msg_bs = &
ctx->nbc_bsp[msg_index];
1111 msg_index, msg_offset);
1119 if (msg_bd_nr > 0) {
1121 bsp_tlist_add_tail(&messages, msg_bs);
1142 if (msg_bd_nr ==
ctx->nbc_bd_nr_max || buf_last) {
1143 bsp_tlist_add_tail(&messages, msg_bs);
1150 list_is_empty = bsb_tlist_is_empty(&msg_bs->
bsp_buffers);
1163 LOGD(
"--- msg send FAILED!");
1177 bsp_tlist_del(msg_bs);
1179 bsp_tlist_fini(&messages);
1196 for (
i = 0;
i <
nr; ++
i) {
1201 if (!ssb_tlink_is_in(ss))
1202 ssb_tlist_add_tail(&
servers, ss);
1217 for (
i = 0;
i <
ctx->nbc_bs_nr; ++
i) {
1221 for (
i = 0;
i <
ctx->nbc_buf_ping_nr; ++
i)
1230 for (
i = 0;
i <
ctx->nbc_buf_ping_nr; ++
i) {
1234 for (
i = 0;
i <
ctx->nbc_buf_bulk_nr; ++
i) {
1245 ctx->nbc_buf_ping_nr &&
1264 ctx->nbc_callback_executed =
false;
1273 if (!
ctx->nbc_callback_executed)
1282 struct m0_mutex tm_chan_mutex = {};
1357 for (
i = 0;
i <
ctx->nbc_bs_nr; ++
i) {
1358 ctx->nbc_bs[
i].bsb_index =
i;
1359 bsb_tlink_init(&
ctx->nbc_bs[
i]);
1368 ss = &
ctx->nbc_sstatus[
i];
1376 for (
i = 0;
i <
ctx->nbc_buf_ping_nr; ++
i) {
1377 msg_bs = &
ctx->nbc_bsp[
i];
1380 bsp_tlink_init(msg_bs);
1387 ctx->nbc_buf_ping_nr);
1393 goto free_rb_ping_unused;
1397 goto free_rb_bulk_unused;
1401 goto free_rb_bulk_queued;
1426 goto free_rb_bulk_final;
1446 free_rb_bulk_queued:
1448 free_rb_bulk_unused:
1450 free_rb_ping_unused:
1454 for (
i = 0;
i <
ctx->nbc_buf_ping_nr; ++
i) {
1455 msg_bs = &
ctx->nbc_bsp[
i];
1456 bsp_tlink_init(msg_bs);
1464 ctx->nbc_net.ntc_ep_nr;
1465 for (
i = 0;
i <
nr; ++
i) {
1466 ss = &
ctx->nbc_sstatus[
i];
1473 for (
i = 0;
i <
ctx->nbc_bs_nr; ++
i)
1474 bsb_tlink_fini(&
ctx->nbc_bs[
i]);
1492 ctx->nbc_nh.ntnh_test_initialized =
false;
1532 if (
ctx->nbc_nh.ntnh_test_initialized) {
1549 ctx->nbc_bs_nr =
ctx->nbc_buf_bulk_nr / 2;
1554 if (!
ergo(role_client,
ctx->nbc_buf_bulk_nr % 2 == 0) ||
1555 ctx->nbc_buf_bulk_nr < 1 ||
ctx->nbc_buf_size_bulk < 1 ||
1556 ctx->nbc_buf_ping_nr < 1 ||
ctx->nbc_buf_size_ping < 1 ||
1557 ctx->nbc_bd_nr_max < 1 ||
ctx->nbc_bs_nr < 1 ||
1560 !
ergo(role_client,
ctx->nbc_client_concurrency != 0) ||
1561 !
ergo(!role_client,
ctx->nbc_bs_nr ==
ctx->nbc_buf_bulk_nr) ||
1562 !
ergo(role_client, 2 *
ctx->nbc_bs_nr ==
ctx->nbc_buf_bulk_nr))
1569 reply->ntc_done.ntcd_errno =
rc;
1587 sd = &
ctx->nbc_nh.ntnh_sd;
1605 reply->ntc_done.ntcd_errno =
rc;
1621 if (!
ctx->nbc_nh.ntnh_test_initialized) {
1622 reply->ntc_done.ntcd_errno = -EINVAL;
1635 reply->ntc_done.ntcd_errno = 0;
1690 #undef NET_TEST_MODULE_NAME
int m0_net_test_node_bulk_init(void)
void m0_net_test_network_buffer_dequeue(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, int32_t buf_index)
static void transition(void)
enum m0_net_test_cmd_type ntsch_type
static const struct state_transition node_bulk_client_success[]
struct m0_net_test_network_timeouts m0_net_test_network_timeouts_never(void)
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
size_t nbc_client_concurrency
struct m0_tlink bsb_tlink
static void node_bulk_tm_event_cb(const struct m0_net_tm_event *ev)
#define M0_ALLOC_ARR(arr, nr)
static void client_transfer_start(struct node_bulk_ctx *ctx, size_t bs_index)
static int node_bulk_cmd_stop(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
struct m0_net_test_slist ntci_ep
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
struct buf_status_errno bsb_send
uint32_t ntncfg_buf_ping_nr
m0_time_t ntci_buf_bulk_timeout
static struct m0_semaphore q
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
int m0_net_test_network_msg_recv(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
static void buf_desc_set0(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
int m0_net_test_network_ctx_init(struct m0_net_test_network_ctx *ctx, struct m0_net_test_network_cfg *cfg, const char *tm_addr)
static int node_bulk_test_init_fini(struct node_bulk_ctx *ctx, const struct m0_net_test_cmd *cmd)
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
struct m0_net_test_mps ntcsd_mps_recv
int m0_thread_join(struct m0_thread *q)
void m0_net_test_service_state_change(struct m0_net_test_service *svc, enum m0_net_test_service_state state)
static size_t client_server_index(struct node_bulk_ctx *ctx, size_t bs_index)
struct m0_tlink ssb_tlink
void m0_net_test_nh_sd_copy_locked(struct m0_net_test_nh *nh)
static void buf_desc_swap(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
m0_bcount_t m0_net_test_network_bd_serialize(enum m0_net_test_serialize_op op, struct m0_net_test_network_ctx *ctx, uint32_t buf_bulk_index, uint32_t buf_ping_index, m0_bcount_t offset)
M0_TL_DESCR_DEFINE(bsb, "buf_status_bulk", static, struct buf_status_bulk, bsb_tlink, bsb_magic, M0_NET_TEST_BSB_MAGIC, M0_NET_TEST_BSB_HEAD_MAGIC)
m0_net_test_network_buffer_cb_proc_t ntnbc_cb[M0_NET_QT_NR]
int m0_net_test_ringbuf_init(struct m0_net_test_ringbuf *rb, size_t size)
static void node_bulk_cb_server(struct node_bulk_ctx *ctx, size_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
M0_TL_DEFINE(bsb, static, struct buf_status_bulk)
struct buf_status_errno bsb_recv
static void buf_desc_client_free(struct node_bulk_ctx *ctx, size_t bs_index)
void m0_net_test_node_bulk_fini(void)
void m0_net_test_nh_sd_update_rtt(struct m0_net_test_nh *nh, m0_time_t rtt)
static const struct state_transition node_bulk_server_success[]
static const struct state_transition node_bulk_client_failure[]
struct m0_net_test_ringbuf nbc_rb_bulk_queued
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
static void node_bulk_buf_unused(struct node_bulk_ctx *ctx)
#define container_of(ptr, type, member)
static m0_bcount_t net_test_len_accumulate(m0_bcount_t accumulator, m0_bcount_t addend)
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
m0_time_t ntnt_timeout[M0_NET_QT_NR]
struct buf_status_bulk * nbc_bs
static int client_bulk_enqueue(struct node_bulk_ctx *ctx, size_t buf_index)
m0_bcount_t ntci_msg_size
m0_net_test_nh_msg_status
static const struct m0_net_tm_callbacks node_bulk_tm_cb
m0_time_t bsb_time_finish
m0_net_test_nh_msg_direction
int m0_net_test_network_bulk_enqueue(struct m0_net_test_network_ctx *ctx, int32_t buf_bulk_index, int32_t ep_index, enum m0_net_queue_type q)
size_t m0_net_test_ringbuf_pop(struct m0_net_test_ringbuf *rb)
m0_time_t ntci_buf_send_timeout
static void * node_bulk_initfini(void *ctx_, struct m0_net_test_service *svc)
bool m0_net_test_nh_transfer_next(struct m0_net_test_nh *nh)
struct buf_status_ping * nbc_bsp
M0_INTERNAL void m0_clink_attach(struct m0_clink *link, struct m0_clink *group, m0_chan_cb_t cb)
m0_bcount_t ntncfg_buf_size_bulk
enum transfer_state sta_from
void node_bulk_state_transition_auto(struct node_bulk_ctx *ctx, size_t bs_index)
static void sd_update(struct node_bulk_ctx *ctx, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
#define m0_tl_teardown(name, head, obj)
static void client_bulk_dequeue(struct node_bulk_ctx *ctx, size_t buf_index)
static void client_bulk_bufs_dequeue(struct node_bulk_ctx *ctx, struct buf_status_bulk *bs)
static void node_bulk_state_check_all(void)
void m0_net_test_ringbuf_fini(struct m0_net_test_ringbuf *rb)
struct m0_net_test_cmd_init ntc_init
enum m0_net_test_role ntci_role
struct m0_net_test_service * nbc_svc
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
struct m0_net_test_network_buffer_callbacks ntncfg_buf_cb
static int node_bulk_cmd_status(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
static void net_bulk_worker_cb(struct node_bulk_ctx *ctx, bool pending)
static char servers[NTCS_NODES_MAX *NTCS_NODE_ADDR_MAX]
struct m0_chan nbc_stop_chan
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
m0_time_t m0_time_now(void)
m0_bcount_t nbc_buf_size_ping
struct server_status_bulk * nbc_sstatus
struct m0_net_test_mps ntcsd_mps_send
void m0_net_test_nh_cmd_status(struct m0_net_test_nh *nh, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
static void client_process_unused_bulk(struct node_bulk_ctx *ctx)
void m0_thread_fini(struct m0_thread *q)
void m0_net_test_ringbuf_push(struct m0_net_test_ringbuf *rb, size_t value)
static void * node_bulk_init(struct m0_net_test_service *svc)
static m0_bcount_t buf_desc_deserialize(struct node_bulk_ctx *ctx, size_t buf_bulk_index, size_t buf_ping_index, m0_bcount_t offset)
void m0_net_test_network_ctx_fini(struct m0_net_test_network_ctx *ctx)
struct m0_net_test_ringbuf nbc_rb_bulk_final
void m0_net_test_nh_sd_update(struct m0_net_test_nh *nh, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
static bool node_bulk_state_is_final(enum transfer_state state)
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
static enum transfer_state node_bulk_state_search(enum transfer_state state, const struct state_transition state_list[], size_t state_nr)
static void node_bulk_worker(struct node_bulk_ctx *ctx)
static void node_bulk_state_check(const struct state_transition state_list[], size_t state_nr)
enum transfer_state bsb_ts
struct m0_tlink bsp_tlink
m0_bcount_t ntci_bd_buf_size
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
m0_time_t ntcsd_time_start
static m0_bcount_t client_bds_serialize2(struct m0_net_test_network_ctx *net, size_t bsb_index, size_t msg_buf_index, m0_bcount_t offset)
static m0_bindex_t offset
struct m0_net_test_nh nbc_nh
uint64_t ntci_msg_concurrency
int m0_net_test_network_msg_send(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index, uint32_t ep_index)
static void node_bulk_fini(void *ctx_)
size_t m0_net_test_network_bd_nr(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
struct m0_atomic64 nbc_stop_flag
static struct fdmi_ctx ctx
struct buf_status_errno bsb_msg
M0_INTERNAL void m0_chan_signal_lock(struct m0_chan *chan)
static struct m0_net_test_network_buffer_callbacks node_bulk_buf_cb
M0_INTERNAL void m0_net_buffer_event_deliver_all(struct m0_net_transfer_mc *tm)
enum transfer_state sta_to
struct m0_mutex nbc_stop_chan_mutex
size_t m0_net_test_ringbuf_nr(struct m0_net_test_ringbuf *rb)
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
struct m0_net_transfer_mc * ntc_tm
void m0_net_test_mps_init(struct m0_net_test_mps *mps, unsigned long messages, m0_time_t timestamp, m0_time_t interval)
struct m0_net_buffer * m0_net_test_network_buf(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, uint32_t buf_index)
static struct m0_net_buffer * net_buf_bulk_get(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
M0_INTERNAL void m0_net_buffer_event_notify(struct m0_net_transfer_mc *tm, struct m0_chan *chan)
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
const struct state_transition * nbst_transition
struct m0_net_test_ringbuf nbc_rb_bulk_unused
M0_INTERNAL bool m0_net_buffer_event_pending(struct m0_net_transfer_mc *tm)
struct m0_net_end_point * ntm_ep
#define M0_ALLOC_PTR(ptr)
static bool node_bulk_is_stopping(struct node_bulk_ctx *ctx)
m0_bcount_t ntncfg_buf_size_ping
void m0_net_test_nh_init(struct m0_net_test_nh *nh, const struct m0_net_test_cmd_init *icmd)
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
struct m0_clink nbc_stop_clink
static struct m0_net_test_service svc
static void node_bulk_state_change_cb(struct node_bulk_ctx *ctx, size_t bs_index, bool success)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
struct m0_net_test_network_timeouts ntncfg_timeouts
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
static int node_bulk_step(void *ctx_)
static int node_bulk_cmd_init(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
bool m0_net_test_ringbuf_is_empty(struct m0_net_test_ringbuf *rb)
struct m0_net_test_service_ops m0_net_test_node_bulk_ops
struct m0_net_buf_desc bsb_desc_send
uint32_t ntncfg_buf_bulk_nr
int fini(struct workload *w)
static void node_bulk_state_change(struct node_bulk_ctx *ctx, size_t bs_index, enum transfer_state state)
#define M0_MKTIME(secs, ns)
int m0_net_test_network_ep_add_slist(struct m0_net_test_network_ctx *ctx, const struct m0_net_test_slist *eps)
static struct m0_addb2_net * net
static bool node_bulk_bufs_unused_all(struct node_bulk_ctx *ctx)
struct m0_net_buf_desc nb_desc
void node_bulk_state_transition_auto_all(struct node_bulk_ctx *ctx)
static void client_process_queued_bulk(struct node_bulk_ctx *ctx)
static void client_bds_send(struct node_bulk_ctx *ctx, struct server_status_bulk *ss)
static const struct state_transition node_bulk_server_failure[]
static bool node_bulk_state_change_allowed(enum transfer_state from, enum transfer_state to, const struct state_transition allowed[], size_t allowed_size)
struct m0_mutex nbc_bulk_mutex
static void node_bulk_cb(struct m0_net_test_network_ctx *net_ctx, const uint32_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
static struct node_bulk_ctx * node_bulk_ctx_from_net_ctx(struct m0_net_test_network_ctx *net_ctx)
void *(* ntso_init)(struct m0_net_test_service *svc)
static struct m0_net_test_service_cmd_handler node_bulk_cmd_handler[]
static void server_process_unused_ping(struct node_bulk_ctx *ctx)
static m0_bcount_t node_bulk_server_transfer_start(struct node_bulk_ctx *ctx, size_t buf_ping_index, m0_bcount_t offset)
static void node_bulk_cb_client(struct node_bulk_ctx *ctx, size_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
static struct m0_dtm_oper_descr reply
#define m0_tl_for(name, head, obj)
m0_bcount_t nbc_buf_size_bulk
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
static const struct @411 node_bulk_state_transitions[]
static void node_bulk_buf_dequeue(struct node_bulk_ctx *ctx)
static int node_bulk_cmd_start(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
void m0_net_test_network_bd_nr_dec(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
struct m0_net_tm_callbacks ntncfg_tm_cb
struct m0_thread nbc_thread
struct m0_net_test_network_ctx nbc_net
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
struct m0_net_test_ringbuf nbc_rb_ping_unused
bool nbc_callback_executed
static void buf_desc_server_free(struct node_bulk_ctx *ctx, size_t buf_bulk_index)
#define M0_IMPOSSIBLE(fmt,...)