31 #define DEF_RESPONSE "active pong" 32 #define DEF_SEND "passive ping" 33 #define SEND_RESP " pong" 63 for (
i = 0;
i <
num; ++
i) {
92 for (
i = 0;
i <
ctx->pc_nr_bufs; ++
i)
99 if (
i ==
ctx->pc_nr_bufs)
102 nb = &
ctx->pc_nbs[
i];
113 int i = nb - &
ctx->pc_nbs[0];
161 *
bp = send_desc ?
's' :
'r';
222 rc = sscanf(
bp,
"%d", &len);
259 ctx->pc_ops->pf(
"%s: Msg Recv CB\n",
ctx->pc_ident);
263 ctx->pc_ops->pf(
"%s: msg recv canceled\n",
266 ctx->pc_ops->pf(
"%s: msg recv error: %d\n",
278 ctx->pc_ops->pf(
"%s: got msg: %s\n",
281 ctx->pc_ops->pf(
"%s: got msg: %u bytes\n",
282 ctx->pc_ident, len + 1);
284 if (
ctx->pc_compare_buf !=
NULL) {
285 int l = strlen(
ctx->pc_compare_buf);
290 ctx->pc_ops->pf(
"%s: msg bytes validated\n",
314 ctx->pc_ops->pf(
"%s: Msg Send CB\n",
ctx->pc_ident);
318 ctx->pc_ops->pf(
"%s: msg send canceled\n",
321 ctx->pc_ops->pf(
"%s: msg send error: %d\n",
358 ctx->pc_ops->pf(
"%s: Passive Recv CB\n",
ctx->pc_ident);
362 ctx->pc_ops->pf(
"%s: passive recv canceled\n",
365 ctx->pc_ops->pf(
"%s: passive recv error: %d\n",
376 ctx->pc_ops->pf(
"%s: got data: %s\n",
379 ctx->pc_ops->pf(
"%s: got data: %u bytes\n",
380 ctx->pc_ident, len + 1);
384 for (
i = 0;
i < len - 1; ++
i) {
386 PING_ERR(
"%s: data diff @ offset %i: " 395 ctx->pc_ops->pf(
"%s: data bytes validated\n",
420 ctx->pc_ops->pf(
"%s: Passive Send CB\n",
ctx->pc_ident);
424 ctx->pc_ops->pf(
"%s: passive send canceled\n",
427 ctx->pc_ops->pf(
"%s: passive send error: %d\n",
463 const char *
s =
"unexpected";
470 ctx->pc_ops->pf(
"%s: Event CB state change to %s, status %d\n",
474 ctx->pc_ops->pf(
"%s: Event CB for error %d\n",
477 ctx->pc_ops->pf(
"%s: Event CB for diagnostic %d\n",
511 sprintf(
buf,
"%s (peer %s)", ident,
ep->nep_addr);
527 int bulk_delay =
ctx->pc_server_bulk_delay;
537 ctx->pc_ops->pf(
"%s: msg recv canceled on shutdown\n",
540 ctx->pc_ops->pf(
"%s: msg recv error: %d\n",
556 ctx->pc_ops->pf(
"%s: dropped msg, " 557 "no buffer available\n", idbuf);
565 ctx->pc_ops->pf(
"%s: got desc for " 566 "active recv\n", idbuf);
573 if (bulk_delay != 0) {
574 ctx->pc_ops->pf(
"%s: delay %d secs\n",
579 ctx->pc_ops->pf(
"%s: got desc for " 580 "active send\n", idbuf);
587 if (
ctx->pc_passive_size == 0)
595 i <
ctx->pc_passive_size - 1; ++
i)
596 bp[
i] =
"abcdefghi"[
i % 9];
597 ctx->pc_ops->pf(
"%s: sending data " 599 ctx->pc_passive_size);
605 if (bulk_delay != 0) {
606 ctx->pc_ops->pf(
"%s: delay %d secs\n",
614 ctx->pc_ops->pf(
"%s: got msg: %s\n",
617 ctx->pc_ops->pf(
"%s: got msg: " 653 ctx->pc_ops->pf(
"%s: Msg Send CB\n", idbuf);
658 ctx->pc_ops->pf(
"%s: msg send canceled\n", idbuf);
660 ctx->pc_ops->pf(
"%s: msg send error: %d\n",
694 ctx->pc_ops->pf(
"%s: Active Recv CB\n", idbuf);
699 ctx->pc_ops->pf(
"%s: active recv canceled\n", idbuf);
701 ctx->pc_ops->pf(
"%s: active recv error: %d\n",
712 ctx->pc_ops->pf(
"%s: got data: %s\n",
715 ctx->pc_ops->pf(
"%s: got data: %u bytes\n",
720 for (
i = 0;
i < len - 1; ++
i) {
722 PING_ERR(
"%s: data diff @ offset %i: " 723 "%c != %c\n", idbuf,
i,
730 ctx->pc_ops->pf(
"%s: data bytes validated\n",
748 ctx->pc_ops->pf(
"%s: Active Send CB\n", idbuf);
753 ctx->pc_ops->pf(
"%s: active send canceled\n", idbuf);
755 ctx->pc_ops->pf(
"%s: active send error: %d\n",
808 PING_ERR(
"buffer allocation failed: %d\n",
rc);
813 PING_ERR(
"buffer bitmap allocation failed: %d\n",
rc);
817 for (
i = 0;
i <
ctx->pc_nr_bufs; ++
i) {
823 ctx->pc_nbs[
i].nb_callbacks =
ctx->pc_buf_callbacks;
827 sprintf(
addr,
"%s:%u:%u",
ctx->pc_hostname,
ctx->pc_port,
830 sprintf(
addr,
"%s:%u",
ctx->pc_hostname,
ctx->pc_port);
834 PING_ERR(
"transfer machine init failed: %d\n",
rc);
842 PING_ERR(
"transfer machine start failed: %d\n",
rc);
853 PING_ERR(
"transfer machine start failed: %d\n",
rc);
880 (*
ctx->pc_ops->pqs)(
ctx,
false);
886 for (
i = 0;
i <
ctx->pc_nr_bufs; ++
i) {
895 if (
ctx->pc_dom.nd_xprt !=
NULL)
917 ctx->pc_hostname =
"127.0.0.1";
918 if (
ctx->pc_port == 0)
920 ctx->pc_ident =
"Server";
926 for (
i = 0;
i < (
ctx->pc_nr_bufs / 4); ++
i) {
927 nb = &
ctx->pc_nbs[
i];
966 for (
i = 0;
i < (
ctx->pc_nr_bufs / 4); ++
i) {
967 nb = &
ctx->pc_nbs[
i];
979 while (!m0_net_tm_tlist_is_empty(&
ctx->pc_tm.ntm_q[
i])) {
980 ctx->pc_ops->pf(
"waiting for queue %d to empty\n",
i);
1022 ctx->pc_ops->pf(
"%s: starting msg send/recv sequence\n",
ctx->pc_ident);
1038 nb->
nb_ep = server_ep;
1061 ctx->pc_ops->pf(
"%s: send failed, " 1062 "no more retries\n",
1099 ctx->pc_ops->pf(
"%s: starting passive recv sequence\n",
ctx->pc_ident);
1104 nb->
nb_ep = server_ep;
1105 if (
ctx->pc_passive_bulk_timeout > 0) {
1106 ctx->pc_ops->pf(
"%s: setting nb_timeout to %ds\n",
1107 ctx->pc_ident,
ctx->pc_passive_bulk_timeout);
1124 nb->
nb_ep = server_ep;
1145 ctx->pc_ops->pf(
"%s: send failed, " 1146 "no more retries\n",
1185 data =
"passive ping";
1186 ctx->pc_ops->pf(
"%s: starting passive send sequence\n",
ctx->pc_ident);
1193 nb->
nb_ep = server_ep;
1194 if (
ctx->pc_passive_bulk_timeout > 0) {
1195 ctx->pc_ops->pf(
"%s: setting nb_timeout to %ds\n",
1196 ctx->pc_ident,
ctx->pc_passive_bulk_timeout);
1213 nb->
nb_ep = server_ep;
1234 ctx->pc_ops->pf(
"%s: send failed, " 1235 "no more retries\n",
1269 ctx->pc_hostname =
"127.0.0.1";
1270 if (
ctx->pc_rhostname ==
NULL)
1271 ctx->pc_rhostname =
"127.0.0.1";
1272 if (
ctx->pc_port == 0)
1274 if (
ctx->pc_rport == 0)
1277 ctx->pc_ident =
"Client";
1283 if (
ctx->pc_rid != 0)
1284 sprintf(
addr,
"%s:%u:%u",
ctx->pc_rhostname,
ctx->pc_rport,
1287 sprintf(
addr,
"%s:%u",
ctx->pc_rhostname,
ctx->pc_rport);
struct m0_net_buffer_callbacks sbuf_cb
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
void ping_server(struct ping_ctx *ctx)
struct m0_net_tm_callbacks stm_cb
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
void s_m_recv_cb(const struct m0_net_buffer_event *ev)
struct m0_net_transfer_mc * nb_tm
M0_INTERNAL void m0_list_add(struct m0_list *head, struct m0_list_link *new)
#define M0_ALLOC_ARR(arr, nr)
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
void m0_net_domain_fini(struct m0_net_domain *dom)
M0_INTERNAL int m0_net_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
struct m0_bufvec nb_buffer
void s_p_send_cb(const struct m0_net_buffer_event *ev)
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
M0_INTERNAL void m0_list_init(struct m0_list *head)
const m0_time_t M0_TIME_NEVER
void s_m_send_cb(const struct m0_net_buffer_event *ev)
union ping_msg::@371 pm_u
static m0_bcount_t segs[NR *IT]
M0_INTERNAL void m0_list_fini(struct m0_list *head)
M0_INTERNAL void m0_list_del(struct m0_list_link *old)
M0_INTERNAL void * m0_bufvec_cursor_addr(struct m0_bufvec_cursor *cur)
int encode_msg(struct m0_net_buffer *nb, const char *str)
M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
int ping_client_fini(struct ping_ctx *ctx, struct m0_net_end_point *server_ep)
m0_bcount_t nb_min_receive_size
#define container_of(ptr, type, member)
enum ping_msg_type pm_type
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
struct m0_net_buffer * nbe_buffer
m0_time_t m0_time(uint64_t secs, long ns)
struct m0_net_end_point * nbe_ep
void event_cb(const struct m0_net_tm_event *ev)
struct m0_net_buf_desc pm_desc
int ping_client_init(struct ping_ctx *ctx, struct m0_net_end_point **server_ep)
M0_INTERNAL int m0_bufvec_alloc(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size)
void c_a_send_cb(const struct m0_net_buffer_event *ev)
M0_INTERNAL void m0_bufvec_free(struct m0_bufvec *bufvec)
int ping_client_passive_recv(struct ping_ctx *ctx, struct m0_net_end_point *server_ep)
int decode_msg(struct m0_net_buffer *nb, struct ping_msg *msg)
void c_m_recv_cb(const struct m0_net_buffer_event *ev)
int alloc_buffers(int num, uint32_t segs, m0_bcount_t segsize, struct m0_net_buffer **out)
M0_INTERNAL bool m0_bufvec_cursor_move(struct m0_bufvec_cursor *cur, m0_bcount_t count)
void c_m_send_cb(const struct m0_net_buffer_event *ev)
enum m0_net_tm_state nte_next_state
M0_INTERNAL int m0_net_tm_init(struct m0_net_transfer_mc *tm, struct m0_net_domain *dom)
void s_p_recv_cb(const struct m0_net_buffer_event *ev)
static void server_event_ident(char *buf, const char *ident, const struct m0_net_buffer_event *ev)
static void ping_sleep_secs(int secs)
struct m0_net_tm_callbacks ctm_cb
void ping_fini(struct ping_ctx *ctx)
enum m0_net_queue_type nb_qtype
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_copy(struct m0_bufvec_cursor *dcur, struct m0_bufvec_cursor *scur, m0_bcount_t num_bytes)
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_step(const struct m0_bufvec_cursor *cur)
void c_a_recv_cb(const struct m0_net_buffer_event *ev)
uint32_t nb_max_receive_msgs
M0_INTERNAL void m0_net_tm_fini(struct m0_net_transfer_mc *tm)
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
struct m0_net_transfer_mc pc_tm
struct m0_net_buffer * pwi_nb
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
M0_INTERNAL void m0_cond_signal(struct m0_cond *cond)
struct m0_list_link pwi_link
void * m0_alloc(size_t size)
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
M0_INTERNAL void m0_bitmap_set(struct m0_bitmap *map, size_t idx, bool val)
static struct ping_ctx * buffer_event_to_ping_ctx(const struct m0_net_buffer_event *ev)
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
void c_p_send_cb(const struct m0_net_buffer_event *ev)
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
M0_INTERNAL int m0_net_desc_copy(const struct m0_net_buf_desc *from_desc, struct m0_net_buf_desc *to_desc)
M0_INTERNAL bool m0_list_is_empty(const struct m0_list *head)
enum m0_net_queue_type pwi_type
static struct m0_clink l[NR]
struct m0_net_buffer_callbacks cbuf_cb
M0_INTERNAL int m0_net_tm_stop(struct m0_net_transfer_mc *tm, bool abort)
#define PING_ERR(fmt,...)
int ping_client_passive_send(struct ping_ctx *ctx, struct m0_net_end_point *server_ep, const char *data)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
struct m0_net_buffer * ping_buf_get(struct ping_ctx *ctx)
void m0_net_end_point_put(struct m0_net_end_point *ep)
M0_INTERNAL void m0_cond_wait(struct m0_cond *cond)
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
int m0_net_domain_init(struct m0_net_domain *dom, const struct m0_net_xprt *xprt)
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
m0_time_t m0_time_from_now(uint64_t secs, long ns)
void s_a_send_cb(const struct m0_net_buffer_event *ev)
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
static void netbuf_step(struct m0_bufvec_cursor *cur)
void ping_server_should_stop(struct ping_ctx *ctx)
int ping_init(struct ping_ctx *ctx)
struct m0_net_buf_desc nb_desc
static struct m0_atomic64 s_msg_recv_counter
enum m0_net_tm_ev_type nte_type
static struct bulkio_params * bp
M0_INTERNAL void m0_list_link_init(struct m0_list_link *link)
static struct m0_list_link * m0_list_first(const struct m0_list *head)
int encode_desc(struct m0_net_buffer *nb, bool send_desc, const struct m0_net_buf_desc *desc)
void msg_free(struct ping_msg *msg)
int ping_client_msg_send_recv(struct ping_ctx *ctx, struct m0_net_end_point *server_ep, const char *data)
static struct m0_addb2_source * s
M0_INTERNAL int m0_net_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
#define m0_list_entry(link, type, member)
void c_p_recv_cb(const struct m0_net_buffer_event *ev)
void ping_buf_put(struct ping_ctx *ctx, struct m0_net_buffer *nb)
void s_a_recv_cb(const struct m0_net_buffer_event *ev)
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
struct m0_net_end_point * nb_ep
struct m0_net_transfer_mc * nte_tm
#define M0_IMPOSSIBLE(fmt,...)
int m0_nanosleep(const m0_time_t req, m0_time_t *rem)