Motr  M0
tx_group_fom.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_BE
24 #include "lib/trace.h"
25 
26 #include "be/tx_group_fom.h"
27 
28 #include "lib/misc.h" /* M0_BITS */
29 #include "rpc/rpc_opcodes.h" /* M0_BE_TX_GROUP_OPCODE */
30 
31 #include "be/tx_group.h"
32 #include "be/tx_service.h" /* m0_be_txs_stype */
33 
39 static struct m0_be_tx_group_fom *fom2tx_group_fom(const struct m0_fom *fom);
40 static void tx_group_fom_fini(struct m0_fom *fom);
41 
42 /* ------------------------------------------------------------------
43  * State definitions
44  * ------------------------------------------------------------------ */
45 
91  TGS_PREPARE, /* XXX rename it */
92  TGS_LOGGING, /* XXX rename it */
94  TGS_RECONSTRUCT, /* XXX rename it? */
95  TGS_TX_OPEN, /* XXX rename it? */
96  TGS_TX_CLOSE, /* XXX rename it? */
97  TGS_REAPPLY, /* XXX rename it? */
113 };
114 
116 #define _S(name, flags, allowed) \
117  [name] = { \
118  .sd_flags = flags, \
119  .sd_name = #name, \
120  .sd_allowed = allowed \
121  }
122 
140 #undef _S
141 };
142 
143 const static struct m0_sm_conf tx_group_fom_conf = {
144  .scf_name = "m0_be_tx_group_fom",
145  .scf_nr_states = ARRAY_SIZE(tx_group_fom_states),
146  .scf_state = tx_group_fom_states,
147 };
148 
149 static int tx_group_fom_tick(struct m0_fom *fom)
150 {
151  enum tx_group_fom_state phase = m0_fom_phase(fom);
153  struct m0_be_tx_group *gr = m->tgf_group;
154  struct m0_be_op *op = &m->tgf_op;
155  int rc;
156 
157  M0_ENTRY("tx_group_fom=%p group=%p phase=%s", m, gr,
158  m0_fom_phase_name(fom, phase));
159 
160  switch (phase) {
161  case TGS_INIT:
164  m0_semaphore_up(&m->tgf_start_sem);
165  return M0_FSO_WAIT;
166  case TGS_OPEN:
167  if (m->tgf_stopping) {
169  return M0_FSO_AGAIN;
170  }
171  return M0_FSO_WAIT;
172  case TGS_PREPARE:
176  case TGS_LOGGING:
178  if (m->tgf_recovery_mode) {
181  } else {
185  }
186  case TGS_RECONSTRUCT:
188  M0_ASSERT_INFO(rc == 0, "rc = %d", rc); /* XXX notify engine */
190  &m->tgf_gen.fo_loc->fl_group);
191  M0_ASSERT_INFO(rc == 0, "rc = %d", rc); /* XXX notify engine */
193  return M0_FSO_AGAIN;
194  case TGS_TX_OPEN:
198  case TGS_TX_CLOSE:
199  m0_be_op_reset(&m->tgf_op_gc);
200  /* m0_be_op_tick_ret() for the op is in TGS_TX_GC_WAIT phase */
201  m0_be_tx_group_reconstruct_tx_close(gr, &m->tgf_op_gc);
203  return M0_FSO_AGAIN;
204  case TGS_REAPPLY:
207  M0_ASSERT_INFO(rc == 0, "rc = %d", rc); /* XXX notify engine */
209  case TGS_PLACING:
215  case TGS_PLACED:
218  return M0_FSO_AGAIN;
219  case TGS_STABILIZING:
220  if (m->tgf_stable) {
222  return M0_FSO_AGAIN;
223  }
224  return M0_FSO_WAIT;
225  case TGS_STABLE:
226  m0_fom_phase_set(fom, m->tgf_recovery_mode ?
228  return M0_FSO_AGAIN;
229  case TGS_TX_GC_WAIT:
230  return m0_be_op_tick_ret(&m->tgf_op_gc, fom, TGS_RESET);
231  case TGS_RESET:
235  return M0_FSO_AGAIN;
236  case TGS_STOPPING:
239  return M0_FSO_WAIT;
240  case TGS_FAILED:
242  return M0_FSO_WAIT;
243  default:
244  M0_IMPOSSIBLE("Invalid phase: %d", phase);
245  }
246 
247  M0_LEAVE();
248  return M0_FSO_WAIT;
249 }
250 
251 static void tx_group_fom_fini(struct m0_fom *fom)
252 {
254 
255  m0_fom_fini(fom);
256  m0_semaphore_up(&m->tgf_finish_sem);
257 }
258 
259 static size_t tx_group_fom_locality(const struct m0_fom *fom)
260 {
261  return 0; /* XXX TODO: reconsider */
262 }
263 
264 static const struct m0_fom_ops tx_group_fom_ops = {
266  .fo_tick = tx_group_fom_tick,
267  .fo_home_locality = tx_group_fom_locality
268 };
269 
271 
272 static const struct m0_fom_type_ops tx_group_fom_type_ops = {
273  .fto_create = NULL
274 };
275 
276 static struct m0_be_tx_group_fom *fom2tx_group_fom(const struct m0_fom *fom)
277 {
278  /* XXX TODO bob_of() */
279  return container_of(fom, struct m0_be_tx_group_fom, tgf_gen);
280 }
281 
282 static void be_tx_group_fom_handle(struct m0_sm_group *gr,
283  struct m0_sm_ast *ast)
284 {
286 
287  M0_LOG(M0_DEBUG, "m=%p, tx_nr=%zu", m,
288  m0_be_tx_group_tx_nr(m->tgf_group));
289 
290  m0_fom_phase_set(&m->tgf_gen, TGS_PREPARE);
291  m0_fom_ready(&m->tgf_gen);
292 }
293 
294 /*
295  * Wakes up tx_group_fom iff it is waiting.
296  * It is possible that multiple fom wakeup asts are posted through different
297  * code paths. Thus we avoid waking up of already running FOM.
298  */
300 {
301  M0_ENTRY();
302  if (m0_fom_is_waiting(fom)) {
303  M0_LOG(M0_DEBUG, "waking up");
304  m0_fom_ready(fom);
305  }
306  M0_LEAVE();
307 }
308 
309 static void be_tx_group_fom_stable(struct m0_sm_group *_, struct m0_sm_ast *ast)
310 {
312 
313  M0_ENTRY();
314 
315  m->tgf_stable = true;
317  M0_LEAVE();
318 }
319 
320 static void be_tx_group_fom_stop(struct m0_sm_group *gr, struct m0_sm_ast *ast)
321 {
323 
324  M0_ENTRY();
325 
326  m->tgf_stopping = true;
328  M0_LEAVE();
329 }
330 
332  struct m0_be_tx_group *gr,
333  struct m0_reqh *reqh)
334 {
335  M0_ENTRY();
336 
337  m0_fom_init(&m->tgf_gen, &tx_group_fom_type,
339 
340  m->tgf_group = gr;
341  m->tgf_reqh = reqh;
342  m->tgf_stable = false;
343  m->tgf_stopping = false;
344 
345 #define _AST(handler) (struct m0_sm_ast){ .sa_cb = (handler) }
346  m->tgf_ast_handle = _AST(be_tx_group_fom_handle);
347  m->tgf_ast_stable = _AST(be_tx_group_fom_stable);
348  m->tgf_ast_stop = _AST(be_tx_group_fom_stop);
349 #undef _AST
350 
351  m0_semaphore_init(&m->tgf_start_sem, 0);
352  m0_semaphore_init(&m->tgf_finish_sem, 0);
353  m0_be_op_init(&m->tgf_op);
354  m0_be_op_init(&m->tgf_op_gc);
355 
356  M0_LEAVE();
357 }
358 
360 {
361  M0_PRE(m0_fom_phase(&m->tgf_gen) == TGS_FINISH);
362 
363  m0_be_op_fini(&m->tgf_op_gc);
364  m0_be_op_fini(&m->tgf_op);
365  m0_semaphore_fini(&m->tgf_start_sem);
366  m0_semaphore_fini(&m->tgf_finish_sem);
367 }
368 
370 {
371  m->tgf_recovery_mode = false;
372  m->tgf_stable = false;
373 }
374 
376  struct m0_sm_ast *ast)
377 {
379 }
380 
381 M0_INTERNAL int m0_be_tx_group_fom_start(struct m0_be_tx_group_fom *gf)
382 {
383  int rc;
384  struct m0_fom *fom = &gf->tgf_gen;
385 
386  m0_fom_queue(fom);
389  rc = m0_fom_rc(fom);
390  if (m0_fom_phase(fom) == TGS_FAILED) {
391  M0_ASSERT(rc != 0);
394  }
395 
396  return M0_RC(rc);
397 }
398 
399 M0_INTERNAL void m0_be_tx_group_fom_stop(struct m0_be_tx_group_fom *gf)
400 {
401  M0_ENTRY();
404  M0_LEAVE();
405 }
406 
408 {
409  be_tx_group_fom_ast_post(m, &m->tgf_ast_handle);
410 }
411 
412 M0_INTERNAL void m0_be_tx_group_fom_stable(struct m0_be_tx_group_fom *gf)
413 {
415 }
416 
417 M0_INTERNAL struct m0_sm_group *
419 {
420  return &m->tgf_gen.fo_loc->fl_group;
421 }
422 
423 M0_INTERNAL void
425 {
426  m->tgf_recovery_mode = true;
427 }
428 
429 M0_INTERNAL void m0_be_tx_group_fom_mod_init(void)
430 {
434 }
435 
436 M0_INTERNAL void m0_be_tx_group_fom_mod_fini(void)
437 {}
438 
440 #undef M0_TRACE_SUBSYSTEM
441 
442 /*
443  * Local variables:
444  * c-indentation-style: "K&R"
445  * c-basic-offset: 8
446  * tab-width: 8
447  * fill-column: 80
448  * scroll-step: 1
449  * End:
450  */
451 /*
452  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
453  */
M0_INTERNAL void m0_be_tx_group_seg_place(struct m0_be_tx_group *gr, struct m0_be_op *op)
Definition: tx_group.c:424
M0_INTERNAL int m0_be_tx_group_fom_start(struct m0_be_tx_group_fom *gf)
Definition: tx_group_fom.c:381
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
static void be_tx_group_fom_ast_post(struct m0_be_tx_group_fom *gf, struct m0_sm_ast *ast)
Definition: tx_group_fom.c:375
#define NULL
Definition: misc.h:38
static struct m0_addb2_mach * m
Definition: consumer.c:38
Definition: sm.h:350
M0_INTERNAL void m0_be_tx_group_reconstruct_tx_close(struct m0_be_tx_group *gr, struct m0_be_op *op_gc)
Definition: tx_group.c:601
M0_INTERNAL void m0_be_tx_group_fom_recovery_prepare(struct m0_be_tx_group_fom *m)
Definition: tx_group_fom.c:424
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
struct m0_semaphore tgf_start_sem
Definition: tx_group_fom.h:62
struct m0_sm_group fl_group
Definition: fom.h:274
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
static struct m0_sm_state_descr tx_group_fom_states[TGS_NR]
Definition: tx_group_fom.c:115
M0_INTERNAL void m0_be_tx_group__deallocate(struct m0_be_tx_group *gr)
Definition: tx_group.c:408
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
static int tx_group_fom_tick(struct m0_fom *fom)
Definition: tx_group_fom.c:149
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL void m0_be_tx_group_fom_stable(struct m0_be_tx_group_fom *gf)
Definition: tx_group_fom.c:412
Definition: sm.h:504
struct m0_sm_ast tgf_ast_handle
Definition: tx_group_fom.h:59
#define container_of(ptr, type, member)
Definition: misc.h:33
static struct m0_fom_type tx_group_fom_type
Definition: tx_group_fom.c:270
M0_INTERNAL void m0_be_tx_group_fom_mod_fini(void)
Definition: tx_group_fom.c:436
m0_fom_phase
Definition: fom.h:372
M0_INTERNAL void m0_be_tx_group_fom_reset(struct m0_be_tx_group_fom *m)
Definition: tx_group_fom.c:369
M0_INTERNAL void m0_be_tx_group_reset(struct m0_be_tx_group *gr)
Definition: tx_group.c:178
M0_INTERNAL void m0_be_tx_group_reconstruct_tx_open(struct m0_be_tx_group *gr, struct m0_be_op *op)
Definition: tx_group.c:585
return M0_RC(rc)
op
Definition: libdemo.c:64
M0_INTERNAL void m0_be_tx_group_log_write(struct m0_be_tx_group *gr, struct m0_be_op *op)
Definition: tx_group.c:435
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_be_tx_group_fom_mod_init(void)
Definition: tx_group_fom.c:429
M0_INTERNAL void m0_be_tx_group_encode(struct m0_be_tx_group *gr)
Definition: tx_group.c:430
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
M0_INTERNAL bool m0_fom_is_waiting(const struct m0_fom *fom)
Definition: fom.c:1732
M0_INTERNAL void m0_fom_ready(struct m0_fom *fom)
Definition: fom.c:429
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
static void be_tx_group_fom_stop(struct m0_sm_group *gr, struct m0_sm_ast *ast)
Definition: tx_group_fom.c:320
static void be_tx_group_fom_stable(struct m0_sm_group *_, struct m0_sm_ast *ast)
Definition: tx_group_fom.c:309
M0_INTERNAL void m0_be_tx_group__tx_state_post(struct m0_be_tx_group *gr, enum m0_be_tx_state state, bool del_tx_from_group)
Definition: tx_group.c:442
M0_INTERNAL int m0_be_tx_group__allocate(struct m0_be_tx_group *gr)
Definition: tx_group.c:403
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
struct m0_sm_ast tgf_ast_stop
Definition: tx_group_fom.h:61
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
Definition: fom.c:1596
tx_group_fom_state
Definition: tx_group_fom.c:82
M0_INTERNAL void m0_be_tx_group_fom_init(struct m0_be_tx_group_fom *m, struct m0_be_tx_group *gr, struct m0_reqh *reqh)
Definition: tx_group_fom.c:331
M0_INTERNAL size_t m0_be_tx_group_tx_nr(struct m0_be_tx_group *gr)
Definition: tx_group.c:383
M0_INTERNAL struct m0_sm_group * m0_be_tx_group_fom__sm_group(struct m0_be_tx_group_fom *m)
Definition: tx_group_fom.c:418
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
struct m0_fom tgf_gen
Definition: tx_group_fom.h:45
Definition: reqh.h:94
Definition: dump.c:103
static const struct m0_fom_type_ops tx_group_fom_type_ops
Definition: tx_group_fom.c:272
M0_INTERNAL void m0_be_tx_group_log_read(struct m0_be_tx_group *gr, struct m0_be_op *op)
Definition: tx_group.c:470
static const struct m0_sm_conf tx_group_fom_conf
Definition: tx_group_fom.c:143
M0_INTERNAL void m0_be_tx_group_fom_stop(struct m0_be_tx_group_fom *gf)
Definition: tx_group_fom.c:399
Definition: fom.h:481
static size_t tx_group_fom_locality(const struct m0_fom *fom)
Definition: tx_group_fom.c:259
M0_INTERNAL void m0_be_tx_group_fom_fini(struct m0_be_tx_group_fom *m)
Definition: tx_group_fom.c:359
M0_INTERNAL void m0_be_op_reset(struct m0_be_op *op)
Definition: op.c:152
struct m0_semaphore tgf_finish_sem
Definition: tx_group_fom.h:63
struct m0_reqh reqh
Definition: rm_foms.c:48
M0_INTERNAL void m0_be_tx_group_seg_place_prepare(struct m0_be_tx_group *gr)
Definition: tx_group.c:413
static void be_tx_group_fom_iff_waiting_wakeup(struct m0_fom *fom)
Definition: tx_group_fom.c:299
struct m0_fom_locality * fo_loc
Definition: fom.h:483
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
M0_INTERNAL int m0_be_op_tick_ret(struct m0_be_op *op, struct m0_fom *fom, int next_state)
Definition: op.c:267
M0_INTERNAL int m0_fom_rc(const struct m0_fom *fom)
Definition: fom.c:1727
#define _AST(handler)
M0_INTERNAL void m0_be_op_fini(struct m0_be_op *op)
Definition: stubs.c:92
M0_INTERNAL int m0_be_tx_group_decode(struct m0_be_tx_group *gr)
Definition: tx_group.c:476
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
Definition: fom.c:624
M0_INTERNAL int m0_be_tx_group_reapply(struct m0_be_tx_group *gr, struct m0_be_op *op)
Definition: tx_group.c:620
static void tx_group_fom_fini(struct m0_fom *fom)
Definition: tx_group_fom.c:251
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
#define M0_ASSERT_INFO(cond, fmt,...)
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
M0_INTERNAL int m0_be_tx_group_reconstruct(struct m0_be_tx_group *gr, struct m0_sm_group *sm_grp)
Definition: tx_group.c:577
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
static void be_tx_group_fom_handle(struct m0_sm_group *gr, struct m0_sm_ast *ast)
Definition: tx_group_fom.c:282
struct m0_sm_ast tgf_ast_stable
Definition: tx_group_fom.h:60
struct m0_reqh_service_type m0_be_txs_stype
Definition: tx_service.c:52
Definition: op.h:74
M0_INTERNAL void m0_be_op_init(struct m0_be_op *op)
Definition: stubs.c:87
#define _S(name, flags, allowed)
M0_INTERNAL void m0_be_tx_group_open(struct m0_be_tx_group *gr)
Definition: tx_group.c:388
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_be_tx_group_fom_handle(struct m0_be_tx_group_fom *m)
Definition: tx_group_fom.c:407
static struct m0_be_tx_group_fom * fom2tx_group_fom(const struct m0_fom *fom)
Definition: tx_group_fom.c:276
M0_INTERNAL const char * m0_fom_phase_name(const struct m0_fom *fom, int phase)
Definition: fom.c:1722
M0_INTERNAL void m0_be_tx_group_prepare(struct m0_be_tx_group *gr, struct m0_be_op *op)
Definition: tx_group.c:195
#define M0_IMPOSSIBLE(fmt,...)
static const struct m0_fom_ops tx_group_fom_ops
Definition: tx_group_fom.c:264