Motr  M0
ping.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/assert.h"
24 #include "lib/errno.h"
25 #include "lib/memory.h"
26 #include "lib/misc.h" /* strlen */
27 #include "net/net.h"
28 #include "net/bulk_mem.h"
30 
31 #define DEF_RESPONSE "active pong"
32 #define DEF_SEND "passive ping"
33 #define SEND_RESP " pong"
34 
36 enum {
38 };
39 
44 };
45 
46 static void ping_sleep_secs(int secs)
47 {
48  if (secs != 0)
49  m0_nanosleep(m0_time(secs, 0), NULL);
50 }
51 
52 int alloc_buffers(int num, uint32_t segs, m0_bcount_t segsize,
53  struct m0_net_buffer **out)
54 {
55  struct m0_net_buffer *nbs;
56  struct m0_net_buffer *nb;
57  int i;
58  int rc = 0;
59 
60  M0_ALLOC_ARR(nbs, num);
61  if (nbs == NULL)
62  return -ENOMEM;
63  for (i = 0; i < num; ++i) {
64  nb = &nbs[i];
65  rc = m0_bufvec_alloc(&nb->nb_buffer, segs, segsize);
66  if (rc != 0)
67  break;
68  }
69 
70  if (rc == 0)
71  *out = nbs;
72  else {
73  while (--i >= 0)
74  m0_bufvec_free(&nbs[i].nb_buffer);
75  m0_free(nbs);
76  }
77  return rc;
78 }
79 
87 {
88  int i;
89  struct m0_net_buffer *nb;
90 
91  m0_mutex_lock(&ctx->pc_mutex);
92  for (i = 0; i < ctx->pc_nr_bufs; ++i)
93  if (!m0_bitmap_get(&ctx->pc_nbbm, i)) {
94  m0_bitmap_set(&ctx->pc_nbbm, i, true);
95  break;
96  }
97  m0_mutex_unlock(&ctx->pc_mutex);
98 
99  if (i == ctx->pc_nr_bufs)
100  return NULL;
101 
102  nb = &ctx->pc_nbs[i];
104  return nb;
105 }
106 
111 void ping_buf_put(struct ping_ctx *ctx, struct m0_net_buffer *nb)
112 {
113  int i = nb - &ctx->pc_nbs[0];
114  M0_ASSERT(i >= 0 && i < ctx->pc_nr_bufs);
116 
117  m0_mutex_lock(&ctx->pc_mutex);
118  M0_ASSERT(m0_bitmap_get(&ctx->pc_nbbm, i));
119  m0_bitmap_set(&ctx->pc_nbbm, i, false);
120  m0_mutex_unlock(&ctx->pc_mutex);
121 }
122 
123 static void netbuf_step(struct m0_bufvec_cursor *cur)
124 {
125  bool eof = m0_bufvec_cursor_move(cur, 1);
126  M0_ASSERT(!eof);
127 }
128 
130 int encode_msg(struct m0_net_buffer *nb, const char *str)
131 {
132  char *bp;
133  m0_bcount_t len = strlen(str) + 1; /* include trailing nul */
134  m0_bcount_t copied;
135  struct m0_bufvec in = M0_BUFVEC_INIT_BUF((void **) &str, &len);
136  struct m0_bufvec_cursor incur;
137  struct m0_bufvec_cursor cur;
138 
139  nb->nb_length = len + 1;
142  *bp = 'm';
143  netbuf_step(&cur);
144  m0_bufvec_cursor_init(&incur, &in);
145  copied = m0_bufvec_cursor_copy(&cur, &incur, len);
146  M0_ASSERT(copied == len);
147  return 0;
148 }
149 
151 int encode_desc(struct m0_net_buffer *nb,
152  bool send_desc,
153  const struct m0_net_buf_desc *desc)
154 {
155  struct m0_bufvec_cursor cur;
156  char *bp;
157  m0_bcount_t step;
158 
161  *bp = send_desc ? 's' : 'r';
162  netbuf_step(&cur);
164 
165  /* only support sending net_desc in single chunks in this test */
166  step = m0_bufvec_cursor_step(&cur);
167  M0_ASSERT(step >= 9 + desc->nbd_len);
168  nb->nb_length = 10 + desc->nbd_len;
169 
170  bp += sprintf(bp, "%08d", desc->nbd_len);
171  ++bp; /* +nul */
172  memcpy(bp, desc->nbd_data, desc->nbd_len);
173  return 0;
174 }
175 
182 };
183 
184 struct ping_msg {
186  union {
187  char *pm_str;
189  } pm_u;
190 };
191 
193 int decode_msg(struct m0_net_buffer *nb, struct ping_msg *msg)
194 {
195  struct m0_bufvec_cursor cur;
196  char *bp;
197  m0_bcount_t step;
198 
201  M0_ASSERT(*bp == 'm' || *bp == 's' || *bp == 'r');
202  netbuf_step(&cur);
203  if (*bp == 'm') {
204  m0_bcount_t len = nb->nb_length - 1;
205  void *str;
206  struct m0_bufvec out = M0_BUFVEC_INIT_BUF(&str, &len);
207  struct m0_bufvec_cursor outcur;
208 
209  msg->pm_type = PM_MSG;
210  str = msg->pm_u.pm_str = m0_alloc(len + 1);
211  m0_bufvec_cursor_init(&outcur, &out);
212  step = m0_bufvec_cursor_copy(&outcur, &cur, len);
213  M0_ASSERT(step == len);
214  } else {
215  int rc;
216  int len = 0;
217 
218  msg->pm_type = (*bp == 's') ? PM_SEND_DESC : PM_RECV_DESC;
220  step = m0_bufvec_cursor_step(&cur);
221  M0_ASSERT(step >= 9 && bp[8] == 0);
222  rc = sscanf(bp, "%d", &len);
223  M0_ASSERT(rc == 1);
224  msg->pm_u.pm_desc.nbd_len = len;
225  M0_ASSERT(step >= 9 + msg->pm_u.pm_desc.nbd_len);
226  bp += 9;
227  msg->pm_u.pm_desc.nbd_data = m0_alloc(len);
228  memcpy(msg->pm_u.pm_desc.nbd_data, bp, len);
229  }
230  return 0;
231 }
232 
233 void msg_free(struct ping_msg *msg)
234 {
235  if (msg->pm_type != PM_MSG)
237  else
238  m0_free(msg->pm_u.pm_str);
239 }
240 
241 static struct ping_ctx *
243 {
244  struct m0_net_buffer *nb = ev->nbe_buffer;
245  M0_ASSERT(nb != NULL);
246  return container_of(nb->nb_tm, struct ping_ctx, pc_tm);
247 }
248 
249 /* client callbacks */
250 void c_m_recv_cb(const struct m0_net_buffer_event *ev)
251 {
252  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
253  int rc;
254  int len;
255  struct ping_work_item *wi;
256  struct ping_msg msg;
257 
259  ctx->pc_ops->pf("%s: Msg Recv CB\n", ctx->pc_ident);
260 
261  if (ev->nbe_status < 0) {
262  if (ev->nbe_status == -ECANCELED)
263  ctx->pc_ops->pf("%s: msg recv canceled\n",
264  ctx->pc_ident);
265  else
266  ctx->pc_ops->pf("%s: msg recv error: %d\n",
267  ctx->pc_ident, ev->nbe_status);
268  } else {
269  ev->nbe_buffer->nb_length = ev->nbe_length;
270  rc = decode_msg(ev->nbe_buffer, &msg);
271  M0_ASSERT(rc == 0);
272 
273  if (msg.pm_type != PM_MSG)
274  M0_IMPOSSIBLE("Client: got desc\n");
275 
276  len = strlen(msg.pm_u.pm_str);
277  if (strlen(msg.pm_u.pm_str) < 32)
278  ctx->pc_ops->pf("%s: got msg: %s\n",
279  ctx->pc_ident, msg.pm_u.pm_str);
280  else
281  ctx->pc_ops->pf("%s: got msg: %u bytes\n",
282  ctx->pc_ident, len + 1);
283 
284  if (ctx->pc_compare_buf != NULL) {
285  int l = strlen(ctx->pc_compare_buf);
286  M0_ASSERT(strlen(msg.pm_u.pm_str) == l + 5);
287  M0_ASSERT(strncmp(ctx->pc_compare_buf,
288  msg.pm_u.pm_str, l) == 0);
289  M0_ASSERT(strcmp(&msg.pm_u.pm_str[l], SEND_RESP) == 0);
290  ctx->pc_ops->pf("%s: msg bytes validated\n",
291  ctx->pc_ident);
292  }
293  msg_free(&msg);
294  }
295 
297 
298  M0_ALLOC_PTR(wi);
301 
302  m0_mutex_lock(&ctx->pc_mutex);
303  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
304  m0_cond_signal(&ctx->pc_cond);
305  m0_mutex_unlock(&ctx->pc_mutex);
306 }
307 
308 void c_m_send_cb(const struct m0_net_buffer_event *ev)
309 {
310  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
311  struct ping_work_item *wi;
312 
314  ctx->pc_ops->pf("%s: Msg Send CB\n", ctx->pc_ident);
315 
316  if (ev->nbe_status < 0) {
317  if (ev->nbe_status == -ECANCELED)
318  ctx->pc_ops->pf("%s: msg send canceled\n",
319  ctx->pc_ident);
320  else
321  ctx->pc_ops->pf("%s: msg send error: %d\n",
322  ctx->pc_ident, ev->nbe_status);
323 
324  /* let main app deal with it */
325  M0_ALLOC_PTR(wi);
328  wi->pwi_nb = ev->nbe_buffer;
329 
330  m0_mutex_lock(&ctx->pc_mutex);
331  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
332  m0_cond_signal(&ctx->pc_cond);
333  m0_mutex_unlock(&ctx->pc_mutex);
334  } else {
337 
338  M0_ALLOC_PTR(wi);
341 
342  m0_mutex_lock(&ctx->pc_mutex);
343  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
344  m0_cond_signal(&ctx->pc_cond);
345  m0_mutex_unlock(&ctx->pc_mutex);
346  }
347 }
348 
349 void c_p_recv_cb(const struct m0_net_buffer_event *ev)
350 {
351  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
352  int rc;
353  int len;
354  struct ping_work_item *wi;
355  struct ping_msg msg;
356 
358  ctx->pc_ops->pf("%s: Passive Recv CB\n", ctx->pc_ident);
359 
360  if (ev->nbe_status < 0) {
361  if (ev->nbe_status == -ECANCELED)
362  ctx->pc_ops->pf("%s: passive recv canceled\n",
363  ctx->pc_ident);
364  else
365  ctx->pc_ops->pf("%s: passive recv error: %d\n",
366  ctx->pc_ident, ev->nbe_status);
367  } else {
368  ev->nbe_buffer->nb_length = ev->nbe_length;
369  rc = decode_msg(ev->nbe_buffer, &msg);
370  M0_ASSERT(rc == 0);
371 
372  if (msg.pm_type != PM_MSG)
373  M0_IMPOSSIBLE("Client: got desc\n");
374  len = strlen(msg.pm_u.pm_str);
375  if (strlen(msg.pm_u.pm_str) < 32)
376  ctx->pc_ops->pf("%s: got data: %s\n",
377  ctx->pc_ident, msg.pm_u.pm_str);
378  else
379  ctx->pc_ops->pf("%s: got data: %u bytes\n",
380  ctx->pc_ident, len + 1);
381  M0_ASSERT(ev->nbe_length == len + 2);
382  if (strcmp(msg.pm_u.pm_str, DEF_RESPONSE) != 0) {
383  int i;
384  for (i = 0; i < len - 1; ++i) {
385  if (msg.pm_u.pm_str[i] != "abcdefghi"[i % 9]) {
386  PING_ERR("%s: data diff @ offset %i: "
387  "%c != %c\n",
388  ctx->pc_ident, i,
389  msg.pm_u.pm_str[i],
390  "abcdefghi"[i % 9]);
391  break;
392  }
393  }
394  if (i == len - 1)
395  ctx->pc_ops->pf("%s: data bytes validated\n",
396  ctx->pc_ident);
397  }
398  msg_free(&msg);
399  }
400 
403 
404  M0_ALLOC_PTR(wi);
407 
408  m0_mutex_lock(&ctx->pc_mutex);
409  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
410  m0_cond_signal(&ctx->pc_cond);
411  m0_mutex_unlock(&ctx->pc_mutex);
412 }
413 
414 void c_p_send_cb(const struct m0_net_buffer_event *ev)
415 {
416  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
417  struct ping_work_item *wi;
418 
420  ctx->pc_ops->pf("%s: Passive Send CB\n", ctx->pc_ident);
421 
422  if (ev->nbe_status < 0) {
423  if (ev->nbe_status == -ECANCELED)
424  ctx->pc_ops->pf("%s: passive send canceled\n",
425  ctx->pc_ident);
426  else
427  ctx->pc_ops->pf("%s: passive send error: %d\n",
428  ctx->pc_ident, ev->nbe_status);
429  }
430 
433 
434  M0_ALLOC_PTR(wi);
437 
438  m0_mutex_lock(&ctx->pc_mutex);
439  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
440  m0_cond_signal(&ctx->pc_cond);
441  m0_mutex_unlock(&ctx->pc_mutex);
442 }
443 
444 void c_a_recv_cb(const struct m0_net_buffer_event *ev)
445 {
446  M0_ASSERT(ev->nbe_buffer != NULL &&
448  M0_IMPOSSIBLE("Client: Active Recv CB\n");
449 }
450 
451 void c_a_send_cb(const struct m0_net_buffer_event *ev)
452 {
453  M0_ASSERT(ev->nbe_buffer != NULL &&
455  M0_IMPOSSIBLE("Client: Active Send CB\n");
456 }
457 
458 void event_cb(const struct m0_net_tm_event *ev)
459 {
460  struct ping_ctx *ctx = container_of(ev->nte_tm, struct ping_ctx, pc_tm);
461 
462  if (ev->nte_type == M0_NET_TEV_STATE_CHANGE) {
463  const char *s = "unexpected";
465  s = "started";
466  else if (ev->nte_next_state == M0_NET_TM_STOPPED)
467  s = "stopped";
468  else if (ev->nte_next_state == M0_NET_TM_FAILED)
469  s = "FAILED";
470  ctx->pc_ops->pf("%s: Event CB state change to %s, status %d\n",
471  ctx->pc_ident, s, ev->nte_status);
472  ctx->pc_status = ev->nte_status;
473  } else if (ev->nte_type == M0_NET_TEV_ERROR)
474  ctx->pc_ops->pf("%s: Event CB for error %d\n",
475  ctx->pc_ident, ev->nte_status);
476  else if (ev->nte_type == M0_NET_TEV_DIAGNOSTIC)
477  ctx->pc_ops->pf("%s: Event CB for diagnostic %d\n",
478  ctx->pc_ident, ev->nte_status);
479 }
480 
481 bool server_stop = false;
482 
484  .nbc_cb = {
491  },
492 };
493 
496 };
497 
498 static void server_event_ident(char *buf, const char *ident,
499  const struct m0_net_buffer_event *ev)
500 {
501  const struct m0_net_end_point *ep = NULL;
502  if (ev != NULL && ev->nbe_buffer != NULL) {
503  if (ev->nbe_buffer->nb_qtype == M0_NET_QT_MSG_RECV) {
504  if (ev->nbe_status == 0)
505  ep = ev->nbe_ep;
506  } else {
507  ep = ev->nbe_buffer->nb_ep;
508  }
509  }
510  if (ep != NULL)
511  sprintf(buf, "%s (peer %s)", ident, ep->nep_addr);
512  else
513  strcpy(buf, ident);
514 }
515 
517 
518 /* server callbacks */
519 void s_m_recv_cb(const struct m0_net_buffer_event *ev)
520 {
521  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
522  int rc;
523  struct ping_work_item *wi;
524  struct ping_msg msg;
525  int64_t count;
526  char idbuf[64];
527  int bulk_delay = ctx->pc_server_bulk_delay;
528 
529 
530  M0_ASSERT(ev->nbe_buffer != NULL &&
532  server_event_ident(idbuf, ctx->pc_ident, ev);
534  ctx->pc_ops->pf("%s: Msg Recv CB %" PRId64 "\n", idbuf, count);
535  if (ev->nbe_status < 0) {
536  if (ev->nbe_status == -ECANCELED && server_stop)
537  ctx->pc_ops->pf("%s: msg recv canceled on shutdown\n",
538  idbuf);
539  else {
540  ctx->pc_ops->pf("%s: msg recv error: %d\n",
541  idbuf, ev->nbe_status);
543  ev->nbe_buffer->nb_ep = NULL;
544  rc = m0_net_buffer_add(ev->nbe_buffer, &ctx->pc_tm);
545  M0_ASSERT(rc == 0);
546  }
547  } else {
548  struct m0_net_buffer *nb;
549 
550  ev->nbe_buffer->nb_length = ev->nbe_length;
551  rc = decode_msg(ev->nbe_buffer, &msg);
552  M0_ASSERT(rc == 0);
553 
554  nb = ping_buf_get(ctx);
555  if (nb == NULL) {
556  ctx->pc_ops->pf("%s: dropped msg, "
557  "no buffer available\n", idbuf);
558  } else {
559  M0_ALLOC_PTR(wi);
560  nb->nb_ep = ev->nbe_ep; /* save for later, if set */
561  ev->nbe_buffer->nb_ep = NULL;
563  wi->pwi_nb = nb;
564  if (msg.pm_type == PM_SEND_DESC) {
565  ctx->pc_ops->pf("%s: got desc for "
566  "active recv\n", idbuf);
570  &nb->nb_desc);
571  nb->nb_ep = NULL; /* not needed */
572  M0_ASSERT(rc == 0);
573  if (bulk_delay != 0) {
574  ctx->pc_ops->pf("%s: delay %d secs\n",
575  idbuf, bulk_delay);
576  ping_sleep_secs(bulk_delay);
577  }
578  } else if (msg.pm_type == PM_RECV_DESC) {
579  ctx->pc_ops->pf("%s: got desc for "
580  "active send\n", idbuf);
584  &nb->nb_desc);
585  nb->nb_ep = NULL; /* not needed */
586  /* reuse encode_msg for convenience */
587  if (ctx->pc_passive_size == 0)
588  rc = encode_msg(nb, DEF_RESPONSE);
589  else {
590  char *bp;
591  int i;
592  bp = m0_alloc(ctx->pc_passive_size);
593  M0_ASSERT(bp != NULL);
594  for (i = 0;
595  i < ctx->pc_passive_size - 1; ++i)
596  bp[i] = "abcdefghi"[i % 9];
597  ctx->pc_ops->pf("%s: sending data "
598  "%d bytes\n", idbuf,
599  ctx->pc_passive_size);
600  rc = encode_msg(nb, bp);
601  m0_free(bp);
602  M0_ASSERT(rc == 0);
603  }
604  M0_ASSERT(rc == 0);
605  if (bulk_delay != 0) {
606  ctx->pc_ops->pf("%s: delay %d secs\n",
607  idbuf, bulk_delay);
608  ping_sleep_secs(bulk_delay);
609  }
610  } else {
611  char *data;
612  int len = strlen(msg.pm_u.pm_str);
613  if (strlen(msg.pm_u.pm_str) < 32)
614  ctx->pc_ops->pf("%s: got msg: %s\n",
615  idbuf, msg.pm_u.pm_str);
616  else
617  ctx->pc_ops->pf("%s: got msg: "
618  "%u bytes\n",
619  idbuf, len + 1);
620 
621  /* queue wi to send back ping response */
622  data = m0_alloc(len + 6);
626  strcpy(data, msg.pm_u.pm_str);
627  strcat(data, SEND_RESP);
628  rc = encode_msg(nb, data);
629  m0_free(data);
630  M0_ASSERT(rc == 0);
631  }
632  m0_mutex_lock(&ctx->pc_mutex);
633  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
634  m0_cond_signal(&ctx->pc_cond);
635  m0_mutex_unlock(&ctx->pc_mutex);
636  }
638  ev->nbe_buffer->nb_ep = NULL;
639  rc = m0_net_buffer_add(ev->nbe_buffer, &ctx->pc_tm);
640  M0_ASSERT(rc == 0);
641 
642  msg_free(&msg);
643  }
644 }
645 
646 void s_m_send_cb(const struct m0_net_buffer_event *ev)
647 {
648  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
649  char idbuf[64];
650 
652  server_event_ident(idbuf, ctx->pc_ident, ev);
653  ctx->pc_ops->pf("%s: Msg Send CB\n", idbuf);
654 
655  if (ev->nbe_status < 0) {
656  /* no retries here */
657  if (ev->nbe_status == -ECANCELED)
658  ctx->pc_ops->pf("%s: msg send canceled\n", idbuf);
659  else
660  ctx->pc_ops->pf("%s: msg send error: %d\n",
661  idbuf, ev->nbe_status);
662  }
663 
665  ev->nbe_buffer->nb_ep = NULL;
666 
668 }
669 
670 void s_p_recv_cb(const struct m0_net_buffer_event *ev)
671 {
672  M0_ASSERT(ev->nbe_buffer != NULL &&
674  M0_IMPOSSIBLE("Server: Passive Recv CB\n");
675 }
676 
677 void s_p_send_cb(const struct m0_net_buffer_event *ev)
678 {
679  M0_ASSERT(ev->nbe_buffer != NULL &&
681  M0_IMPOSSIBLE("Server: Passive Send CB\n");
682 }
683 
684 void s_a_recv_cb(const struct m0_net_buffer_event *ev)
685 {
686  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
687  int rc;
688  int len;
689  struct ping_msg msg;
690  char idbuf[64];
691 
693  server_event_ident(idbuf, ctx->pc_ident, ev);
694  ctx->pc_ops->pf("%s: Active Recv CB\n", idbuf);
695 
696  if (ev->nbe_status < 0) {
697  /* no retries here */
698  if (ev->nbe_status == -ECANCELED)
699  ctx->pc_ops->pf("%s: active recv canceled\n", idbuf);
700  else
701  ctx->pc_ops->pf("%s: active recv error: %d\n",
702  idbuf, ev->nbe_status);
703  } else {
704  ev->nbe_buffer->nb_length = ev->nbe_length;
705  rc = decode_msg(ev->nbe_buffer, &msg);
706  M0_ASSERT(rc == 0);
707 
708  if (msg.pm_type != PM_MSG)
709  M0_IMPOSSIBLE("Server: got desc\n");
710  len = strlen(msg.pm_u.pm_str);
711  if (len < 32)
712  ctx->pc_ops->pf("%s: got data: %s\n",
713  idbuf, msg.pm_u.pm_str);
714  else
715  ctx->pc_ops->pf("%s: got data: %u bytes\n",
716  idbuf, len + 1);
717  M0_ASSERT(ev->nbe_length == len + 2);
718  if (strcmp(msg.pm_u.pm_str, DEF_SEND) != 0) {
719  int i;
720  for (i = 0; i < len - 1; ++i) {
721  if (msg.pm_u.pm_str[i] != "abcdefghi"[i % 9]) {
722  PING_ERR("%s: data diff @ offset %i: "
723  "%c != %c\n", idbuf, i,
724  msg.pm_u.pm_str[i],
725  "abcdefghi"[i % 9]);
726  break;
727  }
728  }
729  if (i == len - 1)
730  ctx->pc_ops->pf("%s: data bytes validated\n",
731  idbuf);
732  }
733 
734  msg_free(&msg);
735  }
736 
739 }
740 
741 void s_a_send_cb(const struct m0_net_buffer_event *ev)
742 {
743  struct ping_ctx *ctx = buffer_event_to_ping_ctx(ev);
744  char idbuf[64];
745 
747  server_event_ident(idbuf, ctx->pc_ident, ev);
748  ctx->pc_ops->pf("%s: Active Send CB\n", idbuf);
749 
750  if (ev->nbe_status < 0) {
751  /* no retries here */
752  if (ev->nbe_status == -ECANCELED)
753  ctx->pc_ops->pf("%s: active send canceled\n", idbuf);
754  else
755  ctx->pc_ops->pf("%s: active send error: %d\n",
756  idbuf, ev->nbe_status);
757  }
758 
761 }
762 
764  .nbc_cb = {
771  },
772 };
773 
776 };
777 
778 void ping_fini(struct ping_ctx *ctx);
779 
790 int ping_init(struct ping_ctx *ctx)
791 {
792  int i;
793  int rc;
795  struct m0_clink tmwait;
796 
797  m0_list_init(&ctx->pc_work_queue);
798 
799  rc = m0_net_domain_init(&ctx->pc_dom, ctx->pc_xprt);
800  if (rc != 0) {
801  PING_ERR("domain init failed: %d\n", rc);
802  goto fail;
803  }
804 
805  rc = alloc_buffers(ctx->pc_nr_bufs, ctx->pc_segments, ctx->pc_seg_size,
806  &ctx->pc_nbs);
807  if (rc != 0) {
808  PING_ERR("buffer allocation failed: %d\n", rc);
809  goto fail;
810  }
811  rc = m0_bitmap_init(&ctx->pc_nbbm, ctx->pc_nr_bufs);
812  if (rc != 0) {
813  PING_ERR("buffer bitmap allocation failed: %d\n", rc);
814  goto fail;
815  }
816  M0_ASSERT(ctx->pc_buf_callbacks != NULL);
817  for (i = 0; i < ctx->pc_nr_bufs; ++i) {
818  rc = m0_net_buffer_register(&ctx->pc_nbs[i], &ctx->pc_dom);
819  if (rc != 0) {
820  PING_ERR("buffer register failed: %d\n", rc);
821  goto fail;
822  }
823  ctx->pc_nbs[i].nb_callbacks = ctx->pc_buf_callbacks;
824  }
825 
826  if (ctx->pc_id != 0)
827  sprintf(addr, "%s:%u:%u", ctx->pc_hostname, ctx->pc_port,
828  ctx->pc_id);
829  else
830  sprintf(addr, "%s:%u", ctx->pc_hostname, ctx->pc_port);
831 
832  rc = m0_net_tm_init(&ctx->pc_tm, &ctx->pc_dom);
833  if (rc != 0) {
834  PING_ERR("transfer machine init failed: %d\n", rc);
835  goto fail;
836  }
837 
838  m0_clink_init(&tmwait, NULL);
839  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
840  rc = m0_net_tm_start(&ctx->pc_tm, addr);
841  if (rc != 0) {
842  PING_ERR("transfer machine start failed: %d\n", rc);
843  goto fail;
844  }
845 
846  /* wait for tm to notify it has started */
847  m0_chan_wait(&tmwait);
848  m0_clink_del_lock(&tmwait);
849  if (ctx->pc_tm.ntm_state != M0_NET_TM_STARTED) {
850  rc = ctx->pc_status;
851  if (rc == 0)
852  rc = -EINVAL;
853  PING_ERR("transfer machine start failed: %d\n", rc);
854  goto fail;
855  }
856 
857  return rc;
858 fail:
859  ping_fini(ctx);
860  return rc;
861 }
862 
863 void ping_fini(struct ping_ctx *ctx)
864 {
865  struct m0_list_link *link;
866  struct ping_work_item *wi;
867 
868  if (ctx->pc_tm.ntm_state != M0_NET_TM_UNDEFINED) {
869  if (ctx->pc_tm.ntm_state != M0_NET_TM_FAILED) {
870  struct m0_clink tmwait;
871  m0_clink_init(&tmwait, NULL);
872  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
873  m0_net_tm_stop(&ctx->pc_tm, true);
874  while (ctx->pc_tm.ntm_state != M0_NET_TM_STOPPED)
875  m0_chan_wait(&tmwait); /* wait for it to stop */
876  m0_clink_del_lock(&tmwait);
877  }
878 
879  if (ctx->pc_ops->pqs != NULL)
880  (*ctx->pc_ops->pqs)(ctx, false);
881 
882  m0_net_tm_fini(&ctx->pc_tm);
883  }
884  if (ctx->pc_nbs != NULL) {
885  int i;
886  for (i = 0; i < ctx->pc_nr_bufs; ++i) {
887  struct m0_net_buffer *nb = &ctx->pc_nbs[i];
889  m0_net_buffer_deregister(nb, &ctx->pc_dom);
891  }
892  m0_free(ctx->pc_nbs);
893  m0_bitmap_fini(&ctx->pc_nbbm);
894  }
895  if (ctx->pc_dom.nd_xprt != NULL)
896  m0_net_domain_fini(&ctx->pc_dom);
897 
898  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
899  link = m0_list_first(&ctx->pc_work_queue);
900  wi = m0_list_entry(link, struct ping_work_item, pwi_link);
901  m0_list_del(&wi->pwi_link);
902  m0_free(wi);
903  }
904  m0_list_fini(&ctx->pc_work_queue);
905 }
906 
907 void ping_server(struct ping_ctx *ctx)
908 {
909  int i;
910  int rc;
911  struct m0_net_buffer *nb;
912  struct m0_clink tmwait;
913 
914  ctx->pc_tm.ntm_callbacks = &stm_cb;
915  ctx->pc_buf_callbacks = &sbuf_cb;
916  if (ctx->pc_hostname == NULL)
917  ctx->pc_hostname = "127.0.0.1";
918  if (ctx->pc_port == 0)
919  ctx->pc_port = PING_PORT1;
920  ctx->pc_ident = "Server";
921  M0_ASSERT(ctx->pc_nr_bufs >= 20);
922  rc = ping_init(ctx);
923  M0_ASSERT(rc == 0);
924 
925  m0_mutex_lock(&ctx->pc_mutex);
926  for (i = 0; i < (ctx->pc_nr_bufs / 4); ++i) {
927  nb = &ctx->pc_nbs[i];
930  nb->nb_ep = NULL;
931  nb->nb_min_receive_size = ctx->pc_segments * ctx->pc_seg_size;
932  nb->nb_max_receive_msgs = 1;
933  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
934  m0_bitmap_set(&ctx->pc_nbbm, i, true);
935  M0_ASSERT(rc == 0);
936  }
937 
938  while (!server_stop) {
939  struct m0_list_link *link;
940  struct ping_work_item *wi;
941  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
942  link = m0_list_first(&ctx->pc_work_queue);
943  wi = m0_list_entry(link, struct ping_work_item,
944  pwi_link);
945  switch (wi->pwi_type) {
946  case M0_NET_QT_MSG_SEND:
950  rc = m0_net_buffer_add(wi->pwi_nb, &ctx->pc_tm);
951  M0_ASSERT(rc == 0);
952  break;
953  default:
954  M0_IMPOSSIBLE("unexpected wi->pwi_type");
955  }
956  m0_list_del(&wi->pwi_link);
957  m0_free(wi);
958  }
959  m0_cond_wait(&ctx->pc_cond);
960  }
961  m0_mutex_unlock(&ctx->pc_mutex);
962 
963  /* dequeue recv buffers */
964  m0_clink_init(&tmwait, NULL);
965 
966  for (i = 0; i < (ctx->pc_nr_bufs / 4); ++i) {
967  nb = &ctx->pc_nbs[i];
968  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
969  m0_net_buffer_del(nb, &ctx->pc_tm);
970  m0_bitmap_set(&ctx->pc_nbbm, i, false);
971  M0_ASSERT(rc == 0);
972  m0_chan_wait(&tmwait);
973  m0_clink_del_lock(&tmwait);
974  }
975 
976  /* wait for active buffers to flush */
977  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
978  for (i = 0; i < M0_NET_QT_NR; ++i)
979  while (!m0_net_tm_tlist_is_empty(&ctx->pc_tm.ntm_q[i])) {
980  ctx->pc_ops->pf("waiting for queue %d to empty\n", i);
981  m0_chan_wait(&tmwait);
982  }
983  m0_clink_del_lock(&tmwait);
984  m0_clink_fini(&tmwait);
985 
986  ping_fini(ctx);
987  server_stop = false;
988 }
989 
991 {
992  m0_mutex_lock(&ctx->pc_mutex);
993  server_stop = true;
994  m0_cond_signal(&ctx->pc_cond);
995  m0_mutex_unlock(&ctx->pc_mutex);
996 }
997 
1008  struct m0_net_end_point *server_ep,
1009  const char *data)
1010 {
1011  int rc;
1012  struct m0_net_buffer *nb;
1013  struct m0_list_link *link;
1014  struct ping_work_item *wi;
1015  int recv_done = 0;
1016  int retries = SEND_RETRIES;
1017 
1018  if (data == NULL)
1019  data = "ping";
1020  ctx->pc_compare_buf = data;
1021 
1022  ctx->pc_ops->pf("%s: starting msg send/recv sequence\n", ctx->pc_ident);
1023  /* queue buffer for response, must do before sending msg */
1024  nb = ping_buf_get(ctx);
1025  M0_ASSERT(nb != NULL);
1027  nb->nb_timeout = M0_TIME_NEVER;
1028  nb->nb_ep = NULL;
1029  nb->nb_min_receive_size = ctx->pc_segments * ctx->pc_seg_size;
1030  nb->nb_max_receive_msgs = 1;
1031  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1032  M0_ASSERT(rc == 0);
1033 
1034  nb = ping_buf_get(ctx);
1035  M0_ASSERT(nb != NULL);
1036  rc = encode_msg(nb, data);
1038  nb->nb_ep = server_ep;
1039  M0_ASSERT(rc == 0);
1040  nb->nb_timeout = M0_TIME_NEVER;
1041  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1042  M0_ASSERT(rc == 0);
1043 
1044  /* wait for receive response to complete */
1045  m0_mutex_lock(&ctx->pc_mutex);
1046  while (1) {
1047  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1048  link = m0_list_first(&ctx->pc_work_queue);
1049  wi = m0_list_entry(link, struct ping_work_item,
1050  pwi_link);
1051  m0_list_del(&wi->pwi_link);
1052  if (wi->pwi_type == M0_NET_QT_MSG_RECV) {
1053  ctx->pc_compare_buf = NULL;
1054  recv_done++;
1055  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND &&
1056  wi->pwi_nb != NULL) {
1057  m0_time_t delay;
1058  /* send error, retry a few times */
1059  if (retries == 0) {
1060  ctx->pc_compare_buf = NULL;
1061  ctx->pc_ops->pf("%s: send failed, "
1062  "no more retries\n",
1063  ctx->pc_ident);
1064  m0_mutex_unlock(&ctx->pc_mutex);
1065  ping_buf_put(ctx, nb);
1066  m0_free(wi);
1067  return -ETIMEDOUT;
1068  }
1069  delay = m0_time(SEND_RETRIES + 1 - retries, 0);
1070  --retries;
1072  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1073  M0_ASSERT(rc == 0);
1074  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND) {
1075  recv_done++;
1076  }
1077  m0_free(wi);
1078  }
1079  if (recv_done == 2)
1080  break;
1081  m0_cond_wait(&ctx->pc_cond);
1082  }
1083 
1084  m0_mutex_unlock(&ctx->pc_mutex);
1085  return rc;
1086 }
1087 
1089  struct m0_net_end_point *server_ep)
1090 {
1091  int rc;
1092  struct m0_net_buffer *nb;
1093  struct m0_net_buf_desc nbd;
1094  struct m0_list_link *link;
1095  struct ping_work_item *wi;
1096  int recv_done = 0;
1097  int retries = SEND_RETRIES;
1098 
1099  ctx->pc_ops->pf("%s: starting passive recv sequence\n", ctx->pc_ident);
1100  /* queue our passive receive buffer */
1101  nb = ping_buf_get(ctx);
1102  M0_ASSERT(nb != NULL);
1104  nb->nb_ep = server_ep;
1105  if (ctx->pc_passive_bulk_timeout > 0) {
1106  ctx->pc_ops->pf("%s: setting nb_timeout to %ds\n",
1107  ctx->pc_ident, ctx->pc_passive_bulk_timeout);
1108  nb->nb_timeout =
1109  m0_time_from_now(ctx->pc_passive_bulk_timeout, 0);
1110  } else {
1111  nb->nb_timeout = M0_TIME_NEVER;
1112  }
1113  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1114  M0_ASSERT(rc == 0);
1115  rc = m0_net_desc_copy(&nb->nb_desc, &nbd);
1116  M0_ASSERT(rc == 0);
1117 
1118  /* send descriptor in message to server */
1119  nb = ping_buf_get(ctx);
1120  M0_ASSERT(nb != NULL);
1121  rc = encode_desc(nb, false, &nbd);
1122  m0_net_desc_free(&nbd);
1124  nb->nb_ep = server_ep;
1125  M0_ASSERT(rc == 0);
1126  nb->nb_timeout = M0_TIME_NEVER;
1127  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1128  M0_ASSERT(rc == 0);
1129 
1130  /* wait for receive to complete */
1131  m0_mutex_lock(&ctx->pc_mutex);
1132  while (1) {
1133  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1134  link = m0_list_first(&ctx->pc_work_queue);
1135  wi = m0_list_entry(link, struct ping_work_item,
1136  pwi_link);
1137  m0_list_del(&wi->pwi_link);
1139  recv_done++;
1140  else if (wi->pwi_type == M0_NET_QT_MSG_SEND &&
1141  wi->pwi_nb != NULL) {
1142  m0_time_t delay;
1143  /* send error, retry a few times */
1144  if (retries == 0) {
1145  ctx->pc_ops->pf("%s: send failed, "
1146  "no more retries\n",
1147  ctx->pc_ident);
1148  m0_net_desc_free(&nb->nb_desc);
1149  m0_mutex_unlock(&ctx->pc_mutex);
1150  ping_buf_put(ctx, nb);
1151  return -ETIMEDOUT;
1152  }
1153  delay = m0_time(SEND_RETRIES + 1 - retries, 0);
1154  --retries;
1156  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1157  M0_ASSERT(rc == 0);
1158  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND) {
1159  recv_done++;
1160  }
1161  m0_free(wi);
1162  }
1163  if (recv_done == 2)
1164  break;
1165  m0_cond_wait(&ctx->pc_cond);
1166  }
1167 
1168  m0_mutex_unlock(&ctx->pc_mutex);
1169  return rc;
1170 }
1171 
1173  struct m0_net_end_point *server_ep,
1174  const char *data)
1175 {
1176  int rc;
1177  struct m0_net_buffer *nb;
1178  struct m0_net_buf_desc nbd;
1179  struct m0_list_link *link;
1180  struct ping_work_item *wi;
1181  int send_done = 0;
1182  int retries = SEND_RETRIES;
1183 
1184  if (data == NULL)
1185  data = "passive ping";
1186  ctx->pc_ops->pf("%s: starting passive send sequence\n", ctx->pc_ident);
1187  /* queue our passive receive buffer */
1188  nb = ping_buf_get(ctx);
1189  M0_ASSERT(nb != NULL);
1190  /* reuse encode_msg for convenience */
1191  rc = encode_msg(nb, data);
1193  nb->nb_ep = server_ep;
1194  if (ctx->pc_passive_bulk_timeout > 0) {
1195  ctx->pc_ops->pf("%s: setting nb_timeout to %ds\n",
1196  ctx->pc_ident, ctx->pc_passive_bulk_timeout);
1197  nb->nb_timeout =
1198  m0_time_from_now(ctx->pc_passive_bulk_timeout, 0);
1199  } else {
1200  nb->nb_timeout = M0_TIME_NEVER;
1201  }
1202  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1203  M0_ASSERT(rc == 0);
1204  rc = m0_net_desc_copy(&nb->nb_desc, &nbd);
1205  M0_ASSERT(rc == 0);
1206 
1207  /* send descriptor in message to server */
1208  nb = ping_buf_get(ctx);
1209  M0_ASSERT(nb != NULL);
1210  rc = encode_desc(nb, true, &nbd);
1211  m0_net_desc_free(&nbd);
1213  nb->nb_ep = server_ep;
1214  M0_ASSERT(rc == 0);
1215  nb->nb_timeout = M0_TIME_NEVER;
1216  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1217  M0_ASSERT(rc == 0);
1218 
1219  /* wait for send to complete */
1220  m0_mutex_lock(&ctx->pc_mutex);
1221  while (1) {
1222  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1223  link = m0_list_first(&ctx->pc_work_queue);
1224  wi = m0_list_entry(link, struct ping_work_item,
1225  pwi_link);
1226  m0_list_del(&wi->pwi_link);
1228  send_done++;
1229  else if (wi->pwi_type == M0_NET_QT_MSG_SEND &&
1230  wi->pwi_nb != NULL) {
1231  m0_time_t delay;
1232  /* send error, retry a few times */
1233  if (retries == 0) {
1234  ctx->pc_ops->pf("%s: send failed, "
1235  "no more retries\n",
1236  ctx->pc_ident);
1237  m0_net_desc_free(&nb->nb_desc);
1238  m0_mutex_unlock(&ctx->pc_mutex);
1239  ping_buf_put(ctx, nb);
1240  return -ETIMEDOUT;
1241  }
1242  delay = m0_time(SEND_RETRIES + 1 - retries, 0);
1243  --retries;
1245  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1246  M0_ASSERT(rc == 0);
1247  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND) {
1248  send_done++;
1249  }
1250  m0_free(wi);
1251  }
1252  if (send_done == 2)
1253  break;
1254  m0_cond_wait(&ctx->pc_cond);
1255  }
1256 
1257  m0_mutex_unlock(&ctx->pc_mutex);
1258  return rc;
1259 }
1260 
1261 int ping_client_init(struct ping_ctx *ctx, struct m0_net_end_point **server_ep)
1262 {
1263  int rc;
1265 
1266  ctx->pc_tm.ntm_callbacks = &ctm_cb;
1267  ctx->pc_buf_callbacks = &cbuf_cb;
1268  if (ctx->pc_hostname == NULL)
1269  ctx->pc_hostname = "127.0.0.1";
1270  if (ctx->pc_rhostname == NULL)
1271  ctx->pc_rhostname = "127.0.0.1";
1272  if (ctx->pc_port == 0)
1273  ctx->pc_port = PING_PORT2;
1274  if (ctx->pc_rport == 0)
1275  ctx->pc_rport = PING_PORT1;
1276  if (ctx->pc_ident == NULL)
1277  ctx->pc_ident = "Client";
1278  rc = ping_init(ctx);
1279  if (rc != 0)
1280  return rc;
1281 
1282  /* need end point for the server */
1283  if (ctx->pc_rid != 0)
1284  sprintf(addr, "%s:%u:%u", ctx->pc_rhostname, ctx->pc_rport,
1285  ctx->pc_rid);
1286  else
1287  sprintf(addr, "%s:%u", ctx->pc_rhostname, ctx->pc_rport);
1288  rc = m0_net_end_point_create(server_ep, &ctx->pc_tm, addr);
1289  return rc;
1290 }
1291 
1292 int ping_client_fini(struct ping_ctx *ctx, struct m0_net_end_point *server_ep)
1293 {
1294  m0_net_end_point_put(server_ep);
1295  ping_fini(ctx);
1296  return 0;
1297 }
1298 
1299 /*
1300  * Local variables:
1301  * c-indentation-style: "K&R"
1302  * c-basic-offset: 8
1303  * tab-width: 8
1304  * fill-column: 80
1305  * scroll-step: 1
1306  * End:
1307  */
struct m0_net_buffer_callbacks sbuf_cb
Definition: ping.c:763
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
Definition: vec.h:165
void ping_server(struct ping_ctx *ctx)
Definition: ping.c:907
struct m0_net_tm_callbacks stm_cb
Definition: ping.c:774
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
void s_m_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:519
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
M0_INTERNAL void m0_list_add(struct m0_list *head, struct m0_list_link *new)
Definition: list.c:113
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
Definition: bitmap.c:86
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
void m0_net_domain_fini(struct m0_net_domain *dom)
Definition: domain.c:71
M0_INTERNAL int m0_net_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
Definition: tm.c:261
#define DEF_SEND
Definition: ping.c:32
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
Definition: bitmap.c:97
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
struct m0_bufvec nb_buffer
Definition: net.h:1322
void s_p_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:677
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:65
uint32_t nbd_len
Definition: net_otw_types.h:37
M0_INTERNAL void m0_list_init(struct m0_list *head)
Definition: list.c:29
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
Definition: ping.c:181
uint64_t m0_time_t
Definition: time.h:37
void s_m_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:646
uint8_t * nbd_data
Definition: net_otw_types.h:38
union ping_msg::@371 pm_u
static int delay
Definition: dump.c:174
static m0_bcount_t segs[NR *IT]
Definition: vec.c:45
struct m0_bufvec data
Definition: di.c:40
m0_bcount_t nb_length
Definition: net.h:1334
M0_INTERNAL void m0_list_fini(struct m0_list *head)
Definition: list.c:36
uint64_t nb_flags
Definition: net.h:1489
M0_INTERNAL void m0_list_del(struct m0_list_link *old)
Definition: list.c:147
M0_INTERNAL void * m0_bufvec_cursor_addr(struct m0_bufvec_cursor *cur)
Definition: vec.c:597
uint64_t m0_bcount_t
Definition: types.h:77
int encode_msg(struct m0_net_buffer *nb, const char *str)
Definition: ping.c:130
M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:261
int ping_client_fini(struct ping_ctx *ctx, struct m0_net_end_point *server_ep)
Definition: ping.c:1292
m0_bcount_t nb_min_receive_size
Definition: net.h:1496
#define container_of(ptr, type, member)
Definition: misc.h:33
enum ping_msg_type pm_type
Definition: ping.c:185
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
void event_cb(const struct m0_net_tm_event *ev)
Definition: ping.c:458
struct m0_net_buf_desc pm_desc
Definition: ping.c:188
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
int ping_client_init(struct ping_ctx *ctx, struct m0_net_end_point **server_ep)
Definition: ping.c:1261
M0_INTERNAL int m0_bufvec_alloc(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size)
Definition: vec.c:220
void c_a_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:451
Definition: ping.c:184
Definition: sock.c:754
M0_INTERNAL void m0_bufvec_free(struct m0_bufvec *bufvec)
Definition: vec.c:395
int ping_client_passive_recv(struct ping_ctx *ctx, struct m0_net_end_point *server_ep)
Definition: ping.c:1088
int decode_msg(struct m0_net_buffer *nb, struct ping_msg *msg)
Definition: ping.c:193
void c_m_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:250
int alloc_buffers(int num, uint32_t segs, m0_bcount_t segsize, struct m0_net_buffer **out)
Definition: ping.c:52
M0_INTERNAL bool m0_bufvec_cursor_move(struct m0_bufvec_cursor *cur, m0_bcount_t count)
Definition: vec.c:574
int i
Definition: dir.c:1033
void c_m_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:308
int32_t nbe_status
Definition: net.h:1218
enum m0_net_tm_state nte_next_state
Definition: net.h:723
M0_INTERNAL int m0_net_tm_init(struct m0_net_transfer_mc *tm, struct m0_net_domain *dom)
Definition: tm.c:160
#define DEF_RESPONSE
Definition: ping.c:31
void s_p_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:670
static void server_event_ident(char *buf, const char *ident, const struct m0_net_buffer_event *ev)
Definition: ping.c:498
static void ping_sleep_secs(int secs)
Definition: ping.c:46
struct m0_net_tm_callbacks ctm_cb
Definition: ping.c:494
void ping_fini(struct ping_ctx *ctx)
Definition: ping.c:863
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_copy(struct m0_bufvec_cursor *dcur, struct m0_bufvec_cursor *scur, m0_bcount_t num_bytes)
Definition: vec.c:620
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_step(const struct m0_bufvec_cursor *cur)
Definition: vec.c:581
void c_a_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:444
uint32_t nb_max_receive_msgs
Definition: net.h:1502
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL void m0_net_tm_fini(struct m0_net_transfer_mc *tm)
Definition: tm.c:204
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
struct m0_net_transfer_mc pc_tm
Definition: ping.h:59
struct m0_net_buffer * pwi_nb
Definition: ping.c:42
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
M0_INTERNAL void m0_cond_signal(struct m0_cond *cond)
Definition: cond.c:94
struct m0_list_link pwi_link
Definition: ping.c:43
void * m0_alloc(size_t size)
Definition: memory.c:126
Definition: xcode.h:73
bool server_stop
Definition: ping.c:481
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
Definition: net.c:87
M0_INTERNAL void m0_bitmap_set(struct m0_bitmap *map, size_t idx, bool val)
Definition: bitmap.c:139
static struct ping_ctx * buffer_event_to_ping_ctx(const struct m0_net_buffer_event *ev)
Definition: ping.c:242
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
Definition: ep.c:88
void c_p_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:414
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
Definition: net.h:1272
M0_INTERNAL int m0_net_desc_copy(const struct m0_net_buf_desc *from_desc, struct m0_net_buf_desc *to_desc)
Definition: net.c:74
M0_INTERNAL bool m0_list_is_empty(const struct m0_list *head)
Definition: list.c:42
enum m0_net_queue_type pwi_type
Definition: ping.c:41
static struct m0_clink l[NR]
Definition: chan.c:37
struct m0_net_buffer_callbacks cbuf_cb
Definition: ping.c:483
#define PRId64
Definition: types.h:57
M0_INTERNAL int m0_net_tm_stop(struct m0_net_transfer_mc *tm, bool abort)
Definition: tm.c:293
#define PING_ERR(fmt,...)
Definition: ping.h:80
int ping_client_passive_send(struct ping_ctx *ctx, struct m0_net_end_point *server_ep, const char *data)
Definition: ping.c:1172
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
struct m0_net_buffer * ping_buf_get(struct ping_ctx *ctx)
Definition: ping.c:86
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
M0_INTERNAL void m0_cond_wait(struct m0_cond *cond)
Definition: cond.c:52
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:107
Definition: ping.h:41
int m0_net_domain_init(struct m0_net_domain *dom, const struct m0_net_xprt *xprt)
Definition: domain.c:36
m0_net_queue_type
Definition: net.h:591
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:247
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
void s_a_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:741
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
Definition: bitmap.c:105
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static void netbuf_step(struct m0_bufvec_cursor *cur)
Definition: ping.c:123
void ping_server_should_stop(struct ping_ctx *ctx)
Definition: ping.c:990
int ping_init(struct ping_ctx *ctx)
Definition: ping.c:790
char * pm_str
Definition: ping.c:187
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
static struct m0_atomic64 s_msg_recv_counter
Definition: ping.c:516
ping_msg_type
Definition: ping.c:176
enum m0_net_tm_ev_type nte_type
Definition: net.h:691
static struct bulkio_params * bp
Definition: bulkio_ut.c:44
Definition: nucleus.c:42
M0_INTERNAL void m0_list_link_init(struct m0_list_link *link)
Definition: list.c:169
#define out(...)
Definition: gen.c:41
#define SEND_RESP
Definition: ping.c:33
static struct m0_list_link * m0_list_first(const struct m0_list *head)
Definition: list.h:191
int encode_desc(struct m0_net_buffer *nb, bool send_desc, const struct m0_net_buf_desc *desc)
Definition: ping.c:151
int num
Definition: bulk_if.c:54
void msg_free(struct ping_msg *msg)
Definition: ping.c:233
int ping_client_msg_send_recv(struct ping_ctx *ctx, struct m0_net_end_point *server_ep, const char *data)
Definition: ping.c:1007
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_net_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: ep.c:56
#define m0_list_entry(link, type, member)
Definition: list.h:217
void c_p_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:349
void ping_buf_put(struct ping_ctx *ctx, struct m0_net_buffer *nb)
Definition: ping.c:111
void s_a_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:684
int32_t nte_status
Definition: net.h:715
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
Definition: vec.h:145
struct m0_net_end_point * nb_ep
Definition: net.h:1424
struct m0_net_transfer_mc * nte_tm
Definition: net.h:696
#define M0_IMPOSSIBLE(fmt,...)
int m0_nanosleep(const m0_time_t req, m0_time_t *rem)
Definition: ktime.c:73