Motr  M0
transmit.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
29 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_DTM
30 #include "lib/trace.h"
31 
32 #include "ut/ut.h"
33 #include "ut/ut_rpc_machine.h"
34 #include "lib/misc.h" /* m0_forall, IS_IN_ARRAY */
35 #include "lib/tlist.h"
36 #include "lib/errno.h" /* EPROTO */
37 #include "lib/assert.h"
38 #include "lib/memory.h"
39 #include "fop/fop.h"
40 #include "fop/fom.h"
41 #include "fop/fom_generic.h"
42 #include "reqh/reqh.h"
43 #include "reqh/reqh_service.h"
44 #include "rpc/rpc_opcodes.h"
45 #include "dtm/dtm_internal.h"
46 #include "dtm/operation.h"
47 #include "dtm/operation_xc.h"
48 #include "dtm/history.h"
49 #include "dtm/remote.h"
50 #include "dtm/update.h"
51 #include "dtm/fol.h"
52 #include "dtm/ltx.h"
53 #include "dtm/dtm.h"
54 
55 M0_INTERNAL void up_print(const struct m0_dtm_up *up);
56 M0_INTERNAL void op_print(const struct m0_dtm_op *op);
57 M0_INTERNAL void hi_print(const struct m0_dtm_hi *hi);
58 
59 enum {
60  OPER_NR = 64,
61  UPDATE_NR = 6,
63 };
64 
65 static struct m0_uint128 dtm_id_src = { 1, 1 };
66 static struct m0_uint128 dtm_id_tgt = { 2, 2 };
67 static struct m0_tl uu;
68 static struct m0_dtm dtm_src;
69 static struct m0_dtm dtm_tgt;
70 static struct m0_dtm_local_remote tgt;
71 static struct m0_dtm_local_remote local;
72 static struct m0_dtm_oper oper_src[OPER_NR];
73 static struct m0_dtm_oper oper_tgt[OPER_NR];
80 static struct m0_uint128 id_src[UPDATE_NR];
81 static struct m0_uint128 id_tgt[UPDATE_NR];
84 static struct m0_fop redo_fop[OPER_NR];
85 static struct m0_fom redo_fom[OPER_NR];
88 static struct m0_dtm_oper_descr ode = {
89  .od_updates = {
91  .ou_update = udescr
92  }
93 };
94 static struct m0_dtm_oper_descr reply = {
95  .od_updates = {
97  .ou_update = udescr_reply
98  }
99 };
102 
103 static void noop(struct m0_dtm_op *op)
104 {}
105 
106 static int undo_redo(struct m0_dtm_update *updt)
107 {
108  return 0;
109 }
110 
111 static const struct m0_dtm_update_type test_utype = {
112  .updtt_id = 0,
113  .updtt_name = "test update"
114 };
115 
116 static const struct m0_dtm_update_ops test_ops = {
117  .updo_redo = &undo_redo,
118  .updo_undo = &undo_redo,
119  .updo_type = &test_utype
120 };
121 
122 static int update_init(struct m0_dtm_history *history, uint8_t id,
123  struct m0_dtm_update *update)
124 {
125  M0_ASSERT(id == 0);
126  update->upd_ops = &test_ops;
127  return 0;
128 }
129 
130 static void test_persistent(struct m0_dtm_history *history)
131 {}
132 
133 static const struct m0_dtm_op_ops op_ops = {
134  .doo_ready = noop,
135  .doo_late = noop,
136  .doo_miser = noop
137 };
138 
139 static int src_find(struct m0_dtm *dtm,
140  const struct m0_dtm_history_type *ht,
141  const struct m0_uint128 *id,
142  struct m0_dtm_history **out)
143 {
144  if (id->u_hi == 0 && IS_IN_ARRAY(id->u_lo, history_src)) {
145  *out = &history_src[id->u_lo];
146  return 0;
147  } else
148  return -EPROTO;
149 }
150 
151 static const struct m0_dtm_history_type_ops src_htype_ops = {
153 };
154 
155 static const struct m0_dtm_history_type src_htype = {
156  .hit_id = 2,
157  .hit_rem_id = 2,
158  .hit_name = "source histories",
159  .hit_ops = &src_htype_ops
160 };
161 
162 static const struct m0_uint128 *src_id(const struct m0_dtm_history *history)
163 {
164  int idx = history - history_src;
165 
166  M0_PRE(IS_IN_ARRAY(idx, id_src));
167  id_src[idx].u_hi = 0;
168  id_src[idx].u_lo = idx;
169  return &id_src[idx];
170 }
171 
172 static const struct m0_dtm_history_ops src_ops = {
173  .hio_type = &src_htype,
174  .hio_id = &src_id,
175  .hio_persistent = &test_persistent,
176  .hio_update = &update_init
177 };
178 
179 static int tgt_find(struct m0_dtm *dtm,
180  const struct m0_dtm_history_type *ht,
181  const struct m0_uint128 *id,
182  struct m0_dtm_history **out)
183 {
184  if (id->u_hi == 0 && IS_IN_ARRAY(id->u_lo, history_tgt)) {
185  *out = &history_tgt[id->u_lo];
186  return 0;
187  } else
188  return -EPROTO;
189 }
190 
191 static const struct m0_dtm_history_type_ops tgt_htype_ops = {
193 };
194 
195 static const struct m0_dtm_history_type tgt_htype = {
196  .hit_id = 2,
197  .hit_rem_id = 2,
198  .hit_name = "target histories",
199  .hit_ops = &tgt_htype_ops
200 };
201 
202 static const struct m0_uint128 *tgt_id(const struct m0_dtm_history *history)
203 {
204  int idx = history - history_tgt;
205 
206  M0_PRE(IS_IN_ARRAY(idx, id_tgt));
207  id_tgt[idx].u_hi = 0;
208  id_tgt[idx].u_lo = idx;
209  return &id_tgt[idx];
210 }
211 
212 static const struct m0_dtm_history_ops tgt_ops = {
213  .hio_type = &tgt_htype,
214  .hio_id = &tgt_id,
215  .hio_persistent = &test_persistent,
216  .hio_update = &update_init
217 };
218 
219 static struct m0_fop_type test_fopt;
220 static struct m0_reqh_service *test_svc;
221 static const struct m0_fop_type_ops test_ftype_ops;
222 
224 {
225  return 0;
226 }
227 
229 {}
230 
232 {
233  m0_free(service);
234 }
235 
236 static const struct m0_reqh_service_ops test_service_ops = {
238  .rso_stop = &service_stop,
239  .rso_fini = &service_fini
240 };
241 
243  const struct m0_reqh_service_type *stype)
244 {
245  struct m0_reqh_service *svc;
246 
247  M0_ALLOC_PTR(svc);
248  M0_UT_ASSERT(svc != NULL);
249  svc->rs_type = stype;
250  svc->rs_ops = &test_service_ops;
251  *service = svc;
252  return 0;
253 }
254 
255 static const struct m0_reqh_service_type_ops stype_ops = {
257 };
258 
260  .rst_name = "dtm-ub-service",
261  .rst_ops = &stype_ops,
262  .rst_level = M0_RS_LEVEL_NORMAL,
263 };
264 
265 static void test_fom_fini(struct m0_fom *fom)
266 {
267  m0_fom_fini(fom);
268 }
269 
270 static unsigned ticked;
271 
272 static void op_ready(struct m0_dtm_op *op)
273 {
274  struct m0_dtm_oper *oper;
275  unsigned idx;
276 
277  oper = M0_AMB(oper, op, oprt_op);
278  idx = oper - &oper_tgt[0];
280  m0_fom_wakeup(&redo_fom[idx]);
281 }
282 
283 static void op_late(struct m0_dtm_op *op)
284 {
285  M0_UT_ASSERT(0);
286 }
287 
288 static void op_miser(struct m0_dtm_op *op)
289 {
290  M0_UT_ASSERT(0);
291 }
292 
293 static const struct m0_dtm_op_ops test_op_ops = {
294  .doo_ready = &op_ready,
295  .doo_late = &op_late,
296  .doo_miser = &op_miser
297 };
298 
299 enum {
302 };
303 
304 static struct m0_sm_state_descr fom_phases[] = {
305  [FOM_INIT] = {
307  .sd_name = "init",
308  .sd_allowed = M0_BITS(FOM_READY)
309  },
310  [FOM_READY] = {
311  .sd_name = "ready",
312  .sd_allowed = M0_BITS(M0_FOPH_FINISH)
313  },
314  [M0_FOPH_FINISH] = {
315  .sd_flags = M0_SDF_TERMINAL,
316  .sd_name = "SM finish",
317  }
318 };
319 
320 const struct m0_sm_conf test_conf = {
321  .scf_name = "dtm up fom",
322  .scf_nr_states = ARRAY_SIZE(fom_phases),
323  .scf_state = fom_phases
324 };
325 
326 static struct m0_mutex lock;
327 static struct m0_semaphore seq;
328 
329 static int test_fom_tick(struct m0_fom *fom)
330 {
331  int result;
332  int idx = fom->fo_fop - &redo_fop[0];
333  struct m0_dtm_oper_descr *ode = m0_fop_data(fom->fo_fop);
334  struct m0_tl uu;
335  struct m0_dtm_oper *oper = &oper_tgt[idx];
336 
339 
340  switch (m0_fom_phase(fom)) {
341  case FOM_INIT:
345  m0_dtm_oper_init(oper, &dtm_tgt, &uu);
349 
350  result = m0_dtm_oper_build(oper, &uu, ode);
351  M0_UT_ASSERT(result == 0);
352  oper->oprt_op.op_ops = &test_op_ops;
354  m0_dtm_oper_close(oper);
356  break;
357  case FOM_READY:
359  m0_dtm_oper_prepared(oper, NULL);
360  m0_dtm_oper_done(oper, NULL);
361  ++ticked;
364  break;
365  default:
366  M0_UT_ASSERT(0);
367  }
368  return M0_FSO_WAIT;
369 }
370 
371 static size_t test_fom_home_locality(const struct m0_fom *fom)
372 {
373  static size_t locality = 0;
374 
375  return locality++;
376 }
377 
378 static const struct m0_fom_ops test_fom_ops = {
380  .fo_tick = &test_fom_tick,
381  .fo_home_locality = &test_fom_home_locality
382 };
383 
384 static int test_fom_create(struct m0_fop *fop, struct m0_fom **out,
385  struct m0_reqh *reqh)
386 {
387  unsigned idx = fop - &redo_fop[0];
388 
390  M0_UT_ASSERT(redo_fom[idx].fo_magic == 0);
391 
392  *out = &redo_fom[idx];
394  fop, NULL, reqh);
395  return 0;
396 }
397 
398 static const struct m0_fom_type_ops test_fom_type_ops = {
400 };
401 
402 static void rpc_fop_fom_init(void)
403 {
404  int result;
405 
407  m0_semaphore_init(&seq, 0);
408  test_ctx = (struct m0_ut_rpc_mach_ctx) {
409  .rmc_cob_id = { 20 },
410  .rmc_ep_addr = "0@lo:12345:34:10"
411  };
413 
415  M0_UT_ASSERT(result == 0);
416 
418  .name = "dtm test fop",
420  .xt = m0_dtm_oper_descr_xc,
421  .rpc_flags = M0_RPC_ITEM_TYPE_REQUEST,
422  .fop_ops = &test_ftype_ops,
423  .fom_ops = &test_fom_type_ops,
424  .sm = &test_conf,
425  .svc_type = &test_stype,
427 
429  M0_UT_ASSERT(result == 0);
431 
433  M0_UT_ASSERT(result == 0);
434 }
435 
436 static void rpc_fop_fom_fini(void)
437 {
447 }
448 
449 static void src_init(struct m0_dtm_remote *dtm, unsigned flags, int ctrl)
450 {
451  int i;
452 
453  M0_ASSERT(ctrl <= TGT_DELTA);
454 
456 
457  memset(oper_src, 0, sizeof oper_src);
458  memset(update_src, 0, sizeof update_src);
459  memset(control_src, 0, sizeof control_src);
460  memset(history_src, 0, sizeof history_src);
461  M0_SET0(&dtm_src);
462  M0_SET0(&tgt);
463  M0_SET0(&redo_fop);
464  M0_SET0(&redo_fom);
465  M0_SET0(&redo_ode);
471  &test_ctx.rmc_reqh);
472 
473  for (i = 0; i < ARRAY_SIZE(history_src); ++i) {
475  history_src[i].h_hi.hi_ver = 1;
478  history_src[i].h_rem = dtm;
479  }
480  for (i = 0; i < ARRAY_SIZE(oper_src); ++i) {
485  redo_ode[i] = (struct m0_dtm_oper_descr) {
486  .od_updates = {
488  .ou_update = redo_udescr[i]
489  }
490  };
492  &m0_fop_release);
494  }
495  for (i = 0; i < ARRAY_SIZE(last); ++i)
496  last[i] = 1;
497 }
498 
499 static void src_fini(void)
500 {
501  int i;
502 
503  for (i = 0; i < ARRAY_SIZE(oper_src); ++i) {
507  }
508  for (i = 0; i < ARRAY_SIZE(history_src); ++i)
514 
516 }
517 
518 static void tgt_init(void)
519 {
520  int i;
521 
522  memset(oper_tgt, 0, sizeof oper_tgt);
523  memset(update_tgt, 0, sizeof update_tgt);
524  memset(control_tgt, 0, sizeof control_tgt);
525  memset(history_tgt, 0, sizeof history_tgt);
526  M0_SET0(&dtm_tgt);
527  M0_SET0(&local);
532 
533  for (i = 0; i < ARRAY_SIZE(history_tgt); ++i) {
535  history_tgt[i].h_hi.hi_ver = 1;
538  history_tgt[i].h_rem = NULL;
539  }
540  for (i = 0; i < ARRAY_SIZE(oper_tgt); ++i) {
545  }
546 }
547 
548 static void tgt_fini(void)
549 {
550  int i;
551 
552  for (i = 0; i < ARRAY_SIZE(oper_tgt); ++i)
555  for (i = 0; i < ARRAY_SIZE(history_tgt); ++i)
561 }
562 
563 static void oper_populate(int i, unsigned nr)
564 {
565  int j;
566 
567  dtm_lock(&dtm_src);
568  for (j = 0; j < nr; ++j) {
569  update_src[i][j].upd_ops = &test_ops;
571  &oper_src[i],
573  M0_DUR_SET, i + 2, last[j]));
574  last[j] = i + 2;
575  }
578 
581 }
582 
583 static void transmit_build(void)
584 {
585  int i;
586  int result;
587 
588  src_init(&tgt.lre_rem, 0, 2);
589  tgt_init();
590  for (i = 0; i < ARRAY_SIZE(oper_src); ++i) {
591  unsigned nr = (i%UPDATE_NR) + 1;
592  oper_populate(i, nr);
596 
599 
600  result = m0_dtm_oper_build(&oper_tgt[i], &uu, &ode);
601  M0_UT_ASSERT(result == 0);
605 
610  }
611 }
612 
613 static void transmit_test(void)
614 {
615  transmit_build();
616  tgt_fini();
617  src_fini();
618 }
619 
620 #if 0
621 
622 static void db_init(void)
623 {
624  int result;
625 
626  result = m0_dbenv_init(&db, db_name, 0, true);
627  M0_UT_ASSERT(result == 0);
628  result = m0_emap_init(&emap, &db, "nonce");
629  M0_UT_ASSERT(result == 0);
630 }
631 
632 static void db_fini(void)
633 {
634  m0_emap_fini(&emap);
635  m0_dbenv_fini(&db);
636 }
637 
638 static struct m0_dtm_ltx ltx;
639 static uint64_t hi = 7;
640 
641 static void ltx_init(void)
642 {
643  M0_SET0(&ltx);
646  db_init();
647 }
648 
649 static void ltx_fini(unsigned nr)
650 {
651  int result;
652  int i;
653 
654  result = m0_emap_obj_insert(&emap, &ltx.lx_tx, &M0_UINT128(hi++, 0), 9);
655  M0_UT_ASSERT(result == 0);
656  for (i = 0; i < nr; ++i) {
659  }
660  m0_dtm_ltx_close(&ltx);
661  db_fini();
663  _0C(op_state(&oper_src[j].oprt_op, M0_DOS_PERSISTENT))));
666  src_fini();
667  M0_SET0(&ltx);
668 }
669 
670 static void ltx_test_1_N(void)
671 {
672  int result;
673 
674  ltx_init();
675 
676  m0_dtm_ltx_init(&ltx, &dtm_src, &db);
677  result = m0_dtm_ltx_open(&ltx);
678  M0_UT_ASSERT(result == 0);
679 
680  m0_dtm_ltx_add(&ltx, &oper_src[0]);
682  ltx_fini(1);
683 }
684 
685 static void ltx_test_N_N(void)
686 {
687  int result;
688  int i;
689 
690  ltx_init();
691 
693 
694  m0_dtm_ltx_init(&ltx, &dtm_src, &db);
695  result = m0_dtm_ltx_open(&ltx);
696  M0_UT_ASSERT(result == 0);
697 
698  for (i = 0; i < UPDATE_NR; ++i) {
699  m0_dtm_ltx_add(&ltx, &oper_src[i]);
700  oper_populate(i, ((i*3) % UPDATE_NR) + 1);
701  }
702  ltx_fini(UPDATE_NR);
703 }
704 #endif
705 
706 static void redo_test(void)
707 {
708  int i;
709 
710  transmit_build();
711 
712  /* crash tgt */
713  tgt_fini();
714  tgt_init();
715 
716  ticked = 0;
717  /* reinitialise the semaphore to count foms afresh. */
719  m0_semaphore_init(&seq, 0);
720 
722  /* wait until all foms start execution. */
723  for (i = 0; i < OPER_NR; ++i)
727  tgt_fini();
728  src_fini();
729 }
730 
732  .ts_name = "dtm-transmit-ut",
733  .ts_tests = {
734  { "transmit", transmit_test },
735 #if 0
736  { "ltx-1-N", ltx_test_1_N },
737  { "ltx-N-N", ltx_test_N_N },
738 #endif
739  { "redo", redo_test },
740  { NULL, NULL }
741  }
742 };
743 M0_EXPORTED(dtm_transmit_ut);
744 
745 #undef M0_TRACE_SUBSYSTEM
746 
748 /*
749  * Local variables:
750  * c-indentation-style: "K&R"
751  * c-basic-offset: 8
752  * tab-width: 8
753  * fill-column: 80
754  * scroll-step: 1
755  * End:
756  */
757 /*
758  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
759  */
static const struct m0_dtm_op_ops test_op_ops
Definition: transmit.c:293
static const struct m0_dtm_history_type tgt_htype
Definition: transmit.c:195
#define M0_DTM_UPDATE_DATA(label, rule, ver, orig_ver)
Definition: update.h:102
void * fd_data
Definition: fop.h:75
uint64_t id
Definition: cob.h:2380
static struct m0_mutex lock
Definition: transmit.c:326
const struct m0_rpc_item_type_ops m0_fop_default_item_type_ops
static const struct m0_fom_ops test_fom_ops
Definition: transmit.c:378
static size_t nr
Definition: dump.c:1505
static int service_start(struct m0_reqh_service *service)
Definition: transmit.c:223
static struct m0_dtm_local_remote local
Definition: transmit.c:71
static struct m0_dtm_update_descr udescr[UPDATE_NR+TGT_DELTA]
Definition: transmit.c:82
static struct m0_uint128 dtm_id_src
Definition: transmit.c:65
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
M0_INTERNAL int m0_reqh_service_start(struct m0_reqh_service *service)
Definition: reqh_service.c:343
static struct m0_dtm_update control_tgt[OPER_NR][TGT_DELTA]
Definition: transmit.c:77
static void test_fom_fini(struct m0_fom *fom)
Definition: transmit.c:265
static const struct m0_dtm_op_ops op_ops
Definition: transmit.c:133
struct m0_dtm_update_comm upd_comm
Definition: update.h:75
struct m0_fop * uc_body
Definition: update.h:68
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static struct m0_dtm_oper oper_src[OPER_NR]
Definition: transmit.c:72
M0_INTERNAL void m0_ut_rpc_mach_init_and_add(struct m0_ut_rpc_mach_ctx *ctx)
int(* rso_start)(struct m0_reqh_service *service)
Definition: reqh_service.h:360
struct m0_ut_suite dtm_transmit_ut
Definition: transmit.c:731
M0_INTERNAL const struct m0_dtm_history_type m0_dtm_ltx_htype
Definition: ltx.c:98
static const struct m0_dtm_history_type src_htype
Definition: transmit.c:155
int const char const void size_t int flags
Definition: dir.c:328
M0_INTERNAL void m0_dtm_fini(struct m0_dtm *dtm)
Definition: dtm.c:54
M0_INTERNAL void m0_reqh_service_stop(struct m0_reqh_service *service)
Definition: reqh_service.c:402
struct m0_reqh_service_type test_stype
Definition: transmit.c:259
#define NULL
Definition: misc.h:38
static struct m0_dtm dtm_tgt
Definition: transmit.c:69
static size_t locality(const struct m0_fom *fom)
Definition: rm_foms.c:269
static int test_fom_tick(struct m0_fom *fom)
Definition: transmit.c:329
#define M0_FOP_TYPE_INIT(ft,...)
Definition: fop.h:307
Definition: sm.h:350
static struct m0_uint128 dtm_id_tgt
Definition: transmit.c:66
uint64_t hi_flags
Definition: nucleus.h:65
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
struct m0_dtm_remote * h_rem
Definition: history.h:59
M0_INTERNAL void m0_reqh_service_prepare_to_stop(struct m0_reqh_service *service)
Definition: reqh_service.c:375
static struct m0_fop redo_fop[OPER_NR]
Definition: transmit.c:84
#define M0_CASSERT(cond)
static struct m0_dtm_update control_src[OPER_NR][TGT_DELTA]
Definition: transmit.c:76
struct m0_rpc_machine rmc_rpc
struct m0_dtm_fol_remote re_fol
Definition: remote.h:58
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
M0_INTERNAL void m0_dtm_oper_init(struct m0_dtm_oper *oper, struct m0_dtm *dtm, struct m0_tl *uu)
Definition: operation.c:44
static void service_stop(struct m0_reqh_service *service)
Definition: transmit.c:228
static void rpc_fop_fom_init(void)
Definition: transmit.c:402
uint8_t hit_id
Definition: history.h:88
struct m0_cob_domain_id rmc_cob_id
struct m0_dtm_op oprt_op
Definition: operation.h:48
static struct m0_dtm_update_descr redo_udescr[OPER_NR][UPDATE_NR+TGT_DELTA]
Definition: transmit.c:87
M0_INTERNAL void m0_dtm_oper_prepared(const struct m0_dtm_oper *oper, const struct m0_dtm_remote *rem)
Definition: operation.c:122
static void op_ready(struct m0_dtm_op *op)
Definition: transmit.c:272
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
void m0_fop_type_fini(struct m0_fop_type *fopt)
Definition: fop.c:232
static const struct m0_dtm_history_ops src_ops
Definition: transmit.c:172
struct m0_dtm_hi h_hi
Definition: history.h:56
M0_INTERNAL void m0_dtm_oper_done(const struct m0_dtm_oper *oper, const struct m0_dtm_remote *rem)
Definition: operation.c:144
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL int m0_dtm_oper_build(struct m0_dtm_oper *oper, struct m0_tl *uu, const struct m0_dtm_oper_descr *ode)
Definition: operation.c:183
static void transmit_build(void)
Definition: transmit.c:583
static struct m0_fom redo_fom[OPER_NR]
Definition: transmit.c:85
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
static struct m0_xcode_type ** xt[]
Definition: protocol.c:64
Definition: ut.h:77
M0_INTERNAL void dtm_lock(struct m0_dtm *dtm)
Definition: dtm.c:159
def db_init(path)
Definition: addb2db.py:314
static struct m0_dtm_update update_tgt[OPER_NR][UPDATE_NR+TGT_DELTA]
Definition: transmit.c:75
static const struct m0_reqh_service_ops test_service_ops
Definition: transmit.c:236
static struct m0_semaphore seq
Definition: transmit.c:327
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
M0_INTERNAL void m0_dtm_history_fini(struct m0_dtm_history *history)
Definition: history.c:63
m0_fom_phase
Definition: fom.h:372
const struct m0_dtm_update_ops * upd_ops
Definition: update.h:74
struct m0_fom_type ft_fom_type
Definition: fop.h:232
static void redo_test(void)
Definition: transmit.c:706
static void op_miser(struct m0_dtm_op *op)
Definition: transmit.c:288
M0_INTERNAL void m0_reqh_shutdown_wait(struct m0_reqh *reqh)
Definition: reqh.c:647
op
Definition: libdemo.c:64
static const struct m0_uint128 * tgt_id(const struct m0_dtm_history *history)
Definition: transmit.c:202
static int test_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: transmit.c:384
static const struct m0_reqh_service_type_ops stype_ops
Definition: transmit.c:255
M0_INTERNAL void m0_dtm_update_link(struct m0_tl *list, struct m0_dtm_update *update, uint32_t nr)
Definition: update.c:221
struct m0_dtm_history ch_history
Definition: history.h:102
static int update_init(struct m0_dtm_history *history, uint8_t id, struct m0_dtm_update *update)
Definition: transmit.c:122
static struct m0_uint128 id_tgt[UPDATE_NR]
Definition: transmit.c:81
M0_INTERNAL void m0_dtm_history_type_register(struct m0_dtm *dtm, const struct m0_dtm_history_type *ht)
Definition: history.c:216
int opcode
Definition: crate.c:301
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
int i
Definition: dir.c:1033
M0_INTERNAL void up_print(const struct m0_dtm_up *up)
Definition: nucleus.c:548
static const struct m0_reqh_service_ops rpc_ops
Definition: service.c:92
static const struct m0_dtm_history_type_ops tgt_htype_ops
Definition: transmit.c:191
static struct m0_dtm dtm_src
Definition: transmit.c:68
const struct m0_dtm_op_ops * op_ops
Definition: nucleus.h:108
static int undo_redo(struct m0_dtm_update *updt)
Definition: transmit.c:106
static void service_fini(struct m0_reqh_service *service)
Definition: transmit.c:231
static const struct m0_fop_type_ops test_ftype_ops
Definition: transmit.c:221
const char * name
Definition: trace.c:110
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
static const struct socktype stype[]
Definition: sock.c:1156
M0_INTERNAL void m0_dtm_oper_pack(struct m0_dtm_oper *oper, const struct m0_dtm_remote *rem, struct m0_dtm_oper_descr *ode)
Definition: operation.c:161
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
uint64_t m0_dtm_ver_t
Definition: nucleus.h:48
static void transmit_test(void)
Definition: transmit.c:613
const char * rst_name
Definition: reqh_service.h:447
M0_INTERNAL void m0_reqh_service_fini(struct m0_reqh_service *service)
Definition: reqh_service.c:457
M0_INTERNAL void op_print(const struct m0_dtm_op *op)
Definition: nucleus.c:555
Definition: tlist.h:251
static struct m0_dtm_history history_tgt[UPDATE_NR]
Definition: transmit.c:79
static struct m0_dtm_history history_src[UPDATE_NR]
Definition: transmit.c:78
static void test_persistent(struct m0_dtm_history *history)
Definition: transmit.c:130
uint64_t u_hi
Definition: types.h:36
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
static int service_allocate(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: transmit.c:242
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static struct m0_dtm_oper_descr redo_ode[OPER_NR]
Definition: transmit.c:86
static struct m0_dtm_local_remote tgt
Definition: transmit.c:70
Definition: ltx.h:44
M0_INTERNAL int m0_reqh_service_allocate(struct m0_reqh_service **out, const struct m0_reqh_service_type *stype, struct m0_reqh_context *rctx)
Definition: reqh_service.c:185
Definition: reqh.h:94
const struct m0_sm_conf test_conf
Definition: transmit.c:320
Definition: dump.c:103
M0_INTERNAL void m0_dtm_history_reset(struct m0_dtm_history *history, m0_dtm_ver_t since)
Definition: history.c:125
static const struct m0_dtm_history_ops tgt_ops
Definition: transmit.c:212
struct m0_reqh rmc_reqh
M0_INTERNAL void m0_reqh_service_init(struct m0_reqh_service *service, struct m0_reqh *reqh, const struct m0_fid *fid)
Definition: reqh_service.c:428
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
static size_t test_fom_home_locality(const struct m0_fom *fom)
Definition: transmit.c:371
M0_INTERNAL void m0_dtm_history_type_deregister(struct m0_dtm *dtm, const struct m0_dtm_history_type *ht)
Definition: history.c:225
struct m0_dtm_controlh rfo_ch
Definition: fol.h:56
void(* doo_ready)(struct m0_dtm_op *op)
Definition: nucleus.h:113
static void rpc_fop_fom_fini(void)
Definition: transmit.c:436
M0_INTERNAL void m0_reqh_idle_wait(struct m0_reqh *reqh)
Definition: reqh.c:606
static struct m0_be_emap * emap
Definition: extmap.c:44
static const struct m0_dtm_update_ops test_ops
Definition: transmit.c:116
static void oper_populate(int i, unsigned nr)
Definition: transmit.c:563
static const struct m0_dtm_history_type_ops src_htype_ops
Definition: transmit.c:151
static void src_init(struct m0_dtm_remote *dtm, unsigned flags, int ctrl)
Definition: transmit.c:449
#define m0_forall(var, nr,...)
Definition: misc.h:112
uint32_t sd_flags
Definition: sm.h:378
M0_INTERNAL void m0_dtm_init(struct m0_dtm *dtm, struct m0_uint128 *id)
Definition: dtm.c:41
static void noop(struct m0_dtm_op *op)
Definition: transmit.c:103
Definition: fom.h:481
M0_INTERNAL const struct m0_dtm_history_type m0_dtm_fol_remote_htype
Definition: fol.c:173
const char * ts_name
Definition: ut.h:99
struct m0_fop_data f_data
Definition: fop.h:82
M0_INTERNAL void m0_dtm_reply_unpack(struct m0_dtm_oper *oper, const struct m0_dtm_oper_descr *reply)
Definition: operation.c:236
static struct m0_reqh_service * test_svc
Definition: transmit.c:220
struct m0_reqh reqh
Definition: rm_foms.c:48
M0_INTERNAL void m0_dtm_update_init(struct m0_dtm_update *update, struct m0_dtm_history *history, struct m0_dtm_oper *oper, const struct m0_dtm_update_data *data)
Definition: update.c:47
int(* rsto_service_allocate)(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: reqh_service.h:435
static void tgt_init(void)
Definition: transmit.c:518
static struct m0_dtm_update_descr udescr_reply[UPDATE_NR+TGT_DELTA]
Definition: transmit.c:83
static const struct m0_dtm_update_type test_utype
Definition: transmit.c:111
int(* hito_find)(struct m0_dtm *dtm, const struct m0_dtm_history_type *ht, const struct m0_uint128 *id, struct m0_dtm_history **out)
Definition: history.h:95
static int src_find(struct m0_dtm *dtm, const struct m0_dtm_history_type *ht, const struct m0_uint128 *id, struct m0_dtm_history **out)
Definition: transmit.c:139
static m0_dtm_ver_t last[UPDATE_NR]
Definition: transmit.c:100
M0_INTERNAL void m0_fop_release(struct m0_ref *ref)
Definition: fop.c:148
const struct m0_dtm_history_type * hio_type
Definition: history.h:79
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
m0_dtm_ver_t hi_ver
Definition: nucleus.h:66
static struct m0_fop_type test_fopt
Definition: transmit.c:219
static struct m0_ut_rpc_mach_ctx test_ctx
Definition: transmit.c:101
M0_INTERNAL void m0_ut_rpc_mach_fini(struct m0_ut_rpc_mach_ctx *ctx)
static struct m0_dtm_update update_src[OPER_NR][UPDATE_NR]
Definition: transmit.c:74
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL void m0_dtm_oper_close(struct m0_dtm_oper *oper)
Definition: operation.c:83
static unsigned ticked
Definition: transmit.c:270
M0_INTERNAL void m0_dtm_update_list_fini(struct m0_tl *list)
Definition: update.c:182
M0_INTERNAL bool op_state(const struct m0_dtm_op *op, enum m0_dtm_state state)
Definition: nucleus.c:511
M0_INTERNAL void dtm_unlock(struct m0_dtm *dtm)
Definition: dtm.c:164
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
static struct m0_uint128 id_src[UPDATE_NR]
Definition: transmit.c:80
static struct m0_fop * fop
Definition: item.c:57
static const struct m0_fom_type_ops test_fom_type_ops
Definition: transmit.c:398
#define IS_IN_ARRAY(idx, array)
Definition: misc.h:311
static struct m0_dtm_oper oper_tgt[OPER_NR]
Definition: transmit.c:73
static const struct m0_uint128 * src_id(const struct m0_dtm_history *history)
Definition: transmit.c:162
M0_INTERNAL void m0_dtm_history_init(struct m0_dtm_history *history, struct m0_dtm *dtm)
Definition: history.c:51
static struct m0_dtm_oper_descr ode
Definition: transmit.c:88
M0_INTERNAL void m0_dtm_local_remote_fini(struct m0_dtm_local_remote *lre)
Definition: remote.c:276
#define M0_UINT128(hi, lo)
Definition: types.h:40
M0_INTERNAL void hi_print(const struct m0_dtm_hi *hi)
Definition: nucleus.c:563
Definition: dtm.h:529
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
M0_INTERNAL void m0_dtm_oper_fini(struct m0_dtm_oper *oper)
Definition: operation.c:58
static struct m0_tl uu
Definition: transmit.c:67
M0_INTERNAL void m0_dtm_update_list_init(struct m0_tl *list)
Definition: update.c:177
uint8_t updtt_id
Definition: update.h:91
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
#define out(...)
Definition: gen.c:41
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
M0_INTERNAL void m0_dtm_local_remote_init(struct m0_dtm_local_remote *lre, struct m0_uint128 *id, struct m0_dtm *local, struct m0_reqh *reqh)
Definition: remote.c:266
struct m0_dtm_remote lre_rem
Definition: remote.h:104
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
static void op_late(struct m0_dtm_op *op)
Definition: transmit.c:283
uint64_t u_lo
Definition: types.h:37
struct m0_dtm_oper_updates od_updates
Definition: operation.h:66
M0_INTERNAL void m0_dtm_ltx_add(struct m0_dtm_ltx *ltx, struct m0_dtm_oper *oper)
Definition: ltx.c:69
struct m0_rpc_machine * ri_rmachine
Definition: item.h:160
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
M0_INTERNAL void m0_dtm_ltx_close(struct m0_dtm_ltx *ltx)
Definition: ltx.c:56
static void tgt_fini(void)
Definition: transmit.c:548
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
struct m0_rpc_item f_item
Definition: fop.h:83
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
static struct m0_sm_state_descr fom_phases[]
Definition: transmit.c:304
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
M0_INTERNAL void m0_dtm_ltx_init(struct m0_dtm_ltx *ltx, struct m0_dtm *dtm, struct m0_be_domain *dom)
Definition: ltx.c:40
static void hi(void)
Definition: nucleus.c:93
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_dtm_ltx_open(struct m0_dtm_ltx *ltx)
Definition: ltx.c:50
#define M0_UT_ASSERT(a)
Definition: ut.h:46
Definition: fop.h:79
static void src_fini(void)
Definition: transmit.c:499
static int tgt_find(struct m0_dtm *dtm, const struct m0_dtm_history_type *ht, const struct m0_uint128 *id, struct m0_dtm_history **out)
Definition: transmit.c:179
M0_INTERNAL void m0_dtm_reply_pack(const struct m0_dtm_oper *oper, const struct m0_dtm_oper_descr *request, struct m0_dtm_oper_descr *reply)
Definition: operation.c:209
const struct m0_dtm_history_ops * h_ops
Definition: history.h:65
int(* updo_redo)(struct m0_dtm_update *updt)
Definition: update.h:84