Motr  M0
node_ping.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "lib/memory.h" /* M0_ALLOC_PTR */
24 #include "lib/misc.h" /* M0_SET0 */
25 #include "lib/time.h" /* m0_time_t */
26 #include "lib/errno.h" /* ENOMEM */
27 #include "lib/thread.h" /* M0_THREAD_INIT */
28 #include "lib/tlist.h" /* m0_tlist */
29 
30 #include "motr/magic.h" /* M0_NET_TEST_BS_LINK_MAGIC */
31 
32 #include "net/test/network.h" /* m0_net_test_network_ctx */
33 #include "net/test/node.h" /* m0_net_test_node_ctx */
34 #include "net/test/node_helper.h" /* m0_net_test_node_ctx */
35 
36 #include "net/test/node_ping.h"
37 
38 #define NET_TEST_MODULE_NAME node_ping
39 #include "net/test/debug.h" /* LOGD */
40 
41 
77 enum {
80 };
81 
83 struct buf_state {
85  uint64_t bs_link_magic;
92  int bs_errno;
103  uint64_t bs_seq;
110  uint64_t bs_cb_nr;
119 };
120 
121 M0_TL_DESCR_DEFINE(buf_state, "buf_state", static, struct buf_state, bs_link,
122  bs_link_magic, M0_NET_TEST_BS_LINK_MAGIC,
124 M0_TL_DEFINE(buf_state, static, struct buf_state);
125 
141  size_t npcc_msg_rt;
150  struct m0_tl npcc_to;
151 };
152 
156 };
157 
170  size_t npc_buf_nr;
201  union {
204  };
214 };
215 
217 static void sd_update(struct node_ping_ctx *ctx,
219  enum m0_net_test_nh_msg_status status,
220  enum m0_net_test_nh_msg_direction direction)
221 {
222  m0_net_test_nh_sd_update(&ctx->npc_nh, type, status, direction);
223 }
224 
225 static void node_ping_tm_event_cb(const struct m0_net_tm_event *ev)
226 {
227  /* nothing for now */
228 }
229 
230 static const struct m0_net_tm_callbacks node_ping_tm_cb = {
232 };
233 
234 static struct node_ping_ctx *
236 {
237  return container_of(net_ctx, struct node_ping_ctx, npc_net);
238 }
239 
241  uint32_t buf_index,
242  uint64_t seq)
243 {
244  struct m0_net_test_timestamp ts;
245  struct m0_net_buffer *buf;
246 
247  buf = m0_net_test_network_buf(net_ctx, M0_NET_TEST_BUF_PING, buf_index);
249  /* buffer size should be enough to hold timestamp */
251  &buf->nb_buffer, 0);
252  return ts.ntt_time;
253 }
254 
256  uint32_t buf_index,
257  struct m0_net_test_timestamp *ts)
258 {
259  struct m0_net_buffer *buf;
260  m0_bcount_t len;
261 
262  M0_PRE(net_ctx != NULL);
263  M0_PRE(ts != NULL);
264 
265  buf = m0_net_test_network_buf(net_ctx, M0_NET_TEST_BUF_PING, buf_index);
267  &buf->nb_buffer, 0);
268  return len != 0;
269 }
270 
271 static void node_ping_to_add(struct node_ping_ctx *ctx,
272  size_t buf_index)
273 {
274  LOGD("buf_index = %lu", buf_index);
275  buf_state_tlist_add_tail(&ctx->npc_client->npcc_to,
276  &ctx->npc_buf_state[buf_index]);
277 }
278 
279 static void node_ping_to_del(struct node_ping_ctx *ctx,
280  size_t buf_index)
281 {
282  LOGD("buf_index = %lu", buf_index);
283  buf_state_tlist_del(&ctx->npc_buf_state[buf_index]);
284 }
285 
286 static ssize_t node_ping_to_peek(struct node_ping_ctx *ctx)
287 {
288  struct buf_state *bs;
289  ssize_t buf_index;
290 
291  bs = buf_state_tlist_head(&ctx->npc_client->npcc_to);
292  buf_index = bs == NULL ? -1 : bs - ctx->npc_buf_state;
293 
294  M0_POST(buf_index == -1 ||
295  (buf_index >= 0 && buf_index < ctx->npc_buf_nr));
296  return buf_index;
297 }
298 
300  size_t server_index,
301  uint64_t seq)
302 {
303  size_t i;
304  size_t buf_index;
305  size_t concurrency = ctx->npc_client->npcc_concurrency;
306 
307  for (i = 0; i < concurrency; ++i) {
308  buf_index = concurrency * server_index + i;
309  if (ctx->npc_buf_state[buf_index].bs_seq == seq)
310  return buf_index;
311  }
312  return -1;
313 }
314 
316  size_t buf_index,
317  enum m0_net_queue_type q,
318  struct m0_net_end_point *ep,
319  size_t ep_index)
320 {
321  struct m0_net_test_network_ctx *nctx = &ctx->npc_net;
322  struct buf_state *bs = &ctx->npc_buf_state[buf_index];
323  bool decreased;
324 
326 
327  decreased = m0_semaphore_trydown(&ctx->npc_buf_q_sem);
328  if (!decreased) {
329  /* worker thread is stopping */
330  M0_ASSERT(ctx->npc_buf_rb_done);
331  bs->bs_errno = -EWOULDBLOCK;
332  return;
333  }
334  LOGD("q = %d, buf_index = %lu, ep_index = %lu, %s",
335  q, buf_index, ep_index, ctx->npc_net.ntc_tm->ntm_ep->nep_addr);
336  if (q == M0_NET_QT_MSG_SEND) {
337  LOGD(" => %s", (ep == NULL ?
338  m0_net_test_network_ep(&ctx->npc_net, ep_index) :
339  ep)->nep_addr);
340  } else {
341  LOGD(" <= ");
342  }
343  bs->bs_errno = (bs->bs_q = q) == M0_NET_QT_MSG_RECV ?
344  m0_net_test_network_msg_recv(nctx, buf_index) : ep == NULL ?
345  m0_net_test_network_msg_send(nctx, buf_index, ep_index) :
346  m0_net_test_network_msg_send_ep(nctx, buf_index, ep);
347  if (bs->bs_errno != 0) {
348  m0_net_test_ringbuf_push(&ctx->npc_buf_rb, buf_index);
349  m0_semaphore_up(&ctx->npc_buf_q_sem);
350  m0_semaphore_up(&ctx->npc_buf_rb_sem);
351  }
352 }
353 
355  size_t buf_index)
356 {
358 }
359 
361  size_t buf_index)
362 {
363  struct node_ping_client_ctx *cctx;
364  struct buf_state *bs;
365  size_t ep_index;
366  m0_time_t begin;
367  bool transfer_next;
368 
369  M0_PRE(ctx != NULL && ctx->npc_client != NULL);
370  M0_PRE(buf_index < ctx->npc_buf_nr / 2);
371 
372  bs = &ctx->npc_buf_state[buf_index];
373  cctx = ctx->npc_client;
374  ep_index = buf_index / cctx->npcc_concurrency;
375  /* check for max number of messages */
376  transfer_next = m0_net_test_nh_transfer_next(&ctx->npc_nh);
377  if (!transfer_next)
378  return;
379  /* put timestamp and sequence number */
380  bs->bs_seq = ctx->npc_nh.ntnh_transfers_started_nr;
381  bs->bs_cb_nr = 0;
382  begin = node_ping_timestamp_put(&ctx->npc_net, buf_index, bs->bs_seq);
383  bs->bs_deadline = m0_time_add(begin, ctx->npc_buf_send_timeout);
384  /* add message to send queue */
386  NULL, ep_index);
387  if (bs->bs_errno != 0)
389 }
390 
392  size_t buf_index)
393 {
394  struct buf_state *bs = &ctx->npc_buf_state[buf_index];
395 
396  M0_PRE(bs->bs_cb_nr == 0 || bs->bs_cb_nr == 1);
397 
398  ++bs->bs_cb_nr;
399  if (bs->bs_cb_nr != 2) {
400  node_ping_to_add(ctx, buf_index);
401  } else {
402  node_ping_to_del(ctx, buf_index);
403  /* enqueue recv and send buffers */
405  node_ping_client_send(ctx, buf_index);
406  }
407 }
408 
410  struct buf_state *bs,
411  size_t buf_index)
412 {
413  struct m0_net_test_timestamp ts;
414  struct buf_state *bs_send = NULL;
415  ssize_t server_index;
416  ssize_t buf_index_send;
417  bool decoded;
418  m0_time_t rtt;
419 
420  M0_PRE(ctx != NULL && ctx->npc_client != NULL);
421  M0_PRE(buf_index >= ctx->npc_buf_nr / 2 &&
422  buf_index < ctx->npc_buf_nr);
423  M0_PRE(bs != NULL);
424 
425  /* check buffer length and offset */
426  if (bs->bs_ev.nbe_length != ctx->npc_buf_size ||
427  bs->bs_ev.nbe_offset != 0)
428  goto bad_buf;
429  /* search for test server index */
430  server_index = m0_net_test_network_ep_search(&ctx->npc_net,
431  bs->bs_ev.nbe_ep->nep_addr);
432  if (server_index == -1)
433  goto bad_buf;
434  /* decode buffer */
435  decoded = node_ping_timestamp_get(&ctx->npc_net, buf_index, &ts);
436  if (!decoded)
437  goto bad_buf;
438  /* check time in received buffer */
439  if (bs->bs_time < ts.ntt_time)
440  goto bad_buf;
441  /* search sequence number */
442  buf_index_send = node_ping_client_search_seq(ctx, server_index,
443  ts.ntt_seq);
444  if (buf_index_send == -1)
445  goto bad_buf;
446  bs_send = &ctx->npc_buf_state[buf_index_send];
447  bs_send->bs_index_pair = buf_index;
448  bs->bs_index_pair = buf_index_send;
449  /* successfully received message */
451  /* update RTT statistics */
452  rtt = m0_time_sub(bs->bs_time, ts.ntt_time);
453  m0_net_test_nh_sd_update_rtt(&ctx->npc_nh, rtt);
454  goto good_buf;
455 bad_buf:
457 good_buf:
458  /* enqueue recv buffer */
459  if (bs_send == NULL) {
460  node_ping_buf_enqueue_recv(ctx, buf_index);
461  }
462  return bs_send != NULL;
463 }
464 
465 static void node_ping_msg_cb(struct m0_net_test_network_ctx *net_ctx,
466  uint32_t buf_index,
467  enum m0_net_queue_type q,
468  const struct m0_net_buffer_event *ev)
469 {
470  struct node_ping_ctx *ctx;
471  struct buf_state *bs;
472  m0_time_t now = m0_time_now();
473 
475 
476  ctx = node_ping_ctx_from_net_ctx(net_ctx);
477  bs = &ctx->npc_buf_state[buf_index];
478 
479  LOGD("role = %d, buf_index = %u, nbe_status = %d, q = %d",
480  ctx->npc_nh.ntnh_role, buf_index, ev->nbe_status, q);
481  LOGD(", ev->nbe_length = %lu", (long unsigned) ev->nbe_length);
482 
483  if (q == M0_NET_QT_MSG_RECV && ev->nbe_status == 0)
484  LOGD(", ev->nbe_ep->nep_addr = %s", ev->nbe_ep->nep_addr);
485 
486  if (ev->nbe_status == -ECANCELED) {
487  m0_semaphore_up(&ctx->npc_buf_q_sem);
488  return;
489  }
490  /* save buffer event */
491  bs->bs_ev = *ev;
492  /* save endpoint from successfully received buffer */
493  if (ev->nbe_status == 0 &&
496  bs->bs_time = now;
497  bs->bs_q = ev->nbe_buffer->nb_qtype;
498 
499  m0_net_test_ringbuf_push(&ctx->npc_buf_rb, buf_index);
500  m0_semaphore_up(&ctx->npc_buf_q_sem);
501  m0_semaphore_up(&ctx->npc_buf_rb_sem);
502 }
503 
505  const uint32_t buf_index,
506  enum m0_net_queue_type q,
507  const struct m0_net_buffer_event *ev)
508 {
509  M0_IMPOSSIBLE("Impossible network bulk callback: "
510  "net-test ping node can't have it.");
511 }
512 
514  .ntnbc_cb = {
521  }
522 };
523 
525 {
526  struct buf_state *bs;
527  m0_time_t now = m0_time_now();
528  ssize_t buf_index;
529 
530  while ((buf_index = node_ping_to_peek(ctx)) != -1) {
531  bs = &ctx->npc_buf_state[buf_index];
532  if (bs->bs_deadline > now)
533  break;
534  /* message timed out */
535  node_ping_to_del(ctx, buf_index);
536  ++ctx->npc_client->npcc_msg_rt;
537  node_ping_client_send(ctx, buf_index);
538  }
539 }
540 
542  struct buf_state *bs,
543  size_t buf_index)
544 {
545  bool good_buf;
546 
547  M0_PRE(ctx != NULL);
548  M0_PRE(bs != NULL);
549 
550  if (bs->bs_q == M0_NET_QT_MSG_SEND) {
551  if (bs->bs_errno != 0 || bs->bs_ev.nbe_status != 0) {
552  /* try to send again */
553  node_ping_client_send(ctx, buf_index);
554  } else {
555  node_ping_client_cb2(ctx, buf_index);
556  }
557  } else {
558  if (bs->bs_errno != 0 || bs->bs_ev.nbe_status != 0) {
559  /* try to receive again */
560  node_ping_buf_enqueue_recv(ctx, buf_index);
561  } else {
562  /* buffer was successfully received from test server */
563  good_buf = node_ping_client_recv_cb(ctx, bs, buf_index);
564  if (good_buf)
566  }
567  }
568 }
569 
571  struct buf_state *bs,
572  size_t buf_index)
573 {
574  if (bs->bs_q == M0_NET_QT_MSG_RECV && bs->bs_errno == 0 &&
575  bs->bs_ev.nbe_status == 0) {
576  /* send back to test client */
578  bs->bs_ev.nbe_ep, 0);
579  } else {
580  /* add to recv queue */
581  node_ping_buf_enqueue_recv(ctx, buf_index);
582  }
583 }
584 
585 static void node_ping_worker(struct node_ping_ctx *ctx)
586 {
587  struct buf_state *bs;
588  size_t buf_index;
589  bool failed;
590  size_t i;
591  m0_time_t to_check_interval;
592  m0_time_t deadline;
593  struct m0_net_end_point *ep;
594  bool rb_is_empty;
595  ssize_t to_index;
596 
597  M0_PRE(ctx != NULL);
598 
599  to_check_interval = M0_MKTIME(TO_CHECK_INTERVAL / 1000,
600  TO_CHECK_INTERVAL * 1000000);
601  while (1) {
602  /* get buffer index from ringbuf */
603  deadline = m0_time_add(m0_time_now(), to_check_interval);
604  rb_is_empty = !m0_semaphore_timeddown(&ctx->npc_buf_rb_sem,
605  deadline);
606  /* check timeout list */
607  if (ctx->npc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT)
609  if (rb_is_empty)
610  continue;
611  if (ctx->npc_buf_rb_done)
612  break;
613  buf_index = m0_net_test_ringbuf_pop(&ctx->npc_buf_rb);
614  bs = &ctx->npc_buf_state[buf_index];
615  LOGD("POP from ringbuf: %lu, role = %d",
616  buf_index, ctx->npc_nh.ntnh_role);
617  /* update total/failed stats */
618  failed = bs->bs_errno != 0 || bs->bs_ev.nbe_status != 0;
619  sd_update(ctx, MT_MSG, failed ? MS_FAILED : MS_SUCCESS,
621  ep = bs->bs_errno == 0 && bs->bs_ev.nbe_status == 0 &&
622  bs->bs_q == M0_NET_QT_MSG_RECV ? bs->bs_ev.nbe_ep : NULL;
623  /* handle buffer */
624  if (ctx->npc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT)
625  node_ping_client_handle(ctx, bs, buf_index);
626  else
627  node_ping_server_handle(ctx, bs, buf_index);
628  if (ep != NULL)
630  /* update copy of statistics */
632  }
633  /* dequeue all buffers */
634  for (i = 0; i < ctx->npc_buf_nr; ++i) {
637  }
638  /* wait for buffer callbacks */
639  for (i = 0; i < ctx->npc_buf_nr; ++i)
640  m0_semaphore_down(&ctx->npc_buf_q_sem);
641  /* clear timeouts list for the test client */
642  if (ctx->npc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
643  while ((to_index = node_ping_to_peek(ctx)) != -1)
644  node_ping_to_del(ctx, to_index);
645  }
646  /*
647  * Clear ringbuf, put() every saved endpoint.
648  * Use !m0_net_test_ringbuf_is_empty(&ctx->npc_buf_rb) instead of
649  * m0_semaphore_trydown(&ctx->npc_buf_rb_sem) because
650  * ctx->npc_buf_rb_sem may not be up()'ed in buffer callback.
651  */
652  while (!m0_net_test_ringbuf_is_empty(&ctx->npc_buf_rb)) {
653  buf_index = m0_net_test_ringbuf_pop(&ctx->npc_buf_rb);
654  bs = &ctx->npc_buf_state[buf_index];
655  if (bs->bs_q == M0_NET_QT_MSG_RECV &&
656  bs->bs_errno == 0 && bs->bs_ev.nbe_status == 0)
658  }
659 }
660 
661 static void node_ping_rb_fill(struct node_ping_ctx *ctx)
662 {
663  size_t i;
664  size_t half_buf = ctx->npc_buf_nr / 2;
665 
666  if (ctx->npc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
667  M0_ASSERT(ctx->npc_buf_nr % 2 == 0);
668  /* add recv buffers */
669  for (i = 0; i < half_buf; ++i)
670  node_ping_buf_enqueue_recv(ctx, half_buf + i);
671  /* add send buffers */
672  for (i = 0; i < half_buf; ++i)
674  } else {
675  for (i = 0; i < ctx->npc_buf_nr; ++i)
677  }
678 }
679 
681  const struct m0_net_test_cmd *cmd)
682 {
683  struct m0_net_test_network_cfg net_cfg;
684  int rc;
685  int i;
686 
687  if (cmd == NULL) {
688  rc = 0;
689  if (ctx->npc_nh.ntnh_test_initialized)
690  goto fini;
691  else
692  goto exit;
693  }
694 
695  rc = m0_semaphore_init(&ctx->npc_buf_q_sem, ctx->npc_buf_nr);
696  if (rc != 0)
697  goto exit;
698  rc = m0_semaphore_init(&ctx->npc_buf_rb_sem, 0);
699  if (rc != 0)
700  goto free_buf_q_sem;
701  rc = m0_net_test_ringbuf_init(&ctx->npc_buf_rb, ctx->npc_buf_nr);
702  if (rc != 0)
703  goto free_buf_rb_sem;
704  M0_ALLOC_ARR(ctx->npc_buf_state, ctx->npc_buf_nr);
705  if (ctx->npc_buf_state == NULL)
706  goto free_buf_rb;
707 
708  /* initialize network context */
709  M0_SET0(&net_cfg);
710  net_cfg.ntncfg_tm_cb = node_ping_tm_cb;
712  net_cfg.ntncfg_buf_size_ping = ctx->npc_buf_size;
713  net_cfg.ntncfg_buf_ping_nr = ctx->npc_buf_nr;
714  net_cfg.ntncfg_ep_max = cmd->ntc_init.ntci_ep.ntsl_nr;
717  ctx->npc_buf_send_timeout;
718  rc = m0_net_test_network_ctx_init(&ctx->npc_net, &net_cfg,
719  cmd->ntc_init.ntci_tm_ep);
720  if (rc != 0)
721  goto free_buf_state;
722  /* add test node endpoints to the network context endpoint list */
724  &cmd->ntc_init.ntci_ep);
725  if (rc != 0)
726  goto fini;
727  if (ctx->npc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
728  buf_state_tlist_init(&ctx->npc_client->npcc_to);
729  for (i = 0; i < ctx->npc_buf_nr; ++i)
730  buf_state_tlink_init(&ctx->npc_buf_state[i]);
731  }
732  rc = 0;
733  goto exit;
734 fini:
735  if (ctx->npc_nh.ntnh_role == M0_NET_TEST_ROLE_CLIENT) {
736  for (i = 0; i < ctx->npc_buf_nr; ++i)
737  buf_state_tlink_fini(&ctx->npc_buf_state[i]);
738  buf_state_tlist_fini(&ctx->npc_client->npcc_to);
739  }
741 free_buf_state:
742  m0_free(ctx->npc_buf_state);
743 free_buf_rb:
744  m0_net_test_ringbuf_fini(&ctx->npc_buf_rb);
745 free_buf_rb_sem:
746  m0_semaphore_fini(&ctx->npc_buf_rb_sem);
747 free_buf_q_sem:
748  m0_semaphore_fini(&ctx->npc_buf_q_sem);
749 exit:
750  return rc;
751 }
752 
753 static void *node_ping_initfini(void *ctx_, struct m0_net_test_service *svc)
754 {
755  struct node_ping_ctx *ctx;
756  int rc;
757 
758  M0_PRE(equi(ctx_ == NULL, svc != NULL));
759 
760  if (svc != NULL) {
761  M0_ALLOC_PTR(ctx);
762  if (ctx != NULL) {
763  ctx->npc_svc = svc;
764  ctx->npc_nh.ntnh_test_initialized = false;
765  }
766  } else {
767  ctx = ctx_;
769  M0_ASSERT(rc == 0);
770  m0_free(ctx);
771  }
772  return svc != NULL ? ctx : NULL;
773 }
774 
776 {
777  return node_ping_initfini(NULL, svc);
778 }
779 
780 static void node_ping_fini(void *ctx_)
781 {
782  void *rc = node_ping_initfini(ctx_, NULL);
783  M0_POST(rc == NULL);
784 }
785 
786 static int node_ping_step(void *ctx_)
787 {
788  return 0;
789 }
790 
791 static int node_ping_cmd_init(void *ctx_,
792  const struct m0_net_test_cmd *cmd,
793  struct m0_net_test_cmd *reply)
794 {
795  const struct m0_net_test_cmd_init *icmd;
796  struct node_ping_ctx *ctx = ctx_;
797  int rc;
798  bool role_client;
799 
800  M0_PRE(ctx != NULL);
801  M0_PRE(cmd != NULL);
802  M0_PRE(reply != NULL);
803 
804  icmd = &cmd->ntc_init;
805  role_client = icmd->ntci_role == M0_NET_TEST_ROLE_CLIENT;
806 
807  /* ep wasn't recognized */
808  if (cmd->ntc_ep_index == -1) {
809  rc = -ENOENT;
810  goto reply;
811  }
812  /* network context already initialized */
813  if (ctx->npc_nh.ntnh_test_initialized) {
814  rc = -EALREADY;
815  goto reply;
816  }
817  /* check command type */
819  /* parse INIT command */
820  m0_net_test_nh_init(&ctx->npc_nh, icmd);
821  ctx->npc_buf_size = icmd->ntci_msg_size;
822  ctx->npc_buf_send_timeout = icmd->ntci_buf_send_timeout;
823 
824  ctx->npc_buf_nr = icmd->ntci_msg_concurrency;
825  ctx->npc_buf_nr *= role_client ? 2 * icmd->ntci_ep.ntsl_nr : 1;
826 
827  ctx->npc_client = role_client ? &ctx->npc__client : NULL;
828  ctx->npc_server = !role_client ? &ctx->npc__server : NULL;
829 
830  if (role_client) {
831  M0_SET0(ctx->npc_client);
832  ctx->npc_client->npcc_msg_rt_max = icmd->ntci_msg_nr;
833  ctx->npc_client->npcc_concurrency = icmd->ntci_msg_concurrency;
834  }
835 
836  /* do sanity check */
837  rc = 0;
838  if (ctx->npc_buf_size < 1 || ctx->npc_buf_nr < 1 ||
839  equi(ctx->npc_client == NULL, ctx->npc_server == NULL))
840  rc = -EINVAL;
841  /* init node_ping_ctx fields */
842  if (rc == 0)
844  if (rc != 0) {
845  /* change service state */
848  }
849 reply:
850  /* fill reply */
851  reply->ntc_type = M0_NET_TEST_CMD_INIT_DONE;
852  reply->ntc_done.ntcd_errno = rc;
853  return rc;
854 }
855 
856 static int node_ping_cmd_start(void *ctx_,
857  const struct m0_net_test_cmd *cmd,
858  struct m0_net_test_cmd *reply)
859 {
860  struct m0_net_test_cmd_status_data *sd;
861  struct node_ping_ctx *ctx = ctx_;
862  int rc;
863  const m0_time_t _1s = M0_MKTIME(1, 0);
864 
865  M0_PRE(ctx != NULL);
866  M0_PRE(cmd != NULL);
867  M0_PRE(reply != NULL);
868 
869  sd = &ctx->npc_nh.ntnh_sd;
870  M0_SET0(sd);
871 
872  /* fill test start time */
874  /* initialize stats */
877  /* add buffer indexes to ringbuf */
879  /* start test */
880  ctx->npc_buf_rb_done = false;
881  rc = M0_THREAD_INIT(&ctx->npc_thread, struct node_ping_ctx *, NULL,
882  &node_ping_worker, ctx, "net-test ping");
883  if (rc != 0) {
884  /* change service state */
887  }
888  /* fill reply */
889  reply->ntc_type = M0_NET_TEST_CMD_START_DONE;
890  reply->ntc_done.ntcd_errno = rc;
891  return rc;
892 }
893 
894 static int node_ping_cmd_stop(void *ctx_,
895  const struct m0_net_test_cmd *cmd,
896  struct m0_net_test_cmd *reply)
897 {
898  struct node_ping_ctx *ctx = ctx_;
899  int rc;
900 
901  M0_PRE(ctx != NULL);
902  M0_PRE(cmd != NULL);
903  M0_PRE(reply != NULL);
904 
905  /* stop worker thread */
906  ctx->npc_buf_rb_done = true;
907  m0_semaphore_up(&ctx->npc_buf_rb_sem);
908  rc = m0_thread_join(&ctx->npc_thread);
909  M0_ASSERT(rc == 0);
910  m0_thread_fini(&ctx->npc_thread);
911  /* change service state */
914  /* fill reply */
915  reply->ntc_type = M0_NET_TEST_CMD_STOP_DONE;
916  reply->ntc_done.ntcd_errno = 0;
917  return 0;
918 }
919 
920 static int node_ping_cmd_status(void *ctx_,
921  const struct m0_net_test_cmd *cmd,
922  struct m0_net_test_cmd *reply)
923 {
924  struct node_ping_ctx *ctx = ctx_;
925 
926  M0_PRE(ctx != NULL);
927 
928  m0_net_test_nh_cmd_status(&ctx->npc_nh, cmd, reply);
929 
930  return 0;
931 }
932 
934  {
936  .ntsch_handler = node_ping_cmd_init,
937  },
938  {
939  .ntsch_type = M0_NET_TEST_CMD_START,
940  .ntsch_handler = node_ping_cmd_start,
941  },
942  {
943  .ntsch_type = M0_NET_TEST_CMD_STOP,
944  .ntsch_handler = node_ping_cmd_stop,
945  },
946  {
947  .ntsch_type = M0_NET_TEST_CMD_STATUS,
948  .ntsch_handler = node_ping_cmd_status,
949  },
950 };
951 
954  .ntso_fini = node_ping_fini,
955  .ntso_step = node_ping_step,
956  .ntso_cmd_handler = node_ping_cmd_handler,
957  .ntso_cmd_handler_nr = ARRAY_SIZE(node_ping_cmd_handler),
958 };
959 
960 #undef NET_TEST_MODULE_NAME
961 
966 /*
967  * Local variables:
968  * c-indentation-style: "K&R"
969  * c-basic-offset: 8
970  * tab-width: 8
971  * fill-column: 79
972  * scroll-step: 1
973  * End:
974  */
static int node_ping_cmd_stop(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_ping.c:894
static int node_ping_cmd_start(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_ping.c:856
void m0_net_test_network_buffer_dequeue(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, int32_t buf_index)
Definition: network.c:500
enum m0_net_test_cmd_type ntsch_type
Definition: service.h:52
struct m0_net_test_network_timeouts m0_net_test_network_timeouts_never(void)
Definition: network.c:803
uint64_t bs_link_magic
Definition: node_ping.c:85
size_t bs_index_pair
Definition: node_ping.c:116
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
void m0_net_test_timestamp_init(struct m0_net_test_timestamp *t, uint64_t seq)
Definition: stats.c:185
m0_net_test_nh_msg_type
Definition: node_helper.h:48
struct m0_net_test_slist ntci_ep
Definition: commands.h:157
uint64_t bs_seq
Definition: node_ping.c:103
m0_time_t ntt_time
Definition: stats.h:209
uint32_t ntncfg_buf_ping_nr
Definition: network.h:86
static void node_ping_to_del(struct node_ping_ctx *ctx, size_t buf_index)
Definition: node_ping.c:279
static struct m0_semaphore q
Definition: rwlock.c:55
#define NULL
Definition: misc.h:38
int m0_net_test_network_msg_recv(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index)
Definition: network.c:469
int m0_net_test_network_ctx_init(struct m0_net_test_network_ctx *ctx, struct m0_net_test_network_cfg *cfg, const char *tm_addr)
Definition: network.c:367
uint64_t ntt_seq
Definition: stats.h:211
struct m0_net_test_mps ntcsd_mps_recv
Definition: commands.h:186
struct m0_net_test_service * npc_svc
Definition: node_ping.c:165
static void node_ping_to_add(struct node_ping_ctx *ctx, size_t buf_index)
Definition: node_ping.c:271
size_t npc_buf_nr
Definition: node_ping.c:170
static bool node_ping_client_recv_cb(struct node_ping_ctx *ctx, struct buf_state *bs, size_t buf_index)
Definition: node_ping.c:409
static void node_ping_client_cb2(struct node_ping_ctx *ctx, size_t buf_index)
Definition: node_ping.c:391
#define ergo(a, b)
Definition: misc.h:293
M0_INTERNAL bool m0_semaphore_trydown(struct m0_semaphore *semaphore)
Definition: semaphore.c:60
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
void m0_net_test_service_state_change(struct m0_net_test_service *svc, enum m0_net_test_service_state state)
Definition: service.c:147
static void * node_ping_init(struct m0_net_test_service *svc)
Definition: node_ping.c:775
static void sd_update(struct node_ping_ctx *ctx, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
Definition: node_ping.c:217
struct m0_net_end_point * m0_net_test_network_ep(struct m0_net_test_network_ctx *ctx, size_t ep_index)
Definition: network.c:783
uint64_t m0_time_t
Definition: time.h:37
M0_INTERNAL bool m0_semaphore_timeddown(struct m0_semaphore *semaphore, const m0_time_t abs_timeout)
Definition: semaphore.c:75
void m0_net_test_nh_sd_copy_locked(struct m0_net_test_nh *nh)
Definition: node_helper.c:63
m0_net_test_network_buffer_cb_proc_t ntnbc_cb[M0_NET_QT_NR]
Definition: network.h:66
int m0_net_test_ringbuf_init(struct m0_net_test_ringbuf *rb, size_t size)
Definition: ringbuf.c:36
struct m0_net_test_ringbuf npc_buf_rb
Definition: node_ping.c:192
static ssize_t node_ping_to_peek(struct node_ping_ctx *ctx)
Definition: node_ping.c:286
void m0_net_test_nh_sd_update_rtt(struct m0_net_test_nh *nh, m0_time_t rtt)
Definition: node_helper.c:129
struct node_ping_server_ctx * npc_server
Definition: node_ping.c:213
static const struct m0_net_tm_callbacks node_ping_tm_cb
Definition: node_ping.c:230
static struct m0_rpc_client_ctx cctx
Definition: rconfc.c:69
int m0_net_test_network_msg_send_ep(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index, struct m0_net_end_point *ep)
Definition: network.c:442
static struct node_ping_ctx * node_ping_ctx_from_net_ctx(struct m0_net_test_network_ctx *net_ctx)
Definition: node_ping.c:235
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
struct m0_semaphore npc_buf_rb_sem
Definition: node_ping.c:194
M0_TL_DEFINE(buf_state, static, struct buf_state)
static int void * buf
Definition: dir.c:1019
struct m0_thread npc_thread
Definition: node_ping.c:200
static int node_ping_test_init_fini(struct node_ping_ctx *ctx, const struct m0_net_test_cmd *cmd)
Definition: node_ping.c:680
const char * nep_addr
Definition: net.h:503
m0_bindex_t nbe_offset
Definition: net.h:1238
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
m0_time_t ntnt_timeout[M0_NET_QT_NR]
Definition: network.h:71
struct m0_semaphore npc_buf_q_sem
Definition: node_ping.c:190
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
Definition: sock.c:887
static int node_ping_step(void *ctx_)
Definition: node_ping.c:786
m0_bcount_t ntci_msg_size
Definition: commands.h:118
uint64_t bs_cb_nr
Definition: node_ping.c:110
m0_net_test_nh_msg_status
Definition: node_helper.h:42
m0_bcount_t m0_net_test_timestamp_serialize(enum m0_net_test_serialize_op op, struct m0_net_test_timestamp *t, struct m0_bufvec *bv, m0_bcount_t bv_offset)
Definition: stats.c:200
#define equi(a, b)
Definition: misc.h:297
Definition: sock.c:754
int bs_errno
Definition: node_ping.c:92
struct m0_net_test_nh npc_nh
Definition: node_ping.c:161
m0_net_test_nh_msg_direction
Definition: node_helper.h:55
static m0_time_t node_ping_timestamp_put(struct m0_net_test_network_ctx *net_ctx, uint32_t buf_index, uint64_t seq)
Definition: node_ping.c:240
m0_bcount_t npc_buf_size
Definition: node_ping.c:172
static int node_ping_cmd_init(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_ping.c:791
int i
Definition: dir.c:1033
size_t m0_net_test_ringbuf_pop(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:88
struct m0_net_buffer_event bs_ev
Definition: node_ping.c:99
int32_t nbe_status
Definition: net.h:1218
#define LOGD(...)
Definition: tx_regmap.c:37
m0_time_t ntci_buf_send_timeout
Definition: commands.h:151
struct node_ping_server_ctx npc__server
Definition: node_ping.c:203
bool m0_net_test_nh_transfer_next(struct m0_net_test_nh *nh)
Definition: node_helper.c:136
M0_TL_DESCR_DEFINE(buf_state, "buf_state", static, struct buf_state, bs_link, bs_link_magic, M0_NET_TEST_BS_LINK_MAGIC, M0_NET_TEST_BS_HEAD_MAGIC)
bool npc_test_initialized
Definition: node_ping.c:176
static struct m0_net_test_service_cmd_handler node_ping_cmd_handler[]
Definition: node_ping.c:933
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
void m0_net_test_ringbuf_fini(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:52
struct m0_net_test_cmd_init ntc_init
Definition: commands.h:208
enum m0_net_test_role ntci_role
Definition: commands.h:104
struct m0_net_test_network_buffer_callbacks ntncfg_buf_cb
Definition: network.h:82
#define M0_ASSERT(cond)
static void node_ping_buf_enqueue_recv(struct node_ping_ctx *ctx, size_t buf_index)
Definition: node_ping.c:354
static void node_ping_to_check(struct node_ping_ctx *ctx)
Definition: node_ping.c:524
void(* ntc_event_cb)(const struct m0_net_tm_event *ev)
Definition: net.h:752
m0_time_t m0_time_now(void)
Definition: time.c:134
bool npc_buf_rb_done
Definition: node_ping.c:196
Definition: tlist.h:251
struct m0_net_test_mps ntcsd_mps_send
Definition: commands.h:184
static void node_ping_tm_event_cb(const struct m0_net_tm_event *ev)
Definition: node_ping.c:225
void m0_net_test_nh_cmd_status(struct m0_net_test_nh *nh, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_helper.c:146
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
void m0_net_test_ringbuf_push(struct m0_net_test_ringbuf *rb, size_t value)
Definition: ringbuf.c:77
static void node_ping_rb_fill(struct node_ping_ctx *ctx)
Definition: node_ping.c:661
static void node_ping_cb_impossible(struct m0_net_test_network_ctx *ctx, const uint32_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_ping.c:504
void m0_net_test_network_ctx_fini(struct m0_net_test_network_ctx *ctx)
Definition: network.c:374
static void node_ping_buf_enqueue(struct node_ping_ctx *ctx, size_t buf_index, enum m0_net_queue_type q, struct m0_net_end_point *ep, size_t ep_index)
Definition: node_ping.c:315
size_t ntsl_nr
Definition: slist.h:49
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
void m0_net_test_nh_sd_update(struct m0_net_test_nh *nh, enum m0_net_test_nh_msg_type type, enum m0_net_test_nh_msg_status status, enum m0_net_test_nh_msg_direction direction)
Definition: node_helper.c:83
#define M0_POST(cond)
m0_time_t npc_buf_send_timeout
Definition: node_ping.c:174
static int node_ping_cmd_status(void *ctx_, const struct m0_net_test_cmd *cmd, struct m0_net_test_cmd *reply)
Definition: node_ping.c:920
M0_INTERNAL void m0_net_end_point_get(struct m0_net_end_point *ep)
Definition: ep.c:88
uint64_t ntci_msg_nr
Definition: commands.h:112
uint64_t ntci_msg_concurrency
Definition: commands.h:144
int m0_net_test_network_msg_send(struct m0_net_test_network_ctx *ctx, uint32_t buf_ping_index, uint32_t ep_index)
Definition: network.c:457
static void node_ping_msg_cb(struct m0_net_test_network_ctx *net_ctx, uint32_t buf_index, enum m0_net_queue_type q, const struct m0_net_buffer_event *ev)
Definition: node_ping.c:465
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
static struct fdmi_ctx ctx
Definition: main.c:80
static struct m0_net_test_network_buffer_callbacks node_ping_buf_cb
Definition: node_ping.c:513
enum m0_net_test_type ntci_type
Definition: commands.h:106
ssize_t ntc_ep_index
Definition: commands.h:218
static bool node_ping_timestamp_get(struct m0_net_test_network_ctx *net_ctx, uint32_t buf_index, struct m0_net_test_timestamp *ts)
Definition: node_ping.c:255
void m0_net_test_mps_init(struct m0_net_test_mps *mps, unsigned long messages, m0_time_t timestamp, m0_time_t interval)
Definition: stats.c:227
struct m0_tl npcc_to
Definition: node_ping.c:150
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
struct m0_net_buffer * m0_net_test_network_buf(struct m0_net_test_network_ctx *ctx, enum m0_net_test_network_buf_type buf_type, uint32_t buf_index)
Definition: network.c:725
static void node_ping_client_send(struct node_ping_ctx *ctx, size_t buf_index)
Definition: node_ping.c:360
static ssize_t node_ping_client_search_seq(struct node_ping_ctx *ctx, size_t server_index, uint64_t seq)
Definition: node_ping.c:299
static void node_ping_client_handle(struct node_ping_ctx *ctx, struct buf_state *bs, size_t buf_index)
Definition: node_ping.c:541
char * ep
Definition: sw.h:132
uint32_t ntncfg_ep_max
Definition: network.h:92
m0_net_queue_type
Definition: net.h:591
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
m0_bcount_t ntncfg_buf_size_ping
Definition: network.h:84
struct buf_state * npc_buf_state
Definition: node_ping.c:198
void m0_net_test_nh_init(struct m0_net_test_nh *nh, const struct m0_net_test_cmd_init *icmd)
Definition: node_helper.c:34
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
static struct m0_net_test_service svc
Definition: service.c:34
struct m0_net_test_service_ops m0_net_test_node_ping_ops
Definition: node_ping.c:952
enum m0_net_queue_type bs_q
Definition: node_ping.c:87
struct m0_net_test_network_ctx npc_net
Definition: node_ping.c:163
struct m0_net_test_network_timeouts ntncfg_timeouts
Definition: network.h:97
bool m0_net_test_ringbuf_is_empty(struct m0_net_test_ringbuf *rb)
Definition: ringbuf.c:99
m0_time_t bs_deadline
Definition: node_ping.c:105
struct node_ping_client_ctx * npc_client
Definition: node_ping.c:209
int fini(struct workload *w)
#define M0_MKTIME(secs, ns)
Definition: time.h:86
int m0_net_test_network_ep_add_slist(struct m0_net_test_network_ctx *ctx, const struct m0_net_test_slist *eps)
Definition: network.c:400
ssize_t m0_net_test_network_ep_search(struct m0_net_test_network_ctx *ctx, const char *ep_addr)
Definition: network.c:791
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
Definition: nucleus.c:42
void *(* ntso_init)(struct m0_net_test_service *svc)
Definition: service.h:76
int type
Definition: dir.c:1031
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
static void node_ping_fini(void *ctx_)
Definition: node_ping.c:780
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
static void node_ping_server_handle(struct node_ping_ctx *ctx, struct buf_state *bs, size_t buf_index)
Definition: node_ping.c:570
static void node_ping_worker(struct node_ping_ctx *ctx)
Definition: node_ping.c:585
void m0_free(void *data)
Definition: memory.c:146
static void * node_ping_initfini(void *ctx_, struct m0_net_test_service *svc)
Definition: node_ping.c:753
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
struct m0_net_tm_callbacks ntncfg_tm_cb
Definition: network.h:80
struct m0_tlink bs_link
Definition: node_ping.c:118
uint64_t seq
Definition: common.c:97
m0_time_t bs_time
Definition: node_ping.c:101
struct node_ping_client_ctx npc__client
Definition: node_ping.c:202
#define M0_IMPOSSIBLE(fmt,...)