Motr  M0
plugin_dock_fom.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2017-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FDMI
24 
25 
26 #include "lib/trace.h"
27 #include "motr/magic.h" /* M0_CONFC_MAGIC, M0_CONFC_CTX_MAGIC */
28 #include "lib/misc.h" /* M0_IN */
29 #include "lib/errno.h" /* ENOMEM, EPROTO */
30 #include "lib/memory.h" /* M0_ALLOC_ARR, m0_free */
31 #include "rpc/rpc_opcodes.h" /* M0_FDMI_PLUGIN_DOCK_OPCODE */
32 #include "fop/fom.h" /* M0_FOM_PHASE_INIT, etc. */
33 #include "fop/fop.h"
34 #include "fdmi/fdmi.h"
35 #include "fdmi/plugin_dock.h"
37 #include "fdmi/fops.h" /* m0_fop_fdmi_record */
38 #include "fdmi/service.h"
39 
40 static int pdock_fom_create(struct m0_fop *fop,
41  struct m0_fom **out,
42  struct m0_reqh *reqh);
43 
49 };
50 
51 static const struct m0_fom_type_ops pdock_fom_type_ops = {
53 };
54 
56 {
57  return &pdock_fom_type_ops;
58 }
59 
61 
65  .sd_name = "Init",
66  .sd_allowed =
68  },
69 
71  .sd_flags = M0_SDF_TERMINAL,
72  .sd_name = "Fini",
73  .sd_allowed = 0
74  },
75 
77  .sd_flags = 0,
78  .sd_name = "Feed Plugins With Record",
80  },
81 
83  .sd_flags = 0,
84  .sd_name = "Finish With Record",
85  .sd_allowed = M0_BITS(FDMI_PLG_DOCK_FOM_FINI)
86  },
87 };
88 
90  .scf_name = "fdmi-plugin-dock-fom-sm",
91  .scf_nr_states = ARRAY_SIZE(fdmi_plugin_dock_state_descr),
92  .scf_state = fdmi_plugin_dock_state_descr
93 };
94 
95 static void pdock_fom_fini(struct m0_fom *fom);
96 static int pdock_fom_tick(struct m0_fom *fom);
97 static size_t pdock_fom_home_locality(const struct m0_fom *fom);
98 
99 static const struct m0_fom_ops pdock_fom_ops = {
101  .fo_tick = pdock_fom_tick,
102  .fo_home_locality = pdock_fom_home_locality,
103 };
104 
105 static int pdock_fom_create(struct m0_fop *fop,
106  struct m0_fom **out,
107  struct m0_reqh *reqh)
108 {
109  struct pdock_fom *pd_fom;
110  struct m0_fom *fom;
111  struct m0_fop *reply_fop;
112  struct m0_fop_fdmi_record_reply *reply_fop_data;
113  struct m0_fop_fdmi_record *frec;
114  struct m0_fdmi_record_reg *rreg;
115  int rc;
116 
117  M0_ENTRY();
118  M0_ASSERT(fop != NULL);
119  M0_ASSERT(out != NULL);
120  M0_ASSERT(reqh != NULL);
122 
123  M0_ALLOC_PTR(pd_fom);
124  if (pd_fom == NULL)
125  return M0_RC(-ENOMEM);
126  M0_SET0(pd_fom);
127 
128  M0_ALLOC_PTR(reply_fop_data);
129  if (reply_fop_data == NULL) {
130  rc = -ENOMEM;
131  goto fom_fini;
132  }
133 
135  if (rreg == NULL) {
136  M0_LOG(M0_ERROR, "FDMI record failed to register");
137  rc = -ENOENT;
138  goto rep_fini;
139  } else {
140  int idx;
141  struct m0_fdmi_flt_id_arr *filters =
142  &rreg->frr_rec->fr_matched_flts;
143 
144  M0_LOG(M0_DEBUG, "FDMI record arrived: id = "
145  U128X_F, U128_P(&rreg->frr_rec->fr_rec_id));
146  M0_LOG(M0_DEBUG, "FDMI record type = %d",
147  rreg->frr_rec->fr_rec_type);
148  M0_LOG(M0_DEBUG, "* matched filters count = [%d]",
149  filters->fmf_count);
150 
151  for (idx = 0; idx < filters->fmf_count; idx++) {
152  M0_LOG(M0_DEBUG, "* [%4d] = "
153  FID_SF, idx, FID_P(&filters->fmf_flt_id[idx]));
154  }
155  }
156 
157  /* get prepared to inspecting record guts */
158  frec = m0_fop_data(fop);
159 
160  /* set up reply fop */
161  reply_fop_data->frn_frt = frec->fr_rec_type;
162 
165  if (reply_fop == NULL) {
166  rc = -ENOMEM;
167  goto rep_fini;
168  }
169 
170  if (m0_fop_to_rpc_item(fop)->ri_rmachine == NULL) {
171  /* seems odd, as no rpc machine found attached !
172  * it must be ut in run, so reply has to be skipped
173  */
175  m0_free(reply_fop_data);
176  reply_fop = NULL;
177  }
178 
179  /* set up fom */
180  fom = &pd_fom->pf_fom;
181 
184 
186  *out = fom;
187 
188  return M0_RC(0);
189 
190 rep_fini:
191  m0_free(reply_fop_data);
192  if (rreg != NULL)
193  m0_ref_put(&rreg->frr_ref);
194 fom_fini:
195  m0_free(pd_fom);
196  return M0_RC(rc);
197 
198 
199 }
200 
201 static void pdock_fom_fini(struct m0_fom *fom)
202 {
203  struct pdock_fom *pd_fom;
204 
205  M0_ENTRY();
206 
207  pd_fom = container_of(fom, struct pdock_fom, pf_fom);
208 
209  if (pd_fom->pf_custom_fom_fini != NULL) {
210  (*pd_fom->pf_custom_fom_fini)(fom);
211  } else {
212  m0_fom_fini(fom);
213  }
214 
215  m0_free(pd_fom);
216 
217  M0_LEAVE();
218 }
219 
220 static int pdock_fom_tick__init(struct m0_fom *fom)
221 {
222  struct pdock_fom *pd_fom;
223 
224  M0_ENTRY();
225 
226  if (fom->fo_rep_fop != NULL) {
227  struct m0_fop_fdmi_record *fdmi_rec =
228  (struct m0_fop_fdmi_record*) m0_fop_data(fom->fo_fop);
229 
230  M0_LOG(M0_DEBUG, "send reply fop data %p, rid " U128X_F,
231  fdmi_rec, U128_P(&fdmi_rec->fr_rec_id));
232 
234  m0_fop_to_rpc_item(fom->fo_rep_fop));
235  }
236 
237  pd_fom = container_of(fom, struct pdock_fom, pf_fom);
238 
239  /* reset position in filter id array */
240  pd_fom->pf_pos = 0;
241 
242  /* unveil fop data */
243  pd_fom->pf_rec = m0_fop_data(fom->fo_fop);
244 
246 
247  M0_LEAVE();
248  return M0_FSO_AGAIN;
249 }
250 
252 {
253  struct pdock_fom *pd_fom;
254  struct m0_fop_fdmi_record *frec;
255  struct m0_fid *fids;
256  struct m0_fdmi_filter_reg *freg;
257  struct m0_fdmi_plugin_ops *pcb;
258  int rc;
259 
260  M0_ENTRY();
261 
262  pd_fom = container_of(fom, struct pdock_fom, pf_fom);
263  frec = pd_fom->pf_rec;
265 
266  M0_ASSERT(pd_fom->pf_pos < frec->fr_matched_flts.fmf_count);
267 
268  /* filter lookup */
269  freg = m0_fdmi__pdock_filter_reg_find(&fids[pd_fom->pf_pos]);
270 
271  if (freg == NULL) {
272  /* filter not found, quit */
275  "filter reg not found: ffid = "FID_SF,
276  FID_P(&fids[pd_fom->pf_pos]));
277  M0_LEAVE();
278  return M0_FSO_AGAIN;
279  }
280 
281  /* test filter reg for being in active state */
284  M0_LOG(M0_NOTICE, "filter reg is not active: id = "FID_SF,
285  FID_P(&freg->ffr_ffid));
286  /* an odd situation, but possible and not critical
287  * one, so plugin feed-up is anyway going to happen
288  */
289  }
290 
291  pcb = freg->ffr_pcb;
292 
293  if (pcb == NULL) {
294  M0_LOG(M0_ERROR, "filter reg contains null in pcb");
295  goto move_on;
296  }
297 
298  /* plugin feed */
299  rc = (*pcb->po_fdmi_rec)(&frec->fr_rec_id,
300  frec->fr_payload,
301  fids[pd_fom->pf_pos]);
302  if (rc == 0) {
303  /*
304  * Plugin accepted the record. We already have a ref on
305  * this record. No need to take extra ref.
306  */
307  /*
308  * If the plugin wants to do more work after returning
309  * from the pcb->po_fdmi_rec(), it should increase fdmi
310  * record refc, and then release it when done.
311  * pseudo code:
312  * struct m0_fdmi_record_reg *rreg;
313  * rreg = m0_fdmi__pdock_record_reg_find(&frec->fr_rec_id);
314  * if (rreg != NULL)
315  * m0_ref_get(&rreg->frr_ref);
316  */
317  } else {
319  "plugin has rejected the record processing: "
320  "rc = %i, frid = "U128X_F", ffid = "FID_SF, rc,
321  U128_P(&frec->fr_rec_id),
322  FID_P(&fids[pd_fom->pf_pos]));
323  }
324 
325 move_on:
326  /* move forward and yeild */
327  ++pd_fom->pf_pos;
328 
329  if (pd_fom->pf_pos >= frec->fr_matched_flts.fmf_count) {
331  }
332 
333  M0_LEAVE();
334  return M0_FSO_AGAIN;
335 }
336 
338 {
339  struct pdock_fom *pd_fom;
340  struct m0_fop_fdmi_record *frec;
341  struct m0_fdmi_record_reg *rreg;
342 
343  M0_ENTRY();
344 
345  pd_fom = container_of(fom, struct pdock_fom, pf_fom);
346  frec = pd_fom->pf_rec;
348  if (rreg != NULL) {
362  M0_LOG(M0_DEBUG, "Dec ref for "U128X_F " to ref_cnt=%d",
363  U128_P(&frec->fr_rec_id),
364  (int)m0_ref_read(&rreg->frr_ref) - 1);
366  m0_ref_put(&rreg->frr_ref);
368  }
369 
370  M0_LOG(M0_DEBUG, "set fom state FOM_FINI");
372 
373  M0_LEAVE();
374  return M0_FSO_WAIT;
375 }
376 
377 static int pdock_fom_tick(struct m0_fom *fom)
378 {
379  int rc = M0_FSO_AGAIN;
380 
381  M0_ENTRY("fom %p", fom);
382  M0_LOG(M0_DEBUG, "fom phase %i", m0_fom_phase(fom));
383 
384  switch (m0_fom_phase(fom)) {
385 
388  break;
389 
392  break;
393 
396  break;
397 
398  default:
399  M0_IMPOSSIBLE("Phase not defined");
400  }
401 
402  M0_LEAVE("<< rc %d", rc);
403  return rc;
404 }
405 
406 static size_t pdock_fom_home_locality(const struct m0_fom *fom)
407 {
408  M0_ENTRY();
409  M0_PRE(fom != NULL);
410  M0_LEAVE();
411  return 1;
412 }
413 
414 M0_INTERNAL int m0_fdmi__plugin_dock_fom_init(void)
415 {
416  M0_ENTRY();
417  /* Initialize FDMI plugin dock fom */
421 
422  return M0_RC(0);
423 }
424 
425 #undef M0_TRACE_SUBSYSTEM
426 
427 /*
428  * Local variables:
429  * c-indentation-style: "K&R"
430  * c-basic-offset: 8
431  * tab-width: 8
432  * fill-column: 80
433  * scroll-step: 1
434  * End:
435  */
436 /*
437  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
438  */
M0_EXTERN struct m0_reqh_service_type m0_fdmi_service_type
Definition: fdmi.h:194
#define M0_PRE(cond)
#define FID_SF
Definition: fid.h:76
struct m0_fdmi_flt_id_arr fr_matched_flts
Definition: fops.h:78
static struct m0_sm_state_descr fdmi_plugin_dock_state_descr[]
static size_t pdock_fom_home_locality(const struct m0_fom *fom)
struct m0_rpc_machine * m0_fdmi__pdock_conn_pool_rpc_machine()
Definition: plugin_dock.c:97
M0_INTERNAL void m0_fom_block_enter(struct m0_fom *fom)
Definition: fom.c:538
#define NULL
Definition: misc.h:38
Definition: sm.h:350
struct m0_fdmi_record_reg * m0_fdmi__pdock_record_reg_find(const struct m0_uint128 *rid)
Definition: plugin_dock.c:135
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
void(* pf_custom_fom_fini)(struct m0_fom *fom)
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
static const struct m0_fom_ops pdock_fom_ops
struct m0_fid ffr_ffid
Definition: plugin_dock.h:80
M0_INTERNAL struct m0_fdmi_record_reg * m0_fdmi__pdock_fdmi_record_register(struct m0_fop *fop)
Definition: plugin_dock.c:353
struct m0_fdmi_plugin_ops * ffr_pcb
Definition: plugin_dock.h:84
const struct m0_sm_conf fdmi_plugin_dock_fom_sm_conf
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
#define M0_BITS(...)
Definition: misc.h:236
static int pdock_fom_tick__finish_with_rec(struct m0_fom *fom)
struct m0_fop_fdmi_record * pf_rec
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
m0_fom_phase
Definition: fom.h:372
static int pdock_fom_tick__feed_plugin_with_rec(struct m0_fom *fom)
uint32_t ffr_flags
Definition: plugin_dock.h:91
struct m0_fom_type ft_fom_type
Definition: fop.h:232
int(* po_fdmi_rec)(struct m0_uint128 *rec_id, struct m0_buf fdmi_rec, struct m0_fid filter_id)
Definition: plugin_dock.h:62
return M0_RC(rc)
#define M0_ENTRY(...)
Definition: trace.h:170
struct m0_buf fr_payload
Definition: fops.h:75
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
static int pdock_fom_tick(struct m0_fom *fom)
fdmi_plugin_dock_fom_phase
struct m0_fop_type * f_type
Definition: fop.h:81
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
Definition: refs.c:38
struct m0_fom pf_fom
m0_fdmi_rec_type_id_t frn_frt
Definition: fops.h:86
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
#define U128_P(x)
Definition: types.h:45
static struct m0_fop reply_fop
Definition: fsync.c:64
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
Definition: fom.c:1596
static int pdock_fom_tick__init(struct m0_fom *fom)
M0_INTERNAL void m0_fom_block_leave(struct m0_fom *fom)
Definition: fom.c:582
uint32_t fr_rec_type
Definition: fops.h:72
static int pdock_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: reqh.h:94
Definition: dump.c:103
static const struct m0_fid fids[]
Definition: diter.c:76
M0_INTERNAL int m0_fdmi__plugin_dock_fom_init(void)
void m0_rpc_reply_post(struct m0_rpc_item *request, struct m0_rpc_item *reply)
Definition: rpc.c:135
#define FID_P(f)
Definition: fid.h:77
struct m0_fop_type m0_fop_fdmi_rec_not_rep_fopt
Definition: fops.c:48
#define U128X_F
Definition: types.h:42
static void pdock_fom_fini(struct m0_fom *fom)
uint32_t sd_flags
Definition: sm.h:378
Definition: fom.h:481
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
Definition: refs.c:44
struct m0_reqh reqh
Definition: rm_foms.c:48
Definition: fid.h:38
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
uint32_t fmf_count
Definition: fops.h:58
static const struct m0_fom_type_ops pdock_fom_type_ops
static void fom_fini(struct m0_fom *fom, enum cob_fom_type fomtype)
Definition: cob_foms.c:604
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:338
static struct m0_fop * fop
Definition: item.c:57
struct m0_fdmi_filter_reg * m0_fdmi__pdock_filter_reg_find(const struct m0_fid *fid)
Definition: plugin_dock.c:118
struct m0_fid * fmf_flt_id
Definition: fops.h:61
struct m0_uint128 fr_rec_id
Definition: fops.h:69
const struct m0_fom_type_ops * m0_fdmi__pdock_fom_type_ops_get(void)
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
struct m0_ref frr_ref
Definition: plugin_dock.h:112
static struct m0_fom_type pdock_fom_type
#define out(...)
Definition: gen.c:41
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
struct m0_fop_fdmi_record * frr_rec
Definition: plugin_dock.h:103
void m0_free(void *data)
Definition: memory.c:146
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
Definition: fop.h:79
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
#define M0_IMPOSSIBLE(fmt,...)