Motr  M0
ping.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/assert.h"
24 #include "lib/chan.h"
25 #include "lib/cond.h"
26 #include "lib/errno.h"
27 #include "lib/arith.h" /* max64u */
28 #include "lib/memory.h"
29 #include "lib/misc.h" /* M0_SET0 */
30 #include "net/net.h"
31 #include "net/lnet/lnet.h"
32 #include "net/lnet/st/ping.h"
33 
34 #define DEF_RESPONSE "active pong"
35 #define DEF_SEND "passive ping"
36 #define SEND_RESP " pong"
37 
39 enum {
42 };
43 
44 struct ping_work_item {
46  struct m0_net_buffer *pwi_nb;
47  struct m0_list_link pwi_link;
48 };
49 
50 static struct m0_mutex qstats_mutex;
52 static uint64_t ping_qs_total_errors;
53 static uint64_t ping_qs_total_retries;
54 
55 static void ping_print_qstats(struct nlx_ping_ctx *ctx,
56  struct m0_net_qstats *qp,
57  bool accumulate)
58 
59 {
60  int i;
61  uint64_t hr;
62  uint64_t min;
63  uint64_t sec;
64  uint64_t msec;
65  static const char *qnames[M0_NET_QT_NR] = {
66  "mRECV", "mSEND",
67  "pRECV", "pSEND",
68  "aRECV", "aSEND",
69  };
70  char tbuf[256];
71  const char *lfmt =
72 "%5s %6lu %6lu %6lu %6lu %13s %18lu %9lu\n";
73  const char *hfmt =
74 "Queue #Add #Del #Succ #Fail Time in Queue Total Bytes "
75 " Max Size\n"
76 "----- ------ ------ ------ ------ ------------- ------------------"
77 " ---------\n";
78 
80  ctx->pc_ops->pf("%s statistics:\n", ctx->pc_ident);
81  ctx->pc_ops->pf("%s", hfmt);
82  for (i = 0; i < M0_NET_QT_NR; ++qp, ++i) {
84  hr = sec / SEC_PER_HR;
85  min = sec % SEC_PER_HR / SEC_PER_MIN;
86  sec %= SEC_PER_MIN;
88  ONE_MILLION / 2) / ONE_MILLION;
89  sprintf(tbuf, "%02lu:%02lu:%02lu.%03lu",
90  (long unsigned int) hr,
91  (long unsigned int) min,
92  (long unsigned int) sec,
93  (long unsigned int) msec);
94  ctx->pc_ops->pf(lfmt,
95  qnames[i],
96  qp->nqs_num_adds, qp->nqs_num_dels,
98  tbuf, qp->nqs_total_bytes, qp->nqs_max_bytes);
99  if (accumulate) {
100  struct m0_net_qstats *cqp = &ping_qs_total[i];
101 
102 #define PING_QSTATS_CLIENT_TOTAL(f) cqp->nqs_##f += qp->nqs_##f
103  PING_QSTATS_CLIENT_TOTAL(time_in_queue);
106  PING_QSTATS_CLIENT_TOTAL(num_s_events);
107  PING_QSTATS_CLIENT_TOTAL(num_f_events);
109 #undef PING_QSTATS_CLIENT_TOTAL
110  cqp->nqs_max_bytes =
112  }
113  }
114  if (ctx->pc_sync_events) {
115  ctx->pc_ops->pf("%s Loops: Work=%lu Blocked=%lu\n",
116  ctx->pc_ident,
117  (unsigned long) ctx->pc_worked_count,
118  (unsigned long) ctx->pc_blocked_count);
119  ctx->pc_ops->pf("%s Wakeups: WorkQ=%lu Net=%lu\n",
120  ctx->pc_ident,
121  (unsigned long) ctx->pc_wq_signal_count,
122  (unsigned long) ctx->pc_net_signal_count);
123  }
124  ctx->pc_ops->pf("%s errors: %lu\n", ctx->pc_ident,
125  (long unsigned int)m0_atomic64_get(&ctx->pc_errors));
126  ctx->pc_ops->pf("%s retries: %lu\n", ctx->pc_ident,
127  (long unsigned int)m0_atomic64_get(&ctx->pc_retries));
128  if (accumulate) {
129  ping_qs_total_errors += m0_atomic64_get(&ctx->pc_errors);
130  ping_qs_total_retries += m0_atomic64_get(&ctx->pc_retries);
131  }
132 
134 }
135 
136 void nlx_ping_print_qstats_tm(struct nlx_ping_ctx *ctx, bool reset)
137 {
138  struct m0_net_qstats qs[M0_NET_QT_NR];
139  bool is_client;
140  int rc;
141 
142  if (ctx->pc_tm.ntm_state < M0_NET_TM_INITIALIZED)
143  return;
144  is_client = ctx->pc_ident[0] == 'C';
145  rc = m0_net_tm_stats_get(&ctx->pc_tm, M0_NET_QT_NR, qs, reset);
146  M0_ASSERT(rc == 0);
147  ping_print_qstats(ctx, qs, is_client);
148 }
149 
150 
151 int nlx_ping_print_qstats_total(const char *ident,
152  const struct nlx_ping_ops *ops)
153 {
154  struct nlx_ping_ctx *tctx;
155 
156  M0_ALLOC_PTR(tctx);
157  if (tctx == NULL)
158  return -ENOMEM;
159 
160  tctx->pc_ops = ops;
161  tctx->pc_ident = ident;
162 
165  ping_print_qstats(tctx, ping_qs_total, false);
166 
167  m0_free(tctx);
168  return 0;
169 }
170 
171 uint64_t nlx_ping_parse_uint64(const char *s)
172 {
173  const char *fmt2 = "%lu%c";
174  const char *fmt1 = "%lu";
175  uint64_t len;
176  uint64_t mult = 1;
177  char unit;
178 
179  if (s == NULL)
180  return 0;
181  if (sscanf(s, fmt2, &len, &unit) == 2) {
182  if (unit == 'K')
183  mult = 1 << 10;
184  else if (unit == 'M')
185  mult = 1 << 20;
186  else if (unit == 'G')
187  mult = 1 << 30;
188  else if (unit == 'T')
189  mult = (uint64_t) 1 << 40;
190  else
191  M0_ASSERT(unit == 'K' || unit == 'M' || unit == 'G' ||
192  unit == 'T');
193  } else
194  sscanf(s, fmt1, &len);
195  M0_ASSERT(len != 0);
196  M0_ASSERT(mult >= 1);
197  return len * mult;
198 }
199 
200 static void ping_sleep_secs(int secs)
201 {
202  if (secs != 0)
203  m0_nanosleep(m0_time(secs, 0), NULL);
204 }
205 
206 static int alloc_buffers(int num, uint32_t segs, m0_bcount_t segsize,
207  unsigned shift, struct m0_net_buffer **out)
208 {
209  struct m0_net_buffer *nbs;
210  struct m0_net_buffer *nb;
211  int i;
212  int rc = 0;
213 
214  M0_ALLOC_ARR(nbs, num);
215  if (nbs == NULL)
216  return -ENOMEM;
217  for (i = 0; i < num; ++i) {
218  nb = &nbs[i];
219  rc = m0_bufvec_alloc_aligned(&nb->nb_buffer, segs, segsize,
220  shift);
221  if (rc != 0)
222  break;
223  }
224 
225  if (rc == 0)
226  *out = nbs;
227  else {
228  while (--i >= 0)
229  m0_bufvec_free_aligned(&nbs[i].nb_buffer, shift);
230  m0_free(nbs);
231  }
232  return rc;
233 }
234 
242 {
243  int i;
244  struct m0_net_buffer *nb;
245 
246  m0_mutex_lock(&ctx->pc_mutex);
247  for (i = 0; i < ctx->pc_nr_bufs; ++i)
248  if (!m0_bitmap_get(&ctx->pc_nbbm, i)) {
249  m0_bitmap_set(&ctx->pc_nbbm, i, true);
250  break;
251  }
252  m0_mutex_unlock(&ctx->pc_mutex);
253 
254  if (i == ctx->pc_nr_bufs)
255  return NULL;
256 
257  nb = &ctx->pc_nbs[i];
259  return nb;
260 }
261 
266 static void ping_buf_put(struct nlx_ping_ctx *ctx, struct m0_net_buffer *nb)
267 {
268  int i = nb - &ctx->pc_nbs[0];
269  M0_ASSERT(i >= 0 && i < ctx->pc_nr_bufs);
271 
272  m0_mutex_lock(&ctx->pc_mutex);
273  M0_ASSERT(m0_bitmap_get(&ctx->pc_nbbm, i));
274  m0_bitmap_set(&ctx->pc_nbbm, i, false);
275  m0_mutex_unlock(&ctx->pc_mutex);
276 }
277 
279 static int encode_msg(struct m0_net_buffer *nb, const char *str)
280 {
281  char *bp;
282  m0_bcount_t len = strlen(str) + 1; /* include trailing nul */
283  m0_bcount_t copied;
284  struct m0_bufvec in = M0_BUFVEC_INIT_BUF((void **) &str, &len);
285  struct m0_bufvec_cursor incur;
286  struct m0_bufvec_cursor cur;
287 
288  nb->nb_length = len + 1;
291  *bp = 'm';
293  m0_bufvec_cursor_init(&incur, &in);
294  copied = m0_bufvec_cursor_copy(&cur, &incur, len);
295  M0_ASSERT(copied == len);
296  return 0;
297 }
298 
300 static int encode_desc(struct m0_net_buffer *nb,
301  bool send_desc,
302  unsigned passive_size,
303  const struct m0_net_buf_desc *desc)
304 {
305  struct m0_bufvec_cursor cur;
306  char *bp;
307  m0_bcount_t step;
308 
311  *bp = send_desc ? 's' : 'r';
314 
315  /* only support sending net_desc in single chunks in this test */
316  step = m0_bufvec_cursor_step(&cur);
317  M0_ASSERT(step >= 18 + desc->nbd_len);
318  nb->nb_length = 19 + desc->nbd_len;
319 
320  bp += sprintf(bp, "%08u", desc->nbd_len);
321  ++bp; /* +nul */
322  bp += sprintf(bp, "%08u", passive_size);
323  ++bp; /* +nul */
324  memcpy(bp, desc->nbd_data, desc->nbd_len);
325  return 0;
326 }
327 
334 };
335 
336 struct ping_msg {
337  enum ping_msg_type pm_type;
338  union {
339  char *pm_str;
340  struct m0_net_buf_desc pm_desc;
341  } pm_u;
342  unsigned pm_passive_size;
343 };
344 
346 static int decode_msg(struct m0_net_buffer *nb,
347  m0_bcount_t nb_len,
348  m0_bcount_t nb_offset,
349  struct ping_msg *msg)
350 {
351  struct m0_bufvec_cursor cur;
352  char *bp;
353  m0_bcount_t step;
354 
356  if (nb_offset > 0)
357  m0_bufvec_cursor_move(&cur, nb_offset);
359  M0_ASSERT(*bp == 'm' || *bp == 's' || *bp == 'r');
361  if (*bp == 'm') {
362  m0_bcount_t len = nb_len - 1;
363  void *str;
364  struct m0_bufvec out = M0_BUFVEC_INIT_BUF(&str, &len);
365  struct m0_bufvec_cursor outcur;
366 
367  msg->pm_type = PM_MSG;
368  str = msg->pm_u.pm_str = m0_alloc(len + 1);
369  M0_ASSERT(str != NULL);
370  m0_bufvec_cursor_init(&outcur, &out);
371  step = m0_bufvec_cursor_copy(&outcur, &cur, len);
372  M0_ASSERT(step == len);
373  } else {
374  int len;
375  char nine[9];
376  int i;
377  void *buf;
378  m0_bcount_t buflen;
379  struct m0_bufvec bv = M0_BUFVEC_INIT_BUF(&buf, &buflen);
380  struct m0_bufvec_cursor bv_cur;
381 
382  msg->pm_type = (*bp == 's') ? PM_SEND_DESC : PM_RECV_DESC;
383 
384  buf = nine;
385  buflen = 9;
386  m0_bufvec_cursor_init(&bv_cur, &bv);
387  i = m0_bufvec_cursor_copy(&bv_cur, &cur, 9);
388  M0_ASSERT(i == 9);
389  i = sscanf(nine, "%u", &len);
390  M0_ASSERT(i == 1);
391 
392  m0_bufvec_cursor_init(&bv_cur, &bv);
393  i = m0_bufvec_cursor_copy(&bv_cur, &cur, 9);
394  M0_ASSERT(i == 9);
395  i = sscanf(nine, "%u", &msg->pm_passive_size);
396  M0_ASSERT(i == 1);
397 
398  buflen = len;
399  msg->pm_u.pm_desc.nbd_len = len;
400  msg->pm_u.pm_desc.nbd_data = buf = m0_alloc(len);
401  M0_ASSERT(buf != NULL);
402  m0_bufvec_cursor_init(&bv_cur, &bv);
403  i = m0_bufvec_cursor_copy(&bv_cur, &cur, len);
404  M0_ASSERT(i == len);
405  }
406  return 0;
407 }
408 
409 static void msg_free(struct ping_msg *msg)
410 {
411  if (msg->pm_type != PM_MSG)
413  else
414  m0_free(msg->pm_u.pm_str);
415 }
416 
418 {
419  int i;
420  ctx->pc_ops->pf("%s: Available interfaces\n", ctx->pc_ident);
421  for (i = 0; ctx->pc_interfaces[i] != NULL; ++i)
422  ctx->pc_ops->pf("\t%s\n", ctx->pc_interfaces[i]);
423  return;
424 }
425 
426 static struct nlx_ping_ctx *
428 {
429  struct m0_net_buffer *nb = ev->nbe_buffer;
430  M0_ASSERT(nb != NULL);
431  return container_of(nb->nb_tm, struct nlx_ping_ctx, pc_tm);
432 }
433 
434 /* client callbacks */
435 static void c_m_recv_cb(const struct m0_net_buffer_event *ev)
436 {
438  int rc;
439  int len;
440  struct ping_work_item *wi;
441  struct ping_msg msg;
442 
444  PING_OUT(ctx, 1, "%s: Msg Recv CB\n", ctx->pc_ident);
445 
446  if (ev->nbe_status < 0) {
447  if (ev->nbe_status == -ECANCELED)
448  PING_OUT(ctx, 1, "%s: msg recv canceled\n",
449  ctx->pc_ident);
450  else {
451  ctx->pc_ops->pf("%s: msg recv error: %d\n",
452  ctx->pc_ident, ev->nbe_status);
453  m0_atomic64_inc(&ctx->pc_errors);
454  }
455  } else {
456  ev->nbe_buffer->nb_length = ev->nbe_length;
457  M0_ASSERT(ev->nbe_offset == 0);
458  rc = decode_msg(ev->nbe_buffer, ev->nbe_length, 0, &msg);
459  M0_ASSERT(rc == 0);
460 
461  if (msg.pm_type != PM_MSG)
462  M0_IMPOSSIBLE("Client: got desc\n");
463 
464  len = strlen(msg.pm_u.pm_str);
465  if (strlen(msg.pm_u.pm_str) < 32)
466  PING_OUT(ctx, 1, "%s: got msg: %s\n",
467  ctx->pc_ident, msg.pm_u.pm_str);
468  else
469  PING_OUT(ctx, 1, "%s: got msg: %u bytes\n",
470  ctx->pc_ident, len + 1);
471 
472  if (ctx->pc_compare_buf != NULL) {
473  int l = strlen(ctx->pc_compare_buf);
474  M0_ASSERT(strlen(msg.pm_u.pm_str) == l + 5);
475  M0_ASSERT(strncmp(ctx->pc_compare_buf,
476  msg.pm_u.pm_str, l) == 0);
477  M0_ASSERT(strcmp(&msg.pm_u.pm_str[l], SEND_RESP) == 0);
478  PING_OUT(ctx, 1, "%s: msg bytes validated\n",
479  ctx->pc_ident);
480  }
481  msg_free(&msg);
482  }
483 
485 
486  M0_ALLOC_PTR(wi);
489 
490  m0_mutex_lock(&ctx->pc_mutex);
491  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
492  m0_cond_signal(&ctx->pc_cond);
493  m0_mutex_unlock(&ctx->pc_mutex);
494 }
495 
496 static void c_m_send_cb(const struct m0_net_buffer_event *ev)
497 {
499  struct ping_work_item *wi;
500 
502  PING_OUT(ctx, 1, "%s: Msg Send CB\n", ctx->pc_ident);
503 
504  if (ev->nbe_status < 0) {
505  if (ev->nbe_status == -ECANCELED)
506  PING_OUT(ctx, 1, "%s: msg send canceled\n",
507  ctx->pc_ident);
508  else {
509  ctx->pc_ops->pf("%s: msg send error: %d\n",
510  ctx->pc_ident, ev->nbe_status);
511  m0_atomic64_inc(&ctx->pc_errors);
512  }
513 
514  /* let main app deal with it */
515  M0_ALLOC_PTR(wi);
518  wi->pwi_nb = ev->nbe_buffer;
519 
520  m0_mutex_lock(&ctx->pc_mutex);
521  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
522  m0_cond_signal(&ctx->pc_cond);
523  m0_mutex_unlock(&ctx->pc_mutex);
524  } else {
527 
528  M0_ALLOC_PTR(wi);
531 
532  m0_mutex_lock(&ctx->pc_mutex);
533  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
534  m0_cond_signal(&ctx->pc_cond);
535  m0_mutex_unlock(&ctx->pc_mutex);
536  }
537 }
538 
539 static void c_p_recv_cb(const struct m0_net_buffer_event *ev)
540 {
542  int rc;
543  int len;
544  struct ping_work_item *wi;
545  struct ping_msg msg;
546 
548  PING_OUT(ctx, 1, "%s: Passive Recv CB\n", ctx->pc_ident);
549 
550  if (ev->nbe_status < 0) {
551  if (ev->nbe_status == -ECANCELED)
552  PING_OUT(ctx, 1, "%s: passive recv canceled\n",
553  ctx->pc_ident);
554  else {
555  ctx->pc_ops->pf("%s: passive recv error: %d\n",
556  ctx->pc_ident, ev->nbe_status);
557  m0_atomic64_inc(&ctx->pc_errors);
558  }
559  } else {
560  ev->nbe_buffer->nb_length = ev->nbe_length;
561  M0_ASSERT(ev->nbe_offset == 0);
562  rc = decode_msg(ev->nbe_buffer, ev->nbe_length, 0, &msg);
563  M0_ASSERT(rc == 0);
564 
565  if (msg.pm_type != PM_MSG)
566  M0_IMPOSSIBLE("Client: got desc\n");
567  len = strlen(msg.pm_u.pm_str);
568  if (strlen(msg.pm_u.pm_str) < 32)
569  PING_OUT(ctx, 1, "%s: got data: %s\n",
570  ctx->pc_ident, msg.pm_u.pm_str);
571  else
572  PING_OUT(ctx, 1, "%s: got data: %u bytes\n",
573  ctx->pc_ident, len + 1);
574  M0_ASSERT(ev->nbe_length == len + 2);
575  if (strcmp(msg.pm_u.pm_str, DEF_RESPONSE) != 0) {
576  int i;
577  for (i = 0; i < len - 1; ++i) {
578  if (msg.pm_u.pm_str[i] != "abcdefghi"[i % 9]) {
579  PING_ERR("%s: data diff @ offset %i: "
580  "%c != %c\n",
581  ctx->pc_ident, i,
582  msg.pm_u.pm_str[i],
583  "abcdefghi"[i % 9]);
584  m0_atomic64_inc(&ctx->pc_errors);
585  break;
586  }
587  }
588  if (i == len - 1)
589  PING_OUT(ctx, 1, "%s: data bytes validated\n",
590  ctx->pc_ident);
591  }
592  msg_free(&msg);
593  }
594 
597 
598  M0_ALLOC_PTR(wi);
601 
602  m0_mutex_lock(&ctx->pc_mutex);
603  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
604  m0_cond_signal(&ctx->pc_cond);
605  m0_mutex_unlock(&ctx->pc_mutex);
606 }
607 
608 static void c_p_send_cb(const struct m0_net_buffer_event *ev)
609 {
611  struct ping_work_item *wi;
612 
614  PING_OUT(ctx, 1, "%s: Passive Send CB\n", ctx->pc_ident);
615 
616  if (ev->nbe_status < 0) {
617  if (ev->nbe_status == -ECANCELED)
618  PING_OUT(ctx, 1, "%s: passive send canceled\n",
619  ctx->pc_ident);
620  else {
621  ctx->pc_ops->pf("%s: passive send error: %d\n",
622  ctx->pc_ident, ev->nbe_status);
623  m0_atomic64_inc(&ctx->pc_errors);
624  }
625  }
626 
629 
630  M0_ALLOC_PTR(wi);
633 
634  m0_mutex_lock(&ctx->pc_mutex);
635  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
636  m0_cond_signal(&ctx->pc_cond);
637  m0_mutex_unlock(&ctx->pc_mutex);
638 }
639 
640 static void c_a_recv_cb(const struct m0_net_buffer_event *ev)
641 {
642  M0_ASSERT(ev->nbe_buffer != NULL &&
644  M0_IMPOSSIBLE("Client: Active Recv CB\n");
645 }
646 
647 static void c_a_send_cb(const struct m0_net_buffer_event *ev)
648 {
649  M0_ASSERT(ev->nbe_buffer != NULL &&
651  M0_IMPOSSIBLE("Client: Active Send CB\n");
652 }
653 
654 static void event_cb(const struct m0_net_tm_event *ev)
655 {
656  struct nlx_ping_ctx *ctx = container_of(ev->nte_tm,
657  struct nlx_ping_ctx,
658  pc_tm);
659 
660  if (ev->nte_type == M0_NET_TEV_STATE_CHANGE) {
661  const char *s = "unexpected";
663  s = "started";
664  else if (ev->nte_next_state == M0_NET_TM_STOPPED)
665  s = "stopped";
666  else if (ev->nte_next_state == M0_NET_TM_FAILED)
667  s = "FAILED";
668  PING_OUT(ctx, 1, "%s: Event CB state change to %s, status %d\n",
669  ctx->pc_ident, s, ev->nte_status);
670  ctx->pc_status = ev->nte_status;
671  } else if (ev->nte_type == M0_NET_TEV_ERROR) {
672  PING_OUT(ctx, 0, "%s: Event CB for error %d\n",
673  ctx->pc_ident, ev->nte_status);
674  m0_atomic64_inc(&ctx->pc_errors);
675  } else if (ev->nte_type == M0_NET_TEV_DIAGNOSTIC)
676  PING_OUT(ctx, 0, "%s: Event CB for diagnostic %d\n",
677  ctx->pc_ident, ev->nte_status);
678 }
679 
680 static bool server_stop = false;
681 
683  .nbc_cb = {
690  },
691 };
692 
693 static struct m0_net_tm_callbacks ctm_cb = {
695 };
696 
697 static void server_event_ident(char *buf, size_t len, const char *ident,
698  const struct m0_net_buffer_event *ev)
699 {
700  const struct m0_net_end_point *ep = NULL;
701  if (ev != NULL && ev->nbe_buffer != NULL) {
702  if (ev->nbe_buffer->nb_qtype == M0_NET_QT_MSG_RECV) {
703  if (ev->nbe_status == 0)
704  ep = ev->nbe_ep;
705  } else {
706  ep = ev->nbe_buffer->nb_ep;
707  }
708  }
709  if (ep != NULL)
710  snprintf(buf, len, "%s (peer %s)", ident, ep->nep_addr);
711  else
712  snprintf(buf, len, "%s", ident);
713 }
714 
716 
717 /* server callbacks */
718 static void s_m_recv_cb(const struct m0_net_buffer_event *ev)
719 {
721  int rc;
722  struct ping_work_item *wi;
723  struct ping_msg msg;
724  int64_t count;
725  char idbuf[IDBUF_LEN];
726  int bulk_delay = ctx->pc_server_bulk_delay;
727 
728 
729  M0_ASSERT(ev->nbe_buffer != NULL &&
731  server_event_ident(idbuf, ARRAY_SIZE(idbuf), ctx->pc_ident, ev);
733  PING_OUT(ctx, 1, "%s: Msg Recv CB %ld 0x%lx\n", idbuf, (long int) count,
734  (unsigned long int) ev->nbe_buffer->nb_flags);
735  if (ev->nbe_status < 0) {
736  if (ev->nbe_status == -ECANCELED && server_stop)
737  PING_OUT(ctx, 1, "%s: msg recv canceled on shutdown\n",
738  idbuf);
739  else {
740  ctx->pc_ops->pf("%s: msg recv error: %d\n",
741  idbuf, ev->nbe_status);
742  m0_atomic64_inc(&ctx->pc_errors);
743 
744  ev->nbe_buffer->nb_ep = NULL;
745  M0_ASSERT(!(ev->nbe_buffer->nb_flags &
748  rc = m0_net_buffer_add(ev->nbe_buffer, &ctx->pc_tm);
749  M0_ASSERT(rc == 0);
750  }
751  } else {
752  struct m0_net_buffer *nb;
753  unsigned bulk_size = ctx->pc_segments * ctx->pc_seg_size;
754 
755  rc = decode_msg(ev->nbe_buffer, ev->nbe_length, ev->nbe_offset,
756  &msg);
757  M0_ASSERT(rc == 0);
758 
759  nb = ping_buf_get(ctx);
760  if (nb == NULL) {
761  ctx->pc_ops->pf("%s: dropped msg, "
762  "no buffer available\n", idbuf);
763  m0_atomic64_inc(&ctx->pc_errors);
764  } else if ((msg.pm_type == PM_SEND_DESC ||
765  msg.pm_type == PM_RECV_DESC) &&
766  msg.pm_passive_size > bulk_size) {
767  const char *req = msg.pm_type == PM_SEND_DESC ?
768  "receive" : "send";
769  ctx->pc_ops->pf("%s: dropped msg, bulk %s request "
770  "too large (%u)\n", idbuf, req,
771  msg.pm_passive_size);
772  m0_atomic64_inc(&ctx->pc_errors);
773  ping_buf_put(ctx, nb);
774  } else {
775  M0_ALLOC_PTR(wi);
776  nb->nb_ep = ev->nbe_ep; /* save for later, if set */
777  ev->nbe_buffer->nb_ep = NULL;
779  wi->pwi_nb = nb;
780  if (msg.pm_type == PM_SEND_DESC) {
781  PING_OUT(ctx, 1, "%s: got desc for "
782  "active recv: sz=%u\n", idbuf,
783  msg.pm_passive_size);
786  nb->nb_length = msg.pm_passive_size;
788  &nb->nb_desc);
789  nb->nb_ep = NULL; /* not needed */
790  M0_ASSERT(rc == 0);
791  if (bulk_delay != 0) {
792  PING_OUT(ctx, 1, "%s: delay %d secs\n",
793  idbuf, bulk_delay);
794  ping_sleep_secs(bulk_delay);
795  }
796  } else if (msg.pm_type == PM_RECV_DESC) {
797  PING_OUT(ctx, 1, "%s: got desc for "
798  "active send: sz=%u\n", idbuf,
799  msg.pm_passive_size);
802  nb->nb_length = 0;
804  &nb->nb_desc);
805  nb->nb_ep = NULL; /* not needed */
806  /* reuse encode_msg for convenience */
807  if (msg.pm_passive_size == 0)
808  rc = encode_msg(nb, DEF_RESPONSE);
809  else {
810  char *bp;
811  int i;
812  bp = m0_alloc(msg.pm_passive_size);
813  M0_ASSERT(bp != NULL);
814  for (i = 0;
815  i < msg.pm_passive_size -
816  PING_MSG_OVERHEAD; ++i)
817  bp[i] = "abcdefghi"[i % 9];
818  PING_OUT(ctx, 1, "%s: sending data "
819  "%u bytes\n", idbuf,
820  msg.pm_passive_size);
821  rc = encode_msg(nb, bp);
822  m0_free(bp);
823  M0_ASSERT(rc == 0);
824  }
825  M0_ASSERT(rc == 0);
826  if (bulk_delay != 0) {
827  PING_OUT(ctx, 1, "%s: delay %d secs\n",
828  idbuf, bulk_delay);
829  ping_sleep_secs(bulk_delay);
830  }
831  } else {
832  char *data;
833  int len = strlen(msg.pm_u.pm_str);
834  if (strlen(msg.pm_u.pm_str) < 32)
835  PING_OUT(ctx, 1, "%s: got msg: %s\n",
836  idbuf, msg.pm_u.pm_str);
837  else
838  PING_OUT(ctx, 1, "%s: got msg: "
839  "%u bytes\n",
840  idbuf, len + 1);
841 
842  /* queue wi to send back ping response */
843  data = m0_alloc(len + 6);
847  strcpy(data, msg.pm_u.pm_str);
848  strcat(data, SEND_RESP);
849  rc = encode_msg(nb, data);
850  m0_free(data);
851  M0_ASSERT(rc == 0);
852  }
853  m0_mutex_lock(&ctx->pc_mutex);
854  m0_list_add(&ctx->pc_work_queue, &wi->pwi_link);
855  if (ctx->pc_sync_events)
856  m0_chan_signal(&ctx->pc_wq_chan);
857  else
858  m0_cond_signal(&ctx->pc_cond);
859  m0_mutex_unlock(&ctx->pc_mutex);
860  }
861  ev->nbe_buffer->nb_ep = NULL;
862  if (!(ev->nbe_buffer->nb_flags & M0_NET_BUF_QUEUED)) {
864  PING_OUT(ctx, 1, "%s: re-queuing buffer\n",
865  ctx->pc_ident);
866  rc = m0_net_buffer_add(ev->nbe_buffer, &ctx->pc_tm);
867  M0_ASSERT(rc == 0);
868  }
869 
870  msg_free(&msg);
871  }
872 }
873 
874 static void s_m_send_cb(const struct m0_net_buffer_event *ev)
875 {
877  char idbuf[IDBUF_LEN];
878 
880  server_event_ident(idbuf, ARRAY_SIZE(idbuf), ctx->pc_ident, ev);
881  PING_OUT(ctx, 1, "%s: Msg Send CB\n", idbuf);
882 
883  if (ev->nbe_status < 0) {
884  /* no retries here */
885  if (ev->nbe_status == -ECANCELED)
886  PING_OUT(ctx, 1, "%s: msg send canceled\n", idbuf);
887  else {
888  ctx->pc_ops->pf("%s: msg send error: %d\n",
889  idbuf, ev->nbe_status);
890  m0_atomic64_inc(&ctx->pc_errors);
891  }
892  }
893 
895  ev->nbe_buffer->nb_ep = NULL;
896 
898 }
899 
900 static void s_p_recv_cb(const struct m0_net_buffer_event *ev)
901 {
902  M0_ASSERT(ev->nbe_buffer != NULL &&
904  M0_IMPOSSIBLE("Server: Passive Recv CB\n");
905 }
906 
907 static void s_p_send_cb(const struct m0_net_buffer_event *ev)
908 {
909  M0_ASSERT(ev->nbe_buffer != NULL &&
911  M0_IMPOSSIBLE("Server: Passive Send CB\n");
912 }
913 
914 static void s_a_recv_cb(const struct m0_net_buffer_event *ev)
915 {
917  int rc;
918  int len;
919  struct ping_msg msg;
920  char idbuf[IDBUF_LEN];
921 
923  server_event_ident(idbuf, ARRAY_SIZE(idbuf), ctx->pc_ident, ev);
924  PING_OUT(ctx, 1, "%s: Active Recv CB\n", idbuf);
925 
926  if (ev->nbe_status < 0) {
927  /* no retries here */
928  if (ev->nbe_status == -ECANCELED)
929  PING_OUT(ctx, 1, "%s: active recv canceled\n", idbuf);
930  else {
931  ctx->pc_ops->pf("%s: active recv error: %d\n",
932  idbuf, ev->nbe_status);
933  m0_atomic64_inc(&ctx->pc_errors);
934  }
935  } else {
936  ev->nbe_buffer->nb_length = ev->nbe_length;
937  M0_ASSERT(ev->nbe_offset == 0);
938  rc = decode_msg(ev->nbe_buffer, ev->nbe_length, 0, &msg);
939  M0_ASSERT(rc == 0);
940 
941  if (msg.pm_type != PM_MSG)
942  M0_IMPOSSIBLE("Server: got desc\n");
943  len = strlen(msg.pm_u.pm_str);
944  if (len < 32)
945  PING_OUT(ctx, 1, "%s: got data: %s\n",
946  idbuf, msg.pm_u.pm_str);
947  else
948  PING_OUT(ctx, 1, "%s: got data: %u bytes\n",
949  idbuf, len + 1);
950  M0_ASSERT(ev->nbe_length == len + 2);
951  if (strcmp(msg.pm_u.pm_str, DEF_SEND) != 0) {
952  int i;
953  for (i = 0; i < len - 1; ++i) {
954  if (msg.pm_u.pm_str[i] != "abcdefghi"[i % 9]) {
955  PING_ERR("%s: data diff @ offset %i: "
956  "%c != %c\n", idbuf, i,
957  msg.pm_u.pm_str[i],
958  "abcdefghi"[i % 9]);
959  m0_atomic64_inc(&ctx->pc_errors);
960  break;
961  }
962  }
963  if (i == len - 1)
964  PING_OUT(ctx, 1, "%s: data bytes validated\n",
965  idbuf);
966  }
967 
968  msg_free(&msg);
969  }
970 
973 }
974 
975 static void s_a_send_cb(const struct m0_net_buffer_event *ev)
976 {
978  char idbuf[IDBUF_LEN];
979 
981  server_event_ident(idbuf, ARRAY_SIZE(idbuf), ctx->pc_ident, ev);
982  PING_OUT(ctx, 1, "%s: Active Send CB\n", idbuf);
983 
984  if (ev->nbe_status < 0) {
985  /* no retries here */
986  if (ev->nbe_status == -ECANCELED)
987  PING_OUT(ctx, 1, "%s: active send canceled\n", idbuf);
988  else {
989  ctx->pc_ops->pf("%s: active send error: %d\n",
990  idbuf, ev->nbe_status);
991  m0_atomic64_inc(&ctx->pc_errors);
992  }
993  }
994 
997 }
998 
1000  .nbc_cb = {
1007  },
1008 };
1009 
1010 static struct m0_net_tm_callbacks stm_cb = {
1012 };
1013 
1014 static void ping_fini(struct nlx_ping_ctx *ctx);
1015 
1016 static bool ping_workq_clink_cb(struct m0_clink *cl)
1017 {
1018  struct nlx_ping_ctx *ctx =
1019  container_of(cl, struct nlx_ping_ctx, pc_wq_clink);
1020  ++ctx->pc_wq_signal_count;
1021  return false;
1022 }
1023 
1024 static bool ping_net_clink_cb(struct m0_clink *cl)
1025 {
1026  struct nlx_ping_ctx *ctx =
1027  container_of(cl, struct nlx_ping_ctx, pc_net_clink);
1028  ++ctx->pc_net_signal_count;
1029  return false;
1030 }
1031 
1042 static int ping_init(struct nlx_ping_ctx *ctx)
1043 {
1044  int i;
1045  int rc;
1047  struct m0_clink tmwait;
1048  uint64_t bsz;
1049 
1050  m0_list_init(&ctx->pc_work_queue);
1051  m0_atomic64_set(&ctx->pc_errors, 0);
1052  m0_atomic64_set(&ctx->pc_retries, 0);
1053  ctx->pc_interfaces = NULL;
1054  if (ctx->pc_sync_events) {
1055  m0_chan_init(&ctx->pc_wq_chan, &ctx->pc_mutex);
1056  m0_chan_init(&ctx->pc_net_chan, &ctx->pc_mutex);
1057 
1058  m0_clink_init(&ctx->pc_wq_clink, &ping_workq_clink_cb);
1059  m0_clink_attach(&ctx->pc_net_clink, &ctx->pc_wq_clink,
1060  &ping_net_clink_cb); /* group */
1061 
1062  m0_clink_add_lock(&ctx->pc_wq_chan, &ctx->pc_wq_clink);
1063  m0_clink_add_lock(&ctx->pc_net_chan, &ctx->pc_net_clink);
1064  }
1065 
1066  rc = m0_net_domain_init(&ctx->pc_dom, ctx->pc_xprt);
1067  if (rc != 0) {
1068  PING_ERR("domain init failed: %d\n", rc);
1069  goto fail;
1070  }
1071 
1072  if (ctx->pc_dom_debug > 0)
1073  m0_net_lnet_dom_set_debug(&ctx->pc_dom, ctx->pc_dom_debug);
1074 
1075  rc = m0_net_lnet_ifaces_get(&ctx->pc_dom, &ctx->pc_interfaces);
1076  if (rc != 0) {
1077  PING_ERR("failed to load interface names: %d\n", rc);
1078  goto fail;
1079  }
1080  M0_ASSERT(ctx->pc_interfaces != NULL);
1081 
1082  if (ctx->pc_interfaces[0] == NULL) {
1083  PING_ERR("no interfaces defined locally\n");
1084  goto fail;
1085  }
1086 
1087  ctx->pc_seg_shift = PING_SEGMENT_SHIFT;
1088  ctx->pc_seg_size = PING_SEGMENT_SIZE;
1089  bsz = ctx->pc_bulk_size > 0 ? ctx->pc_bulk_size : PING_DEF_BUFFER_SIZE;
1090  ctx->pc_segments = bsz / ctx->pc_seg_size +
1091  (bsz % ctx->pc_seg_size != 0 ? 1 : 0);
1092  M0_ASSERT(ctx->pc_segments * ctx->pc_seg_size <= PING_MAX_BUFFER_SIZE);
1093  rc = alloc_buffers(ctx->pc_nr_bufs, ctx->pc_segments, ctx->pc_seg_size,
1094  ctx->pc_seg_shift, &ctx->pc_nbs);
1095  if (rc != 0) {
1096  PING_ERR("buffer allocation %u X %lu([%u][%u]) failed: %d\n",
1097  ctx->pc_nr_bufs, (unsigned long) bsz,
1098  ctx->pc_segments, ctx->pc_seg_size, rc);
1099  goto fail;
1100  }
1101  rc = m0_bitmap_init(&ctx->pc_nbbm, ctx->pc_nr_bufs);
1102  if (rc != 0) {
1103  PING_ERR("buffer bitmap allocation failed: %d\n", rc);
1104  goto fail;
1105  }
1106  M0_ASSERT(ctx->pc_buf_callbacks != NULL);
1107  for (i = 0; i < ctx->pc_nr_bufs; ++i) {
1108  rc = m0_net_buffer_register(&ctx->pc_nbs[i], &ctx->pc_dom);
1109  if (rc != 0) {
1110  PING_ERR("buffer register failed: %d\n", rc);
1111  goto fail;
1112  }
1113  ctx->pc_nbs[i].nb_callbacks = ctx->pc_buf_callbacks;
1114  }
1115 
1116  if (ctx->pc_network == NULL) {
1117  ctx->pc_network = ctx->pc_interfaces[0];
1118  for (i = 0; ctx->pc_interfaces[i] != NULL; ++i) {
1119  if (strstr(ctx->pc_interfaces[i], "@lo") != NULL)
1120  continue;
1121  ctx->pc_network = ctx->pc_interfaces[i]; /* 1st !@lo */
1122  break;
1123  }
1124  }
1125  if (ctx->pc_rnetwork == NULL)
1126  ctx->pc_rnetwork = ctx->pc_network;
1127 
1128  if (ctx->pc_tmid >= 0)
1129  snprintf(addr, ARRAY_SIZE(addr), "%s:%u:%u:%u", ctx->pc_network,
1130  ctx->pc_pid, ctx->pc_portal, ctx->pc_tmid);
1131  else
1132  snprintf(addr, ARRAY_SIZE(addr), "%s:%u:%u:*", ctx->pc_network,
1133  ctx->pc_pid, ctx->pc_portal);
1134 
1136  rc = m0_net_tm_init(&ctx->pc_tm, &ctx->pc_dom);
1137  if (rc != 0) {
1138  PING_ERR("transfer machine init failed: %d\n", rc);
1139  goto fail;
1140  }
1141 
1142  if (ctx->pc_tm_debug > 0)
1143  m0_net_lnet_tm_set_debug(&ctx->pc_tm, ctx->pc_tm_debug);
1144 
1145  if (ctx->pc_sync_events) {
1147  M0_ASSERT(rc == 0);
1148  }
1149 
1150  m0_clink_init(&tmwait, NULL);
1151  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
1152  rc = m0_net_tm_start(&ctx->pc_tm, addr);
1153  if (rc != 0) {
1154  PING_ERR("transfer machine start failed: %d\n", rc);
1155  goto fail;
1156  }
1157 
1158  /* wait for tm to notify it has started */
1159  m0_chan_wait(&tmwait);
1160  m0_clink_del_lock(&tmwait);
1161  if (ctx->pc_tm.ntm_state != M0_NET_TM_STARTED) {
1162  rc = ctx->pc_status;
1163  if (rc == 0)
1164  rc = -EINVAL;
1165  PING_ERR("transfer machine start failed: %d\n", rc);
1166  goto fail;
1167  }
1168 
1169  return rc;
1170 fail:
1171  ping_fini(ctx);
1172  return rc;
1173 }
1174 
1175 static inline bool ping_tm_timedwait(struct nlx_ping_ctx *ctx,
1176  struct m0_clink *cl,
1178 {
1179  bool signalled = false;
1180  if (timeout == M0_TIME_NEVER) {
1181  if (ctx->pc_sync_events) {
1182  do {
1184  signalled = m0_chan_timedwait(cl, timeout);
1186  } while (!signalled);
1187  } else
1188  m0_chan_wait(cl);
1189  } else {
1190  signalled = m0_chan_timedwait(cl, timeout);
1191  if (ctx->pc_sync_events)
1193  }
1194  return signalled;
1195 }
1196 
1197 static inline void ping_tm_wait(struct nlx_ping_ctx *ctx,
1198  struct m0_clink *cl)
1199 {
1201 }
1202 
1203 static void ping_fini(struct nlx_ping_ctx *ctx)
1204 {
1205  struct m0_list_link *link;
1206  struct ping_work_item *wi;
1207 
1208  if (ctx->pc_tm.ntm_state != M0_NET_TM_UNDEFINED) {
1209  if (ctx->pc_tm.ntm_state != M0_NET_TM_FAILED) {
1210  struct m0_clink tmwait;
1211  m0_clink_init(&tmwait, NULL);
1212  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
1213  m0_net_tm_stop(&ctx->pc_tm, true);
1214  while (ctx->pc_tm.ntm_state != M0_NET_TM_STOPPED) {
1215  /* wait for it to stop */
1217  50 * ONE_MILLION);
1218  m0_chan_timedwait(&tmwait, timeout);
1219  }
1220  m0_clink_del_lock(&tmwait);
1221  }
1222 
1223  if (ctx->pc_ops->pqs != NULL)
1224  (*ctx->pc_ops->pqs)(ctx, false);
1225 
1226  m0_net_tm_fini(&ctx->pc_tm);
1227  }
1228  if (ctx->pc_nbs != NULL) {
1229  int i;
1230  for (i = 0; i < ctx->pc_nr_bufs; ++i) {
1231  struct m0_net_buffer *nb = &ctx->pc_nbs[i];
1233  m0_net_buffer_deregister(nb, &ctx->pc_dom);
1235  ctx->pc_seg_shift);
1236  }
1237  m0_free(ctx->pc_nbs);
1238  m0_bitmap_fini(&ctx->pc_nbbm);
1239  }
1240  if (ctx->pc_interfaces != NULL)
1241  m0_net_lnet_ifaces_put(&ctx->pc_dom, &ctx->pc_interfaces);
1242  if (ctx->pc_dom.nd_xprt != NULL)
1243  m0_net_domain_fini(&ctx->pc_dom);
1244 
1245  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1246  link = m0_list_first(&ctx->pc_work_queue);
1247  wi = m0_list_entry(link, struct ping_work_item, pwi_link);
1248  m0_list_del(&wi->pwi_link);
1249  m0_free(wi);
1250  }
1251  if (ctx->pc_sync_events) {
1252  m0_clink_del_lock(&ctx->pc_net_clink);
1253  m0_clink_del_lock(&ctx->pc_wq_clink);
1254 
1255  m0_clink_fini(&ctx->pc_net_clink);
1256  m0_clink_fini(&ctx->pc_wq_clink);
1257 
1258  m0_chan_fini_lock(&ctx->pc_net_chan);
1259  m0_chan_fini_lock(&ctx->pc_wq_chan);
1260  }
1261 
1262  m0_list_fini(&ctx->pc_work_queue);
1263 }
1264 
1265 static void set_msg_timeout(struct nlx_ping_ctx *ctx,
1266  struct m0_net_buffer *nb)
1267 {
1268  if (ctx->pc_msg_timeout > 0) {
1269  PING_OUT(ctx, 1, "%s: setting msg nb_timeout to %ds\n",
1270  ctx->pc_ident, ctx->pc_msg_timeout);
1271  nb->nb_timeout = m0_time_from_now(ctx->pc_msg_timeout, 0);
1272  } else {
1273  nb->nb_timeout = M0_TIME_NEVER;
1274  }
1275 }
1276 
1277 static void set_bulk_timeout(struct nlx_ping_ctx *ctx,
1278  struct m0_net_buffer *nb)
1279 {
1280  if (ctx->pc_bulk_timeout > 0) {
1281  PING_OUT(ctx, 1, "%s: setting bulk nb_timeout to %ds\n",
1282  ctx->pc_ident, ctx->pc_bulk_timeout);
1283  nb->nb_timeout = m0_time_from_now(ctx->pc_bulk_timeout, 0);
1284  } else {
1285  nb->nb_timeout = M0_TIME_NEVER;
1286  }
1287 }
1288 
1290 {
1291  struct m0_list_link *link;
1292  struct ping_work_item *wi;
1293  int rc;
1294 
1295  M0_ASSERT(m0_mutex_is_locked(&ctx->pc_mutex));
1296 
1297  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1298  link = m0_list_first(&ctx->pc_work_queue);
1299  wi = m0_list_entry(link, struct ping_work_item,
1300  pwi_link);
1301  switch (wi->pwi_type) {
1302  case M0_NET_QT_MSG_SEND:
1303  set_msg_timeout(ctx, wi->pwi_nb);
1304  rc = m0_net_buffer_add(wi->pwi_nb, &ctx->pc_tm);
1305  break;
1308  set_bulk_timeout(ctx, wi->pwi_nb);
1309  rc = m0_net_buffer_add(wi->pwi_nb, &ctx->pc_tm);
1310  break;
1311  default:
1312  M0_IMPOSSIBLE("unexpected wi->pwi_type");
1313  rc = -EINVAL;
1314  break;
1315  }
1316  if (rc != 0) {
1317  m0_atomic64_inc(&ctx->pc_errors);
1318  ctx->pc_ops->pf("%s buffer_add(%d) failed %d\n",
1319  ctx->pc_ident, wi->pwi_type, rc);
1320  }
1321  m0_list_del(&wi->pwi_link);
1322  m0_free(wi);
1323  }
1324 }
1325 
1327 {
1329 
1330  M0_ASSERT(m0_mutex_is_locked(&ctx->pc_mutex));
1331 
1332  while (!server_stop) {
1334  timeout = m0_time_from_now(5, 0);
1335  m0_cond_timedwait(&ctx->pc_cond, timeout);
1336  }
1337 
1338  return;
1339 }
1340 
1342 {
1344 
1345  M0_ASSERT(m0_mutex_is_locked(&ctx->pc_mutex));
1346 
1347  while (!server_stop) {
1348  while (!server_stop &&
1349  m0_list_is_empty(&ctx->pc_work_queue) &&
1350  !m0_net_buffer_event_pending(&ctx->pc_tm)) {
1351  ++ctx->pc_blocked_count;
1353  &ctx->pc_net_chan);
1354  m0_mutex_unlock(&ctx->pc_mutex);
1355  /* wait on the channel group */
1356  timeout = m0_time_from_now(15, 0);
1357  m0_chan_timedwait(&ctx->pc_wq_clink, timeout);
1358  m0_mutex_lock(&ctx->pc_mutex);
1359  }
1360 
1361  ++ctx->pc_worked_count;
1362 
1363  if (m0_net_buffer_event_pending(&ctx->pc_tm)) {
1364  m0_mutex_unlock(&ctx->pc_mutex);
1365  /* deliver events synchronously on this thread */
1367  m0_mutex_lock(&ctx->pc_mutex);
1368  }
1369 
1370  if (server_stop) {
1371  PING_OUT(ctx, 1, "%s stopping\n", ctx->pc_ident);
1372  break;
1373  }
1374 
1376  }
1377 
1378  return;
1379 }
1380 
1381 
1382 static void nlx_ping_server(struct nlx_ping_ctx *ctx)
1383 {
1384  int i;
1385  int rc;
1386  struct m0_net_buffer *nb;
1387  struct m0_clink tmwait;
1388  unsigned int num_recv_bufs = max32u(ctx->pc_nr_bufs / 8, 2);
1389  int buf_size;
1390 
1391  ctx->pc_tm.ntm_callbacks = &stm_cb;
1392  ctx->pc_buf_callbacks = &sbuf_cb;
1393 
1394  ctx->pc_ident = "Server";
1395  M0_ASSERT(ctx->pc_nr_bufs > 2);
1396  if (ctx->pc_nr_recv_bufs > ctx->pc_nr_bufs / 2)
1397  ctx->pc_nr_recv_bufs = num_recv_bufs;
1398  if (ctx->pc_nr_recv_bufs < 2)
1399  ctx->pc_nr_recv_bufs = num_recv_bufs;
1400  num_recv_bufs = ctx->pc_nr_recv_bufs;
1401  M0_ASSERT(num_recv_bufs >= 2);
1402  rc = ping_init(ctx);
1403  M0_ASSERT(rc == 0);
1404  M0_ASSERT(ctx->pc_network != NULL);
1406  ctx->pc_ops->pf("Server end point: %s\n", ctx->pc_tm.ntm_ep->nep_addr);
1407 
1408  buf_size = ctx->pc_segments * ctx->pc_seg_size;
1409  if (ctx->pc_max_recv_msgs > 0 && ctx->pc_min_recv_size <= 0)
1410  ctx->pc_min_recv_size = buf_size / ctx->pc_max_recv_msgs;
1411  else if (ctx->pc_min_recv_size > 0 && ctx->pc_max_recv_msgs <= 0)
1412  ctx->pc_max_recv_msgs = buf_size / ctx->pc_min_recv_size;
1413 
1414  if (ctx->pc_min_recv_size < PING_DEF_MIN_RECV_SIZE ||
1415  ctx->pc_min_recv_size > buf_size ||
1416  ctx->pc_max_recv_msgs < 1)
1417  ctx->pc_max_recv_msgs = ctx->pc_min_recv_size = -1;
1418 
1419  if (ctx->pc_min_recv_size <= 0 && ctx->pc_max_recv_msgs <= 0) {
1420  ctx->pc_min_recv_size = PING_DEF_MIN_RECV_SIZE;
1421  ctx->pc_max_recv_msgs = buf_size / ctx->pc_min_recv_size;
1422  }
1423  M0_ASSERT(ctx->pc_min_recv_size >= PING_DEF_MIN_RECV_SIZE);
1424  M0_ASSERT(ctx->pc_max_recv_msgs >= 1);
1425  ctx->pc_ops->pf("%s buffer parameters:\n"
1426  "\t total buffers=%u\n"
1427  "\t buffer size=%u\n"
1428  "\treceive buffers=%u\n"
1429  "\t min_recv_size=%d\n"
1430  "\t max_recv_msgs=%d\n",
1431  ctx->pc_ident, ctx->pc_nr_bufs, buf_size, num_recv_bufs,
1432  ctx->pc_min_recv_size, ctx->pc_max_recv_msgs);
1433 
1434  m0_mutex_lock(&ctx->pc_mutex);
1435  for (i = 0; i < num_recv_bufs; ++i) {
1436  nb = &ctx->pc_nbs[i];
1438  nb->nb_timeout = M0_TIME_NEVER;
1439  nb->nb_ep = NULL;
1440  nb->nb_min_receive_size = ctx->pc_min_recv_size;
1441  nb->nb_max_receive_msgs = ctx->pc_max_recv_msgs;
1442  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1443  m0_bitmap_set(&ctx->pc_nbbm, i, true);
1444  M0_ASSERT(rc == 0);
1445  }
1446 
1447  /* startup synchronization handshake */
1448  ctx->pc_ready = true;
1449  m0_cond_signal(&ctx->pc_cond);
1450  while (ctx->pc_ready)
1451  m0_cond_wait(&ctx->pc_cond);
1452 
1453  if (ctx->pc_sync_events)
1455  else
1457  m0_mutex_unlock(&ctx->pc_mutex);
1458 
1459  /* dequeue recv buffers */
1460  m0_clink_init(&tmwait, NULL);
1461 
1462  for (i = 0; i < num_recv_bufs; ++i) {
1463  nb = &ctx->pc_nbs[i];
1464  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
1465  m0_net_buffer_del(nb, &ctx->pc_tm);
1466  m0_bitmap_set(&ctx->pc_nbbm, i, false);
1467  ping_tm_wait(ctx, &tmwait);
1468  m0_clink_del_lock(&tmwait);
1469  }
1470 
1471  /* wait for active buffers to flush */
1472  m0_clink_add_lock(&ctx->pc_tm.ntm_chan, &tmwait);
1473  for (i = 0; i < M0_NET_QT_NR; ++i)
1474  while (!m0_net_tm_tlist_is_empty(&ctx->pc_tm.ntm_q[i])) {
1475  PING_OUT(ctx, 1, "waiting for queue %d to empty\n", i);
1476  ping_tm_wait(ctx, &tmwait);
1477  }
1478  m0_clink_del_lock(&tmwait);
1479  m0_clink_fini(&tmwait);
1480 
1481  ping_fini(ctx);
1482  server_stop = false;
1483 }
1484 
1486 {
1487  m0_mutex_lock(&ctx->pc_mutex);
1488  server_stop = true;
1489  if (ctx->pc_sync_events)
1490  m0_chan_signal(&ctx->pc_wq_chan);
1491  else
1492  m0_cond_signal(&ctx->pc_cond);
1493  m0_mutex_unlock(&ctx->pc_mutex);
1494 }
1495 
1497  struct nlx_ping_ctx *sctx)
1498 {
1499  int rc;
1500 
1501  sctx->pc_xprt = &m0_net_lnet_xprt;
1502  sctx->pc_pid = M0_NET_LNET_PID;
1503 
1504  m0_mutex_lock(&sctx->pc_mutex);
1507  NULL, &nlx_ping_server, sctx, "ping_server");
1508  M0_ASSERT(rc == 0);
1509  while (!sctx->pc_ready)
1510  m0_cond_wait(&sctx->pc_cond);
1511  sctx->pc_ready = false;
1512  m0_cond_signal(&sctx->pc_cond);
1513  m0_mutex_unlock(&sctx->pc_mutex);
1514 }
1515 
1525  struct m0_net_end_point *server_ep,
1526  const char *data)
1527 {
1528  int rc;
1529  struct m0_net_buffer *nb;
1530  struct m0_list_link *link;
1531  struct ping_work_item *wi;
1532  int recv_done = 0;
1533  int retries = SEND_RETRIES;
1534  m0_time_t session_timeout = M0_TIME_NEVER;
1535 
1536  if (data == NULL)
1537  data = "ping";
1538  ctx->pc_compare_buf = data;
1539 
1540  PING_OUT(ctx, 1, "%s: starting msg send/recv sequence\n",
1541  ctx->pc_ident);
1542  /* queue buffer for response, must do before sending msg */
1543  nb = ping_buf_get(ctx);
1544  M0_ASSERT(nb != NULL);
1546  nb->nb_timeout = M0_TIME_NEVER;
1547  nb->nb_ep = NULL;
1548  nb->nb_min_receive_size = ctx->pc_segments * ctx->pc_seg_size;
1549  nb->nb_max_receive_msgs = 1;
1550  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1551  M0_ASSERT(rc == 0);
1552 
1553  nb = ping_buf_get(ctx);
1554  M0_ASSERT(nb != NULL);
1555  rc = encode_msg(nb, data);
1557  nb->nb_ep = server_ep;
1558  M0_ASSERT(rc == 0);
1559  set_msg_timeout(ctx, nb);
1560  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1561  M0_ASSERT(rc == 0);
1562 
1563  /* wait for receive response to complete */
1564  m0_mutex_lock(&ctx->pc_mutex);
1565  while (1) {
1566  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1567  link = m0_list_first(&ctx->pc_work_queue);
1568  wi = m0_list_entry(link, struct ping_work_item,
1569  pwi_link);
1570  m0_list_del(&wi->pwi_link);
1571  if (wi->pwi_type == M0_NET_QT_MSG_RECV) {
1572  ctx->pc_compare_buf = NULL;
1573  recv_done++;
1574  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND &&
1575  wi->pwi_nb != NULL) {
1576  m0_time_t delay;
1577  /* send error, retry a few times */
1578  if (retries == 0) {
1579  ctx->pc_compare_buf = NULL;
1580  ctx->pc_ops->pf("%s: send failed, "
1581  "no more retries\n",
1582  ctx->pc_ident);
1583  m0_mutex_unlock(&ctx->pc_mutex);
1584  ping_buf_put(ctx, nb);
1585  m0_free(wi);
1586  m0_atomic64_inc(&ctx->pc_errors);
1587  return -ETIMEDOUT;
1588  }
1589  delay = m0_time(SEND_RETRIES + 1 - retries, 0);
1590  --retries;
1592  m0_atomic64_inc(&ctx->pc_retries);
1593  set_msg_timeout(ctx, nb);
1594  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1595  M0_ASSERT(rc == 0);
1596  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND) {
1597  recv_done++;
1598  if (ctx->pc_msg_timeout > 0)
1599  session_timeout =
1601  ctx->pc_msg_timeout, 0);
1602  }
1603  m0_free(wi);
1604  }
1605  if (recv_done == 2)
1606  break;
1607  if (session_timeout == M0_TIME_NEVER)
1608  m0_cond_wait(&ctx->pc_cond);
1609  else if (!m0_cond_timedwait(&ctx->pc_cond,
1610  session_timeout)) {
1611  ctx->pc_ops->pf("%s: Receive TIMED OUT\n",
1612  ctx->pc_ident);
1613  rc = -ETIMEDOUT;
1614  m0_atomic64_inc(&ctx->pc_errors);
1615  break;
1616  }
1617  }
1618 
1619  m0_mutex_unlock(&ctx->pc_mutex);
1620  return rc;
1621 }
1622 
1624  struct m0_net_end_point *server_ep)
1625 {
1626  int rc;
1627  struct m0_net_buffer *nb;
1628  struct m0_net_buf_desc nbd;
1629  struct m0_list_link *link;
1630  struct ping_work_item *wi;
1631  int recv_done = 0;
1632  int retries = SEND_RETRIES;
1633 
1634  PING_OUT(ctx, 1, "%s: starting passive recv sequence\n", ctx->pc_ident);
1635  /* queue our passive receive buffer */
1636  nb = ping_buf_get(ctx);
1637  M0_ASSERT(nb != NULL);
1639  nb->nb_ep = server_ep;
1640  set_bulk_timeout(ctx, nb);
1641  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1642  M0_ASSERT(rc == 0);
1643  rc = m0_net_desc_copy(&nb->nb_desc, &nbd);
1644  M0_ASSERT(rc == 0);
1645 
1646  /* send descriptor in message to server */
1647  nb = ping_buf_get(ctx);
1648  M0_ASSERT(nb != NULL);
1649  rc = encode_desc(nb, false, ctx->pc_seg_size * ctx->pc_segments, &nbd);
1650  m0_net_desc_free(&nbd);
1652  nb->nb_ep = server_ep;
1653  M0_ASSERT(rc == 0);
1654  nb->nb_timeout = M0_TIME_NEVER;
1655  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1656  M0_ASSERT(rc == 0);
1657 
1658  /* wait for receive to complete */
1659  m0_mutex_lock(&ctx->pc_mutex);
1660  while (1) {
1661  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1662  link = m0_list_first(&ctx->pc_work_queue);
1663  wi = m0_list_entry(link, struct ping_work_item,
1664  pwi_link);
1665  m0_list_del(&wi->pwi_link);
1667  recv_done++;
1668  else if (wi->pwi_type == M0_NET_QT_MSG_SEND &&
1669  wi->pwi_nb != NULL) {
1670  m0_time_t delay;
1671  /* send error, retry a few times */
1672  if (retries == 0) {
1673  ctx->pc_ops->pf("%s: send failed, "
1674  "no more retries\n",
1675  ctx->pc_ident);
1676  m0_net_desc_free(&nb->nb_desc);
1677  m0_mutex_unlock(&ctx->pc_mutex);
1678  ping_buf_put(ctx, nb);
1679  m0_atomic64_inc(&ctx->pc_errors);
1680  return -ETIMEDOUT;
1681  }
1682  delay = m0_time(SEND_RETRIES + 1 - retries, 0);
1683  --retries;
1685  m0_atomic64_inc(&ctx->pc_retries);
1686  set_msg_timeout(ctx, nb);
1687  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1688  M0_ASSERT(rc == 0);
1689  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND) {
1690  recv_done++;
1691  }
1692  m0_free(wi);
1693  }
1694  if (recv_done == 2)
1695  break;
1696  m0_cond_wait(&ctx->pc_cond);
1697  }
1698 
1699  m0_mutex_unlock(&ctx->pc_mutex);
1700  return rc;
1701 }
1702 
1704  struct m0_net_end_point *server_ep,
1705  const char *data)
1706 {
1707  int rc;
1708  struct m0_net_buffer *nb;
1709  struct m0_net_buf_desc nbd;
1710  struct m0_list_link *link;
1711  struct ping_work_item *wi;
1712  int send_done = 0;
1713  int retries = SEND_RETRIES;
1714 
1715  if (data == NULL)
1716  data = "passive ping";
1717  PING_OUT(ctx, 1, "%s: starting passive send sequence\n", ctx->pc_ident);
1718  /* queue our passive receive buffer */
1719  nb = ping_buf_get(ctx);
1720  M0_ASSERT(nb != NULL);
1721  /* reuse encode_msg for convenience */
1722  rc = encode_msg(nb, data);
1724  nb->nb_ep = server_ep;
1725  set_bulk_timeout(ctx, nb);
1726  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1727  M0_ASSERT(rc == 0);
1728  rc = m0_net_desc_copy(&nb->nb_desc, &nbd);
1729  M0_ASSERT(rc == 0);
1730 
1731  /* send descriptor in message to server */
1732  nb = ping_buf_get(ctx);
1733  M0_ASSERT(nb != NULL);
1734  rc = encode_desc(nb, true, ctx->pc_seg_size * ctx->pc_segments, &nbd);
1735  m0_net_desc_free(&nbd);
1737  nb->nb_ep = server_ep;
1738  M0_ASSERT(rc == 0);
1739  nb->nb_timeout = M0_TIME_NEVER;
1740  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1741  M0_ASSERT(rc == 0);
1742 
1743  /* wait for send to complete */
1744  m0_mutex_lock(&ctx->pc_mutex);
1745  while (1) {
1746  while (!m0_list_is_empty(&ctx->pc_work_queue)) {
1747  link = m0_list_first(&ctx->pc_work_queue);
1748  wi = m0_list_entry(link, struct ping_work_item,
1749  pwi_link);
1750  m0_list_del(&wi->pwi_link);
1752  send_done++;
1753  else if (wi->pwi_type == M0_NET_QT_MSG_SEND &&
1754  wi->pwi_nb != NULL) {
1755  m0_time_t delay;
1756  /* send error, retry a few times */
1757  if (retries == 0) {
1758  ctx->pc_ops->pf("%s: send failed, "
1759  "no more retries\n",
1760  ctx->pc_ident);
1761  m0_net_desc_free(&nb->nb_desc);
1762  m0_mutex_unlock(&ctx->pc_mutex);
1763  ping_buf_put(ctx, nb);
1764  m0_atomic64_inc(&ctx->pc_errors);
1765  return -ETIMEDOUT;
1766  }
1767  delay = m0_time(SEND_RETRIES + 1 - retries, 0);
1768  --retries;
1770  m0_atomic64_inc(&ctx->pc_retries);
1771  set_msg_timeout(ctx, nb);
1772  rc = m0_net_buffer_add(nb, &ctx->pc_tm);
1773  M0_ASSERT(rc == 0);
1774  } else if (wi->pwi_type == M0_NET_QT_MSG_SEND) {
1775  send_done++;
1776  }
1777  m0_free(wi);
1778  }
1779  if (send_done == 2)
1780  break;
1781  m0_cond_wait(&ctx->pc_cond);
1782  }
1783 
1784  m0_mutex_unlock(&ctx->pc_mutex);
1785  return rc;
1786 }
1787 
1789  struct m0_net_end_point **server_ep)
1790 {
1791  int rc;
1793  const char *fmt = "Client %s";
1794  char *ident;
1795 
1796  M0_ALLOC_ARR(ident, ARRAY_SIZE(addr) + strlen(fmt) + 1);
1797  if (ident == NULL)
1798  return -ENOMEM;
1799  sprintf(ident, fmt, "starting"); /* temporary */
1800  ctx->pc_ident = ident;
1801 
1802  ctx->pc_tm.ntm_callbacks = &ctm_cb;
1803  ctx->pc_buf_callbacks = &cbuf_cb;
1804  rc = ping_init(ctx);
1805  if (rc != 0) {
1806  m0_free0((char **)&ctx->pc_ident);
1807  return rc;
1808  }
1809  M0_ASSERT(ctx->pc_network != NULL);
1810  M0_ASSERT(ctx->pc_rnetwork != NULL);
1811 
1812  /* need end point for the server */
1813  snprintf(addr, ARRAY_SIZE(addr), "%s:%u:%u:%u", ctx->pc_rnetwork,
1814  ctx->pc_rpid, ctx->pc_rportal, ctx->pc_rtmid);
1815  rc = m0_net_end_point_create(server_ep, &ctx->pc_tm, addr);
1816  if (rc != 0) {
1817  ping_fini(ctx);
1818  m0_free0((char **)&ctx->pc_ident);
1819  return rc;
1820  }
1821 
1822  /* clients can have dynamically assigned TMIDs so use the EP addr
1823  in the ident.
1824  */
1825  sprintf(ident, fmt, ctx->pc_tm.ntm_ep->nep_addr);
1826  return 0;
1827 }
1828 
1830  struct m0_net_end_point *server_ep)
1831 {
1832  m0_net_end_point_put(server_ep);
1833  ping_fini(ctx);
1834  m0_free0((char **)&ctx->pc_ident);
1835 }
1836 
1838 {
1839  int i;
1840  int rc;
1841  struct m0_net_end_point *server_ep;
1842  char *bp = NULL;
1843  char *send_msg = NULL;
1844  struct nlx_ping_ctx *cctx;
1845  M0_ALLOC_PTR(cctx);
1846  if (cctx == NULL)
1847  goto free_ctx;
1848 
1849  cctx->pc_xprt = &m0_net_lnet_xprt;
1850  cctx->pc_ops = params->ops;
1851  cctx->pc_nr_bufs = params->nr_bufs;
1852  cctx->pc_bulk_size = params->bulk_size;
1853  cctx->pc_tm.ntm_state = M0_NET_TM_UNDEFINED;
1854  cctx->pc_bulk_timeout = params->bulk_timeout;
1855  cctx->pc_msg_timeout = params->msg_timeout;
1856  cctx->pc_network = params->client_network;
1857  cctx->pc_pid = params->client_pid;
1858  cctx->pc_portal = params->client_portal;
1859  cctx->pc_tmid = params->client_tmid;
1860  cctx->pc_rnetwork = params->server_network;
1861  cctx->pc_rpid = params->server_pid;
1862  cctx->pc_rportal = params->server_portal;
1863  cctx->pc_rtmid = params->server_tmid;
1864  cctx->pc_dom_debug = params->debug;
1865  cctx->pc_tm_debug = params->debug;
1866  cctx->pc_verbose = params->verbose;
1867  cctx->pc_sync_events = false;
1868 
1869  m0_mutex_init(&cctx->pc_mutex);
1870  m0_cond_init(&cctx->pc_cond, &cctx->pc_mutex);
1871  rc = nlx_ping_client_init(cctx, &server_ep);
1872  if (rc != 0)
1873  goto fail;
1874 
1875  if (params->client_id == 1)
1877 
1878  if (params->bulk_size != 0) {
1879  bp = m0_alloc(params->bulk_size);
1880  M0_ASSERT(bp != NULL);
1881  for (i = 0; i < params->bulk_size - PING_MSG_OVERHEAD; ++i)
1882  bp[i] = "abcdefghi"[i % 9];
1883  }
1884 
1885  if (params->send_msg_size > 0) {
1886  send_msg = m0_alloc(params->send_msg_size);
1887  M0_ASSERT(send_msg);
1888  for (i = 0; i < params->send_msg_size - 1; ++i)
1889  send_msg[i] = "ABCDEFGHI"[i % 9];
1890  }
1891 
1892  for (i = 1; i <= params->loops; ++i) {
1893  PING_OUT(cctx, 1, "%s: Loop %d\n", cctx->pc_ident, i);
1894  rc = nlx_ping_client_msg_send_recv(cctx, server_ep, send_msg);
1895  if (rc != 0)
1896  break;
1897  rc = nlx_ping_client_passive_recv(cctx, server_ep);
1898  if (rc != 0)
1899  break;
1900  rc = nlx_ping_client_passive_send(cctx, server_ep, bp);
1901  if (rc != 0)
1902  break;
1903  }
1904 
1905  cctx->pc_ops->pqs(cctx, false);
1906  nlx_ping_client_fini(cctx, server_ep);
1907  m0_free(bp);
1908  m0_free(send_msg);
1909 fail:
1910  m0_cond_fini(&cctx->pc_cond);
1911  m0_mutex_fini(&cctx->pc_mutex);
1912 free_ctx:
1913  m0_free(cctx);
1914 }
1915 
1917 {
1919 }
1920 
1922 {
1924 }
1925 
1926 /*
1927  * Local variables:
1928  * c-indentation-style: "K&R"
1929  * c-basic-offset: 8
1930  * tab-width: 8
1931  * fill-column: 80
1932  * scroll-step: 1
1933  * End:
1934  */
struct m0_net_buffer_callbacks sbuf_cb
Definition: ping.c:763
static int nlx_ping_client_init(struct nlx_ping_ctx *ctx, struct m0_net_end_point **server_ep)
Definition: ping.c:1788
M0_INTERNAL void m0_net_lnet_dom_set_debug(struct m0_net_domain *dom, unsigned dbg)
Definition: lnet_main.c:977
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
Definition: vec.h:165
static void m0_atomic64_inc(struct m0_atomic64 *a)
uint64_t nqs_num_f_events
Definition: net.h:784
static uint64_t num_adds[M0_NET_QT_NR]
Definition: bulk_if.c:497
Definition: ping.c:41
struct m0_net_tm_callbacks stm_cb
Definition: ping.c:774
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
uint64_t nqs_num_adds
Definition: net.h:764
void s_m_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:519
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
M0_INTERNAL void m0_list_add(struct m0_list *head, struct m0_list_link *new)
Definition: list.c:113
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
Definition: bitmap.c:86
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
void m0_net_domain_fini(struct m0_net_domain *dom)
Definition: domain.c:71
m0_time_t nqs_time_in_queue
Definition: net.h:791
M0_INTERNAL int m0_net_tm_start(struct m0_net_transfer_mc *tm, const char *addr)
Definition: tm.c:261
#define PING_OUT(ctx, num, fmt,...)
Definition: ping.h:151
static struct m0_thread server_thread
uint64_t nqs_num_s_events
Definition: net.h:774
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
Definition: bitmap.c:97
M0_INTERNAL void m0_clink_del_lock(struct m0_clink *link)
Definition: chan.c:293
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
struct m0_bufvec nb_buffer
Definition: net.h:1322
void s_p_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:677
M0_INTERNAL int m0_net_buffer_register(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:65
uint32_t nbd_len
Definition: net_otw_types.h:37
M0_INTERNAL void m0_list_init(struct m0_list *head)
Definition: list.c:29
const struct m0_net_xprt m0_net_lnet_xprt
Definition: lnet_xo.c:679
const m0_time_t M0_TIME_NEVER
Definition: time.c:108
static struct io_request req
Definition: file.c:100
static void ping_sleep_secs(int secs)
Definition: ping.c:200
Definition: ping.c:181
uint64_t nqs_num_dels
Definition: net.h:769
uint64_t m0_time_t
Definition: time.h:37
int nlx_ping_print_qstats_total(const char *ident, const struct nlx_ping_ops *ops)
Definition: ping.c:151
void s_m_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:646
M0_INTERNAL int m0_net_tm_stats_get(struct m0_net_transfer_mc *tm, enum m0_net_queue_type qtype, struct m0_net_qstats *qs, bool reset)
Definition: tm.c:343
uint8_t * nbd_data
Definition: net_otw_types.h:38
uint64_t m0_time_nanoseconds(const m0_time_t time)
Definition: time.c:89
union ping_msg::@371 pm_u
static int delay
Definition: dump.c:174
static struct m0_mutex qstats_mutex
Definition: ping.c:50
static m0_bcount_t segs[NR *IT]
Definition: vec.c:45
struct m0_bufvec data
Definition: di.c:40
m0_bcount_t nb_length
Definition: net.h:1334
M0_INTERNAL void m0_list_fini(struct m0_list *head)
Definition: list.c:36
uint64_t nb_flags
Definition: net.h:1489
static uint64_t ping_qs_total_errors
Definition: ping.c:52
M0_INTERNAL void m0_list_del(struct m0_list_link *old)
Definition: list.c:147
static struct m0_rpc_client_ctx cctx
Definition: rconfc.c:69
const struct nlx_ping_ops * pc_ops
Definition: ping.h:41
static struct m0_net_qstats ping_qs_total[M0_NET_QT_NR]
Definition: ping.c:51
static void ping_tm_wait(struct nlx_ping_ctx *ctx, struct m0_clink *cl)
Definition: ping.c:1197
static int nlx_ping_client_passive_recv(struct nlx_ping_ctx *ctx, struct m0_net_end_point *server_ep)
Definition: ping.c:1623
M0_INTERNAL void * m0_bufvec_cursor_addr(struct m0_bufvec_cursor *cur)
Definition: vec.c:597
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
int encode_msg(struct m0_net_buffer *nb, const char *str)
Definition: ping.c:130
M0_INTERNAL bool m0_net_buffer_del(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:261
uint64_t nqs_total_bytes
Definition: net.h:797
static int void * buf
Definition: dir.c:1019
m0_bcount_t nb_min_receive_size
Definition: net.h:1496
m0_bindex_t nbe_offset
Definition: net.h:1238
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
enum ping_msg_type pm_type
Definition: ping.c:185
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
void event_cb(const struct m0_net_tm_event *ev)
Definition: ping.c:458
struct m0_net_buf_desc pm_desc
Definition: ping.c:188
M0_INTERNAL void m0_cond_init(struct m0_cond *cond, struct m0_mutex *mutex)
Definition: cond.c:40
const char * pc_ident
Definition: ping.h:68
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
static int nlx_ping_client_msg_send_recv(struct nlx_ping_ctx *ctx, struct m0_net_end_point *server_ep, const char *data)
Definition: ping.c:1524
static void nlx_ping_server_sync(struct nlx_ping_ctx *ctx)
Definition: ping.c:1341
void nlx_ping_server_spawn(struct m0_thread *server_thread, struct nlx_ping_ctx *sctx)
Definition: ping.c:1496
int m0_bufvec_alloc_aligned(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size, unsigned shift)
Definition: vec.c:355
void c_a_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:451
Definition: ping.c:184
Definition: sock.c:754
static void nlx_ping_client_fini(struct nlx_ping_ctx *ctx, struct m0_net_end_point *server_ep)
Definition: ping.c:1829
static bool ping_workq_clink_cb(struct m0_clink *cl)
Definition: ping.c:1016
int decode_msg(struct m0_net_buffer *nb, struct ping_msg *msg)
Definition: ping.c:193
void c_m_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:250
int alloc_buffers(int num, uint32_t segs, m0_bcount_t segsize, struct m0_net_buffer **out)
Definition: ping.c:52
M0_INTERNAL bool m0_bufvec_cursor_move(struct m0_bufvec_cursor *cur, m0_bcount_t count)
Definition: vec.c:574
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
int i
Definition: dir.c:1033
void c_m_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:308
static void nlx_ping_server_async(struct nlx_ping_ctx *ctx)
Definition: ping.c:1326
int32_t nbe_status
Definition: net.h:1218
enum m0_net_tm_state nte_next_state
Definition: net.h:723
M0_INTERNAL int m0_net_tm_init(struct m0_net_transfer_mc *tm, struct m0_net_domain *dom)
Definition: tm.c:160
M0_INTERNAL void m0_clink_attach(struct m0_clink *link, struct m0_clink *group, m0_chan_cb_t cb)
Definition: chan.c:215
void s_p_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:670
struct m0_net_tm_callbacks ctm_cb
Definition: ping.c:494
void ping_fini(struct ping_ctx *ctx)
Definition: ping.c:863
M0_INTERNAL void m0_net_lnet_tm_set_debug(struct m0_net_transfer_mc *tm, unsigned dbg)
Definition: lnet_main.c:992
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_copy(struct m0_bufvec_cursor *dcur, struct m0_bufvec_cursor *scur, m0_bcount_t num_bytes)
Definition: vec.c:620
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_step(const struct m0_bufvec_cursor *cur)
Definition: vec.c:581
#define SEND_RESP
Definition: ping.c:36
void c_a_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:444
uint32_t nb_max_receive_msgs
Definition: net.h:1502
#define m0_free0(pptr)
Definition: memory.h:77
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL void m0_net_tm_fini(struct m0_net_transfer_mc *tm)
Definition: tm.c:204
struct m0_net_transfer_mc pc_tm
Definition: ping.h:64
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL int m0_net_lnet_ifaces_get(struct m0_net_domain *dom, char ***addrs)
Definition: lnet_main.c:955
M0_INTERNAL void m0_cond_fini(struct m0_cond *cond)
Definition: cond.c:46
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
static bool ping_net_clink_cb(struct m0_clink *cl)
Definition: ping.c:1024
struct m0_net_buffer * pwi_nb
Definition: ping.c:42
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
M0_INTERNAL void m0_cond_signal(struct m0_cond *cond)
Definition: cond.c:94
struct m0_atomic64 pc_errors
Definition: ping.h:86
char * fmt(const char *format,...) __attribute__((format(printf
struct m0_list_link pwi_link
Definition: ping.c:43
void * m0_alloc(size_t size)
Definition: memory.c:126
static struct m0_rpc_server_ctx sctx
Definition: console.c:88
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static int nlx_ping_client_passive_send(struct nlx_ping_ctx *ctx, struct m0_net_end_point *server_ep, const char *data)
Definition: ping.c:1703
void nlx_ping_init()
Definition: ping.c:1916
Definition: xcode.h:73
M0_INTERNAL void m0_net_lnet_ifaces_put(struct m0_net_domain *dom, char ***addrs)
Definition: lnet_main.c:966
void nlx_ping_fini()
Definition: ping.c:1921
bool server_stop
Definition: ping.c:481
M0_INTERNAL void m0_net_desc_free(struct m0_net_buf_desc *desc)
Definition: net.c:87
M0_INTERNAL void m0_bitmap_set(struct m0_bitmap *map, size_t idx, bool val)
Definition: bitmap.c:139
static void set_bulk_timeout(struct nlx_ping_ctx *ctx, struct m0_net_buffer *nb)
Definition: ping.c:1277
#define DEF_RESPONSE
Definition: ping.c:34
struct m0_clink pc_net_clink
Definition: ping.h:82
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
Definition: ep.c:88
void c_p_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:414
m0_net_buffer_cb_proc_t nbc_cb[M0_NET_QT_NR]
Definition: net.h:1272
struct m0_atomic64 pc_retries
Definition: ping.h:87
M0_INTERNAL int m0_net_desc_copy(const struct m0_net_buf_desc *from_desc, struct m0_net_buf_desc *to_desc)
Definition: net.c:74
M0_INTERNAL bool m0_list_is_empty(const struct m0_list *head)
Definition: list.c:42
uint64_t m0_time_seconds(const m0_time_t time)
Definition: time.c:83
enum m0_net_queue_type pwi_type
Definition: ping.c:41
static struct m0_clink l[NR]
Definition: chan.c:37
struct m0_net_buffer_callbacks cbuf_cb
Definition: ping.c:483
M0_INTERNAL int m0_net_tm_stop(struct m0_net_transfer_mc *tm, bool abort)
Definition: tm.c:293
M0_INTERNAL void m0_bufvec_free_aligned(struct m0_bufvec *bufvec, unsigned shift)
Definition: vec.c:436
#define DEF_SEND
Definition: ping.c:35
M0_INTERNAL void m0_net_buffer_event_deliver_all(struct m0_net_transfer_mc *tm)
Definition: tm.c:397
static void server_event_ident(char *buf, size_t len, const char *ident, const struct m0_net_buffer_event *ev)
Definition: ping.c:697
#define PING_ERR(fmt,...)
Definition: ping.h:80
static struct nlx_ping_ctx * buffer_event_to_ping_ctx(const struct m0_net_buffer_event *ev)
Definition: ping.c:427
static uint32_t timeout
Definition: console.c:52
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
struct m0_net_buffer * ping_buf_get(struct ping_ctx *ctx)
Definition: ping.c:86
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
M0_INTERNAL void m0_net_buffer_event_notify(struct m0_net_transfer_mc *tm, struct m0_chan *chan)
Definition: tm.c:423
M0_INTERNAL void m0_cond_wait(struct m0_cond *cond)
Definition: cond.c:52
static uint8_t fail[DATA_UNIT_COUNT_MAX+PARITY_UNIT_COUNT_MAX]
static long long min(long long a, long long b)
Definition: crate.c:191
M0_INTERNAL void m0_net_buffer_deregister(struct m0_net_buffer *buf, struct m0_net_domain *dom)
Definition: buf.c:107
int m0_net_domain_init(struct m0_net_domain *dom, const struct m0_net_xprt *xprt)
Definition: domain.c:36
void nlx_ping_server_should_stop(struct nlx_ping_ctx *ctx)
Definition: ping.c:1485
M0_INTERNAL bool m0_net_buffer_event_pending(struct m0_net_transfer_mc *tm)
Definition: tm.c:409
static uint64_t ping_qs_total_retries
Definition: ping.c:53
M0_INTERNAL int m0_net_buffer_event_deliver_synchronously(struct m0_net_transfer_mc *tm)
Definition: tm.c:377
m0_net_queue_type
Definition: net.h:591
static uint64_t num_dels[M0_NET_QT_NR]
Definition: bulk_if.c:498
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL bool m0_chan_timedwait(struct m0_clink *link, const m0_time_t abs_timeout)
Definition: chan.c:349
static void nlx_ping_server_work(struct nlx_ping_ctx *ctx)
Definition: ping.c:1289
static char * bulk_size
void nlx_ping_client(struct nlx_ping_client_params *params)
Definition: ping.c:1837
M0_INTERNAL int m0_net_buffer_add(struct m0_net_buffer *buf, struct m0_net_transfer_mc *tm)
Definition: buf.c:247
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
void s_a_send_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:741
static uint32_t max32u(uint32_t a, uint32_t b)
Definition: arith.h:61
uint64_t nlx_ping_parse_uint64(const char *s)
Definition: ping.c:171
Definition: common.h:34
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
Definition: bitmap.c:105
struct m0_clink pc_wq_clink
Definition: ping.h:79
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static m0_bcount_t total_bytes[M0_NET_QT_NR]
Definition: bulk_if.c:499
M0_INTERNAL bool m0_cond_timedwait(struct m0_cond *cond, const m0_time_t abs_timeout)
Definition: cond.c:74
static struct m0_atomic64 s_msg_recv_counter
Definition: ping.c:715
int ping_init(struct ping_ctx *ctx)
Definition: ping.c:790
char * pm_str
Definition: ping.c:187
unsigned pm_passive_size
Definition: ping.c:342
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
static void set_msg_timeout(struct nlx_ping_ctx *ctx, struct m0_net_buffer *nb)
Definition: ping.c:1265
ping_msg_type
Definition: ping.c:176
static uint32_t buf_size
Definition: ad.c:75
enum m0_net_tm_ev_type nte_type
Definition: net.h:691
static struct bulkio_params * bp
Definition: bulkio_ut.c:44
Definition: nucleus.c:42
static void ping_print_qstats(struct nlx_ping_ctx *ctx, struct m0_net_qstats *qp, bool accumulate)
Definition: ping.c:55
M0_INTERNAL void m0_list_link_init(struct m0_list_link *link)
Definition: list.c:169
#define out(...)
Definition: gen.c:41
static struct m0_list_link * m0_list_first(const struct m0_list *head)
Definition: list.h:191
static bool ping_tm_timedwait(struct nlx_ping_ctx *ctx, struct m0_clink *cl, m0_time_t timeout)
Definition: ping.c:1175
struct m0_fom_ops ops
Definition: io_foms.c:623
int encode_desc(struct m0_net_buffer *nb, bool send_desc, const struct m0_net_buf_desc *desc)
Definition: ping.c:151
static void ping_print_interfaces(struct nlx_ping_ctx *ctx)
Definition: ping.c:417
int num
Definition: bulk_if.c:54
void msg_free(struct ping_msg *msg)
Definition: ping.c:233
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
static struct m0_addb2_source * s
Definition: consumer.c:39
int32_t rc
Definition: trigger_fop.h:47
M0_INTERNAL int m0_net_end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const char *addr)
Definition: ep.c:56
#define m0_list_entry(link, type, member)
Definition: list.h:217
#define ARRAY_SIZE(a)
Definition: misc.h:45
void c_p_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:349
void ping_buf_put(struct ping_ctx *ctx, struct m0_net_buffer *nb)
Definition: ping.c:111
void s_a_recv_cb(const struct m0_net_buffer_event *ev)
Definition: ping.c:684
static void nlx_ping_server(struct nlx_ping_ctx *ctx)
Definition: ping.c:1382
uint64_t nqs_max_bytes
Definition: net.h:804
int32_t nte_status
Definition: net.h:715
#define PING_QSTATS_CLIENT_TOTAL(f)
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
static uint64_t max64u(uint64_t a, uint64_t b)
Definition: arith.h:71
void nlx_ping_print_qstats_tm(struct nlx_ping_ctx *ctx, bool reset)
Definition: ping.c:136
Definition: vec.h:145
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
struct m0_net_end_point * nb_ep
Definition: net.h:1424
struct m0_net_transfer_mc * nte_tm
Definition: net.h:696
#define M0_IMPOSSIBLE(fmt,...)
int m0_nanosleep(const m0_time_t req, m0_time_t *rem)
Definition: ktime.c:73