Motr  M0
fsync_foms.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2014-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_COB
23 #include "fop/fop.h" /* m0_fom */
24 
25 #include "lib/assert.h"
26 #include "lib/errno.h"
27 #include "lib/memory.h"
28 #include "lib/trace.h"
29 
30 #include "mdservice/fsync_foms.h"
31 #include "mdservice/fsync_fops.h"
32 #include "ioservice/io_fops.h" /* m0_fop_fsync_ios_fopt */
33 #include "cas/cas.h" /* m0_fop_fsync_cas_fopt */
34 #include "rpc/rpc_opcodes.h"
35 #include "ioservice/storage_dev.h" /* m0_storage_dev */
36 #include "motr/setup.h" /* m0_cs_storage_devs_get */
37 
38 #include "fop/fom_generic.h"
39 #include "be/domain.h" /* m0_be_domain */
40 #include "be/engine.h" /* m0_be_engine__tx_find */
41 #include "reqh/reqh.h" /* m0_reqh */
42 
43 
44 static void fsync_fom_fini(struct m0_fom *fom);
45 static int fsync_fom_tick(struct m0_fom *fom);
46 static size_t fsync_fom_locality_get(const struct m0_fom *fom);
47 
51 static const struct m0_fom_ops fsync_fom_ops = {
53  .fo_tick = fsync_fom_tick,
54  .fo_home_locality = fsync_fom_locality_get
55 };
56 
64 };
65 
71  .sd_name = "start",
72  .sd_allowed = M0_BITS(M0_FOPH_FSYNC_FOM_WAIT,
76  },
78  .sd_name = "wait",
79  .sd_allowed = M0_BITS(M0_FOPH_FAILURE,
83  },
85  .sd_name = "datasync",
86  .sd_allowed = M0_BITS(M0_FOPH_FAILURE,
88  },
89 };
90 
98  { "start failed", M0_FOPH_FSYNC_FOM_START, M0_FOPH_FAILURE},
100  { "wait failed", M0_FOPH_FSYNC_FOM_WAIT, M0_FOPH_FAILURE},
102  { "wait done", M0_FOPH_FSYNC_FOM_WAIT, M0_FOPH_SUCCESS},
104  { "sync done", M0_FOPH_FSYNC_DATASYNC, M0_FOPH_SUCCESS},
105  { "sync failed", M0_FOPH_FSYNC_DATASYNC, M0_FOPH_FAILURE},
106 };
107 
123 M0_INTERNAL struct m0_sm_conf m0_fsync_fom_conf = {
124  .scf_name = "fsync",
125  .scf_nr_states = ARRAY_SIZE(m0_fsync_fom_phases),
126  .scf_state = m0_fsync_fom_phases,
127  .scf_trans_nr = ARRAY_SIZE(fsync_fom_phases_trans),
128  .scf_trans = fsync_fom_phases_trans,
129 };
130 
141 static struct m0_be_tx *fsync_target_tx_get(struct m0_fom *fom,
142  uint64_t txid)
143 {
144  struct m0_be_engine *en;
145 
146  M0_PRE(fom != NULL);
148  return m0_be_engine__tx_find(en, txid);
149 }
150 
174 static int fsync_fom_tick(struct m0_fom *fom)
175 {
176  /* return value included in the fsync reply */
177  int rc;
178  struct m0_reqh_context *rctx;
179  struct m0_fop_fsync_rep *rep;
180  struct m0_fop_fsync *req;
181  struct m0_be_tx *target_tx;
182  int phase;
183  uint64_t txid;
184 
185  M0_ENTRY();
186 
187  M0_PRE(fom != NULL);
188 
189  req = m0_fop_data(fom->fo_fop);
190  M0_ASSERT(req != NULL);
191  rep = m0_fop_data(fom->fo_rep_fop);
192  M0_ASSERT(rep != NULL);
193 
194  M0_ASSERT(M0_IN(req->ff_fsync_mode,
196 
197  txid = req->ff_be_remid.tri_txid;
198  M0_LOG(M0_DEBUG, "Target tx: %" PRId64 "\n", txid);
199  rctx = container_of(m0_fom_reqh(fom), struct m0_reqh_context, rc_reqh);
200 
201 
202  phase = m0_fom_phase(fom);
203 
204  if (phase < M0_FOPH_NR) {
205  M0_LEAVE("fom is in a generic phase");
206  return m0_fom_tick_generic(fom);
207  }
208 
209  M0_ASSERT(M0_IN(phase,
213 
214  if (phase == M0_FOPH_FSYNC_DATASYNC) {
215  struct m0_storage_devs *sdevs = m0_cs_storage_devs_get();
216 
218 
219  m0_storage_devs_lock(sdevs);
220  rc = m0_storage_devs_fdatasync(sdevs);
221  m0_storage_devs_unlock(sdevs);
222 
224 
226 
227  return M0_FSO_AGAIN;
228  }
229 
230  /* we can get the target tx because we're at the same locality */
231  target_tx = fsync_target_tx_get(fom, txid);
232  if (target_tx == NULL) {
233  /*
234  * XXX: when we finally get a last committed tx from the
235  * engine, use it to return an error if the value received
236  * is greater than the value in the engine(ahead in the future).
237  */
238  M0_LOG(M0_DEBUG, "tx not found: %" PRId64 "\n", txid);
239  rc = 0;
240  goto fsync_done;
241  }
242 
243  /* if the target tx has been logged, we're done */
244  if (m0_be_tx_state(target_tx) >= M0_BTS_LOGGED) {
245  M0_LOG(M0_DEBUG, "TX has been logged: %" PRId64 "\n",
246  target_tx->t_id);
247  m0_be_tx_put(target_tx);
248  rc = 0;
249  goto fsync_done;
250  }
251 
252  /* we must wait on the target tx */
253  switch (phase) {
255  /* maybe the tx must be forced */
256  if (req->ff_fsync_mode == M0_FSYNC_MODE_ACTIVE) {
257  M0_LOG(M0_DEBUG, "force tx: %" PRId64 "\n",
258  target_tx->t_id);
259  m0_be_tx_force(target_tx);
260  }
261 
263  M0_LOG(M0_DEBUG, "fom is now waiting for the target tx\n");
264  /* no 'break' so the following code gets executed */
265 
267  m0_fom_wait_on(fom, &target_tx->t_sm.sm_chan, &fom->fo_cb);
268  break;
269 
270  default:
271  M0_IMPOSSIBLE("wrong phase");
272  break;
273  }
274 
275  /* release the reference so the tx can be moved to M0_BTS_DONE */
276  m0_be_tx_put(target_tx);
277 
278  M0_LEAVE("wait for the target tx to be logged");
279  return M0_FSO_WAIT;
280 
281 fsync_done:
282  rep->ffr_rc = rc;
283  /*
284  * XXX:As an optimization, ffr_be_remid should contain the
285  * ID of the last committed tx (retrieved from the be engine)
286  */
287  rep->ffr_be_remid.tri_txid = txid;
288 
292  else
295 
296  M0_LEAVE("fsync fop request processed");
297 
298  /* some other generic phases will be executed now */
299  return M0_FSO_AGAIN;
300 }
301 
308 static size_t fsync_fom_locality_get(const struct m0_fom *fom)
309 {
310  M0_PRE(fom != NULL);
311  M0_PRE(fom->fo_fop != NULL);
312 
313  return ((struct m0_fop_fsync *)
314  m0_fop_data(fom->fo_fop))->ff_be_remid.tri_locality;
315 }
316 
321 static void fsync_fom_fini(struct m0_fom *fom)
322 {
323  M0_PRE(fom != NULL);
324  m0_fom_fini(fom);
325 }
326 
330 M0_INTERNAL int m0_fsync_req_fom_create(struct m0_fop *fop,
331  struct m0_fom **out,
332  struct m0_reqh *reqh)
333 {
334  struct m0_fop *rep_fop;
335  struct m0_fom *fom;
336 
337  M0_ENTRY();
338 
339  M0_PRE(fop != NULL);
340  M0_PRE(out != NULL);
341  M0_PRE(M0_IN(fop->f_type,
344  M0_PRE(M0_IN(m0_fop_opcode(fop),
347 
348  /* allocate the fom */
349  M0_ALLOC_PTR(fom);
350  if (fom == NULL) {
351  M0_LEAVE("unable to allocate fom");
352  return M0_ERR_INFO(-ENOMEM, "unable to allocate fom");
353  }
354 
355  /* create the fsync reply */
357  if (rep_fop == NULL) {
358  m0_free(fom);
359  M0_LEAVE("unable to allocate fop reply");
360  return M0_ERR_INFO(-ENOMEM, "unable to allocate fop reply");
361  }
362 
363  /* init the fom */
366  *out = fom;
367 
368  M0_LEAVE();
369  return M0_RC(0);
370 }
371 
372 #undef M0_TRACE_SUBSYSTEM
373 /*
374  * Local variables:
375  * c-indentation-style: "K&R"
376  * c-basic-offset: 8
377  * tab-width: 8
378  * fill-column: 80
379  * scroll-step: 1
380  * End:
381  */
void m0_fom_phase_moveif(struct m0_fom *fom, int32_t rc, int phase0, int phase1)
Definition: fom.c:1710
uint32_t m0_fop_opcode(const struct m0_fop *fop)
Definition: fop.c:226
struct m0_be_domain * bs_domain
Definition: seg.h:82
#define M0_PRE(cond)
M0_INTERNAL struct m0_be_tx * m0_be_engine__tx_find(struct m0_be_engine *en, uint64_t id)
Definition: engine.c:887
M0_INTERNAL void m0_fom_block_enter(struct m0_fom *fom)
Definition: fom.c:538
#define NULL
Definition: misc.h:38
struct m0_fop_type m0_fop_fsync_rep_fopt
Definition: fsync_fops.c:39
m0_be_tx_state
Definition: tx.h:214
Definition: sm.h:350
static struct io_request req
Definition: file.c:100
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
static const struct m0_fom_ops fsync_fom_ops
Definition: fsync_foms.c:51
struct m0_fop_type m0_fop_fsync_ios_fopt
Definition: io_fops.c:82
static struct m0_be_tx * fsync_target_tx_get(struct m0_fom *fom, uint64_t txid)
Definition: fsync_foms.c:141
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
#define M0_BITS(...)
Definition: misc.h:236
#define container_of(ptr, type, member)
Definition: misc.h:33
struct m0_sm_state_descr m0_fsync_fom_phases[]
Definition: fsync_foms.c:69
static void fsync_fom_fini(struct m0_fom *fom)
Definition: fsync_foms.c:321
struct m0_fop_type m0_fop_fsync_mds_fopt
Definition: fsync_fops.c:33
struct m0_fop_getxattr_rep * rep
Definition: dir.c:455
m0_fom_phase
Definition: fom.h:372
struct m0_fom_type ft_fom_type
Definition: fop.h:232
M0_INTERNAL int m0_fsync_req_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fsync_foms.c:330
return M0_RC(rc)
M0_INTERNAL struct m0_sm_conf m0_fsync_fom_conf
Definition: fsync_foms.c:123
#define M0_ENTRY(...)
Definition: trace.h:170
struct m0_reqh_context rctx
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
struct m0_fop_type * f_type
Definition: fop.h:81
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
M0_INTERNAL void m0_storage_devs_lock(struct m0_storage_devs *devs)
Definition: storage_dev.c:71
int m0_fom_tick_generic(struct m0_fom *fom)
Definition: fom_generic.c:848
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL void m0_fom_block_leave(struct m0_fom *fom)
Definition: fom.c:582
static struct m0_sm_trans_descr fsync_fom_phases_trans[]
Definition: fsync_foms.c:94
Definition: reqh.h:94
Definition: dump.c:103
fsync_fom_phase
Definition: fsync_foms.c:60
uint64_t t_id
Definition: tx.h:284
struct m0_sm t_sm
Definition: tx.h:281
M0_INTERNAL struct m0_storage_devs * m0_cs_storage_devs_get(void)
Definition: setup.c:1783
#define PRId64
Definition: types.h:57
static size_t fsync_fom_locality_get(const struct m0_fom *fom)
Definition: fsync_foms.c:308
struct m0_fop * m0_fop_reply_alloc(struct m0_fop *req, struct m0_fop_type *rept)
Definition: fop.c:129
Definition: fom.h:481
struct m0_reqh reqh
Definition: rm_foms.c:48
const char * sd_name
Definition: sm.h:383
struct m0_fop_type m0_fop_fsync_cas_fopt
Definition: cas.c:53
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_chan sm_chan
Definition: sm.h:331
struct m0_sm_trans_descr m0_generic_phases_trans[]
Definition: fom_generic.c:765
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL void m0_be_tx_put(struct m0_be_tx *tx)
Definition: stubs.c:171
struct m0_be_seg * rh_beseg
Definition: reqh.h:112
M0_INTERNAL void m0_storage_devs_unlock(struct m0_storage_devs *devs)
Definition: storage_dev.c:77
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
static int fsync_fom_tick(struct m0_fom *fom)
Definition: fsync_foms.c:174
#define out(...)
Definition: gen.c:41
struct m0_be_engine bd_engine
Definition: domain.h:136
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
bool rc_disable_direct_io
Definition: setup.h:341
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL int m0_storage_devs_fdatasync(struct m0_storage_devs *sdevs)
Definition: storage_dev.c:802
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_be_tx_force(struct m0_be_tx *tx)
Definition: tx.c:638
Definition: fop.h:79
M0_INTERNAL struct m0_reqh * m0_fom_reqh(const struct m0_fom *fom)
Definition: fom.c:283
struct m0_fop * rep_fop
Definition: dir.c:334
Definition: tx.h:280
#define M0_IMPOSSIBLE(fmt,...)