Motr  M0
source_dock_fom.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2017-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FDMI
24 #include "lib/trace.h"
25 
26 #include "lib/memory.h"
27 #include "rpc/rpc_opcodes.h" /* M0_FDMI_SOURCE_DOCK_OPCODE */
28 #include "fop/fom_generic.h" /* m0_rpc_item_generic_reply_rc */
29 #include "fdmi/fdmi.h"
30 #include "fdmi/source_dock.h"
32 #include "fdmi/fops.h"
33 
34 #include "fdmi/fol_fdmi_src.h" /* m0_fol_fdmi_filter_kv_substring */
35 
36 
37 static void fdmi_sd_fom_fini(struct m0_fom *fom);
38 static int fdmi_sd_fom_tick(struct m0_fom *fom);
39 static size_t fdmi_sd_fom_locality(const struct m0_fom *fom);
40 static int sd_fom_send_record(struct fdmi_sd_fom *sd_fom,
41  struct m0_fop *fop,
42  const char *ep);
43 static int fdmi_filter_calc(struct fdmi_sd_fom *sd_fom,
44  struct m0_fdmi_src_rec *src_rec,
45  struct m0_conf_fdmi_filter *fdmi_filter);
46 
47 static int fdmi_rr_fom_create(struct m0_fop *fop, struct m0_fom **out,
48  struct m0_reqh *reqh);
49 static void fdmi_rr_fom_fini(struct m0_fom *fom);
50 static int fdmi_rr_fom_tick(struct m0_fom *fom);
51 
52 static void fdmi_rec_notif_replied(struct m0_rpc_item *item);
53 
54 static const struct m0_rpc_item_ops fdmi_rec_not_item_ops = {
56 };
57 
59  uint64_t fti_magic;
60  struct m0_fop *fti_fop;
65 };
66 
67 M0_TL_DESCR_DEFINE(pending_fops, "pending fops list", M0_INTERNAL,
68  struct fdmi_pending_fop, fti_linkage, fti_magic,
71 
72 M0_TL_DEFINE(pending_fops, static, struct fdmi_pending_fop);
73 
74 M0_TL_DESCR_DECLARE(fdmi_record_inflight, M0_EXTERN);
75 M0_TL_DECLARE(fdmi_record_inflight, M0_EXTERN, struct m0_fdmi_src_rec);
76 
77 /*
78  ******************************************************************************
79  * FDMI Source Dock: Main FOM
80  ******************************************************************************
81  */
82 
88 };
89 
93  .sd_name = "Init",
95  },
97  .sd_flags = 0,
98  .sd_name = "WaitRec",
101  },
103  .sd_flags = 0,
104  .sd_name = "GetRec",
107  },
109  .sd_flags = M0_SDF_TERMINAL,
110  .sd_name = "Fini",
111  .sd_allowed = 0
112  }
113 };
114 
115 
117  .scf_name = "fdmi-src-dock-fom-sm",
118  .scf_nr_states = ARRAY_SIZE(fdmi_src_dock_state_descr),
119  .scf_state = fdmi_src_dock_state_descr
120 };
121 
122 
123 static const struct m0_fom_ops fdmi_sd_fom_ops = {
125  .fo_tick = fdmi_sd_fom_tick,
126  .fo_home_locality = fdmi_sd_fom_locality
127 };
128 
129 static const struct m0_fom_type_ops fdmi_sd_fom_type_ops = {
130 };
131 
133 
134 /*
135  ******************************************************************************
136  * FDMI Source Dock Timer FOM
137  ******************************************************************************
138  */
139 static void fdmi_sd_timer_fom_fini(struct m0_fom *fom);
140 static int fdmi_sd_timer_fom_tick(struct m0_fom *fom);
141 
147 };
148 
152  .sd_name = "Init",
155  },
157  .sd_flags = 0,
158  .sd_name = "WaitForTimeout",
161  },
163  .sd_flags = 0,
164  .sd_name = "Alarmed",
167  },
169  .sd_flags = M0_SDF_TERMINAL,
170  .sd_name = "Fini",
171  .sd_allowed = 0
172  }
173 };
174 
175 
177  .scf_name = "fdmi-src-dock-timer-fom-sm",
180 };
181 
182 
183 static const struct m0_fom_ops fdmi_sd_timer_fom_ops = {
185  .fo_tick = fdmi_sd_timer_fom_tick,
186  .fo_home_locality = fdmi_sd_fom_locality
187 };
188 
190 };
191 
193 
194 
195 /*
196  ******************************************************************************
197  * FDMI Source Dock: Release Record FOP Handling FOM
198  ******************************************************************************
199  */
200 
205 };
206 
210  .sd_name = "Init",
211  .sd_allowed = M0_BITS(FDMI_RR_FOM_PHASE_FINI)
212  },
214  .sd_flags = M0_SDF_TERMINAL,
215  .sd_name = "Fini",
216  .sd_allowed = 0
217  }
218 };
219 
220 /* extern */ const struct m0_sm_conf fdmi_rr_fom_sm_conf = {
221  .scf_name = "fdmi-rr-fom-sm",
222  .scf_nr_states = ARRAY_SIZE(fdmi_rr_fom_state_descr),
223  .scf_state = fdmi_rr_fom_state_descr
224 };
225 
226 /* extern */ const struct m0_fom_type_ops fdmi_rr_fom_type_ops = {
228 };
229 
230 const struct m0_fom_ops fdmi_rr_fom_ops = {
232  .fo_tick = fdmi_rr_fom_tick,
234  .fo_home_locality = fdmi_sd_fom_locality
235 };
236 
237 /*
238  ******************************************************************************
239  * FDMI Source Dock Main FOM specific functions
240  ******************************************************************************
241  */
242 
243 M0_INTERNAL void m0_fdmi__src_dock_fom_init(void)
244 {
245  M0_ENTRY();
253  M0_LEAVE();
254 }
255 
256 M0_INTERNAL int
258  const struct m0_filterc_ops *filterc_ops,
259  struct m0_reqh *reqh)
260 {
261  enum { MAX_RPCS_IN_FLIGHT = 32 };
262  struct fdmi_sd_fom *sd_fom = &src_dock->fsdc_sd_fom;
263  struct m0_fom *fom = &sd_fom->fsf_fom;
264  struct m0_rpc_machine *rpc_mach;
265  int rc;
266  struct fdmi_sd_timer_fom *timer_fom = &src_dock->fsdc_sd_timer_fom;
267 
268  M0_ENTRY();
269  M0_PRE(!src_dock->fsdc_started);
270  M0_PRE(!src_dock->fsdc_filters_defined);
271 
272  m0_semaphore_init(&sd_fom->fsf_shutdown, 0);
273 
274  rpc_mach = m0_reqh_rpc_mach_tlist_head(&reqh->rh_rpc_machines);
275  M0_SET0(&sd_fom->fsf_conn_pool);
276  rc = m0_rpc_conn_pool_init(&sd_fom->fsf_conn_pool, rpc_mach,
277  m0_time(30, 0), /* connection timeout */
279  if (rc != 0)
280  return M0_ERR(rc);
281  M0_SET0(&sd_fom->fsf_filter_ctx);
282  m0_filterc_ctx_init(&sd_fom->fsf_filter_ctx, filterc_ops);
283  rc = sd_fom->fsf_filter_ctx.fcc_ops->fco_start(&sd_fom->fsf_filter_ctx,
284  reqh);
285  if (rc != 0) {
290  M0_LOG(M0_WARN, "Cannot start filterc %d", rc);
293  return M0_ERR(rc);
294  }
295  rc = filterc_ops->fco_open(&sd_fom->fsf_filter_ctx,
297  &sd_fom->fsf_filter_iter);
298  if (rc == 0) {
299  src_dock->fsdc_filters_defined = true;
300  filterc_ops->fco_close(&sd_fom->fsf_filter_iter);
301  }
302  rc = filterc_ops->fco_open(&sd_fom->fsf_filter_ctx,
304  &sd_fom->fsf_filter_iter);
305  if (rc == 0) {
306  src_dock->fsdc_filters_defined = true;
307  filterc_ops->fco_close(&sd_fom->fsf_filter_iter);
308  }
309  M0_LOG(M0_DEBUG, "Filters?=%d", !!src_dock->fsdc_filters_defined);
310 
313  pending_fops_tlist_init(&sd_fom->fsf_pending_fops);
314  sd_fom->fsf_has_records = false;
316  m0_fom_queue(fom);
317  sd_fom->fsf_last_checkpoint = m0_time_now();
318  src_dock->fsdc_started = true;
321  NULL, NULL, reqh);
322  m0_fom_timeout_init(&timer_fom->fstf_timeout);
323  m0_semaphore_init(&timer_fom->fstf_shutdown, 0);
324  m0_fom_queue(&timer_fom->fstf_fom);
325  return M0_RC(0);
326 }
327 
328 static void wakeup_iff_waiting(struct m0_sm_group *grp, struct m0_sm_ast *ast)
329 {
330  struct m0_fom *fom = ast->sa_datum;
331 
332  M0_ENTRY();
333  if (m0_fom_is_waiting(fom))
334  m0_fom_ready(fom);
335  M0_LEAVE();
336 }
337 
338 M0_INTERNAL void m0_fdmi__src_dock_fom_wakeup(struct fdmi_sd_fom *sd_fom)
339 {
340  struct m0_fom *fom;
341 
342  M0_ENTRY("sd_fom %p", sd_fom);
343  M0_PRE(sd_fom != NULL);
344 
345  fom = &sd_fom->fsf_fom;
346 
352  if (fom == NULL || fom->fo_loc == NULL) {
353  M0_LEAVE("FDMI FOM is not initialized yet");
354  return;
355  }
356  if (sd_fom->fsf_wakeup_ast.sa_next == NULL) {
357  sd_fom->fsf_wakeup_ast = (struct m0_sm_ast) {
359  .sa_datum = fom
360  };
361  m0_sm_ast_post(&fom->fo_loc->fl_group, &sd_fom->fsf_wakeup_ast);
362  }
363  M0_LEAVE();
364 }
365 
367 {
368  struct m0_fom *fom;
369 
370  M0_ENTRY("sd_timer_fom %p", timer_fom);
371  M0_PRE(timer_fom != NULL);
372 
373  fom = &timer_fom->fstf_fom;
374 
375  if (timer_fom->fstf_wakeup_ast.sa_next == NULL) {
376  timer_fom->fstf_wakeup_ast = (struct m0_sm_ast) {
378  .sa_datum = fom
379  };
380  m0_sm_ast_post(&fom->fo_loc->fl_group,
381  &timer_fom->fstf_wakeup_ast);
382  }
383  M0_LEAVE();
384 }
385 
386 
387 M0_INTERNAL void
389 {
390  M0_ENTRY();
391  M0_PRE(src_dock->fsdc_started);
392  src_dock->fsdc_started = false;
393  src_dock->fsdc_filters_defined = false;
394 
395  /* Wake up the timer FOM, so it can stop itself */
397  /* Wake up FOM, so it can stop itself */
399 
400  /* Wait for timer fom finished */
403  /* Wait for fom finished */
406 
407  M0_LEAVE();
408 }
409 
410 static size_t fdmi_sd_fom_locality(const struct m0_fom *fom)
411 {
412  return 1;
413 }
414 
415 static int apply_filters(struct fdmi_sd_fom *sd_fom,
416  struct m0_fdmi_src_rec *src_rec)
417 {
418  struct m0_fom *fom = &sd_fom->fsf_fom;
419  struct m0_filterc_ctx *filterc = &sd_fom->fsf_filter_ctx;
420  struct m0_conf_fdmi_filter *fdmi_filter;
421  int matched;
422  int rc = 0;
423  int ret;
424 
425  M0_ENTRY("sd_fom %p, src_rec %p", sd_fom, src_rec);
427 
428  do {
429  /* @todo fco_get_next shouldn't block (phase 2) */
431  ret = filterc->fcc_ops->fco_get_next(&sd_fom->fsf_filter_iter,
432  &fdmi_filter);
434  if (ret > 0) {
435  matched = fdmi_filter_calc(sd_fom, src_rec,
436  fdmi_filter);
437  src_rec->fsr_matched = (matched > 0);
438  if (matched < 0) {
443  } else if (matched && !src_rec->fsr_dryrun) {
449  fdmi_matched_filter_list_tlink_init_at(
450  fdmi_filter, &src_rec->fsr_filter_list);
451  }
452  } else if (ret < 0) {
453  rc = ret;
454  }
455  } while (ret > 0);
456  return M0_RC(rc);
457 }
458 
459 static int process_fdmi_rec(struct fdmi_sd_fom *sd_fom,
460  struct m0_fdmi_src_rec *src_rec)
461 {
462  struct m0_fom *fom = &sd_fom->fsf_fom;
463  struct m0_filterc_ctx *filterc = &sd_fom->fsf_filter_ctx;
464  int ret;
465 
466  M0_ENTRY("sd_fom %p, src_rec %p", sd_fom, src_rec);
468 
469  M0_LOG(M0_DEBUG, "FDMI record id = "U128X_F,
470  U128_P(&src_rec->fsr_rec_id));
471  /*
472  * Inform source that posted fdmi record handling started, call
473  * fs_begin()
474  */
475  m0_fdmi__fs_begin(src_rec);
476 
478  ret = filterc->fcc_ops->fco_open(filterc,
480  &sd_fom->fsf_filter_iter);
482 
483  if (ret == 0) {
484  ret = apply_filters(sd_fom, src_rec);
485  filterc->fcc_ops->fco_close(&sd_fom->fsf_filter_iter);
486  }
487  return M0_RC(ret);
488 }
489 
490 static void fdmi_sd_fom_fini(struct m0_fom *fom)
491 {
492  struct fdmi_sd_fom *sd_fom = M0_AMB(sd_fom, fom, fsf_fom);
493  struct m0_filterc_ctx *filterc_ctx = &sd_fom->fsf_filter_ctx;
494 
495  M0_ENTRY("fom %p", fom);
496 
497  M0_LOG(M0_DEBUG, "deinit filterc ctx");
498  filterc_ctx->fcc_ops->fco_stop(filterc_ctx);
499  m0_filterc_ctx_fini(filterc_ctx);
500 
502 
505  pending_fops_tlist_fini(&sd_fom->fsf_pending_fops);
506  sd_fom->fsf_has_records = false;
507  m0_semaphore_up(&sd_fom->fsf_shutdown);
508  m0_fom_fini(fom);
509 
510  M0_LEAVE();
511 }
512 
513 enum { FDMI_RPC_MAX_RETRIES = 60 }; /* @see M0_RPC_MAX_RETRIES */
514 
515 static int fdmi_post_fop(struct m0_fop *fop, struct m0_rpc_session *session)
516 {
517  struct m0_rpc_item *item;
518 
519  M0_ENTRY("fop: %p, session: %p", fop, session);
520 
521  item = &fop->f_item;
525 
528 
531  /* timeout val = (item->ri_resend_interval * item->ri_nr_sent_max) */
532 
533  return M0_RC(m0_rpc_post(item));
534 }
535 
537 static bool pending_fop_clink_cb(struct m0_clink *clink)
538 {
539  struct fdmi_pending_fop *pending_fop = M0_AMB(pending_fop, clink,
540  fti_clink);
541  struct fdmi_sd_fom *sd_fom = pending_fop->sd_fom;
542  struct m0_fop *fop = pending_fop->fti_fop;
543  struct m0_fdmi_src_rec *src_rec = fop->f_opaque;
544  struct m0_rpc_session *session = pending_fop->fti_session;
545  struct m0_fdmi_src_dock *sd_dock = M0_AMB(sd_dock, sd_fom, fsdc_sd_fom);
546  m0_time_t now;
547  bool est;
548  int rc;
549  M0_ENTRY();
550 
552  M0_LOG(M0_DEBUG, "ASYNC CONN CB: %d sd_fom=%p remote=%s",
553  !!est, sd_fom,
555 
557  pending_fops_tlist_del(pending_fop);
559  m0_free(pending_fop);
560 
561  if (est) {
563  if (rc == 0) {
564  /*
565  * At this moment, the fop may already fail and
566  * fail replied.
567  */
568  m0_mutex_lock(&sd_dock->fsdc_list_mutex);
569  if (!fdmi_record_inflight_tlink_is_in(src_rec)) {
570  fdmi_record_inflight_tlist_add_tail(
571  &sd_dock->fsdc_rec_inflight, src_rec);
572  M0_LOG(M0_DEBUG, "added to inflight "
573  "list id = " U128X_F,
574  U128_P(&src_rec->fsr_rec_id));
575  }
576  m0_mutex_unlock(&sd_dock->fsdc_list_mutex);
577  }
578  } else {
580  /*
581  * Destroy this session.
582  */
584  M0_LOG(M0_DEBUG, "CANNOT SEND src_rec =" U128X_F " ref cnt:%d",
585  U128_P(&src_rec->fsr_rec_id),
586  (int)m0_ref_read(&src_rec->fsr_ref));
587  m0_ref_put(&src_rec->fsr_ref);
588  m0_fdmi__fs_put(src_rec);
589 
590  /*
591  * re-send FDMI it, or release it.
592  */
593  now = m0_time_now();
594  if (m0_time_sub(now, src_rec->fsr_init_time) >
596  M0_LOG(M0_WARN, "Given up record %p, ID:" U128X_F,
597  src_rec, U128_P(&src_rec->fsr_rec_id));
598  m0_ref_put(&src_rec->fsr_ref);
599  m0_fdmi__fs_put(src_rec);
600  } else {
601  M0_LOG(M0_DEBUG, "Enqueue record again %p, ID:" U128X_F,
602  src_rec, U128_P(&src_rec->fsr_rec_id));
603  m0_fdmi__enqueue(src_rec);
604  }
605  }
607  M0_LEAVE();
608  return true;
609 }
610 
611 static int sd_fom_save_pending_fop(struct fdmi_sd_fom *sd_fom,
612  struct m0_fop *fop,
613  struct m0_rpc_session *session)
614 {
615  struct fdmi_pending_fop *pending_fop;
616 
617  M0_ENTRY();
618 
619  M0_ALLOC_PTR(pending_fop);
620  if (pending_fop == NULL)
621  return M0_ERR(-ENOMEM);
622 
623  m0_fop_get(fop);
624  pending_fop->fti_fop = fop;
626  pending_fop->fti_clink.cl_is_oneshot = true;
627  pending_fop->fti_session = session;
628  pending_fop->sd_fom = sd_fom;
630  &pending_fop->fti_clink);
632  pending_fops_tlink_init_at_tail(pending_fop, &sd_fom->fsf_pending_fops);
634  return M0_RC(0);
635 }
636 
637 static int sd_fom_send_record(struct fdmi_sd_fom *sd_fom, struct m0_fop *fop,
638  const char *ep)
639 {
640  int rc;
641  struct m0_rpc_session *session;
642  struct m0_fdmi_src_dock *src_dock = m0_fdmi_src_dock_get();
643  struct m0_fdmi_src_rec *src_rec = fop->f_opaque;
644 
645  M0_LOG(M0_DEBUG, "sd_fom %p, sending fop %p to ep %s", sd_fom, fop, ep);
647  if (rc == 0) {
649  if (rc == 0) {
650  m0_mutex_lock(&src_dock->fsdc_list_mutex);
651  if (!fdmi_record_inflight_tlink_is_in(src_rec)) {
652  fdmi_record_inflight_tlist_add_tail(
653  &src_dock->fsdc_rec_inflight, src_rec);
654  M0_LOG(M0_DEBUG, "added to inflight list id = "
655  U128X_F,
656  U128_P(&src_rec->fsr_rec_id));
657  }
658  m0_mutex_unlock(&src_dock->fsdc_list_mutex);
659  }
660  } else if (rc == -EBUSY)
662  return M0_RC(rc);
663 }
664 
665 static int filters_nr(struct m0_fdmi_src_rec *src_rec, const char *endpoint)
666 {
667  int n;
668 
669  M0_ENTRY("src_rec=%p, endpoint=%s", src_rec, endpoint);
671  n = m0_tl_reduce(fdmi_matched_filter_list, flt,
672  &src_rec->fsr_filter_list, 0,
673  + !!m0_streq(endpoint, flt->ff_endpoints[0]));
674  M0_LEAVE("==> %d", n);
675  return n;
676 }
677 
679 {
680  struct m0_fdmi_src_dock *src_dock = m0_fdmi_src_dock_get();
681  return src_dock->fsdc_sd_fom.fsf_conn_pool.cp_rpc_mach;
682 }
683 
684 static struct m0_fop *alloc_fdmi_rec_fop(int filter_num)
685 {
686  struct m0_fop_fdmi_record *fop_data;
687  struct m0_fop *fop;
688 
689  M0_PRE(filter_num > 0);
690 
691  M0_ALLOC_PTR(fop_data);
692  if (fop_data == NULL)
693  goto data_alloc_fail;
694  fop_data->fr_matched_flts.fmf_count = filter_num;
695 
696  M0_ALLOC_ARR(fop_data->fr_matched_flts.fmf_flt_id, filter_num);
697  if (fop_data->fr_matched_flts.fmf_flt_id == NULL)
698  goto flts_alloc_fail;
699 
702  if (fop == NULL)
703  goto fop_alloc_fail;
704  return fop;
705 fop_alloc_fail:
707 flts_alloc_fail:
708  m0_free(fop_data);
709 data_alloc_fail:
710  return NULL;
711 }
712 
713 static struct m0_fop *fop_create(struct m0_fdmi_src_rec *src_rec,
714  const char *endpoint)
715 {
716  int filter_num;
717  struct m0_conf_fdmi_filter *flt;
718  struct m0_conf_fdmi_filter *tmp;
719  int k;
720  struct m0_fop *fop = NULL;
721  struct m0_fop_fdmi_record *fop_data;
722  int idx; /* XXX: TEMP */
723  struct m0_fdmi_flt_id_arr *matched = NULL;
724 
725  M0_ENTRY("src_rec %p, endpoint %s", src_rec, endpoint);
727 
728  filter_num = filters_nr(src_rec, endpoint);
729  if (filter_num > 0) {
730  fop = alloc_fdmi_rec_fop(filter_num);
731  if (fop == NULL)
732  return NULL;
733  fop_data = m0_fop_data(fop);
734  fop_data->fr_rec_id = src_rec->fsr_rec_id;
735  fop_data->fr_rec_type = m0_fdmi__sd_rec_type_id_get(src_rec);
736  matched = &fop_data->fr_matched_flts;
737  k = 0;
738  while (k < filter_num) {
739  flt = fdmi_matched_filter_list_tlist_head(
740  &src_rec->fsr_filter_list);
741  if (m0_streq(endpoint, flt->ff_endpoints[0])) {
742  matched->fmf_flt_id[k++] = flt->ff_filter_id;
743  tmp = flt;
744  flt = fdmi_matched_filter_list_tlist_next(
745  &src_rec->fsr_filter_list, flt);
746  fdmi_matched_filter_list_tlink_del_fini(tmp);
747  } else {
748  flt = fdmi_matched_filter_list_tlist_next(
749  &src_rec->fsr_filter_list, flt);
750  }
751  }
752  M0_LOG(M0_DEBUG, "FDMI record id = "U128X_F,
753  U128_P(&fop_data->fr_rec_id));
754  M0_LOG(M0_DEBUG, "FDMI record type = %x", fop_data->fr_rec_type);
755  M0_LOG(M0_DEBUG, "* matched filters count = [%d]",
756  matched->fmf_count);
757  for (idx = 0; idx < matched->fmf_count; idx++) {
758  M0_LOG(M0_DEBUG, "* [%4d] = "FID_SF, idx,
759  FID_P(&matched->fmf_flt_id[idx]));
760  }
761  }
762  M0_LOG(M0_DEBUG, "ret fop %p", fop);
763  return fop;
764 }
765 
766 static int payload_encode(struct m0_fdmi_src_rec *src_rec, struct m0_fop *fop)
767 {
768  struct m0_fop_fdmi_record *rec = m0_fop_data(fop);
769 
770  M0_ENTRY("src_rec %p fop %p", src_rec, fop);
772  return M0_RC(src_rec->fsr_src->fs_encode(src_rec, &rec->fr_payload));
773 }
774 
776  struct m0_fdmi_src_rec *src_rec)
777 {
778  int rc = 0;
779  struct m0_conf_fdmi_filter *matched_filter;
780  const char *endpoint;
781 
782  M0_ENTRY("sd_ctx %p src_rec %p", sd_ctx, src_rec);
784 
785  M0_LOG(M0_DEBUG, "FDMI record id = "U128X_F,
786  U128_P(&src_rec->fsr_rec_id));
787  while (!fdmi_matched_filter_list_tlist_is_empty(
788  &src_rec->fsr_filter_list)) {
789  struct m0_fop *fop;
790  matched_filter = fdmi_matched_filter_list_tlist_head(
791  &src_rec->fsr_filter_list);
792  /*
793  * Currently only 1 endpoint is specified
794  * for a filter => take 1st array item
795  */
796  endpoint = matched_filter->ff_endpoints[0];
797  fop = fop_create(src_rec, endpoint);
798  if (fop == NULL)
799  continue;
800  M0_LOG(M0_DEBUG, "will send fop=%p fdmi rec:%p", fop, src_rec);
801  fop->f_opaque = src_rec;
802 
803  /* @todo check rc and handle errors properly. */
804  rc = payload_encode(src_rec, fop);
805  M0_ASSERT(rc == 0);
806 
807  /* Adding a ref. It will be dropped when reply is received. */
808  m0_fdmi__fs_get(src_rec);
809  m0_ref_get(&src_rec->fsr_ref);
810  M0_LOG(M0_DEBUG, "src_rec ="U128X_F" ref cnt:%d",
811  U128_P(&src_rec->fsr_rec_id),
812  (int)m0_ref_read(&src_rec->fsr_ref));
813 
814  rc = sd_fom_send_record(&sd_ctx->fsdc_sd_fom, fop, endpoint);
815  if (rc == 0) {
816  /*
817  * Adding a ref. It will be dropped when
818  * "FDMI record release" is received.
819  */
820  m0_fdmi__fs_get(src_rec);
821  m0_ref_get(&src_rec->fsr_ref);
822  M0_LOG(M0_DEBUG, "src_rec ="U128X_F" ref cnt:%d",
823  U128_P(&src_rec->fsr_rec_id),
824  (int)m0_ref_read(&src_rec->fsr_ref));
829  } else {
830  /* Send failure. Drop ref now. */
831  M0_LOG(M0_DEBUG, "src_rec ="U128X_F" ref cnt:%d",
832  U128_P(&src_rec->fsr_rec_id),
833  (int)m0_ref_read(&src_rec->fsr_ref));
834  m0_ref_put(&src_rec->fsr_ref);
835  m0_fdmi__fs_put(src_rec);
836  }
838  }
839  return M0_RC(rc);
840 }
841 
842 static int node_eval(void *data,
843  struct m0_fdmi_flt_var_node *value_desc,
844  struct m0_fdmi_flt_operand *value)
845 {
846  struct m0_fdmi_src_rec *src_rec = data;
847 
849  return src_rec->fsr_src->fs_node_eval(src_rec, value_desc, value);
850 }
851 
853  {
855  .ffth_handler = &m0_fdmi_eval_flt,
856  },
857  {
859  .ffth_handler = &m0_fol_fdmi_filter_kv_substring,
860  },
861 };
862 
863 static int fdmi_filter_calc(struct fdmi_sd_fom *sd_fom,
864  struct m0_fdmi_src_rec *src_rec,
865  struct m0_conf_fdmi_filter *fdmi_filter)
866 {
867  struct m0_fdmi_sd_filter_type_handler *handler;
868  struct m0_fdmi_eval_var_info get_var_info;
869  int rc;
870  int i;
871 
872  M0_ENTRY("sd_fom=%p src_rec=%p fdmi_filter=%p",
873  sd_fom, src_rec, fdmi_filter);
875 
876  get_var_info.user_data = src_rec;
877  get_var_info.get_value_cb = node_eval;
878 
879  for (i = 0; i < ARRAY_SIZE(fdmi_filter_type_handlers); ++i) {
880  handler = &fdmi_filter_type_handlers[i];
881  if (handler->ffth_id == fdmi_filter->ff_type) {
882  rc = handler->ffth_handler(&sd_fom->fsf_flt_eval,
883  fdmi_filter,
884  &get_var_info);
885  return M0_RC_INFO(rc,
886  "sd_fom=%p src_rec=%p fdmi_filter=%p",
887  sd_fom, src_rec, fdmi_filter);
888 
889  }
890  }
891  return M0_ERR_INFO(-EINVAL, "ff_filter_id=%d", fdmi_filter->ff_type);
892 }
893 
894 
901 static void fdmi_sd_fom_check(struct fdmi_sd_fom *sd_fom)
902 {
903  m0_time_t now = m0_time_now();
904  struct m0_fdmi_src_dock *sd_dock = M0_AMB(sd_dock, sd_fom, fsdc_sd_fom);
905  struct m0_fdmi_src_rec *src_rec;
906  uint64_t ref_cnt;
907 
908  if (m0_time_sub(now, sd_fom->fsf_last_checkpoint) <
910  !sd_fom->fsf_has_records) {
911  /* Not enough time elapsed, or no records at all. */
912  return;
913  }
914 
915  M0_LOG(M0_DEBUG, "do checking for %p", sd_fom);
916  sd_fom->fsf_last_checkpoint = now;
917 
918  m0_mutex_lock(&sd_dock->fsdc_list_mutex);
919  m0_tlist_for(&fdmi_record_inflight_tl,
920  &sd_dock->fsdc_rec_inflight,
921  src_rec) {
923  ref_cnt = m0_ref_read(&src_rec->fsr_ref);
924 
925  M0_LOG(M0_DEBUG, "No ack fop=%p src_rec=" U128X_F " ref cnt:%d",
926  src_rec->fsr_data,
927  U128_P(&src_rec->fsr_rec_id),
928  (int)(ref_cnt));
929  if (ref_cnt >= 2) {
930  /* replied is not arrived. Let's wait a bit. */
931  continue;
932  }
933 
934  /*
935  * re-send it, or release it.
936  */
937  now = m0_time_now();
938  if (m0_time_sub(now, src_rec->fsr_init_time) >
940  fdmi_record_inflight_tlist_remove(src_rec);
941  M0_LOG(M0_WARN, "Given up record %p, ID:" U128X_F,
942  src_rec, U128_P(&src_rec->fsr_rec_id));
943  m0_ref_put(&src_rec->fsr_ref);
944  m0_fdmi__fs_put(src_rec);
945  } else if (m0_time_sub(now, src_rec->fsr_init_time) >
947  fdmi_record_inflight_tlist_remove(src_rec);
948  M0_LOG(M0_DEBUG, "Enqueue record again %p, ID:" U128X_F,
949  src_rec, U128_P(&src_rec->fsr_rec_id));
950  m0_fdmi__enqueue_locked(src_rec);
951  }
952  } m0_tlist_endfor;
953  m0_mutex_unlock(&sd_dock->fsdc_list_mutex);
954 }
955 
956 static int fdmi_sd_fom_tick(struct m0_fom *fom)
957 {
958  struct fdmi_sd_fom *sd_fom = M0_AMB(sd_fom, fom, fsf_fom);
959  struct m0_fdmi_src_dock *sd_ctx = M0_AMB(sd_ctx, sd_fom, fsdc_sd_fom);
960  struct m0_reqh_service *rsvc = fom->fo_service;
961  struct m0_fdmi_src_rec *src_rec;
962  int rc;
963 
964  M0_ENTRY("fom %p", fom);
965 
966  M0_LOG(M0_DEBUG, "sd_fom %p phase=%d", sd_fom, m0_fom_phase(fom));
967 
968  switch (m0_fom_phase(fom)) {
970  M0_LOG(M0_DEBUG, "init phase");
972  return M0_RC(M0_FSO_AGAIN);
974  M0_LOG(M0_DEBUG, "wait phase");
976  return M0_RC(M0_FSO_AGAIN);
978  M0_LOG(M0_DEBUG, "get rec");
979 
980  fdmi_sd_fom_check(sd_fom);
981 
982  m0_mutex_lock(&sd_ctx->fsdc_list_mutex);
983  src_rec = fdmi_record_list_tlist_pop(
984  &sd_ctx->fsdc_posted_rec_list);
986 
987  if (src_rec == NULL) {
991  else
994  return M0_RC(M0_FSO_WAIT);
995  } else {
996  M0_LOG(M0_DEBUG, "popped from record list id =" U128X_F,
997  U128_P(&src_rec->fsr_rec_id));
999  sd_fom->fsf_has_records = true;
1000  rc = process_fdmi_rec(sd_fom, src_rec);
1001  if (rc == 0) {
1002  if (!fdmi_matched_filter_list_tlist_is_empty(
1003  &src_rec->fsr_filter_list)) {
1005  src_rec);
1006  }
1007  } else if (rc != -ENOENT) {
1008  /*
1009  * -ENOENT error means that configuration does
1010  * not have filters group matching the record
1011  * type. This is fine, ignoring.
1012  */
1013  M0_LOG(M0_ERROR,
1014  "FDMI record processing error %d", rc);
1015  }
1016  /*
1017  * Source dock is done with this record (however,
1018  * there is still a ref held while sending this record
1019  * to plugin).
1020  */
1021  M0_LOG(M0_DEBUG, "src_rec =" U128X_F " ref cnt:%d",
1022  U128_P(&src_rec->fsr_rec_id),
1023  (int)m0_ref_read(&src_rec->fsr_ref) - 1);
1024  m0_ref_put(&src_rec->fsr_ref);
1025  m0_fdmi__fs_put(src_rec);
1026  return M0_RC(M0_FSO_AGAIN);
1027  }
1028  }
1029  return M0_RC(M0_FSO_WAIT);
1030 }
1031 
1033 {
1034  struct m0_fdmi_src_rec *src_rec;
1035  struct m0_fdmi_src_dock *src_dock;
1036  struct m0_rpc_conn_pool *pool;
1037  int rc;
1038  int64_t ref_cnt;
1039 
1040  M0_ENTRY("item=%p", item);
1041 
1042  src_dock = m0_fdmi_src_dock_get();
1043  src_rec = m0_rpc_item_to_fop(item)->f_opaque;
1045 
1047  if (rc != 0)
1048  M0_LOG(M0_ERROR, "FDMI reply error %d item->ri_error %d to %s",
1049  rc, item->ri_error,
1051 
1052  pool = &src_dock->fsdc_sd_fom.fsf_conn_pool;
1054  ref_cnt = m0_ref_read(&src_rec->fsr_ref);
1055  M0_LOG(M0_DEBUG, "src_rec ="U128X_F" ref cnt:%d",
1056  U128_P(&src_rec->fsr_rec_id),
1057  (int)(ref_cnt - 1));
1058  m0_ref_put(&src_rec->fsr_ref);
1059  m0_fdmi__fs_put(src_rec);
1060 
1061  /*
1062  * The "FDMI release" request may come before this reply.
1063  * So, the ref cnt may drop to zero at this moment.
1064  * In that case, the record is freed and no need to re-send again.
1065  */
1066  if (rc != 0 && (ref_cnt - 1) > 0) {
1068  m0_mutex_lock(&src_dock->fsdc_list_mutex);
1069  fdmi_record_inflight_tlist_remove(src_rec);
1070  M0_LOG(M0_DEBUG, "removed from inflight list id = " U128X_F,
1071  U128_P(&src_rec->fsr_rec_id));
1072 
1073  m0_mutex_unlock(&src_dock->fsdc_list_mutex);
1074  /*
1075  * The failed fop will be released.
1076  * Now let's enqueue the FDMI record again. It will be
1077  * processed and sent again.
1078  */
1079  /*
1080  * There is a rare case that the failed reply comes before
1081  * the processing of this record in fom tick.
1082  * So, the refcount here may be more than 1. But the extra
1083  * refcount will be droppped soon.
1084  */
1085  M0_LOG(M0_DEBUG, "Enqueue fdmi record again %p, ID:" U128X_F,
1086  src_rec, U128_P(&src_rec->fsr_rec_id));
1087  m0_fdmi__enqueue(src_rec);
1088  }
1089 
1090  M0_LEAVE();
1091 }
1092 
1093 /*
1094  ******************************************************************************
1095  * FDMI SD Release Record FOM specific functions
1096  ******************************************************************************
1097  */
1098 
1099 static int fdmi_rr_fom_create(struct m0_fop *fop, struct m0_fom **out,
1100  struct m0_reqh *reqh)
1101 {
1102  struct fdmi_rr_fom *rr_fom;
1103  struct m0_fom *fom;
1104  struct m0_fop *reply_fop;
1105  int rc = 0;
1106 
1107  M0_ENTRY("fop %p", fop);
1108 
1109  M0_ALLOC_PTR(rr_fom);
1110  if (rr_fom == NULL) {
1111  rc = M0_ERR(-ENOMEM);
1112  goto end;
1113  }
1114  fom = &rr_fom->frf_fom;
1117  if (reply_fop == NULL) {
1118  rc = M0_ERR(-ENOMEM);
1119  goto end;
1120  }
1122  fop, reply_fop, reqh);
1124  *out = fom;
1125 end:
1126  if (rc != 0) {
1127  m0_free(rr_fom);
1128  *out = NULL;
1129  }
1130  return M0_RC(rc);
1131 }
1132 
1133 static void fdmi_rr_fom_fini(struct m0_fom *fom)
1134 {
1135  struct fdmi_rr_fom *rr_fom = M0_AMB(rr_fom, fom, frf_fom);
1136 
1137  M0_ENTRY("fom %p", fom);
1138  m0_fom_fini(fom);
1139  m0_free(rr_fom);
1140  M0_LEAVE();
1141 }
1142 
1143 static int fdmi_rr_fom_tick(struct m0_fom *fom)
1144 {
1145  struct m0_fop_fdmi_rec_release *fop_data;
1147  struct m0_rpc_item *item;
1148 
1149  M0_ENTRY("fom %p", fom);
1150 
1151  fop_data = m0_fop_data(fom->fo_fop);
1152  m0_fdmi__handle_release(&fop_data->frr_frid);
1153  reply_data = m0_fop_data(fom->fo_rep_fop);
1154  reply_data->frrr_rc = 0;
1155  item = m0_fop_to_rpc_item(fom->fo_rep_fop);
1156  m0_rpc_reply_post(&fom->fo_fop->f_item, item);
1158 
1159  M0_LEAVE();
1160  return M0_FSO_WAIT;
1161 }
1162 
1163 static void fdmi_sd_timer_fom_fini(struct m0_fom *fom)
1164 {
1165  struct fdmi_sd_timer_fom *timer_fom = M0_AMB(timer_fom, fom, fstf_fom);
1166  M0_ENTRY("fom %p", fom);
1167 
1168  m0_fom_timeout_fini(&timer_fom->fstf_timeout);
1169  m0_semaphore_up(&timer_fom->fstf_shutdown);
1170  m0_fom_fini(fom);
1171  M0_LEAVE();
1172 }
1173 
1174 static int fdmi_sd_timer_fom_tick(struct m0_fom *fom)
1175 {
1176  struct fdmi_sd_timer_fom *timer_fom = M0_AMB(timer_fom, fom, fstf_fom);
1177  struct m0_reqh_service *rsvc = fom->fo_service;
1178  struct m0_fdmi_src_dock *src_dock = m0_fdmi_src_dock_get();
1179 
1181  M0_LOG(M0_DEBUG, "timer fom stopping");
1182  m0_fom_timeout_cancel(&timer_fom->fstf_timeout);
1184  return M0_RC(M0_FSO_WAIT);
1185  }
1186 
1187  switch (m0_fom_phase(fom)) {
1190  return M0_RC(M0_FSO_AGAIN);
1192  M0_LOG(M0_DEBUG, "wait phase");
1194  return M0_RC(M0_FSO_AGAIN);
1196  M0_LOG(M0_DEBUG, "Now, WAKEUP the source dock fom");
1197  m0_fom_timeout_fini(&timer_fom->fstf_timeout);
1198  m0_fom_timeout_init(&timer_fom->fstf_timeout);
1200  fom,
1201  m0_time_from_now(30, 0));
1204  return M0_RC(M0_FSO_WAIT);
1205  }
1206  return M0_RC(M0_FSO_WAIT);
1207 }
1208 
1209 #undef M0_TRACE_SUBSYSTEM
1210 
1211 /*
1212  * Local variables:
1213  * c-indentation-style: "K&R"
1214  * c-basic-offset: 8
1215  * tab-width: 8
1216  * fill-column: 80
1217  * scroll-step: 1
1218  * End:
1219  */
1220 /*
1221  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
1222  */
static int payload_encode(struct m0_fdmi_src_rec *src_rec, struct m0_fop *fop)
M0_INTERNAL void m0_fdmi__src_dock_fom_stop(struct m0_fdmi_src_dock *src_dock)
M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
Definition: reqh_service.c:560
struct m0_fop_type m0_fop_fdmi_rec_release_rep_fopt
Definition: fops.c:50
M0_EXTERN struct m0_reqh_service_type m0_fdmi_service_type
Definition: fdmi.h:194
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
M0_INTERNAL void m0_rpc_conn_pool_destroy(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)
Definition: conn_pool.c:450
m0_time_t ri_resend_interval
Definition: item.h:144
M0_TL_DECLARE(fdmi_record_inflight, M0_EXTERN, struct m0_fdmi_src_rec)
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static void fdmi_sd_fom_check(struct fdmi_sd_fom *sd_fom)
#define FID_SF
Definition: fid.h:76
struct m0_fdmi_flt_id_arr fr_matched_flts
Definition: fops.h:78
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
struct m0_fid ff_filter_id
Definition: obj.h:730
fdmi_rr_fom_phase
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
int(* fco_open)(struct m0_filterc_ctx *ctx, enum m0_fdmi_rec_type_id rec_type_id, struct m0_filterc_iter *iter)
Definition: filterc.h:76
struct m0_tl fsr_filter_list
Definition: src_rec.h:92
M0_INTERNAL void m0_rpc_conn_pool_fini(struct m0_rpc_conn_pool *pool)
Definition: conn_pool.c:345
M0_INTERNAL void m0_fom_block_enter(struct m0_fom *fom)
Definition: fom.c:538
static size_t fdmi_sd_fom_locality(const struct m0_fom *fom)
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_TL_DESCR_DEFINE(pending_fops, "pending fops list", M0_INTERNAL, struct fdmi_pending_fop, fti_linkage, fti_magic, M0_FDMI_SRC_DOCK_PENDING_FOP_MAGIC, M0_FDMI_SRC_DOCK_PENDING_FOP_HEAD_MAGIC)
M0_INTERNAL int m0_fdmi__handle_release(struct m0_uint128 *fdmi_rec_id)
Definition: source_dock.c:445
struct fdmi_sd_fom fsdc_sd_fom
static const struct m0_fom_type_ops fdmi_sd_fom_type_ops
static int fdmi_rr_fom_tick(struct m0_fom *fom)
struct m0_mutex fsdc_list_mutex
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
M0_TL_DESCR_DECLARE(fdmi_record_inflight, M0_EXTERN)
Definition: sm.h:350
bool fsr_dryrun
Definition: src_rec.h:106
static struct m0_sm_group * grp
Definition: bytecount.c:38
static struct m0_fop * alloc_fdmi_rec_fop(int filter_num)
int(* fco_get_next)(struct m0_filterc_iter *iter, struct m0_conf_fdmi_filter **out)
Definition: filterc.h:89
static int apply_filters(struct fdmi_sd_fom *sd_fom, struct m0_fdmi_src_rec *src_rec)
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
const struct m0_fom_ops fdmi_rr_fom_ops
struct m0_sm_ast fstf_wakeup_ast
static void fdmi_sd_fom_fini(struct m0_fom *fom)
static int fdmi_post_fop(struct m0_fop *fop, struct m0_rpc_session *session)
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
M0_INTERNAL int m0_rpc_conn_pool_init(struct m0_rpc_conn_pool *pool, struct m0_rpc_machine *rpc_mach, m0_time_t conn_timeout, uint64_t max_rpcs_in_flight)
Definition: conn_pool.c:325
int(* ffth_handler)(struct m0_fdmi_eval_ctx *ctx, struct m0_conf_fdmi_filter *filter, struct m0_fdmi_eval_var_info *var_info)
struct fdmi_sd_fom * sd_fom
struct m0_uint128 fsr_rec_id
Definition: src_rec.h:80
m0_time_t fsf_last_checkpoint
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
M0_INTERNAL bool m0_fdmi__record_is_valid(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:121
struct m0_fom fsf_fom
int(* fs_node_eval)(struct m0_fdmi_src_rec *src_rec, struct m0_fdmi_flt_var_node *value_desc, struct m0_fdmi_flt_operand *value)
Definition: source_dock.h:62
struct m0_bufvec data
Definition: di.c:40
static int fdmi_sd_timer_fom_tick(struct m0_fom *fom)
int const char const void * value
Definition: dir.c:325
static int fdmi_filter_calc(struct fdmi_sd_fom *sd_fom, struct m0_fdmi_src_rec *src_rec, struct m0_conf_fdmi_filter *fdmi_filter)
int32_t ri_error
Definition: item.h:161
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
static void wakeup_iff_waiting(struct m0_sm_group *grp, struct m0_sm_ast *ast)
bool fsr_matched
Definition: src_rec.h:101
static void fdmi_rec_notif_replied(struct m0_rpc_item *item)
#define M0_BITS(...)
Definition: misc.h:236
Definition: sm.h:504
static int node_eval(void *data, struct m0_fdmi_flt_var_node *value_desc, struct m0_fdmi_flt_operand *value)
static int sd_fom_send_record(struct fdmi_sd_fom *sd_fom, struct m0_fop *fop, const char *ep)
static void fdmi__src_dock_timer_fom_wakeup(struct fdmi_sd_timer_fom *timer_fom)
struct m0_semaphore fsf_shutdown
static struct m0_rpc_session session
Definition: formation2.c:38
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
struct m0_filterc_ctx fsf_filter_ctx
m0_time_t m0_time(uint64_t secs, long ns)
Definition: time.c:41
const char ** ff_endpoints
Definition: obj.h:760
static struct m0_rpc_item * item
Definition: item.c:56
static const struct m0_fom_type_ops fdmi_sd_timer_fom_type_ops
fdmi_src_dock_timer_fom_phase
m0_fom_phase
Definition: fom.h:372
struct m0_clink fti_clink
M0_INTERNAL void m0_fdmi__fs_get(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:388
const struct m0_filterc_ops * fcc_ops
Definition: filterc.h:108
struct m0_fom_type ft_fom_type
Definition: fop.h:232
void(* fco_stop)(struct m0_filterc_ctx *ctx)
Definition: filterc.h:63
M0_INTERNAL void m0_fdmi__src_dock_fom_init(void)
struct m0_ref fsr_ref
Definition: src_rec.h:87
return M0_RC(rc)
struct m0_fdmi_src * fsr_src
Definition: src_rec.h:72
M0_INTERNAL void m0_fdmi_eval_init(struct m0_fdmi_eval_ctx *ctx)
Definition: flt_eval.c:130
Definition: sock.c:754
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
Definition: conn.c:1306
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_conf fdmi_src_dock_timer_fom_sm_conf
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
int(* fs_encode)(struct m0_fdmi_src_rec *src_rec, struct m0_buf *buf)
Definition: source_dock.h:82
M0_INTERNAL int m0_fom_timeout_wait_on(struct m0_fom_timeout *to, struct m0_fom *fom, m0_time_t deadline)
Definition: fom.c:1566
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
struct m0_buf fr_payload
Definition: fops.h:75
struct fdmi_sd_timer_fom fsdc_sd_timer_fom
static struct m0_rpc_machine * m0_fdmi__sd_conn_pool_rpc_machine(void)
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
static const struct m0_fom_ops fdmi_sd_fom_ops
M0_INTERNAL bool m0_fom_is_waiting(const struct m0_fom *fom)
Definition: fom.c:1732
int i
Definition: dir.c:1033
#define M0_RC_INFO(rc, fmt,...)
Definition: trace.h:209
struct m0_fop_type * f_type
Definition: fop.h:81
static struct m0_fom_type fdmi_sd_timer_fom_type
struct m0_rpc_machine * cp_rpc_mach
Definition: conn_pool.h:48
static int filters_nr(struct m0_fdmi_src_rec *src_rec, const char *endpoint)
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
Definition: refs.c:38
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
struct m0_uint128 frr_frid
Definition: fops.h:94
struct m0_rpc_fop_session_establish est
Definition: session.c:51
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
struct m0_filterc_iter fsf_filter_iter
void * sa_datum
Definition: sm.h:508
M0_INTERNAL void m0_fom_ready(struct m0_fom *fom)
Definition: fom.c:429
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
M0_INTERNAL void m0_fdmi__enqueue(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:250
fdmi_src_dock_fom_phase
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_TL_DEFINE(pending_fops, static, struct fdmi_pending_fop)
struct m0_sm_ast fsf_wakeup_ast
M0_INTERNAL void m0_filterc_ctx_init(struct m0_filterc_ctx *ctx, const struct m0_filterc_ops *ops)
Definition: filterc.c:49
#define U128_P(x)
Definition: types.h:45
static bool pending_fop_clink_cb(struct m0_clink *clink)
m0_time_t m0_time_now(void)
Definition: time.c:134
M0_INTERNAL void m0_filterc_ctx_fini(struct m0_filterc_ctx *ctx)
Definition: filterc.c:80
static int fdmi_sd_fom_tick(struct m0_fom *fom)
static struct m0_fop reply_fop
Definition: fsync.c:64
static struct m0_sm_state_descr fdmi_src_dock_state_descr[]
#define m0_streq(a, b)
Definition: string.h:34
void * fsr_data
Definition: src_rec.h:75
int(* get_value_cb)(void *user_data, struct m0_fdmi_flt_var_node *value_desc, struct m0_fdmi_flt_operand *value)
Definition: flt_eval.h:70
struct m0_tl rh_rpc_machines
Definition: reqh.h:135
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
Definition: fom.c:1596
M0_INTERNAL struct m0_fdmi_src_dock * m0_fdmi_src_dock_get(void)
Definition: fdmi.c:890
M0_INTERNAL void m0_fom_block_leave(struct m0_fom *fom)
Definition: fom.c:582
M0_INTERNAL struct m0_chan * m0_rpc_conn_pool_session_chan(struct m0_rpc_session *session)
Definition: conn_pool.c:304
static int fdmi_rr_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
struct m0_fop * m0_fop_get(struct m0_fop *fop)
Definition: fop.c:162
struct m0_rpc_item * ri_reply
Definition: item.h:163
struct m0_semaphore fstf_shutdown
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
static struct m0_fop_fsync_rep reply_data
Definition: fsync.c:67
uint32_t fr_rec_type
Definition: fops.h:72
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
void * f_opaque
Definition: fop.h:84
uint64_t ri_nr_sent_max
Definition: item.h:146
M0_INTERNAL void m0_fdmi__fs_begin(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:415
Definition: reqh.h:94
M0_INTERNAL void m0_fdmi__src_dock_fom_wakeup(struct fdmi_sd_fom *sd_fom)
Definition: dump.c:103
struct m0_sm_ast * sa_next
Definition: sm.h:509
struct m0_fop * fti_fop
const struct m0_sm_conf fdmi_rr_fom_sm_conf
void m0_rpc_reply_post(struct m0_rpc_item *request, struct m0_rpc_item *reply)
Definition: rpc.c:135
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL bool m0_rpc_conn_pool_session_established(struct m0_rpc_session *session)
Definition: conn_pool.c:310
M0_INTERNAL int m0_fdmi_eval_flt(struct m0_fdmi_eval_ctx *ctx, struct m0_conf_fdmi_filter *filter, struct m0_fdmi_eval_var_info *var_info)
Definition: flt_eval.c:184
static struct m0_sm_state_descr fdmi_rr_fom_state_descr[]
#define FID_P(f)
Definition: fid.h:77
static int sd_fom_save_pending_fop(struct fdmi_sd_fom *sd_fom, struct m0_fop *fop, struct m0_rpc_session *session)
static struct m0_pool pool
Definition: iter_ut.c:58
struct m0_mutex fsf_pending_fops_lock
M0_INTERNAL enum m0_fdmi_rec_type_id m0_fdmi__sd_rec_type_id_get(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:439
static struct m0_sm_state_descr fdmi_src_dock_timer_fom_state_descr[]
#define U128X_F
Definition: types.h:42
struct m0_tl fsdc_posted_rec_list
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:255
uint32_t sd_flags
Definition: sm.h:378
struct m0_tl fsdc_rec_inflight
static void fdmi_sd_timer_fom_fini(struct m0_fom *fom)
Definition: fom.h:481
struct m0_fop_type m0_fop_fdmi_rec_not_fopt
Definition: fops.c:47
static const struct m0_rpc_item_ops fdmi_rec_not_item_ops
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
Definition: refs.c:44
uint64_t n
Definition: fops.h:107
m0_time_t fsr_init_time
Definition: src_rec.h:111
struct m0_reqh reqh
Definition: rm_foms.c:48
M0_INTERNAL void m0_fom_timeout_fini(struct m0_fom_timeout *to)
Definition: fom.c:1539
struct m0_rpc_conn_pool fsf_conn_pool
struct m0_fdmi_eval_ctx fsf_flt_eval
struct m0_tlink fti_linkage
struct m0_fom frf_fom
M0_INTERNAL int m0_rpc_conn_pool_get_async(struct m0_rpc_conn_pool *pool, const char *remote_ep, struct m0_rpc_session **session)
Definition: conn_pool.c:230
const struct m0_fom_type_ops fdmi_rr_fom_type_ops
struct m0_tl fsf_pending_fops
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
uint32_t fmf_count
Definition: fops.h:58
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
struct m0_fom_timeout fstf_timeout
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
struct m0_rpc_session * ri_session
Definition: item.h:147
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:338
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
static struct m0_sm_conf fdmi_src_dock_fom_sm_conf
struct m0_fid * fmf_flt_id
Definition: fops.h:61
#define m0_tlist_endfor
Definition: tlist.h:448
struct m0_uint128 fr_rec_id
Definition: fops.h:69
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
static void fdmi_rr_fom_fini(struct m0_fom *fom)
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
Definition: fom.c:624
int(* fco_start)(struct m0_filterc_ctx *ctx, struct m0_reqh *reqh)
Definition: filterc.h:56
void(* fo_fini)(struct m0_fom *fom)
Definition: fom.h:657
#define m0_tlist_for(descr, head, obj)
Definition: tlist.h:435
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
M0_INTERNAL int m0_fol_fdmi_filter_kv_substring(struct m0_fdmi_eval_ctx *ctx, struct m0_conf_fdmi_filter *filter, struct m0_fdmi_eval_var_info *var_info)
Definition: fol_fdmi_src.c:738
#define out(...)
Definition: gen.c:41
static struct m0_fop * fop_create(struct m0_fdmi_src_rec *src_rec, const char *endpoint)
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
M0_INTERNAL void m0_fom_timeout_init(struct m0_fom_timeout *to)
Definition: fom.c:1532
static const struct m0_fom_ops fdmi_sd_timer_fom_ops
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
void m0_free(void *data)
Definition: memory.c:146
struct m0_rpc_item f_item
Definition: fop.h:83
M0_INTERNAL void m0_fdmi_eval_fini(struct m0_fdmi_eval_ctx *ctx)
Definition: flt_eval.c:204
enum m0_fdmi_filter_type_id ff_type
Definition: obj.h:729
M0_INTERNAL void m0_fdmi__fs_put(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:402
static struct m0_fdmi_sd_filter_type_handler fdmi_filter_type_handlers[]
enum m0_fdmi_filter_type_id ffth_id
static int sd_fom_process_matched_filters(struct m0_fdmi_src_dock *sd_ctx, struct m0_fdmi_src_rec *src_rec)
struct m0_rpc_session * fti_session
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_fdmi__enqueue_locked(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:236
struct m0_rpc_conn * s_conn
Definition: session.h:312
static struct m0_fom_type fdmi_sd_fom_type
const m0_time_t M0_TIME_IMMEDIATELY
Definition: time.c:107
Definition: fop.h:79
static int process_fdmi_rec(struct fdmi_sd_fom *sd_fom, struct m0_fdmi_src_rec *src_rec)
Definition: trace.h:478
M0_INTERNAL void m0_fom_timeout_cancel(struct m0_fom_timeout *to)
Definition: fom.c:1581
M0_INTERNAL int m0_fdmi__src_dock_fom_start(struct m0_fdmi_src_dock *src_dock, const struct m0_filterc_ops *filterc_ops, struct m0_reqh *reqh)
m0_time_t ri_deadline
Definition: item.h:141
M0_INTERNAL void m0_rpc_conn_pool_put(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)
Definition: conn_pool.c:288
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
void(* fco_close)(struct m0_filterc_iter *iter)
Definition: filterc.h:96