Motr  M0
load_fom.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * For any questions about this software or licensing,
17  * please email opensource@seagate.com or cortx-questions@seagate.com.
18  *
19  */
20 
21 
22 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_SPIEL
23 #include "lib/trace.h"
24 
25 #include "conf/load_fom.h"
26 #include "conf/confd.h" /* m0_confd, m0_confd_bob */
27 #include "conf/confd_stob.h" /* m0_conf_stob_location_generate */
28 #include "lib/memory.h" /* M0_ALLOC_PTR */
29 
35 M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN);
36 
37 static int conf_load_fom_tick(struct m0_fom *fom);
38 static void conf_load_fom_fini(struct m0_fom *fom);
39 
40 static int conf_net_buffer_allocate(struct m0_fom *);
41 static int conf_prepare(struct m0_fom *);
42 static int conf_buffer_free(struct m0_fom *);
43 static int conf_zero_copy_initiate(struct m0_fom *);
44 static int conf_zero_copy_finish(struct m0_fom *);
45 static size_t conf_load_fom_home_locality(const struct m0_fom *fom);
46 
50 const struct m0_fom_ops conf_load_fom_ops = {
52  .fo_tick = conf_load_fom_tick,
53  .fo_home_locality = conf_load_fom_home_locality,
54 };
55 
61 };
62 
69  .sd_name = "Conf_load_Prepare",
72  },
74  .sd_name = "Network_Buffer_Acquire",
77  },
79  .sd_name = "Zero-copy_Initiate",
82  },
84  .sd_name = "Zero-copy_Finish",
85  .sd_allowed = M0_BITS(M0_FOPH_CONF_BUFFER_FREE,
87  },
89  .sd_name = "Conf_load_Finish",
90  .sd_allowed = M0_BITS(M0_FOPH_SUCCESS,
92  },
93 };
94 
95 
97  .scf_name = "Conf_load_phases",
98  .scf_nr_states = ARRAY_SIZE(conf_load_phases),
99  .scf_state = conf_load_phases,
100 };
101 
102 static bool conf_load_fom_invariant(const struct m0_conf_load_fom *fom)
103 {
104  return _0C(fom != NULL);
105 }
106 
119 M0_INTERNAL int m0_conf_load_fom_create(struct m0_fop *fop,
120  struct m0_fom **out,
121  struct m0_reqh *reqh)
122 {
123  struct m0_fom *fom;
124  struct m0_conf_load_fom *conf_load_fom;
125  struct m0_fop *rep_fop;
126 
127  M0_PRE(fop != NULL);
129  M0_PRE(out != NULL);
130 
131  M0_ENTRY("fop=%p", fop);
132 
133  M0_ALLOC_PTR(conf_load_fom);
135  if (conf_load_fom == NULL || rep_fop == NULL) {
136  m0_free(conf_load_fom);
137  m0_free(rep_fop);
138  return M0_RC(-ENOMEM);
139  }
140 
141  fom = &conf_load_fom->clm_gen;
143  fop, rep_fop, reqh);
144  *out = fom;
145 
146  M0_LOG(M0_DEBUG, "fom=%p", fom);
147  return M0_RC(0);
148 }
149 
160 #ifndef __KERNEL__
161 static int conf_prepare(struct m0_fom *fom)
162 {
163  struct m0_confd *confd;
164  struct m0_fop_conf_load_rep *rep;
165 
166  M0_PRE(fom != NULL);
167  M0_PRE(m0_is_conf_load_fop(fom->fo_fop));
168  M0_PRE(fom->fo_service != NULL);
170 
171  M0_ENTRY("fom=%p", fom);
172 
173  confd = bob_of(fom->fo_service, struct m0_confd, d_reqh, &m0_confd_bob);
174  rep = m0_conf_fop_to_load_fop_rep(fom->fo_rep_fop);
175  rep->clfr_version = m0_conf_version(confd->d_cache);
177 
178  return M0_RC(M0_FSO_AGAIN);
179 }
180 #else /* __KERNEL__ */
181 static int conf_prepare(struct m0_fom *fom)
182 {
183  M0_PRE(fom != NULL);
184  M0_ENTRY("fom=%p", fom);
185 
187 
188  return M0_RC(M0_FSO_AGAIN);
189 }
190 #endif /* __KERNEL__ */
191 
202 {
203  struct m0_fop *fop;
204  struct m0_conf_load_fom *conf_fom;
205  struct m0_fop_conf_load *conf_fop;
207  struct m0_bufvec *buf_vec;
208  int rc;
210 
211  M0_PRE(fom != NULL);
212  M0_PRE(m0_is_conf_load_fop(fom->fo_fop));
213  M0_PRE(fom->fo_service != NULL);
215 
216  M0_ENTRY("fom=%p", fom);
217 
218  conf_fom = container_of(fom, struct m0_conf_load_fom, clm_gen);
220 
221  fop = fom->fo_fop;
222  conf_fop = m0_conf_fop_to_load_fop(fop);
223 
224  size = conf_fop->clf_desc.bdd_used;
226 
227  buf_vec = &conf_fom->clm_net_buffer.nb_buffer;
228 
229  rc = m0_bufvec_alloc_aligned(buf_vec, (size + seg_size - 1) / seg_size,
231  if (rc == 0)
233 
236 
237  return M0_RC(M0_FSO_AGAIN);
238 }
239 
251 static int conf_zero_copy_initiate(struct m0_fom *fom)
252 {
253  int rc;
254  struct m0_conf_load_fom *conf_fom;
255  struct m0_fop *fop;
256  struct m0_fop_conf_load *conf_fop;
258  struct m0_rpc_bulk *rbulk;
259  struct m0_rpc_bulk_buf *rb_buf;
260  struct m0_net_buffer *nb;
261  struct m0_net_domain *dom;
262  uint32_t segs_nr = 0;
264 
265  M0_PRE(fom != NULL);
266  M0_PRE(m0_is_conf_load_fop(fom->fo_fop));
268 
269  M0_ENTRY("fom=%p", fom);
270 
271  conf_fom = container_of(fom, struct m0_conf_load_fom, clm_gen);
273 
274  fop = fom->fo_fop;
275  conf_fop = m0_conf_fop_to_load_fop(fop);
276  rbulk = &conf_fom->clm_bulk;
277  m0_rpc_bulk_init(rbulk);
278 
280  nbd_data = &conf_fop->clf_desc;
281 
282  /* Create rpc bulk bufs list using prepare net buffers */
283  nb = &conf_fom->clm_net_buffer;
284 
286 
287  /*
288  * Calculate number of segments by length of data
289  * Segment number Round up
290  */
291  segs_nr = (conf_fop->clf_desc.bdd_used + seg_size - 1) / seg_size;
292 
293  M0_LOG(M0_DEBUG, "segs_nr %d", segs_nr);
294 
295  rc = m0_rpc_bulk_buf_add(rbulk, segs_nr, conf_fop->clf_desc.bdd_used,
296  dom, nb, &rb_buf);
297  if (rc != 0) {
299  M0_LEAVE();
300  return M0_FSO_AGAIN;
301  }
302 
303  /*
304  * On completion of zero-copy on all buffers rpc_bulk
305  * sends signal on channel rbulk->rb_chan.
306  */
307  m0_mutex_lock(&rbulk->rb_mutex);
308  m0_fom_wait_on(fom, &rbulk->rb_chan, &fom->fo_cb);
309  m0_mutex_unlock(&rbulk->rb_mutex);
310 
311  /*
312  * This function deletes m0_rpc_bulk_buf object one
313  * by one as zero copy completes on respective buffer.
314  */
315  rc = m0_rpc_bulk_load(rbulk,
317  nbd_data,
319  if (rc != 0) {
320  m0_mutex_lock(&rbulk->rb_mutex);
321  m0_fom_callback_cancel(&fom->fo_cb);
322  m0_mutex_unlock(&rbulk->rb_mutex);
324  m0_rpc_bulk_fini(rbulk);
326  M0_LEAVE();
327  return M0_FSO_AGAIN;
328  }
329 
331 
332  return M0_RC(M0_FSO_WAIT);
333 }
334 
342 static int conf_fom_conf_file_save(struct m0_conf_load_fom *conf_fom)
343 {
344  int rc = 0;
345  char *location = NULL;
346  struct m0_stob *stob = NULL;
347  struct m0_net_buffer *nb = &conf_fom->clm_net_buffer;
348  struct m0_fop_conf_load *conf_fop;
349  struct m0_fop_conf_load_rep *conf_fop_rep;
350 
351  M0_ENTRY();
352 
353  conf_fop = m0_conf_fop_to_load_fop(conf_fom->clm_gen.fo_fop);
354  conf_fop_rep = m0_conf_fop_to_load_fop_rep(conf_fom->clm_gen.fo_rep_fop);
355 
358  &M0_CONFD_FID(conf_fop_rep->clfr_version,
359  conf_fop->clf_version,
360  conf_fop->clf_tx_id)) ?:
362 
364  m0_free(location);
365 
366  return M0_RC(rc);
367 }
368 
379 static int conf_zero_copy_finish(struct m0_fom *fom)
380 {
381  struct m0_conf_load_fom *conf_fom;
382  struct m0_rpc_bulk *rbulk;
383 
384  M0_ENTRY("fom=%p", fom);
385 
386  M0_PRE(fom != NULL);
387  M0_PRE(m0_is_conf_load_fop(fom->fo_fop));
389 
390  conf_fom = container_of(fom, struct m0_conf_load_fom, clm_gen);
392 
393  rbulk = &conf_fom->clm_bulk;
394  m0_mutex_lock(&rbulk->rb_mutex);
395 
396  if (rbulk->rb_rc == 0)
397  rbulk->rb_rc = conf_fom_conf_file_save(conf_fom);
398 
400  rbulk->rb_rc,
403 
404  m0_mutex_unlock(&rbulk->rb_mutex);
405  m0_rpc_bulk_fini(rbulk);
406  return M0_RC(M0_FSO_AGAIN);
407 }
408 
418 static int conf_buffer_free(struct m0_fom *fom)
419 {
420  struct m0_conf_load_fom *conf_fom;
421  struct m0_fop_conf_load *conf_fop;
422 
423  M0_PRE(fom != NULL);
424  M0_PRE(m0_is_conf_load_fop(fom->fo_fop));
426 
427  M0_ENTRY("fom=%p", fom);
428 
429  conf_fom = container_of(fom, struct m0_conf_load_fom, clm_gen);
431 
432  conf_fop = m0_conf_fop_to_load_fop(fom->fo_fop);
433 
434  if (conf_fop->clf_desc.bdd_desc.nbd_data != NULL) {
435  m0_free(conf_fop->clf_desc.bdd_desc.nbd_data);
436  conf_fop->clf_desc.bdd_desc.nbd_len = 0;
437  }
439  m0_pageshift_get());
440 
442 
443  return M0_RC(M0_FSO_AGAIN);
444 }
445 
449 static int conf_load_fom_tick(struct m0_fom *fom)
450 {
451  int rc = 0;
452  struct m0_conf_load_fom *conf_fom;
453  struct m0_fop_conf_load_rep *rep;
454 
455  M0_ENTRY("fom %p", fom);
456  M0_PRE(m0_is_conf_load_fop(fom->fo_fop));
457 
458  if (m0_fom_phase(fom) < M0_FOPH_NR)
459  return m0_fom_tick_generic(fom);
460 
461  conf_fom = container_of(fom, struct m0_conf_load_fom, clm_gen);
463 
464  switch (m0_fom_phase(fom)) {
465 #define _CASE(phase, op) \
466  case phase: \
467  rc = op(fom); \
468  break
469 
475 #undef _CASE
476  default:
477  M0_IMPOSSIBLE("");
478  }
479 
481  rep = m0_conf_fop_to_load_fop_rep(fom->fo_rep_fop);
482  rep->clfr_rc = m0_fom_rc(fom);
483  }
484  return M0_RC(rc);
485 }
486 
495 static void conf_load_fom_fini(struct m0_fom *fom)
496 {
497  struct m0_conf_load_fom *conf_fom;
498 
499  M0_PRE(fom != NULL);
500  conf_fom = container_of(fom, struct m0_conf_load_fom, clm_gen);
502 
503  m0_fom_fini(fom);
504  m0_free(conf_fom);
505 }
506 
507 
508 static size_t conf_load_fom_home_locality(const struct m0_fom *fom)
509 {
510  M0_ENTRY();
511  M0_PRE(fom != NULL);
512  M0_LEAVE();
513  return 1;
514 }
515 
516 #undef M0_TRACE_SUBSYSTEM
517 
520 /*
521  * Local variables:
522  * c-indentation-style: "K&R"
523  * c-basic-offset: 8
524  * tab-width: 8
525  * fill-column: 80
526  * scroll-step: 1
527  * End:
528  */
static m0_bcount_t seg_size
Definition: net.c:118
#define M0_CONFD_FID(old_version, new_version, tx_id)
Definition: confd_stob.h:39
void m0_fom_phase_moveif(struct m0_fom *fom, int32_t rc, int phase0, int phase1)
Definition: fom.c:1710
#define M0_PRE(cond)
m0_conf_version
Definition: cache.h:75
M0_INTERNAL struct m0_fop_conf_load * m0_conf_fop_to_load_fop(const struct m0_fop *fop)
Definition: load_fop.c:47
#define _CASE(phase, op)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
struct m0_fop * fo_fop
Definition: fom.h:490
#define NULL
Definition: misc.h:38
struct m0_bufvec nb_buffer
Definition: net.h:1322
uint32_t nbd_len
Definition: net_otw_types.h:37
Definition: sm.h:350
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
struct m0_fom clm_gen
Definition: load_fom.h:61
uint8_t * nbd_data
Definition: net_otw_types.h:38
struct m0_chan rb_chan
Definition: bulk.h:258
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
M0_INTERNAL void m0_rpc_bulk_fini(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:263
static size_t conf_load_fom_home_locality(const struct m0_fom *fom)
Definition: load_fom.c:508
static int conf_load_fom_tick(struct m0_fom *fom)
Definition: load_fom.c:449
uint8_t * nbd_data
static int conf_zero_copy_finish(struct m0_fom *)
Definition: load_fom.c:379
const struct m0_fom_type_ops conf_load_fom_type_ops
Definition: load_fom.c:59
static bool conf_load_fom_invariant(const struct m0_conf_load_fom *fom)
Definition: load_fom.c:102
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
#define M0_BITS(...)
Definition: misc.h:236
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL int m0_pageshift_get(void)
Definition: memory.c:238
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
M0_INTERNAL m0_bcount_t m0_conf_segment_size(struct m0_fop *fop)
Definition: load_fop.c:65
struct m0_fop_getxattr_rep * rep
Definition: dir.c:455
m0_fom_phase
Definition: fom.h:372
const char * location
Definition: storage.c:50
M0_INTERNAL void m0_rpc_bulk_buflist_empty(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:279
struct m0_fom_type ft_fom_type
Definition: fop.h:232
int m0_bufvec_alloc_aligned(struct m0_bufvec *bufvec, uint32_t num_segs, m0_bcount_t seg_size, unsigned shift)
Definition: vec.c:355
return M0_RC(rc)
#define M0_ENTRY(...)
Definition: trace.h:170
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
struct m0_fop_type m0_fop_conf_load_rep_fopt
Definition: fop.c:59
M0_INTERNAL int m0_rpc_bulk_load(struct m0_rpc_bulk *rbulk, const struct m0_rpc_conn *conn, struct m0_net_buf_desc_data *from_desc, const struct m0_net_buffer_callbacks *bulk_cb)
Definition: bulk.c:530
struct m0_fop_type * f_type
Definition: fop.h:81
void m0_confd_stob_fini(struct m0_stob *stob)
Definition: confd_stob.c:104
M0_INTERNAL int m0_conf_stob_location_generate(struct m0_fom *fom, char **location)
Definition: confd_stob.c:181
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
static int conf_fom_conf_file_save(struct m0_conf_load_fom *conf_fom)
Definition: load_fom.c:342
Definition: stob.h:163
int m0_fom_tick_generic(struct m0_fom *fom)
Definition: fom_generic.c:848
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
static struct m0_stob * stob
Definition: storage.c:39
int32_t rb_rc
Definition: bulk.h:266
#define M0_ASSERT(cond)
struct m0_net_buffer clm_net_buffer
Definition: load_fom.h:65
const char * scf_name
Definition: sm.h:352
M0_INTERNAL void m0_rpc_bulk_init(struct m0_rpc_bulk *rbulk)
Definition: bulk.c:247
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
const struct m0_bob_type m0_confd_bob
Definition: confd.c:443
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
struct m0_net_buf_desc_data clf_desc
Definition: load_fop.h:101
int m0_confd_stob_init(struct m0_stob **stob, const char *location, struct m0_fid *confd_fid)
Definition: confd_stob.c:50
static struct m0_stob_domain * dom
Definition: storage.c:38
int m0_confd_stob_bufvec_write(struct m0_stob *stob, struct m0_bufvec *bufvec)
Definition: confd_stob.c:159
static void conf_load_fom_fini(struct m0_fom *fom)
Definition: load_fom.c:495
static int conf_buffer_free(struct m0_fom *)
Definition: load_fom.c:418
struct m0_rpc_bulk clm_bulk
Definition: load_fom.h:63
Definition: reqh.h:94
Definition: dump.c:103
uint64_t clf_tx_id
Definition: load_fop.h:96
struct m0_sm_state_descr conf_load_phases[]
Definition: load_fom.c:66
static uint64_t min64u(uint64_t a, uint64_t b)
Definition: arith.h:66
M0_INTERNAL void m0_bufvec_free_aligned(struct m0_bufvec *bufvec, unsigned shift)
Definition: vec.c:436
struct m0_fop * m0_fop_reply_alloc(struct m0_fop *req, struct m0_fop_type *rept)
Definition: fop.c:129
struct m0_conf_cache * d_cache
Definition: confd.h:163
uint32_t sd_flags
Definition: sm.h:378
Definition: fom.h:481
M0_INTERNAL bool m0_is_conf_load_fop(const struct m0_fop *fop)
Definition: load_fop.c:35
struct m0_reqh reqh
Definition: rm_foms.c:48
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
static int conf_zero_copy_initiate(struct m0_fom *)
Definition: load_fom.c:251
struct m0_net_domain * m0_fop_domain_get(const struct m0_fop *fop)
Definition: fop.c:486
uint32_t clfr_version
Definition: load_fop.h:86
struct m0_sm_conf conf_load_conf
Definition: load_fom.c:96
uint32_t clf_version
Definition: load_fop.h:94
struct m0_rpc_session * ri_session
Definition: item.h:147
M0_INTERNAL int m0_fom_rc(const struct m0_fom *fom)
Definition: fom.c:1727
m0_bcount_t size
Definition: di.c:39
#define _0C(exp)
Definition: assert.h:311
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL void m0_fom_callback_cancel(struct m0_fom_callback *cb)
Definition: fom.c:1514
struct m0_net_buf_desc bdd_desc
Definition: net_otw_types.h:47
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
struct m0_fop * fo_rep_fop
Definition: fom.h:492
#define out(...)
Definition: gen.c:41
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
const struct m0_net_buffer_callbacks m0_rpc__buf_bulk_cb
Definition: bulk.c:238
M0_INTERNAL int m0_rpc_bulk_buf_add(struct m0_rpc_bulk *rbulk, uint32_t segs_nr, m0_bcount_t length, struct m0_net_domain *netdom, struct m0_net_buffer *nb, struct m0_rpc_bulk_buf **out)
Definition: bulk.c:291
static int conf_net_buffer_allocate(struct m0_fom *)
Definition: load_fom.c:201
const struct m0_fom_ops conf_load_fom_ops
Definition: load_fom.c:50
void m0_free(void *data)
Definition: memory.c:146
struct m0_rpc_item f_item
Definition: fop.h:83
static int conf_prepare(struct m0_fom *)
Definition: load_fom.c:161
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
struct m0_rpc_conn * s_conn
Definition: session.h:312
M0_INTERNAL struct m0_fop_conf_load_rep * m0_conf_fop_to_load_fop_rep(const struct m0_fop *fop)
Definition: load_fop.c:56
Definition: fop.h:79
struct m0_mutex rb_mutex
Definition: bulk.h:251
Definition: vec.h:145
struct m0_fop * rep_fop
Definition: dir.c:334
M0_INTERNAL int m0_conf_load_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: load_fom.c:119
#define M0_IMPOSSIBLE(fmt,...)
M0_TL_DESCR_DECLARE(rpcbulk, M0_EXTERN)