Motr  M0
engine.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE
24 #include "lib/trace.h"
25 
26 #include "be/engine.h"
27 
28 #include "lib/memory.h" /* m0_free */
29 #include "lib/errno.h" /* ENOMEM */
30 #include "lib/misc.h" /* m0_forall */
31 #include "lib/time.h" /* m0_time_now */
32 
33 #include "be/tx_service.h" /* m0_be_tx_service_init */
34 #include "be/tx_group.h" /* m0_be_tx_group */
35 #include "be/tx_internal.h" /* m0_be_tx__state_post */
36 #include "be/seg_dict.h" /* XXX remove it */
37 #include "be/domain.h" /* XXX remove it */
38 
45 M0_TL_DESCR_DEFINE(etx, "m0_be_engine::eng_txs[]", M0_INTERNAL,
46  struct m0_be_tx, t_engine_linkage, t_magic,
48 M0_TL_DEFINE(etx, M0_INTERNAL, struct m0_be_tx);
49 
50 M0_TL_DESCR_DEFINE(egr, "m0_be_engine::eng_groups[]", static,
51  struct m0_be_tx_group, tg_engine_linkage, tg_magic,
52  M0_BE_TX_MAGIC /* XXX */, M0_BE_TX_ENGINE_MAGIC /* XXX */);
53 M0_TL_DEFINE(egr, static, struct m0_be_tx_group);
54 
55 static bool be_engine_is_locked(const struct m0_be_engine *en);
56 static void be_engine_tx_group_open(struct m0_be_engine *en,
57  struct m0_be_tx_group *gr);
58 static void be_engine_group_freeze(struct m0_be_engine *en,
59  struct m0_be_tx_group *gr);
60 static void be_engine_group_tryclose(struct m0_be_engine *en,
61  struct m0_be_tx_group *gr);
62 
64  struct m0_be_tx_group *gr,
65  enum m0_be_tx_group_state state)
66 {
67  static const enum m0_be_tx_group_state prev_state[] = {
72  };
73  M0_ENTRY("en=%p gr=%p state=%d gr->tg_state=%d",
74  en, gr, state, gr->tg_state);
75 
77  M0_PRE(gr->tg_state == prev_state[state]);
78  M0_PRE(egr_tlist_contains(&en->eng_groups[gr->tg_state], gr));
79  M0_PRE(egr_tlist_length(&en->eng_groups[M0_BGS_OPEN]) +
80  egr_tlist_length(&en->eng_groups[M0_BGS_FROZEN]) <= 1);
81 
82  egr_tlist_move(&en->eng_groups[state], gr);
83  gr->tg_state = state;
84 
85  M0_POST(egr_tlist_length(&en->eng_groups[M0_BGS_OPEN]) +
86  egr_tlist_length(&en->eng_groups[M0_BGS_FROZEN]) <= 1);
87 }
88 
89 static int be_engine_cfg_validate(struct m0_be_engine_cfg *en_cfg)
90 {
92  &en_cfg->bec_group_cfg.tgc_size_max),
93  "Maximum transaction size shouldn't be greater than "
94  "maximum group size: "
95  "tx_size_max = " BETXCR_F ", group_size_max = " BETXCR_F,
96  BETXCR_P(&en_cfg->bec_tx_size_max),
98  M0_ASSERT(en_cfg->bec_tx_payload_max <=
100  return 0;
101 }
102 
103 M0_INTERNAL int m0_be_engine_init(struct m0_be_engine *en,
104  struct m0_be_domain *dom,
105  struct m0_be_engine_cfg *en_cfg)
106 {
107  struct m0_be_tx_group_cfg *gr_cfg;
108  struct m0_be_tx_group *gr;
109  int rc;
110  int i;
111 
112  M0_ENTRY();
113 
114  rc = be_engine_cfg_validate(en_cfg);
115  M0_ASSERT(rc == 0);
116 
117  en->eng_cfg = en_cfg;
118  en->eng_group_nr = en_cfg->bec_group_nr;
119  en->eng_domain = dom;
120 
121  M0_ALLOC_ARR(en->eng_group, en_cfg->bec_group_nr);
122  if (en->eng_group == NULL) {
123  rc = -ENOMEM;
124  goto err;
125  }
126  M0_ALLOC_ARR(en_cfg->bec_groups_cfg, en_cfg->bec_group_nr);
127  M0_ASSERT(en_cfg->bec_groups_cfg != NULL);
128 
130  (egr_tlist_init(&en->eng_groups[i]), true));
131  rc = m0_be_tx_service_init(en, en_cfg->bec_reqh);
132  if (rc != 0)
133  goto err_free;
134  if (rc != 0)
135  goto err_service_fini;
136  for (i = 0; i < en->eng_group_nr; ++i) {
137  gr = &en->eng_group[i];
138  gr_cfg = &en_cfg->bec_groups_cfg[i];
139 
140  *gr_cfg = en_cfg->bec_group_cfg;
141  gr_cfg->tgc_domain = en_cfg->bec_domain;
142  gr_cfg->tgc_engine = en;
143  gr_cfg->tgc_log = &en->eng_log;
144  gr_cfg->tgc_log_discard = en_cfg->bec_log_discard;
145  gr_cfg->tgc_pd = en_cfg->bec_pd;
146  gr_cfg->tgc_reqh = en_cfg->bec_reqh;
147 
148  rc = m0_be_tx_group_init(gr, gr_cfg);
149  M0_ASSERT(rc == 0);
151  egr_tlink_init(gr);
152  }
153  en->eng_tx_id_next = 0;
154 
156  (etx_tlist_init(&en->eng_txs[i]), true));
157 
159  en->eng_recovery_finished = false;
160 
162  return M0_RC(0);
163  err_service_fini:
165  err_free:
166  m0_free(en->eng_group);
167  err:
168  return M0_RC(rc);
169 }
170 
171 M0_INTERNAL void m0_be_engine_fini(struct m0_be_engine *en)
172 {
173  m0_bcount_t log_size = 0; /* XXX */
174  m0_bcount_t log_free = 0; /* XXX */
175  int i;
176 
177  M0_ENTRY();
179 
180  /*
181  * TODO this check should be implemented.
182  */
183  M0_ASSERT_INFO(log_size == log_free,
184  "There is at least one transaction which didn't become "
185  "stable yet. log_size = %" PRIu64 ", log_free = %"PRIu64,
186  log_size, log_free);
187 
189 
191  (etx_tlist_fini(&en->eng_txs[i]), true));
192  for (i = 0; i < en->eng_group_nr; ++i) {
193  egr_tlink_fini(&en->eng_group[i]);
196  }
199  (egr_tlist_fini(&en->eng_groups[i]), true));
200  m0_free(en->eng_group);
202 
203  M0_LEAVE();
204 }
205 
206 static void be_engine_lock(struct m0_be_engine *en)
207 {
209 }
210 
211 static void be_engine_unlock(struct m0_be_engine *en)
212 {
214 }
215 
216 static bool be_engine_is_locked(const struct m0_be_engine *en)
217 {
218  return m0_mutex_is_locked(en->eng_cfg->bec_lock);
219 }
220 
221 static bool be_engine_invariant(struct m0_be_engine *en)
222 {
223  return be_engine_is_locked(en);
224 }
225 
226 static uint64_t be_engine_tx_id_allocate(struct m0_be_engine *en)
227 {
228  return en->eng_tx_id_next++;
229 }
230 
231 static struct m0_be_tx *be_engine_tx_peek(struct m0_be_engine *en,
232  enum m0_be_tx_state state)
233 {
235 
236  return etx_tlist_head(&en->eng_txs[state]);
237 }
238 
239 static void be_engine_tx_state_post(struct m0_be_engine *en,
240  struct m0_be_tx *tx,
241  enum m0_be_tx_state state)
242 {
244 
245  etx_tlist_move(&en->eng_txs[M0_BTS_NR], tx);
246  m0_be_tx__state_post(tx, state);
247 }
248 
253 static bool is_engine_at_stopped_or_done(const struct m0_be_engine *en,
254  bool stopped)
255 {
257 
258  return m0_forall(state, M0_BTS_DONE,
259  state < M0_BTS_ACTIVE ? true :
260  etx_tlist_is_empty(&en->eng_txs[state])) &&
261  !m0_tl_exists(etx, tx, &en->eng_txs[M0_BTS_NR],
263  <= m0_be_tx_state(tx) &&
264  m0_be_tx_state(tx) <= M0_BTS_DONE);
265 }
266 
268 {
269  struct m0_be_tx *tx;
270 
272 
273  if (en->eng_exclusive_mode)
274  return NULL;
275 
277  if (tx != NULL && m0_be_tx__is_exclusive(tx)) {
279  if (!en->eng_exclusive_mode)
280  return NULL;
281  }
282 
283  return tx;
284 }
285 
286 /* XXX RENAME IT */
287 static void be_engine_got_tx_open(struct m0_be_engine *en,
288  struct m0_be_tx *tx)
289 {
290  int rc;
291 
293 
294  /* XXX s/tx->t_group/tx_is_recovering */
295  if (tx != NULL && tx->t_group != NULL) {
296  /* XXX this is copypaste */
297  tx->t_log_reserved = true;
299  }
300 
301  while ((tx = be_engine_tx_opening_peek(en)) != NULL) {
302  if (!m0_be_tx_credit_le(&tx->t_prepared,
303  &en->eng_cfg->bec_tx_size_max) ||
306  "tx=%p engine=%p t_prepared="BETXCR_F" "
307  "t_payload_prepared=%" PRIu64 " "
308  "bec_tx_size_max="BETXCR_F
309  " bec_tx_payload_max=%"PRIu64,
310  tx, en, BETXCR_P(&tx->t_prepared),
311  tx->t_payload_prepared,
315  } else {
316  tx->t_log_reserved_size =
318  &en->eng_log, &tx->t_prepared,
319  tx->t_payload_prepared);
321  tx->t_log_reserved_size);
322  if (rc == 0) {
323  tx->t_log_reserved = true;
325  } else {
326  /* Ignore the rest of OPENING transactions.
327  * If we don't ignore them, the big ones
328  * may starve.
329  */
330  break;
331  }
332  }
333  }
334 }
335 
336 static void be_engine_group_timer_cb(struct m0_sm_timer *timer)
337 {
338  struct m0_be_tx_group *gr = M0_AMB(gr, timer, tg_close_timer);
339  struct m0_be_engine *en = gr->tg_engine;
340  struct m0_sm_group *sm_grp = timer->tr_grp;
341 
342  M0_ENTRY("en=%p gr=%p sm_grp=%p", en, gr, sm_grp);
343 
344  be_engine_lock(en);
346  be_engine_group_freeze(en, gr);
347  be_engine_group_tryclose(en, gr);
348  be_engine_unlock(en);
349  M0_LEAVE();
350 }
351 
352 static void be_engine_group_timer_arm(struct m0_sm_group *sm_grp,
353  struct m0_sm_ast *ast)
354 {
355  struct m0_be_tx_group *gr = M0_AMB(gr, ast, tg_close_timer_arm);
356  struct m0_sm_timer *timer = &gr->tg_close_timer;
357  m0_time_t deadline = gr->tg_close_deadline;
358  int rc;
359 
360  M0_ENTRY("en=%p gr=%p sm_grp=%p", gr->tg_engine, gr, sm_grp);
361  m0_sm_timer_fini(timer);
362  m0_sm_timer_init(timer);
364  deadline);
365  M0_ASSERT_INFO(rc == 0, "rc = %d", rc);
366  M0_LEAVE();
367 }
368 
369 static void be_engine_group_timer_disarm(struct m0_sm_group *sm_grp,
370  struct m0_sm_ast *ast)
371 {
372  struct m0_be_tx_group *gr = M0_AMB(gr, ast, tg_close_timer_disarm);
373  struct m0_be_engine *en = gr->tg_engine;
374  struct m0_sm_timer *timer = &gr->tg_close_timer;
375 
376  M0_ENTRY("en=%p gr=%p sm_grp=%p", en, gr, sm_grp);
377  if (m0_sm_timer_is_armed(timer))
378  m0_sm_timer_cancel(timer);
379  m0_sm_ast_cancel(sm_grp, &gr->tg_close_timer_arm);
380  M0_LEAVE();
381 }
382 
383 static void be_engine_group_freeze(struct m0_be_engine *en,
384  struct m0_be_tx_group *gr)
385 {
387 
388  if (gr->tg_state == M0_BGS_OPEN)
390 }
391 
392 static void be_engine_group_tryclose(struct m0_be_engine *en,
393  struct m0_be_tx_group *gr)
394 {
395  struct m0_be_tx_group *group;
396 
398 
399  if (gr->tg_nr_unclosed == 0 && gr->tg_state == M0_BGS_FROZEN) {
404  &gr->tg_close_timer_disarm);
405 
406  M0_ASSERT(egr_tlist_is_empty(&en->eng_groups[M0_BGS_OPEN]) &&
407  egr_tlist_is_empty(&en->eng_groups[M0_BGS_FROZEN]));
408  group = egr_tlist_head(&en->eng_groups[M0_BGS_READY]);
409  if (group != NULL) {
410  /*
411  * XXX be_engine_tx_trygroup() calls
412  * be_engine_group_tryclose() in some cases.
413  * Therefore, this can be recursive call.
414  */
416  }
417  }
418 }
419 
421  struct m0_be_tx_group *gr)
422 {
423  struct m0_sm_group *sm_grp = m0_be_tx_group__sm_group(gr);
427  uint64_t grouping_q_length;
428  uint64_t tx_per_group_max;
429 
430  M0_ENTRY("en=%p gr=%p sm_grp=%p", en, gr, sm_grp);
432 
433  grouping_q_length = etx_tlist_length(&en->eng_txs[M0_BTS_GROUPING]);
434  M0_ASSERT(grouping_q_length > 0);
435  tx_per_group_max = en->eng_cfg->bec_group_cfg.tgc_tx_nr_max;
436  grouping_q_length = min_check(grouping_q_length, tx_per_group_max);
437  delay = t_min + (t_max - t_min) * grouping_q_length / tx_per_group_max;
441  m0_sm_ast_post(sm_grp, &gr->tg_close_timer_arm);
442  M0_LEAVE("grouping_q_length=%" PRIu64 " delay=%"PRIu64,
443  grouping_q_length, delay);
444 }
445 
447 {
448  return m0_tl_find(egr, gr, &en->eng_groups[M0_BGS_OPEN],
450 }
451 
452 static int be_engine_tx_trygroup(struct m0_be_engine *en,
453  struct m0_be_tx *tx)
454 {
455  struct m0_be_tx_group *gr;
456  int rc = -EBUSY;
457 
460 
461  while ((gr = be_engine_group_find(en)) != NULL) {
462  if (tx->t_grouped) {
463  /*
464  * The tx is already grouped in a recursive
465  * be_engine_tx_trygroup() call.
466  */
467  rc = -ELOOP;
468  break;
469  }
470  if (m0_be_tx__is_exclusive(tx) && m0_be_tx_group_tx_nr(gr) > 0) {
471  rc = -EBUSY;
472  } else if (etx_tlist_length(&en->eng_txs[M0_BTS_ACTIVE]) >=
473  en->eng_cfg->bec_tx_active_max) {
474  rc = -ENOSPC;
475  } else {
476  rc = m0_be_tx_group_tx_add(gr, tx);
477  if (rc == 0)
478  m0_be_tx__group_assign(tx, gr);
479  }
480  if (rc == -EXFULL ||
481  m0_be_tx__is_fast(tx) ||
483  be_engine_group_freeze(en, gr);
484  } else if (rc == 0 && m0_be_tx_group_tx_nr(gr) == 1) {
486  }
487  if (M0_IN(rc, (-EBUSY, -EXFULL)))
488  be_engine_group_tryclose(en, gr);
489  if (M0_IN(rc, (0, -ENOSPC)))
490  break;
491  }
492  if (rc == 0) {
493  tx->t_grouped = true;
495  }
496  return M0_RC(rc);
497 }
498 
500 {
502 
504  egr_tlist_is_empty(&en->eng_groups[M0_BGS_FROZEN]) &&
505  egr_tlist_is_empty(&en->eng_groups[M0_BGS_CLOSED]);
506 }
507 
509 {
511 
512  en->eng_recovery_finished = true;
514 }
515 
516 static void be_engine_try_recovery(struct m0_be_engine *en)
517 {
518  struct m0_be_tx_group *gr;
519  bool group_recovery_started = false;
520 
521  M0_ENTRY();
523 
525  gr = be_engine_group_find(en);
526  if (gr == NULL)
527  break;
529  be_engine_group_freeze(en, gr);
530  be_engine_group_tryclose(en, gr);
531  group_recovery_started = true;
532  }
533  if (!group_recovery_started && !en->eng_recovery_finished &&
536  }
537  M0_LEAVE("group_recovery_started=%d eng_recovery_finished=%d",
538  !!group_recovery_started, !!en->eng_recovery_finished);
539 }
540 
542  enum m0_be_tx_state state)
543 {
545 
546  return m0_tl_find(etx, tx, &en->eng_txs[state],
548 }
549 
550 /* XXX RENAME IT */
552 {
553  struct m0_be_tx *tx;
554  int rc;
555 
557 
558  /* Close recovering transactions */
559  while ((tx = be_engine_recovery_tx_find(en, M0_BTS_GROUPING)) != NULL) {
560  /*
561  * Group is already closed, we just need to add tx to the group.
562  */
565  rc = m0_be_tx_group_tx_add(tx->t_group, tx);
566  M0_ASSERT_INFO(rc == 0, "rc = %d", rc);
568  }
569  /* Close regular transactions */
570  while ((tx = be_engine_tx_peek(en, M0_BTS_GROUPING)) != NULL) {
571  rc = be_engine_tx_trygroup(en, tx);
572  if (rc != 0)
573  break;
574  /*
575  * XXX if be_engine_tx_trygroup return eny error, upper levels
576  * will not be informed. Actually, I saw situation when
577  * be_engine_tx_trygroup() does not find transaction and returns
578  * EBUSY, then transaction is forever in grouping state.
579  */
580  }
581 }
582 static void be_engine_got_tx_closed(struct m0_be_engine *en,
583  struct m0_be_tx *tx)
584 {
585  struct m0_be_tx_group *gr = tx->t_group;
586 
588 
590  m0_be_tx_group_tx_closed(gr, tx);
592  be_engine_group_tryclose(en, gr);
593 }
594 
595 static void be_engine_got_tx_done(struct m0_be_engine *en, struct m0_be_tx *tx)
596 {
597  struct m0_be_tx_group *gr = tx->t_group;
598 
600 
602  if (gr->tg_nr_unstable == 0)
604  tx->t_group = NULL;
605 
606  if (m0_be_tx__is_exclusive(tx)) {
608  en->eng_exclusive_mode = false;
609  }
610 }
611 
612 M0_INTERNAL bool m0_be_engine__invariant(struct m0_be_engine *en)
613 {
614  bool rc_bool;
615 
616  be_engine_lock(en);
617  rc_bool = be_engine_invariant(en);
618  be_engine_unlock(en);
619 
620  return M0_RC(rc_bool);
621 }
622 
623 M0_INTERNAL void m0_be_engine__tx_init(struct m0_be_engine *en,
624  struct m0_be_tx *tx,
625  enum m0_be_tx_state state)
626 {
627  etx_tlink_init(tx);
628  m0_be_engine__tx_state_set(en, tx, state);
629 }
630 
631 M0_INTERNAL void m0_be_engine__tx_fini(struct m0_be_engine *en,
632  struct m0_be_tx *tx)
633 {
634  be_engine_lock(en);
635  etx_tlink_del_fini(tx);
636  be_engine_unlock(en);
637 }
638 
639 M0_INTERNAL void m0_be_engine__tx_state_set(struct m0_be_engine *en,
640  struct m0_be_tx *tx,
641  enum m0_be_tx_state state)
642 {
643  be_engine_lock(en);
645 
646  M0_LOG(M0_DEBUG, "tx %p: => %s", tx, m0_be_tx_state_name(state));
647 
648  if (state != M0_BTS_PREPARE)
649  etx_tlist_del(tx);
650  etx_tlist_add_tail(&en->eng_txs[state], tx);
651 
652  switch (state) {
653  case M0_BTS_PREPARE:
654  /* TODO don't assign id for recovering tx */
655  tx->t_id = be_engine_tx_id_allocate(en);
656  tx->t_grouped = false;
657  M0_LOG(M0_DEBUG, "tx=%p t_id=%"PRIu64, tx, tx->t_id);
658  break;
659  case M0_BTS_OPENING:
660  be_engine_got_tx_open(en, tx);
661  break;
662  case M0_BTS_GROUPING:
664  break;
665  case M0_BTS_CLOSED:
666  be_engine_got_tx_closed(en, tx);
667  break;
668  case M0_BTS_DONE:
669  be_engine_got_tx_done(en, tx);
670  break;
671  case M0_BTS_FAILED:
672  if (tx->t_log_reserved)
674  tx->t_log_reserved_size);
675  break;
676  default:
677  break;
678  }
679 
681  be_engine_unlock(en);
682 }
683 
684 M0_INTERNAL void m0_be_engine__tx_force(struct m0_be_engine *en,
685  struct m0_be_tx *tx)
686 {
687  struct m0_be_tx_group *grp;
688 
689 
690  M0_ENTRY("en=%p tx=%p", en, tx);
691 
692  /*
693  * Note: as multiple txs may try to move tx group's fom (for example,
694  * a new tx is added to the tx group or multiple txs call
695  * m0_be_tx_force()), we use be engine's lock here
696  */
697  be_engine_lock(en);
698 
699  grp = tx->t_group;
700  if (grp == NULL) {
701  be_engine_unlock(en);
702  return;
703  }
704 
705  /*
706  * Is it possible that the tx has been committed to disk while
707  * we were waiting for the lock?
708  */
709  /* XXX race here. Let's disable this completely. */
710  // if (m0_be_tx_state(tx) < M0_BTS_LOGGED)
711  // be_engine_group_close(en, grp, true);
712 
713  be_engine_unlock(en);
714 }
715 
716 static void be_engine_tx_group_open(struct m0_be_engine *en,
717  struct m0_be_tx_group *gr)
718 {
720 
724 }
725 
726 static void be_engine_tx_group_ready(struct m0_be_engine *en,
727  struct m0_be_tx_group *gr)
728 {
730 
732  if (egr_tlist_is_empty(&en->eng_groups[M0_BGS_OPEN]) &&
733  egr_tlist_is_empty(&en->eng_groups[M0_BGS_FROZEN])) {
734  be_engine_tx_group_open(en, gr);
735  }
736 }
737 
738 M0_INTERNAL void m0_be_engine__tx_group_ready(struct m0_be_engine *en,
739  struct m0_be_tx_group *gr)
740 {
741  M0_ENTRY("en=%p gr=%p", en, gr);
742  be_engine_lock(en);
744 
745  be_engine_tx_group_ready(en, gr);
748 
750  be_engine_unlock(en);
751  M0_LEAVE();
752 }
753 
754 static void be_engine_group_stop_nr(struct m0_be_engine *en, size_t nr)
755 {
756  size_t i;
757 
759 
760  for (i = 0; i < nr; ++i) {
761  /*
762  * XXX engine lock-unlock is temporary solution
763  * to prevent deadlock.
764  */
765  be_engine_unlock(en);
767  be_engine_lock(en);
768  egr_tlist_del(&en->eng_group[i]);
769  }
770 }
771 
772 static int be_engine_group_start(struct m0_be_engine *en, size_t index)
773 {
774  int rc;
775 
777 
779  if (rc != 0)
780  return M0_RC(rc);
781  /*
782  * group is moved to READY state in
783  * be_engine_tx_group_ready().
784  */
785  egr_tlist_add_tail(&en->eng_groups[M0_BGS_CLOSED],
786  &en->eng_group[index]);
789  return M0_RC(0);
790 }
791 
792 M0_INTERNAL int m0_be_engine_start(struct m0_be_engine *en)
793 {
794  m0_time_t recovery_time = 0;
795  int rc = 0;
796  size_t i;
797 
798  M0_ENTRY();
799  be_engine_lock(en);
801 
802  /*
803  * Run BE recovery having only one group.
804  * This prevents having m0_be_tx_group_reapply() called in wrong order,
805  * i.e. not in the same order as corresponding log records in the log.
806  * See EOS-7888 and linked tickets for an example of what happens if
807  * the order of m0_be_tx_group_reapply() is wrong.
808  *
809  * An alternative solution would be to enforce the right order for
810  * m0_be_tx_group_reapply(), but with group rewrite this solution
811  * wouldn't be needed anymore. One group for recovery is just easier.
812  */
813  rc = be_engine_group_start(en, 0);
814  if (rc != 0)
815  return M0_ERR(rc);
816 
817  recovery_time = m0_time_now();
819 
821  be_engine_unlock(en);
822 
823  if (en->eng_cfg->bec_wait_for_recovery) {
825  recovery_time = m0_time_now() - recovery_time;
826  M0_LOG(M0_INFO, "BE recovery execution time: %"PRIu64,
827  recovery_time);
828  /* XXX workaround BEGIN */
829  if (!en->eng_cfg->bec_domain->bd_cfg.bc_mkfs_mode) {
831  en->eng_cfg->bec_domain));
832  }
833  /* XXX workaround END */
834  }
835  be_engine_lock(en);
836  for (i = 1; i < en->eng_group_nr; ++i) {
837  rc = be_engine_group_start(en, i);
838  if (rc != 0)
839  break;
840  }
841  if (rc != 0)
843  be_engine_unlock(en);
844  return rc == 0 ? M0_RC(rc) : M0_ERR(rc);
845 }
846 
847 M0_INTERNAL void m0_be_engine_stop(struct m0_be_engine *en)
848 {
849  M0_ENTRY();
850  be_engine_lock(en);
852 
854 
856  be_engine_unlock(en);
857  M0_LEAVE();
858 }
859 
860 M0_INTERNAL void m0_be_engine_got_log_space_cb(struct m0_be_log *log)
861 {
862  struct m0_be_engine *en =
863  container_of(log, struct m0_be_engine, eng_log);
864 
865  M0_ENTRY("en=%p log=%p", en, log);
866 
868 
870 
872 }
873 
874 M0_INTERNAL void m0_be_engine_full_log_cb(struct m0_be_log *log)
875 {
876  struct m0_be_engine *en =
877  container_of(log, struct m0_be_engine, eng_log);
878  struct m0_be_domain *dom = en->eng_domain;
879 
880  M0_ENTRY("en=%p dom=%p log=%p", en, dom, log);
881 
883 
884  m0_be_log_discard_sync(&dom->bd_log_discard);
885 }
886 
887 M0_INTERNAL struct m0_be_tx *m0_be_engine__tx_find(struct m0_be_engine *en,
888  uint64_t id)
889 {
890  struct m0_be_tx *tx = NULL;
891  size_t i;
892 
893  M0_ENTRY("en=%p id=%"PRIu64, en, id);
894 
895  be_engine_lock(en);
897 
898  for (i = 0; i < ARRAY_SIZE(en->eng_txs); ++i) {
899  tx = m0_tl_find(etx, tx, &en->eng_txs[i], tx->t_id == id);
900  if (tx != NULL) {
901  if (M0_IN(m0_be_tx_state(tx),
903  tx = NULL;
904  }
905  break;
906  }
907  }
908 
910  be_engine_unlock(en);
911 
912  if (tx != NULL)
913  m0_be_tx_get(tx);
914 
915  M0_LEAVE("en=%p tx=%p state=%s", en, tx,
916  tx == NULL ? "" : m0_be_tx_state_name(m0_be_tx_state(tx)));
917  return tx;
918 }
919 
920 M0_INTERNAL int
922  struct m0_be_tx *excl)
923 {
924  bool ret;
925 
926  be_engine_lock(en);
927 
928  ret = m0_forall(state, M0_BTS_DONE, state < M0_BTS_CLOSED ? true :
929  etx_tlist_is_empty(&en->eng_txs[state])) &&
930  etx_tlist_length(&en->eng_txs[M0_BTS_ACTIVE]) == 1 &&
931  etx_tlist_head(&en->eng_txs[M0_BTS_ACTIVE]) == excl;
932 
933  be_engine_unlock(en);
934  return ret;
935 }
936 
937 M0_INTERNAL void m0_be_engine_tx_size_max(struct m0_be_engine *en,
938  struct m0_be_tx_credit *cred,
939  m0_bcount_t *payload_size)
940 {
941  if (cred != NULL)
942  *cred = en->eng_cfg->bec_tx_size_max;
943  if (payload_size != NULL)
944  *payload_size = en->eng_cfg->bec_tx_payload_max;
945 }
946 
947 M0_INTERNAL void m0_be_engine__group_limits(struct m0_be_engine *en,
948  uint32_t *group_nr,
949  uint32_t *tx_per_group)
950 {
951  if (group_nr != NULL)
952  *group_nr = en->eng_cfg->bec_group_nr;
953  if (tx_per_group != NULL)
954  *tx_per_group = en->eng_cfg->bec_group_cfg.tgc_tx_nr_max;
955 }
956 
958 #undef M0_TRACE_SUBSYSTEM
959 
960 /*
961  * Local variables:
962  * c-indentation-style: "K&R"
963  * c-basic-offset: 8
964  * tab-width: 8
965  * fill-column: 80
966  * scroll-step: 1
967  * End:
968  */
969 /*
970  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
971  */
m0_bcount_t tgc_payload_max
Definition: tx_group.h:66
struct m0_be_tx_credit t_prepared
Definition: tx.h:300
M0_INTERNAL void m0_be_engine_fini(struct m0_be_engine *en)
Definition: engine.c:171
struct m0_be_domain * bec_domain
Definition: engine.h:88
static size_t nr
Definition: dump.c:1505
#define M0_PRE(cond)
static bool be_engine_recovery_is_finished(struct m0_be_engine *en)
Definition: engine.c:499
m0_time_t bec_group_freeze_timeout_max
Definition: engine.h:81
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_TL_DEFINE(etx, M0_INTERNAL, struct m0_be_tx)
static void be_engine_group_timer_cb(struct m0_sm_timer *timer)
Definition: engine.c:336
M0_INTERNAL void m0_be_tx_group_close(struct m0_be_tx_group *gr)
Definition: tx_group.c:147
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static void be_engine_tx_state_post(struct m0_be_engine *en, struct m0_be_tx *tx, enum m0_be_tx_state state)
Definition: engine.c:239
struct m0_semaphore eng_recovery_wait_sem
Definition: engine.h:119
#define BETXCR_F
Definition: tx_credit.h:102
static void be_engine_tx_group_state_move(struct m0_be_engine *en, struct m0_be_tx_group *gr, enum m0_be_tx_group_state state)
Definition: engine.c:63
bool bec_wait_for_recovery
Definition: engine.h:86
M0_INTERNAL struct m0_be_tx * m0_be_engine__tx_find(struct m0_be_engine *en, uint64_t id)
Definition: engine.c:887
M0_INTERNAL int m0_be_tx_group_init(struct m0_be_tx_group *gr, struct m0_be_tx_group_cfg *gr_cfg)
Definition: tx_group.c:201
M0_INTERNAL void m0_be_tx_group_recovery_prepare(struct m0_be_tx_group *gr, struct m0_be_log *log)
Definition: tx_group.c:462
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_be_seg_dict_init(struct m0_be_seg *seg)
Definition: seg_dict.c:160
m0_bcount_t t_log_reserved_size
Definition: tx.h:378
static void be_engine_group_timer_arm(struct m0_sm_group *sm_grp, struct m0_sm_ast *ast)
Definition: engine.c:352
M0_INTERNAL bool m0_be_log_recovery_record_available(struct m0_be_log *log)
Definition: log.c:1214
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
m0_be_tx_state
Definition: tx.h:214
M0_INTERNAL struct m0_sm_group * m0_be_tx_group__sm_group(struct m0_be_tx_group *gr)
Definition: tx_group.c:73
M0_INTERNAL void m0_be_engine__tx_group_ready(struct m0_be_engine *en, struct m0_be_tx_group *gr)
Definition: engine.c:738
M0_INTERNAL void m0_be_tx_group_tx_closed(struct m0_be_tx_group *gr, struct m0_be_tx *tx)
Definition: tx_group.c:325
struct m0_be_log_discard * tgc_log_discard
Definition: tx_group.h:73
static struct m0_sm_group * grp
Definition: bytecount.c:38
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
#define min_check(a, b)
Definition: arith.h:88
static int be_engine_tx_trygroup(struct m0_be_engine *en, struct m0_be_tx *tx)
Definition: engine.c:452
static struct m0_be_tx * be_engine_tx_opening_peek(struct m0_be_engine *en)
Definition: engine.c:267
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
static struct m0_be_tx * be_engine_recovery_tx_find(struct m0_be_engine *en, enum m0_be_tx_state state)
Definition: engine.c:541
static struct m0_be_tx_group * be_engine_group_find(struct m0_be_engine *en)
Definition: engine.c:446
struct m0_be_domain * tgc_domain
Definition: tx_group.h:68
static int delay
Definition: dump.c:174
struct m0_mutex * bec_lock
Definition: engine.h:94
static void be_engine_group_tryclose(struct m0_be_engine *en, struct m0_be_tx_group *gr)
Definition: engine.c:392
M0_INTERNAL int m0_be_engine_start(struct m0_be_engine *en)
Definition: engine.c:792
uint64_t t_magic
Definition: tlist.h:296
M0_INTERNAL struct m0_be_seg * m0_be_domain_seg0_get(struct m0_be_domain *dom)
Definition: domain.c:466
struct m0_sm_timer tg_close_timer
Definition: tx_group.h:141
uint64_t m0_bcount_t
Definition: types.h:77
M0_TL_DESCR_DEFINE(etx, "m0_be_engine::eng_txs[]", M0_INTERNAL, struct m0_be_tx, t_engine_linkage, t_magic, M0_BE_TX_MAGIC, M0_BE_TX_ENGINE_MAGIC)
static void be_engine_recovery_finish(struct m0_be_engine *en)
Definition: engine.c:508
Definition: sm.h:504
struct m0_be_engine * tg_engine
Definition: tx_group.h:133
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL int m0_be_tx_group_start(struct m0_be_tx_group *gr)
Definition: tx_group.c:393
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
struct m0_be_engine_cfg * eng_cfg
Definition: engine.h:98
static void be_engine_group_timer_disarm(struct m0_sm_group *sm_grp, struct m0_sm_ast *ast)
Definition: engine.c:369
struct m0_be_tx_credit bec_tx_size_max
Definition: engine.h:73
struct m0_sm_ast tg_close_timer_arm
Definition: tx_group.h:142
static bool stopped
Definition: net.c:29
M0_INTERNAL int m0_be_engine__exclusive_open_invariant(struct m0_be_engine *en, struct m0_be_tx *excl)
Definition: engine.c:921
uint64_t bec_tx_active_max
Definition: engine.h:59
static void be_engine_tx_group_open(struct m0_be_engine *en, struct m0_be_tx_group *gr)
Definition: engine.c:716
struct m0_be_pd * tgc_pd
Definition: tx_group.h:74
struct m0_be_domain_cfg bd_cfg
Definition: domain.h:135
M0_INTERNAL bool m0_be_tx__is_fast(struct m0_be_tx *tx)
Definition: tx.c:625
struct m0_be_log eng_log
Definition: engine.h:106
return M0_RC(rc)
struct m0_tl eng_groups[M0_BGS_NR]
Definition: engine.h:104
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
m0_time_t bec_group_freeze_timeout_limit
Definition: engine.h:82
struct m0_reqh * bec_reqh
Definition: engine.h:84
static int be_engine_group_start(struct m0_be_engine *en, size_t index)
Definition: engine.c:772
struct m0_be_pd * bec_pd
Definition: engine.h:90
uint32_t tg_nr_unstable
Definition: tx_group.h:118
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
M0_INTERNAL void m0_be_engine_got_log_space_cb(struct m0_be_log *log)
Definition: engine.c:860
uint64_t eng_tx_id_next
Definition: engine.h:111
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
Definition: sm.c:628
struct m0_be_log * tgc_log
Definition: tx_group.h:72
return M0_ERR(-EOPNOTSUPP)
struct m0_be_tx_group * eng_group
Definition: engine.h:108
Definition: trace.h:482
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
struct m0_be_tx_group * t_group
Definition: tx.h:372
M0_INTERNAL void m0_be_engine__tx_state_set(struct m0_be_engine *en, struct m0_be_tx *tx, enum m0_be_tx_state state)
Definition: engine.c:639
static uint64_t be_engine_tx_id_allocate(struct m0_be_engine *en)
Definition: engine.c:226
m0_time_t tg_close_deadline
Definition: tx_group.h:144
struct m0_be_tx_credit tgc_size_max
Definition: tx_group.h:61
#define M0_ASSERT(cond)
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
static void be_engine_group_freeze(struct m0_be_engine *en, struct m0_be_tx_group *gr)
Definition: engine.c:383
struct m0_sm_ast tg_close_timer_disarm
Definition: tx_group.h:143
struct m0_be_log_discard * bec_log_discard
Definition: engine.h:89
static void be_engine_group_stop_nr(struct m0_be_engine *en, size_t nr)
Definition: engine.c:754
M0_INTERNAL void m0_be_tx__group_assign(struct m0_be_tx *tx, struct m0_be_tx_group *gr)
Definition: tx.c:689
struct m0_sm_group * tr_grp
Definition: sm.h:612
bool t_grouped
Definition: tx.h:407
m0_time_t m0_time_now(void)
Definition: time.c:134
bool t_log_reserved
Definition: tx.h:376
static void be_engine_got_tx_closed(struct m0_be_engine *en, struct m0_be_tx *tx)
Definition: engine.c:582
static struct m0_stob_domain * dom
Definition: storage.c:38
M0_INTERNAL bool m0_be_tx_group_is_recovering(struct m0_be_tx_group *gr)
Definition: tx_group.c:78
static bool be_engine_is_locked(const struct m0_be_engine *en)
Definition: engine.c:216
struct m0_be_domain * eng_domain
Definition: engine.h:118
M0_INTERNAL size_t m0_be_tx_group_tx_nr(struct m0_be_tx_group *gr)
Definition: tx_group.c:383
unsigned long tgc_tx_nr_max
Definition: tx_group.h:57
M0_INTERNAL bool m0_be_engine__invariant(struct m0_be_engine *en)
Definition: engine.c:612
bool eng_exclusive_mode
Definition: engine.h:117
m0_bcount_t t_payload_prepared
Definition: tx.h:365
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
#define M0_POST(cond)
struct m0_reqh * tgc_reqh
Definition: tx_group.h:76
M0_INTERNAL bool m0_be_tx__is_exclusive(const struct m0_be_tx *tx)
Definition: tx.c:649
static void group(void)
Definition: sm.c:386
m0_be_tx_group_state
Definition: tx_group.h:47
#define BETXCR_P(c)
Definition: tx_credit.h:113
static void be_engine_got_tx_done(struct m0_be_engine *en, struct m0_be_tx *tx)
Definition: engine.c:595
M0_INTERNAL void m0_be_tx_get(struct m0_be_tx *tx)
Definition: stubs.c:167
uint64_t t_id
Definition: tx.h:284
static void be_engine_tx_group_ready(struct m0_be_engine *en, struct m0_be_tx_group *gr)
Definition: engine.c:726
struct m0_tl eng_txs[M0_BTS_NR+1]
Definition: engine.h:103
M0_INTERNAL void m0_be_engine__tx_fini(struct m0_be_engine *en, struct m0_be_tx *tx)
Definition: engine.c:631
M0_INTERNAL void m0_be_engine_tx_size_max(struct m0_be_engine *en, struct m0_be_tx_credit *cred, m0_bcount_t *payload_size)
Definition: engine.c:937
M0_INTERNAL int m0_be_log_reserve(struct m0_be_log *log, m0_bcount_t size)
Definition: log.c:850
M0_INTERNAL void m0_be_engine__group_limits(struct m0_be_engine *en, uint32_t *group_nr, uint32_t *tx_per_group)
Definition: engine.c:947
static void be_engine_group_timeout_arm(struct m0_be_engine *en, struct m0_be_tx_group *gr)
Definition: engine.c:420
static bool be_engine_invariant(struct m0_be_engine *en)
Definition: engine.c:221
M0_INTERNAL void m0_be_tx_group_stop(struct m0_be_tx_group *gr)
Definition: tx_group.c:398
#define m0_forall(var, nr,...)
Definition: misc.h:112
M0_INTERNAL int m0_be_tx_group_tx_add(struct m0_be_tx_group *gr, struct m0_be_tx *tx)
Definition: tx_group.c:286
M0_INTERNAL void m0_be_engine_stop(struct m0_be_engine *en)
Definition: engine.c:847
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
Definition: sm.c:610
M0_INTERNAL void m0_be_log_discard_sync(struct m0_be_log_discard *ld)
Definition: log_discard.c:367
M0_INTERNAL int m0_be_tx_service_init(struct m0_be_engine *en, struct m0_reqh *reqh)
Definition: tx_service.c:117
static void be_engine_got_tx_grouping(struct m0_be_engine *en)
Definition: engine.c:551
struct m0_be_engine * tgc_engine
Definition: tx_group.h:70
size_t bec_group_nr
Definition: engine.h:61
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
static struct m0_be_tx * be_engine_tx_peek(struct m0_be_engine *en, enum m0_be_tx_state state)
Definition: engine.c:231
m0_time_t bec_group_freeze_timeout_min
Definition: engine.h:80
M0_INTERNAL void m0_be_tx_service_fini(struct m0_be_engine *en)
Definition: tx_service.c:125
M0_INTERNAL void m0_be_tx_group_fini(struct m0_be_tx_group *gr)
Definition: tx_group.c:252
M0_INTERNAL m0_bcount_t m0_be_group_format_log_reserved_size(struct m0_be_log *log, struct m0_be_tx_credit *cred, m0_bcount_t cred_payload)
M0_INTERNAL void m0_be_engine__tx_force(struct m0_be_engine *en, struct m0_be_tx *tx)
Definition: engine.c:684
M0_INTERNAL int m0_be_engine_init(struct m0_be_engine *en, struct m0_be_domain *dom, struct m0_be_engine_cfg *en_cfg)
Definition: engine.c:103
M0_INTERNAL bool m0_be_tx__is_recovering(struct m0_be_tx *tx)
Definition: tx.c:659
static void be_engine_try_recovery(struct m0_be_engine *en)
Definition: engine.c:516
static bool is_engine_at_stopped_or_done(const struct m0_be_engine *en, bool stopped)
Definition: engine.c:253
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
#define M0_ASSERT_INFO(cond, fmt,...)
struct m0_be_tx_group_cfg bec_group_cfg
Definition: engine.h:71
M0_INTERNAL void m0_be_engine_full_log_cb(struct m0_be_log *log)
Definition: engine.c:874
struct m0_be_tx_group_cfg * bec_groups_cfg
Definition: engine.h:92
M0_INTERNAL const char * m0_be_tx_state_name(enum m0_be_tx_state state)
Definition: tx.c:417
M0_INTERNAL void m0_sm_ast_cancel(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:183
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
M0_INTERNAL void m0_be_tx_group_stable(struct m0_be_tx_group *gr)
Definition: tx_group.c:65
Definition: log.h:261
M0_INTERNAL bool m0_be_tx_credit_le(const struct m0_be_tx_credit *c0, const struct m0_be_tx_credit *c1)
Definition: tx_credit.c:98
enum m0_be_tx_group_state tg_state
Definition: tx_group.h:146
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
Definition: sm.c:566
Definition: tx.h:270
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
Definition: sm.c:559
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
static void be_engine_unlock(struct m0_be_engine *en)
Definition: engine.c:211
bool bc_mkfs_mode
Definition: domain.h:98
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL void m0_be_tx__state_post(struct m0_be_tx *tx, enum m0_be_tx_state state)
Definition: tx.c:524
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
uint32_t tg_nr_unclosed
Definition: tx_group.h:114
static void be_engine_lock(struct m0_be_engine *en)
Definition: engine.c:206
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
Definition: sm.c:577
bool eng_recovery_finished
Definition: engine.h:120
size_t eng_group_nr
Definition: engine.h:109
#define m0_tl_exists(name, var, head,...)
Definition: tlist.h:774
static int be_engine_cfg_validate(struct m0_be_engine_cfg *en_cfg)
Definition: engine.c:89
M0_INTERNAL void m0_be_engine__tx_init(struct m0_be_engine *en, struct m0_be_tx *tx, enum m0_be_tx_state state)
Definition: engine.c:623
Definition: tx.h:280
static void be_engine_got_tx_open(struct m0_be_engine *en, struct m0_be_tx *tx)
Definition: engine.c:287
M0_INTERNAL void m0_be_log_unreserve(struct m0_be_log *log, m0_bcount_t size)
Definition: log.c:868
m0_bcount_t bec_tx_payload_max
Definition: engine.h:75