Motr  M0
link.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_UT
31 #include "lib/trace.h"
32 
33 #include "ha/link.h"
34 #include "ut/ut.h"
35 
36 #include "lib/time.h" /* m0_time_now */
37 #include "lib/arith.h" /* m0_rnd64 */
38 #include "lib/memory.h" /* M0_ALLOC_PTR */
39 #include "lib/string.h" /* m0_strdup */
40 #include "fid/fid.h" /* M0_FID0 */
41 #include "reqh/reqh_service.h" /* m0_reqh_service */
42 #include "ut/threads.h" /* M0_UT_THREADS_DEFINE */
43 #include "ha/ut/helper.h" /* m0_ha_ut_rpc_ctx */
44 #include "ha/link_service.h" /* m0_ha_link_service_init */
45 
51 };
52 
53 static void ha_ut_link_conn_cfg_create(struct m0_ha_link_conn_cfg *hl_conn_cfg,
54  struct m0_uint128 *id_local,
55  struct m0_uint128 *id_remote,
56  struct m0_uint128 *id_connection,
57  bool tag_even,
58  const char *ep)
59 {
60  *hl_conn_cfg = (struct m0_ha_link_conn_cfg){
61  .hlcc_params = {
62  .hlp_id_local = *id_local,
63  .hlp_id_remote = *id_remote,
64  .hlp_id_connection = *id_connection,
65  },
66  .hlcc_rpc_service_fid = M0_FID0,
67  .hlcc_rpc_endpoint = m0_strdup(ep),
68  .hlcc_max_rpcs_in_flight = M0_HA_UT_MAX_RPCS_IN_FLIGHT,
69  .hlcc_connect_timeout = M0_MKTIME(5, 0),
70  .hlcc_disconnect_timeout = M0_MKTIME(5, 0),
71  .hlcc_resend_interval = M0_MKTIME(0, 15000000),
72  .hlcc_nr_sent_max = 1,
73  };
75  tag_even);
77  !tag_even);
78  M0_UT_ASSERT(hl_conn_cfg->hlcc_rpc_endpoint != NULL);
79 }
80 
81 static void ha_ut_link_cfg_create(struct m0_ha_link_cfg *hl_cfg,
82  struct m0_ha_ut_rpc_ctx *rpc_ctx,
83  struct m0_reqh_service *hl_service)
84 {
85  *hl_cfg = (struct m0_ha_link_cfg){
86  .hlc_reqh = &rpc_ctx->hurc_reqh,
87  .hlc_reqh_service = hl_service,
88  .hlc_rpc_machine = &rpc_ctx->hurc_rpc_machine,
89  .hlq_q_cfg_in = {},
90  .hlq_q_cfg_out = {},
91  };
92 }
93 
94 static void ha_ut_link_conn_cfg_free(struct m0_ha_link_conn_cfg *hl_conn_cfg)
95 {
96  m0_free((char *)hl_conn_cfg->hlcc_rpc_endpoint);
97 }
98 
99 static void ha_ut_link_init(struct ha_ut_link_ctx *link_ctx,
100  struct m0_ha_ut_rpc_ctx *rpc_ctx,
101  struct m0_reqh_service *hl_service,
102  struct m0_uint128 *id_local,
103  struct m0_uint128 *id_remote,
104  struct m0_uint128 *id_connection,
105  bool tag_even,
106  bool start)
107 {
108  int rc;
109 
110  m0_clink_init(&link_ctx->ulc_stop_clink, NULL);
111  link_ctx->ulc_stop_clink.cl_is_oneshot = true;
112  ha_ut_link_cfg_create(&link_ctx->ulc_cfg, rpc_ctx, hl_service);
113  rc = m0_ha_link_init(&link_ctx->ulc_link, &link_ctx->ulc_cfg);
114  M0_UT_ASSERT(rc == 0);
115  ha_ut_link_conn_cfg_create(&link_ctx->ulc_conn_cfg, id_local, id_remote,
116  id_connection, tag_even,
117  m0_rpc_machine_ep(&rpc_ctx->hurc_rpc_machine));
118  if (start)
119  m0_ha_link_start(&link_ctx->ulc_link, &link_ctx->ulc_conn_cfg);
120 }
121 
122 static void ha_ut_link_fini(struct ha_ut_link_ctx *link_ctx)
123 {
124  m0_ha_link_stop(&link_ctx->ulc_link, &link_ctx->ulc_stop_clink);
125  m0_chan_wait(&link_ctx->ulc_stop_clink);
126  m0_ha_link_fini(&link_ctx->ulc_link);
128  m0_clink_fini(&link_ctx->ulc_stop_clink);
129 }
130 
131 static void ha_ut_link_set_some_msg(struct m0_ha_msg *msg)
132 {
133  *msg = (struct m0_ha_msg){
134  .hm_fid = M0_FID_INIT(1, 2),
135  .hm_source_process = M0_FID_INIT(3, 4),
136  .hm_source_service = M0_FID_INIT(5, 6),
137  .hm_time = m0_time_now(),
138  .hm_data = {
139  .hed_type = M0_HA_MSG_STOB_IOQ,
140  .u.hed_stob_ioq = {
141  .sie_conf_sdev = M0_FID_INIT(7, 8),
142  .sie_size = 0x100,
143  },
144  },
145  };
146 }
147 
149 {
150  struct m0_ha_ut_rpc_ctx *rpc_ctx;
151  struct m0_reqh_service *hl_service;
152  struct ha_ut_link_ctx *ctx1;
153  struct ha_ut_link_ctx *ctx2;
154  struct m0_ha_link *hl1;
155  struct m0_ha_link *hl2;
156  struct m0_uint128 id1 = M0_UINT128(0, 0);
157  struct m0_uint128 id2 = M0_UINT128(0, 1);
158  struct m0_uint128 id = M0_UINT128(1, 2);
159  struct m0_ha_msg *msg;
160  struct m0_ha_msg *msg_recv;
161  uint64_t tag;
162  uint64_t tag1;
163  uint64_t tag2;
164  int rc;
165 
169  rc = m0_ha_link_service_init(&hl_service, &rpc_ctx->hurc_reqh);
170  M0_UT_ASSERT(rc == 0);
171  M0_ALLOC_PTR(ctx1);
172  M0_UT_ASSERT(ctx1 != NULL);
173  ha_ut_link_init(ctx1, rpc_ctx, hl_service, &id1, &id2, &id, true, true);
174  M0_ALLOC_PTR(ctx2);
175  M0_UT_ASSERT(ctx2 != NULL);
176  ha_ut_link_init(ctx2, rpc_ctx, hl_service, &id2, &id1, &id, false,
177  true);
178  hl1 = &ctx1->ulc_link;
179  hl2 = &ctx2->ulc_link;
180 
181  /* One way transmission. Message is sent from hl1 to hl2. */
182  M0_ALLOC_PTR(msg);
184  msg_recv = m0_ha_link_recv(hl2, &tag1);
185  M0_UT_ASSERT(msg_recv == NULL);
186  m0_ha_link_send(hl1, msg, &tag);
188  msg_recv = m0_ha_link_recv(hl2, &tag1);
189  M0_UT_ASSERT(msg_recv != NULL);
190  M0_UT_ASSERT(tag == tag1);
191  M0_UT_ASSERT(m0_ha_msg_tag(msg_recv) == tag1);
192  M0_UT_ASSERT(m0_ha_msg_eq(msg_recv, msg));
193  tag2 = m0_ha_link_delivered_consume(hl1);
195  m0_ha_link_delivered(hl2, msg_recv);
197  tag2 = m0_ha_link_delivered_consume(hl1);
198  M0_UT_ASSERT(tag1 == tag2);
201  m0_free(msg);
202  m0_ha_link_flush(hl1);
203  m0_ha_link_flush(hl2);
204 
205  ha_ut_link_fini(ctx2);
206  m0_free(ctx2);
207  ha_ut_link_fini(ctx1);
208  m0_free(ctx1);
209  m0_ha_link_service_fini(hl_service);
211  m0_free(rpc_ctx);
212 };
213 
214 enum {
217 };
218 
231  uint64_t *ulmt_tags_out;
232  uint64_t *ulmt_tags_in;
233 };
234 
235 static void ha_ut_link_mt_thread(void *param)
236 {
237  struct ha_ut_link_mt_test *test = param;
238  struct m0_ha_link *hl;
239  struct m0_ha_msg **msgs;
240  struct m0_ha_msg *msg;
241  uint64_t tag;
242  int i;
243  int j;
244 
245  ha_ut_link_init(&test->ulmt_link_ctx, test->ulmt_ctx,
246  test->ulmt_hl_service, &test->ulmt_id_local,
247  &test->ulmt_id_remote, &test->ulmt_id_connection,
248  test->ulmt_tag_even, true);
249  /* barrier with the main thread */
250  m0_semaphore_up(&test->ulmt_barrier_wait);
251  m0_semaphore_down(&test->ulmt_barrier_done);
252 
253  hl = &test->ulmt_link_ctx.ulc_link;
254  for (i = 0; i < HA_UT_MSG_PER_THREAD; ++i) {
255  m0_ha_link_send(hl, &test->ulmt_msgs_out[i],
256  &test->ulmt_tags_out[i]);
257  }
258  i = 0;
260  M0_UT_ASSERT(msgs != NULL);
261  while (i < HA_UT_MSG_PER_THREAD) {
263  j = i;
264  while (1) {
266  msg = m0_ha_link_recv(hl, &test->ulmt_tags_in[i]);
267  if (msg == NULL)
268  break;
270  msgs[i] = msg;
271  test->ulmt_msgs_in[i] = *msg;
272  ++i;
273  }
274  M0_ASSERT(j < i);
275  for ( ; j < i; ++j)
276  m0_ha_link_delivered(hl, msgs[j]);
277  }
278  m0_free(msgs);
279  for (i = 0; i < HA_UT_MSG_PER_THREAD; ++i) {
280  m0_ha_link_wait_delivery(hl, test->ulmt_tags_out[i]);
282  M0_UT_ASSERT(tag == test->ulmt_tags_out[i]);
283  }
288  m0_ha_link_flush(hl);
289 
290  /* barrier with the main thread */
291  m0_semaphore_up(&test->ulmt_barrier_wait);
292  m0_semaphore_down(&test->ulmt_barrier_done);
293 
294  ha_ut_link_fini(&test->ulmt_link_ctx);
295 }
296 
298 
300 {
301  struct m0_ha_ut_rpc_ctx *rpc_ctx;
302  struct m0_reqh_service *hl_service;
303  struct ha_ut_link_mt_test *tests;
304  struct ha_ut_link_mt_test *test1;
305  struct ha_ut_link_mt_test *test2;
306  struct m0_uint128 id1;
307  struct m0_uint128 id2;
308  struct m0_uint128 id;
309  uint64_t seed = 42;
310  int rc;
311  int i;
312  int j;
313 
317  rc = m0_ha_link_service_init(&hl_service, &rpc_ctx->hurc_reqh);
318  M0_UT_ASSERT(rc == 0);
320  M0_UT_ASSERT(tests != NULL);
321 
322  for (i = 0; i < HA_UT_THREAD_PAIR_NR; ++i) {
323  id1 = M0_UINT128(i * 2, m0_rnd64(&seed));
324  id2 = M0_UINT128(i * 2 + 1, m0_rnd64(&seed));
325  id = M0_UINT128(i, m0_rnd64(&seed));
326  tests[i * 2] = (struct ha_ut_link_mt_test){
327  .ulmt_id_local = id1,
328  .ulmt_id_remote = id2,
329  .ulmt_id_connection = id,
330  .ulmt_tag_even = true,
331  };
332  tests[i * 2 + 1] = (struct ha_ut_link_mt_test){
333  .ulmt_id_local = id2,
334  .ulmt_id_remote = id1,
335  .ulmt_id_connection = id,
336  .ulmt_tag_even = false,
337  };
338  }
339  for (i = 0; i < HA_UT_THREAD_PAIR_NR * 2; ++i) {
340  tests[i].ulmt_ctx = rpc_ctx;
341  tests[i].ulmt_hl_service = hl_service;
342  rc = m0_semaphore_init(&tests[i].ulmt_barrier_done, 0);
343  M0_UT_ASSERT(rc == 0);
344  rc = m0_semaphore_init(&tests[i].ulmt_barrier_wait, 0);
345  M0_UT_ASSERT(rc == 0);
346  M0_ALLOC_ARR(tests[i].ulmt_msgs_out, HA_UT_MSG_PER_THREAD);
347  M0_UT_ASSERT(tests[i].ulmt_msgs_out != NULL);
348  M0_ALLOC_ARR(tests[i].ulmt_msgs_in, HA_UT_MSG_PER_THREAD);
349  M0_UT_ASSERT(tests[i].ulmt_msgs_in != NULL);
350  for (j = 0; j < HA_UT_MSG_PER_THREAD; ++j) {
351  tests[i].ulmt_msgs_out[j] = (struct m0_ha_msg){
352  .hm_fid = M0_FID_INIT(m0_rnd64(&seed),
353  m0_rnd64(&seed)),
354  .hm_source_process = M0_FID_INIT(
355  m0_rnd64(&seed), m0_rnd64(&seed)),
356  .hm_source_service = M0_FID_INIT(
357  m0_rnd64(&seed), m0_rnd64(&seed)),
358  .hm_time = m0_time_now(),
359  .hm_data = {
360  .hed_type = M0_HA_MSG_STOB_IOQ,
361  }
362  };
363  }
364  M0_ALLOC_ARR(tests[i].ulmt_tags_out, HA_UT_MSG_PER_THREAD);
365  M0_UT_ASSERT(tests[i].ulmt_tags_out != NULL);
366  M0_ALLOC_ARR(tests[i].ulmt_tags_in, HA_UT_MSG_PER_THREAD);
367  M0_UT_ASSERT(tests[i].ulmt_tags_in != NULL);
368  }
369  M0_UT_THREADS_START(ha_ut_link_mt, HA_UT_THREAD_PAIR_NR * 2, tests);
370  /* Barriers with all threads. One after init, another one before fini */
371  for (j = 0; j < 2; ++j) {
372  for (i = 0; i < HA_UT_THREAD_PAIR_NR * 2; ++i)
373  m0_semaphore_down(&tests[i].ulmt_barrier_wait);
374  for (i = 0; i < HA_UT_THREAD_PAIR_NR * 2; ++i)
375  m0_semaphore_up(&tests[i].ulmt_barrier_done);
376  }
377  M0_UT_THREADS_STOP(ha_ut_link_mt);
378 
379  for (i = 0; i < HA_UT_THREAD_PAIR_NR; ++i) {
380  test1 = &tests[i * 2];
381  test2 = &tests[i * 2 + 1];
382  for (j = 0; j < HA_UT_MSG_PER_THREAD; ++j) {
383  M0_UT_ASSERT(test1->ulmt_tags_in[j] ==
384  test2->ulmt_tags_out[j]);
385  M0_UT_ASSERT(test1->ulmt_tags_out[j] ==
386  test2->ulmt_tags_in[j]);
387  M0_UT_ASSERT(m0_ha_msg_eq(&test1->ulmt_msgs_in[j],
388  &test2->ulmt_msgs_out[j]));
389  M0_UT_ASSERT(m0_ha_msg_eq(&test1->ulmt_msgs_out[j],
390  &test2->ulmt_msgs_in[j]));
391  }
392  }
393  for (i = 0; i < HA_UT_THREAD_PAIR_NR * 2; ++i) {
394  m0_free(tests[i].ulmt_tags_in);
395  m0_free(tests[i].ulmt_tags_out);
396  m0_free(tests[i].ulmt_msgs_in);
397  m0_free(tests[i].ulmt_msgs_out);
398  m0_semaphore_fini(&tests[i].ulmt_barrier_wait);
399  m0_semaphore_fini(&tests[i].ulmt_barrier_done);
400  }
401  m0_free(tests);
402  m0_ha_link_service_fini(hl_service);
404  m0_free(rpc_ctx);
405 }
406 
408  struct m0_reqh_service **hl_service,
409  int nr_links,
410  struct ha_ut_link_ctx ***ctx,
411  struct m0_ha_link ***hl,
412  struct m0_uint128 *id1,
413  struct m0_uint128 *id2,
414  struct m0_uint128 *id,
415  int *tag_even,
416  bool start)
417 {
418  int i;
419  int rc;
420 
424  rc = m0_ha_link_service_init(hl_service, &(*rpc_ctx)->hurc_reqh);
425  M0_UT_ASSERT(rc == 0);
426  M0_ALLOC_ARR(*ctx, nr_links);
427  M0_UT_ASSERT(*ctx != NULL);
428  M0_ALLOC_ARR(*hl, nr_links);
429  M0_UT_ASSERT(*hl != NULL);
430  for (i = 0; i < nr_links; ++i) {
431  M0_ALLOC_PTR((*ctx)[i]);
432  M0_UT_ASSERT((*ctx)[i] != NULL);
433  ha_ut_link_init((*ctx)[i], *rpc_ctx, *hl_service,
434  &id1[i], &id2[i], &id[i], tag_even[i] != 0,
435  start);
436  (*hl)[i] = &(*ctx)[i]->ulc_link;
437  }
438 }
439 
441  struct m0_reqh_service *hl_service,
442  int nr_links,
443  struct ha_ut_link_ctx **ctx,
444  struct m0_ha_link **hl)
445 {
446  int i;
447 
448  for (i = 0; i < nr_links; ++i) {
450  m0_free(ctx[i]);
451  }
452  m0_free(hl);
453  m0_free(ctx);
454  m0_ha_link_service_fini(hl_service);
456  m0_free(rpc_ctx);
457 }
458 
459 static void ha_ut_link_msg_transfer(struct m0_ha_link *hl1,
460  struct m0_ha_link *hl2)
461 {
462  struct m0_ha_msg *msg;
463  struct m0_ha_msg *msg_recv;
464  uint64_t tag;
465  uint64_t tag_recv;
466 
467  M0_ALLOC_PTR(msg);
468  M0_UT_ASSERT(msg != NULL);
469 
470  m0_ha_link_send(hl1, msg, &tag);
472  msg_recv = m0_ha_link_recv(hl2, &tag_recv);
473  M0_UT_ASSERT(m0_ha_msg_eq(msg, msg_recv));
474  M0_UT_ASSERT(tag == tag_recv);
475  m0_ha_link_delivered(hl2, msg_recv);
476  m0_ha_link_flush(hl1);
478  M0_UT_ASSERT(tag_recv == tag);
479 
480  m0_free(msg);
481 }
482 
484 {
485  struct m0_ha_link_conn_cfg *hl_conn_cfg;
486  struct m0_ha_ut_rpc_ctx *rpc_ctx;
487  struct m0_reqh_service *hl_service;
488  struct m0_ha_link_params lp0;
489  struct m0_ha_link_params lp0_new;
490  struct m0_ha_link_params lp2;
491  struct m0_ha_link_cfg *hl_cfg;
492  struct m0_ha_link *hl;
493  struct m0_uint128 id1 = M0_UINT128(1, 2);
494  struct m0_uint128 id2 = M0_UINT128(3, 4);
495  struct m0_uint128 id3 = M0_UINT128(5, 6);
496  struct m0_uint128 id4 = M0_UINT128(7, 8);
497  struct m0_uint128 id_connection1 = M0_UINT128(9, 10);
498  struct m0_uint128 id_connection2 = M0_UINT128(11, 12);
499  struct m0_clink stop_clink = {};
500  const char *ep;
501  int i;
502  int rc;
503 
507  ep = m0_rpc_machine_ep(&rpc_ctx->hurc_rpc_machine);
508  rc = m0_ha_link_service_init(&hl_service, &rpc_ctx->hurc_reqh);
509  M0_UT_ASSERT(rc == 0);
510  M0_ALLOC_ARR(hl, 3);
511  M0_UT_ASSERT(hl != NULL);
512  M0_ALLOC_ARR(hl_cfg, 3);
513  M0_UT_ASSERT(hl_cfg != NULL);
514  M0_ALLOC_ARR(hl_conn_cfg, 3);
515  M0_UT_ASSERT(hl_conn_cfg != NULL);
516  m0_clink_init(&stop_clink, NULL);
517  stop_clink.cl_is_oneshot = true;
518 
519  for (i = 0; i < 3; ++i) {
520  ha_ut_link_cfg_create(&hl_cfg[i], rpc_ctx, hl_service);
521  rc = m0_ha_link_init(&hl[i], &hl_cfg[i]);
522  M0_UT_ASSERT(rc == 0);
523  }
524 
525  /* start hl[0] and hl[1] connected to each other */
526  ha_ut_link_conn_cfg_create(&hl_conn_cfg[0], &id1, &id2, &id_connection1,
527  true, ep);
528  ha_ut_link_conn_cfg_create(&hl_conn_cfg[1], &id2, &id1, &id_connection1,
529  false, ep);
530  for (i = 0; i < 2; ++i) {
531  m0_ha_link_start(&hl[i], &hl_conn_cfg[i]);
532  ha_ut_link_conn_cfg_free(&hl_conn_cfg[i]);
533  }
534 
535  /* send a message from hl[0] to hl[1] */
536  ha_ut_link_msg_transfer(&hl[0], &hl[1]);
537 
538  /* stop & fini hl[1] */
539  m0_ha_link_stop(&hl[1], &stop_clink);
540  m0_chan_wait(&stop_clink);
541  m0_ha_link_fini(&hl[1]);
542 
543  /* reconnect hl[0] to hl[2] */
544  m0_ha_link_reconnect_begin(&hl[0], &lp0);
545  ha_ut_link_conn_cfg_create(&hl_conn_cfg[0], &id3, &id4, &id_connection2,
546  true, ep);
547  ha_ut_link_conn_cfg_create(&hl_conn_cfg[2], &id4, &id3, &id_connection2,
548  true, ep);
549  m0_ha_link_reconnect_params(&lp0, &lp0_new, &lp2, &id3, &id4,
550  &id_connection2);
551  hl_conn_cfg[0].hlcc_params = lp0_new;
552  hl_conn_cfg[2].hlcc_params = lp2;
553  m0_ha_link_start(&hl[2], &hl_conn_cfg[2]);
554  m0_ha_link_reconnect_end(&hl[0], &hl_conn_cfg[0]);
555  ha_ut_link_conn_cfg_free(&hl_conn_cfg[0]);
556  ha_ut_link_conn_cfg_free(&hl_conn_cfg[2]);
557 
558  /* send a message from hl[0] to hl[2] */
559  ha_ut_link_msg_transfer(&hl[0], &hl[2]);
560 
561  /* stop & fini hl[0] and hl[2] */
562  m0_ha_link_stop(&hl[0], &stop_clink);
563  m0_chan_wait(&stop_clink);
564  m0_ha_link_fini(&hl[0]);
565  m0_ha_link_stop(&hl[2], &stop_clink);
566  m0_chan_wait(&stop_clink);
567  m0_ha_link_fini(&hl[2]);
568 
569  m0_free(hl_conn_cfg);
570  m0_free(hl_cfg);
571  m0_free(hl);
572  m0_ha_link_service_fini(hl_service);
574  m0_free(rpc_ctx);
575 }
576 
577 enum {
579 };
580 
582 {
583  struct m0_ha_link_conn_cfg hl_conn_cfg[2];
584  struct m0_ha_link_params lp0;
585  struct m0_ha_ut_rpc_ctx *rpc_ctx;
586  struct m0_reqh_service *hl_service;
587  struct ha_ut_link_ctx **ctx;
588  struct m0_ha_link **hl;
589  struct m0_uint128 *id1;
590  struct m0_uint128 *id2;
591  struct m0_uint128 *id;
592  struct m0_ha_msg *msg;
593  struct m0_ha_msg *msg_recv;
594  const char *ep;
595  int *tag_even;
596  uint64_t seed = 42;
597  uint64_t tag;
598  uint64_t tag_recv;
599  uint64_t tag2;
600  int i;
601 
603  M0_UT_ASSERT(id1 != NULL);
605  M0_UT_ASSERT(id2 != NULL);
607  M0_UT_ASSERT(id != NULL);
609  M0_UT_ASSERT(tag_even != NULL);
610  M0_ALLOC_PTR(msg);
611  M0_UT_ASSERT(msg != NULL);
612  for (i = 0; i < HA_UT_LINK_RECONNECT_MULTIPLE_NR_LINKS; ++i) {
613  id1[i] = M0_UINT128(m0_rnd64(&seed), m0_rnd64(&seed));
614  id2[i] = M0_UINT128(m0_rnd64(&seed), m0_rnd64(&seed));
615  id[i] = M0_UINT128(m0_rnd64(&seed), m0_rnd64(&seed));
616  tag_even[i] = i == 0;
617  }
618  ha_ut_links_init(&rpc_ctx, &hl_service,
620  &ctx, &hl, id1, id2, id, tag_even, false);
621  m0_ha_link_start(hl[0], &ctx[0]->ulc_conn_cfg);
622 
623  /* this message shouldn't be delivered at all */
624  m0_ha_link_send(hl[0], msg, &tag);
625  for (i = 1; i < HA_UT_LINK_RECONNECT_MULTIPLE_NR_LINKS; ++i) {
626  msg_recv = m0_ha_link_recv(hl[i], &tag_recv);
627  M0_UT_ASSERT(msg_recv == NULL);
628  }
629  /* now reconnect hl[0] to all other links */
630  ep = m0_rpc_machine_ep(&rpc_ctx->hurc_rpc_machine);
631  for (i = 1; i < HA_UT_LINK_RECONNECT_MULTIPLE_NR_LINKS; ++i) {
632  msg_recv = m0_ha_link_recv(hl[i], &tag_recv);
633  M0_UT_ASSERT(msg_recv == NULL);
634  m0_ha_link_reconnect_begin(hl[0], &lp0);
635  ha_ut_link_conn_cfg_create(&hl_conn_cfg[1], &id1[i], &id2[i],
636  &id[i], !tag_even[i], ep);
637  ha_ut_link_conn_cfg_create(&hl_conn_cfg[0], &id2[i], &id1[i],
638  &id[i], tag_even[i], ep);
639  m0_ha_link_reconnect_params(&lp0, &hl_conn_cfg[0].hlcc_params,
640  &hl_conn_cfg[1].hlcc_params,
641  &id1[i], &id2[i], &id[i]);
642  m0_ha_link_reconnect_end(hl[0], &hl_conn_cfg[0]);
643  ha_ut_link_conn_cfg_free(&hl_conn_cfg[0]);
644  m0_ha_link_start(hl[i], &hl_conn_cfg[1]);
645  ha_ut_link_conn_cfg_free(&hl_conn_cfg[1]);
646  if (i == 1) {
648  msg_recv = m0_ha_link_recv(hl[i], &tag_recv);
649  M0_UT_ASSERT(msg_recv != NULL);
650  M0_UT_ASSERT(m0_ha_msg_eq(msg_recv, msg));
651  M0_UT_ASSERT(tag_recv == tag);
652  m0_ha_link_delivered(hl[i], msg_recv);
654  tag2 = m0_ha_link_delivered_consume(hl[0]);
655  M0_UT_ASSERT(tag2 == tag);
656  }
657  }
658 
659  ha_ut_links_fini(rpc_ctx, hl_service,
661  m0_free(msg);
662  m0_free(tag_even);
663  m0_free(id);
664  m0_free(id2);
665  m0_free(id1);
666 }
667 
668 #undef M0_TRACE_SUBSYSTEM
669 
672 /*
673  * Local variables:
674  * c-indentation-style: "K&R"
675  * c-basic-offset: 8
676  * tab-width: 8
677  * fill-column: 80
678  * scroll-step: 1
679  * End:
680  */
681 /*
682  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
683  */
static void ha_ut_link_cfg_create(struct m0_ha_link_cfg *hl_cfg, struct m0_ha_ut_rpc_ctx *rpc_ctx, struct m0_reqh_service *hl_service)
Definition: link.c:81
uint64_t id
Definition: cob.h:2380
static void ha_ut_link_set_some_msg(struct m0_ha_msg *msg)
Definition: link.c:131
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
struct m0_fid hm_fid
Definition: msg.h:117
#define m0_strdup(s)
Definition: string.h:43
M0_UT_THREADS_DEFINE(ha_ut_link_mt, &ha_ut_link_mt_thread)
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_ha_link_stop(struct m0_ha_link *hl, struct m0_clink *clink)
Definition: link.c:412
static void ha_ut_link_msg_transfer(struct m0_ha_link *hl1, struct m0_ha_link *hl2)
Definition: link.c:459
M0_INTERNAL void m0_ha_link_wait_arrival(struct m0_ha_link *hl)
Definition: link.c:709
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
M0_INTERNAL struct m0_ha_msg * m0_ha_link_recv(struct m0_ha_link *hl, uint64_t *tag)
Definition: link.c:568
#define M0_FID_INIT(container, key)
Definition: fid.h:84
static char * tests
Definition: st_kmain.c:52
Definition: list.c:27
M0_INTERNAL void m0_ha_ut_rpc_ctx_init(struct m0_ha_ut_rpc_ctx *ctx)
Definition: helper.c:41
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
Definition: rpc_machine.c:603
M0_INTERNAL void m0_ha_link_flush(struct m0_ha_link *hl)
Definition: link.c:767
M0_INTERNAL void m0_ha_link_start(struct m0_ha_link *hl, struct m0_ha_link_conn_cfg *hl_conn_cfg)
Definition: link.c:382
M0_INTERNAL void m0_ha_link_send(struct m0_ha_link *hl, const struct m0_ha_msg *msg, uint64_t *tag)
Definition: link.c:556
Definition: sock.c:754
int i
Definition: dir.c:1033
#define M0_ASSERT(cond)
m0_time_t m0_time_now(void)
Definition: time.c:134
void m0_ha_ut_link_reconnect_simple(void)
Definition: link.c:483
static void ha_ut_links_init(struct m0_ha_ut_rpc_ctx **rpc_ctx, struct m0_reqh_service **hl_service, int nr_links, struct ha_ut_link_ctx ***ctx, struct m0_ha_link ***hl, struct m0_uint128 *id1, struct m0_uint128 *id2, struct m0_uint128 *id, int *tag_even, bool start)
Definition: link.c:407
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
M0_INTERNAL bool m0_ha_msg_eq(const struct m0_ha_msg *msg1, const struct m0_ha_msg *msg2)
Definition: msg.c:46
Definition: msg.h:115
M0_INTERNAL void m0_ha_link_reconnect_begin(struct m0_ha_link *hl, struct m0_ha_link_params *lp)
Definition: link.c:422
static void ha_ut_link_fini(struct ha_ut_link_ctx *link_ctx)
Definition: link.c:122
M0_INTERNAL uint64_t m0_ha_msg_tag(const struct m0_ha_msg *msg)
Definition: msg.c:36
M0_INTERNAL void m0_ha_link_tags_initial(struct m0_ha_link_tags *tags, bool tag_even)
Definition: link_fops.c:44
M0_INTERNAL void m0_ha_link_reconnect_end(struct m0_ha_link *hl, const struct m0_ha_link_conn_cfg *hl_conn_cfg)
Definition: link.c:441
M0_INTERNAL int m0_ha_link_service_init(struct m0_reqh_service **hl_service, struct m0_reqh *reqh)
Definition: link_service.c:290
void m0_ha_ut_link_multithreaded(void)
Definition: link.c:299
static struct fdmi_ctx ctx
Definition: main.c:80
M0_INTERNAL uint64_t m0_ha_link_delivered_consume(struct m0_ha_link *hl)
Definition: link.c:606
M0_INTERNAL void m0_ha_link_delivered(struct m0_ha_link *hl, struct m0_ha_msg *msg)
Definition: link.c:584
static void ha_ut_link_conn_cfg_create(struct m0_ha_link_conn_cfg *hl_conn_cfg, struct m0_uint128 *id_local, struct m0_uint128 *id_remote, struct m0_uint128 *id_connection, bool tag_even, const char *ep)
Definition: link.c:53
M0_INTERNAL void m0_ha_ut_rpc_ctx_fini(struct m0_ha_ut_rpc_ctx *ctx)
Definition: helper.c:73
static void ha_ut_link_conn_cfg_free(struct m0_ha_link_conn_cfg *hl_conn_cfg)
Definition: link.c:94
void m0_ha_ut_link_reconnect_multiple(void)
Definition: link.c:581
void m0_ha_ut_link_usecase(void)
Definition: link.c:148
char * ep
Definition: sw.h:132
M0_INTERNAL uint64_t m0_rnd64(uint64_t *seed)
Definition: misc.c:100
static void ha_ut_link_init(struct ha_ut_link_ctx *link_ctx, struct m0_ha_ut_rpc_ctx *rpc_ctx, struct m0_reqh_service *hl_service, struct m0_uint128 *id_local, struct m0_uint128 *id_remote, struct m0_uint128 *id_connection, bool tag_even, bool start)
Definition: link.c:99
M0_INTERNAL void m0_ha_link_service_fini(struct m0_reqh_service *hl_service)
Definition: link_service.c:298
M0_INTERNAL void m0_ha_link_wait_delivery(struct m0_ha_link *hl, uint64_t tag)
Definition: link.c:669
static void ha_ut_links_fini(struct m0_ha_ut_rpc_ctx *rpc_ctx, struct m0_reqh_service *hl_service, int nr_links, struct ha_ut_link_ctx **ctx, struct m0_ha_link **hl)
Definition: link.c:440
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
Definition: list.c:42
static int start(struct m0_fom *fom)
Definition: trigger_fom.c:321
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
#define M0_UT_THREADS_STOP(name)
Definition: threads.h:55
#define M0_UINT128(hi, lo)
Definition: types.h:40
#define M0_MKTIME(secs, ns)
Definition: time.h:86
M0_INTERNAL void m0_ha_link_reconnect_params(const struct m0_ha_link_params *lp_alive, struct m0_ha_link_params *lp_alive_new, struct m0_ha_link_params *lp_dead_new, const struct m0_uint128 *id_alive, const struct m0_uint128 *id_dead, const struct m0_uint128 *id_connection)
Definition: link.c:486
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
Definition: nucleus.c:42
#define M0_UT_THREADS_START(name, thread_nr, param_array)
Definition: threads.h:51
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
#define M0_FID0
Definition: fid.h:93
M0_INTERNAL int m0_ha_link_init(struct m0_ha_link *hl, struct m0_ha_link_cfg *hl_cfg)
Definition: link.c:225
static void ha_ut_link_mt_thread(void *param)
Definition: link.c:235
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL void m0_ha_link_fini(struct m0_ha_link *hl)
Definition: link.c:269
M0_INTERNAL uint64_t m0_ha_link_not_delivered_consume(struct m0_ha_link *hl)
Definition: link.c:618
int32_t rc
Definition: trigger_fop.h:47
#define M0_UT_ASSERT(a)
Definition: ut.h:46