Motr  M0
fol_fdmi_src.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2017-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FDMI
24 #include "lib/trace.h"
25 #include "lib/memory.h"
26 #include "lib/finject.h" /* M0_FI_ENABLED */
27 #include "lib/string.h" /* strlen */
28 
29 #include "fdmi/fdmi.h"
30 #include "fdmi/source_dock.h"
31 #include "fdmi/fol_fdmi_src.h"
32 #include "fdmi/filter.h"
34 #include "fdmi/module.h"
35 #include "fop/fop.h" /* m0_fop_fol_frag */
36 #include "rpc/rpc_opcodes.h" /* M0_CAS_PUT_FOP_OPCODE */
37 #include "cas/cas.h" /* m0_cas_op */
38 
39 
206 /* ------------------------------------------------------------------
207  * Fragments handling
208  * ------------------------------------------------------------------ */
209 
210 #define M0_FOL_FRAG_DATA_HANDLER_DECLARE(_opecode, _get_val_func) { \
211  .ffh_opecode = (_opecode), \
212  .ffh_fol_frag_get_val = (_get_val_func) }
213 
216 };
217 
218 /* ------------------------------------------------------------------
219  * List of locked transactions
220  * ------------------------------------------------------------------ */
221 
222 M0_TL_DESCR_DEFINE(ffs_tx, "fdmi fol src tx list", M0_INTERNAL,
223  struct m0_be_tx, t_fdmi_linkage, t_magic,
225 
226 M0_TL_DEFINE(ffs_tx, M0_INTERNAL, struct m0_be_tx);
227 
228 /* ------------------------------------------------------------------
229  * Helpers
230  * ------------------------------------------------------------------ */
231 
232 static struct m0_dtx* ffs_get_dtx(struct m0_fdmi_src_rec *src_rec)
233 {
234  /* This is just wrapper, so no point using ENTRY/LEAVE */
235 
236  struct m0_fol_rec *fol_rec;
237 
239  fol_rec = container_of(src_rec, struct m0_fol_rec, fr_fdmi_rec);
240  return container_of(fol_rec, struct m0_dtx, tx_fol_rec);
241 }
242 
243 static void be_tx_put_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
244 {
245  struct m0_be_tx *be_tx = ast->sa_datum;
246 
247  M0_ENTRY("sm_group %p, ast %p (be_tx = %p)", grp, ast, be_tx);
248  M0_LOG(M0_DEBUG, "call be_tx_put direct (2)");
249  m0_be_tx_put(be_tx);
250 
251  M0_LEAVE();
252 }
253 
254 static void ffs_tx_inc_refc(struct m0_be_tx *be_tx, int64_t *counter)
255 {
257  int64_t cnt;
258 
259  M0_ENTRY("be_tx %p", be_tx);
260 
261  M0_ASSERT(be_tx != NULL);
262 
263  if (m0_atomic64_get(&be_tx->t_fdmi_ref) == 0) {
269  M0_LOG(M0_INFO, "first incref for a be_tx_get %p", be_tx);
270  m0_be_tx_get(be_tx);
271  m0_mutex_lock(&m->fdm_s.fdms_ffs_locked_tx_lock);
272  ffs_tx_tlink_init_at_tail(be_tx,
273  &m->fdm_s.fdms_ffs_locked_tx_list);
274  m0_mutex_unlock(&m->fdm_s.fdms_ffs_locked_tx_lock);
275  }
276 
277  cnt = m0_atomic64_add_return(&be_tx->t_fdmi_ref, 1);
278  M0_ASSERT(cnt > 0);
279 
280  if (counter != NULL)
281  *counter = cnt;
282  M0_LEAVE("counter = %"PRIi64, cnt);
283 }
284 
285 static void ffs_tx_dec_refc(struct m0_be_tx *be_tx, int64_t *counter)
286 {
288  int64_t cnt;
289 
290  M0_ENTRY("be_tx %p, counter ptr %p", be_tx, counter);
291 
292  M0_ASSERT(be_tx != NULL);
293 
294  cnt = m0_atomic64_sub_return(&be_tx->t_fdmi_ref, 1);
295  M0_ASSERT(cnt >= 0);
296 
297  if (counter != NULL)
298  *counter = cnt;
299 
300  if (cnt == 0) {
301  m0_mutex_lock(&m->fdm_s.fdms_ffs_locked_tx_lock);
302  ffs_tx_tlink_del_fini(be_tx);
303  m0_mutex_unlock(&m->fdm_s.fdms_ffs_locked_tx_lock);
304  M0_LOG(M0_DEBUG, "call be_tx_put CB");
306  be_tx->t_fdmi_put_ast.sa_datum = be_tx;
307  m0_sm_ast_post(be_tx->t_sm.sm_grp, &be_tx->t_fdmi_put_ast);
308  M0_LOG(M0_DEBUG, "last decref for a be_tx %p "
309  "(ast callback posted)", be_tx);
310  }
311  M0_LEAVE("counter = %"PRIi64, cnt);
312 }
313 
314 static int64_t ffs_rec_get(struct m0_fdmi_src_rec *src_rec)
315 {
316  struct m0_dtx *dtx;
317  int64_t cnt;
318 
319  /*
320  * If this is the first time call on a new rec, some member
321  * of this struct may not be fully populated. But it's fine
322  * to get dtx.
323  */
324  dtx = ffs_get_dtx(src_rec);
325  M0_ASSERT(dtx != NULL);
326 
327  ffs_tx_inc_refc(&dtx->tx_betx, &cnt);
328 
329  return cnt;
330 }
331 
332 static int64_t ffs_rec_put(struct m0_fdmi_src_rec *src_rec)
333 {
334  struct m0_dtx *dtx;
335  int64_t cnt;
336 
338 
339  dtx = ffs_get_dtx(src_rec);
340  M0_ASSERT(dtx != NULL);
341 
342  ffs_tx_dec_refc(&dtx->tx_betx, &cnt);
343 
344  return cnt;
345 }
346 
347 /* ------------------------------------------------------------------
348  * FOL source interface implementation
349  * ------------------------------------------------------------------ */
350 
351 static int ffs_op_node_eval(struct m0_fdmi_src_rec *src_rec,
352  struct m0_fdmi_flt_var_node *value_desc,
353  struct m0_fdmi_flt_operand *value)
354 {
355  struct m0_dtx *dtx;
356  struct m0_fol_rec *fol_rec;
357  uint64_t opcode;
358  struct m0_fol_frag *rfrag;
359  struct m0_fop_fol_frag *rp;
360  int rc;
361 
362  M0_ENTRY("src_rec %p, value desc %p, value %p",
363  src_rec, value_desc, value);
364 
366  M0_ASSERT(value_desc != NULL && value != NULL);
367 
368  dtx = ffs_get_dtx(src_rec);
369  M0_ASSERT(dtx != NULL);
370 
371  fol_rec = &dtx->tx_fol_rec;
372 
376  rfrag = m0_rec_frag_tlist_head(&fol_rec->fr_frags);
377  M0_ASSERT(rfrag != NULL);
378 
386 
387  rp = rfrag->rp_data;
388  M0_ASSERT(rp != NULL);
389  opcode = rp->ffrp_fop_code;
391  rc = 0;
392 
393  return M0_RC(rc);
394 }
395 
396 static void ffs_op_get(struct m0_fdmi_src_rec *src_rec)
397 {
398  int64_t cnt;
399 
401 
402  /* Proper transactional handling is for phase 2. */
403  cnt = ffs_rec_get(src_rec);
404 
405  M0_LOG(M0_DEBUG, "src_rec %p counter=%"PRIi64, src_rec, cnt);
406 }
407 
408 static void ffs_op_put(struct m0_fdmi_src_rec *src_rec)
409 {
410  int64_t cnt;
412 
413  /* Proper transactional handling is for phase 2. */
414  cnt = ffs_rec_put(src_rec);
415 
416  M0_LOG(M0_DEBUG, "src_rec %p counter=%"PRIi64, src_rec, cnt);
417 }
418 
419 static int ffs_op_encode(struct m0_fdmi_src_rec *src_rec,
420  struct m0_buf *buf)
421 {
422  struct m0_dtx *dtx;
423  struct m0_fol_rec *fol_rec;
424  struct m0_buf local_buf = {};
425  int rc;
426 
427  M0_ASSERT(buf != NULL);
429  M0_ASSERT(buf->b_addr == NULL && buf->b_nob == 0);
430 
431  M0_ENTRY("src_rec %p, cur " BUF_F, src_rec, BUF_P(buf));
432 
433  dtx = ffs_get_dtx(src_rec);
434  M0_ASSERT(dtx != NULL);
435 
436  fol_rec = &dtx->tx_fol_rec;
437 
445  rc = m0_buf_alloc(&local_buf, FOL_REC_MAXSIZE);
446  if (rc != 0) {
447  return M0_ERR_INFO(rc, "Failed to allocate internal buffer "
448  "for encoded FOL FDMI record.");
449  }
450 
451  rc = m0_fol_rec_encode(fol_rec, &local_buf);
452  if (rc != 0) {
454  "Failed to encoded FOL FDMI record.");
455  goto done;
456  }
457 
458  rc = m0_buf_alloc(buf, fol_rec->fr_header.rh_data_len);
459  if (rc != 0) {
461  "Failed to allocate encoded FOL FDMI record.");
462  goto done;
463  }
464  memcpy(buf->b_addr, local_buf.b_addr, buf->b_nob);
465 
466  if (M0_FI_ENABLED("fail_in_final"))
467  rc = -EINVAL;
468 
469 done:
470  /* Finalization */
471  if (local_buf.b_addr != NULL)
472  m0_buf_free(&local_buf);
473  /* On-Error cleanup. */
474  if (rc < 0) {
475  if (buf->b_addr != NULL)
476  m0_buf_free(buf);
477  }
478 
479  return M0_RC(rc);
480 }
481 
482 static int ffs_op_decode(struct m0_buf *buf, void **handle)
483 {
484  struct m0_fol_rec *fol_rec = 0;
485  int rc = 0;
486 
487  M0_ASSERT(buf != NULL && buf->b_addr != NULL && handle != NULL);
488 
489  M0_ENTRY("buf " BUF_F ", handle %p", BUF_P(buf), handle);
490 
491  M0_ALLOC_PTR(fol_rec);
492  if (fol_rec == NULL) {
493  M0_LOG(M0_ERROR, "failed to allocate m0_fol_rec object");
494  rc = -ENOMEM;
495  goto done;
496  }
497  m0_fol_rec_init(fol_rec, NULL);
498 
499  rc = m0_fol_rec_decode(fol_rec, buf);
500  if (rc < 0)
501  goto done;
502 
503  *handle = fol_rec;
504 
505  if (M0_FI_ENABLED("fail_in_final"))
506  rc = -EINVAL;
507 
508 done:
509  if (rc < 0) {
510  if (fol_rec != NULL) {
511  m0_fol_rec_fini(fol_rec);
512  m0_free0(&fol_rec);
513  }
514  *handle = NULL;
515  }
516 
517  return M0_RC(rc);
518 }
519 
520 static void ffs_op_begin(struct m0_fdmi_src_rec *src_rec)
521 {
522  M0_ENTRY("src_rec %p", src_rec);
523 
525 
533  (void)src_rec;
534 
535  M0_LEAVE();
536 }
537 
538 static void ffs_op_end(struct m0_fdmi_src_rec *src_rec)
539 {
540  M0_ENTRY("src_rec %p", src_rec);
541 
543 
544  M0_LEAVE();
545 }
546 
547 /* ------------------------------------------------------------------
548  * Init/fini
549  * ------------------------------------------------------------------ */
550 
551 M0_INTERNAL int m0_fol_fdmi_src_init(void)
552 {
554  int rc;
555 
556  M0_ENTRY();
557 
558  M0_ASSERT(m->fdm_s.fdms_ffs_ctx.ffsc_src == NULL);
559  m->fdm_s.fdms_ffs_ctx.ffsc_magic = M0_FOL_FDMI_SRC_CTX_MAGIC;
560 
562  &m->fdm_s.fdms_ffs_ctx.ffsc_src);
563  if (rc != 0)
564  return M0_ERR(rc);
565 
566  ffs_tx_tlist_init(&m->fdm_s.fdms_ffs_locked_tx_list);
567  m0_mutex_init(&m->fdm_s.fdms_ffs_locked_tx_lock);
568 
569  m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_node_eval = ffs_op_node_eval;
570  m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_get = ffs_op_get;
571  m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_put = ffs_op_put;
572  m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_begin = ffs_op_begin;
573  m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_end = ffs_op_end;
574  m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_encode = ffs_op_encode;
575  m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_decode = ffs_op_decode;
576 
577  rc = m0_fdmi_source_register(m->fdm_s.fdms_ffs_ctx.ffsc_src);
578  if (rc != 0) {
579  M0_LOG(M0_ERROR, "Failed to register FDMI FOL source.");
580  goto error_free_src;
581  }
582 
583  m->fdm_s.fdms_ffs_ctx.ffsc_frag_handler_vector = ffs_frag_handler_array;
584  m->fdm_s.fdms_ffs_ctx.ffsc_handler_number =
586  return M0_RC(rc);
587 error_free_src:
588  m0_fdmi_source_deregister(m->fdm_s.fdms_ffs_ctx.ffsc_src);
589  m0_fdmi_source_free(m->fdm_s.fdms_ffs_ctx.ffsc_src);
590  m->fdm_s.fdms_ffs_ctx.ffsc_src = NULL;
591  return M0_RC(rc);
592 }
593 
594 M0_INTERNAL void m0_fol_fdmi_src_fini(void)
595 {
596  M0_ENTRY();
598  M0_LEAVE();
599 }
600 
601 M0_INTERNAL int m0_fol_fdmi_src_deinit(void)
602 {
604  struct m0_fdmi_src_ctx *src_ctx;
605  struct m0_be_tx *be_tx;
606  int rc = 0;
607 
608  M0_ENTRY();
609 
610  M0_PRE(m->fdm_s.fdms_ffs_ctx.ffsc_src != NULL);
611  src_ctx = container_of(m->fdm_s.fdms_ffs_ctx.ffsc_src,
612  struct m0_fdmi_src_ctx, fsc_src);
613 
614  M0_PRE(src_ctx->fsc_registered);
615  M0_PRE(m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_record_post != NULL);
616 
622  m0_mutex_lock(&m->fdm_s.fdms_ffs_locked_tx_lock);
623  m0_tlist_for(&ffs_tx_tl, &m->fdm_s.fdms_ffs_locked_tx_list, be_tx) {
624  ffs_tx_tlink_del_fini(be_tx);
625  m0_be_tx_put(be_tx);
630  } m0_tlist_endfor;
631  m0_mutex_unlock(&m->fdm_s.fdms_ffs_locked_tx_lock);
632  m0_mutex_fini(&m->fdm_s.fdms_ffs_locked_tx_lock);
633 
634  ffs_tx_tlist_fini(&m->fdm_s.fdms_ffs_locked_tx_list);
635 
636  m0_fdmi_source_deregister(m->fdm_s.fdms_ffs_ctx.ffsc_src);
637  m0_fdmi_source_free(m->fdm_s.fdms_ffs_ctx.ffsc_src);
638  m->fdm_s.fdms_ffs_ctx.ffsc_src = NULL;
639  return M0_RC(rc);
640 }
641 
642 /* ------------------------------------------------------------------
643  * Entry point for FOM to start FDMI processing
644  * ------------------------------------------------------------------ */
645 
646 M0_INTERNAL void m0_fol_fdmi_post_record(struct m0_fom *fom)
647 {
648  struct m0_fdmi_src_dock *src_dock = m0_fdmi_src_dock_get();
650  struct m0_dtx *dtx;
651  struct m0_be_tx *be_tx;
652 
653  M0_ENTRY("fom: %p", fom);
654 
655  M0_ASSERT(fom != NULL);
656  M0_ASSERT(m->fdm_s.fdms_ffs_ctx.ffsc_src->fs_record_post != NULL);
657 
658  if (!src_dock->fsdc_started) {
659  /*
660  * It should really be M0_ASSERT() here, because posting FDMI
661  * records when there is nothing running to process them is a
662  * bug (FDMI ensures guaranteed delivery and it's impossible if
663  * there is nothing running that could take care of delivery.
664  *
665  * But the current codebase is not ready for such change.
666  * Let's mark is as just another "Phase 2" TODO.
667  */
668  M0_LOG(M0_ERROR, "src dock fom is not running");
669  return;
670  }
671  if (!src_dock->fsdc_filters_defined) {
672  /* No filters defined. Let's not post the records. */
673  return;
674  }
675 
676 
682  dtx = &fom->fo_tx;
683  be_tx = &fom->fo_tx.tx_betx;
684 
685 
688  dtx->tx_fol_rec.fr_fdmi_rec.fsr_src = m->fdm_s.fdms_ffs_ctx.ffsc_src;
689  dtx->tx_fol_rec.fr_fdmi_rec.fsr_dryrun = false;
691 
692  /* Post record. */
694  M0_LOG(M0_DEBUG, "M0_FDMI_SOURCE_POST_RECORD fr_fdmi_rec=%p "
695  "fsr_rec_id="U128X_F, &dtx->tx_fol_rec.fr_fdmi_rec,
697 
698  /* Aftermath. */
699 
706  M0_LEAVE();
707 }
708 
709 M0_INTERNAL bool
711  const char **substrings)
712 {
713  struct m0_buf s;
714  m0_bcount_t j;
715  bool match;
716  int i;
717 
718  for (i = 0; substrings[i] != NULL; ++i) {
719  s = M0_BUF_INIT_CONST(strlen(substrings[i]), substrings[i]);
720  if (value->b_nob < s.b_nob)
721  return false;
722  match = false;
723  /* brute-force */
724  for (j = 0; j <= value->b_nob - s.b_nob; ++j) {
725  if (m0_buf_eq(&s, &M0_BUF_INIT(s.b_nob,
726  value->b_addr + j))) {
727  match = true;
728  break;
729  }
730  }
731  if (!match)
732  return false;
733  }
734  return true;
735 }
736 
737 M0_INTERNAL int
739  struct m0_conf_fdmi_filter *filter,
740  struct m0_fdmi_eval_var_info *var_info)
741 {
742  struct m0_fdmi_src_rec *src_rec = var_info->user_data;
743  struct m0_fop_fol_frag *fop_fol_frag;
744  struct m0_fol_frag *fol_frag;
745  struct m0_fol_rec *fol_rec;
746  struct m0_cas_rec *cas_rec;
747  struct m0_cas_op *cas_op;
748  int i;
749 
750  fol_rec = container_of(src_rec, struct m0_fol_rec, fr_fdmi_rec);
751  m0_tl_for(m0_rec_frag, &fol_rec->fr_frags, fol_frag) {
752  if (fol_frag->rp_ops->rpo_type != &m0_fop_fol_frag_type)
753  continue;
754  fop_fol_frag = fol_frag->rp_data;
755  if (fop_fol_frag->ffrp_fop_code != M0_CAS_PUT_FOP_OPCODE &&
756  fop_fol_frag->ffrp_fop_code != M0_CAS_DEL_FOP_OPCODE)
757  continue;
758  cas_op = fop_fol_frag->ffrp_fop;
759  M0_ASSERT(cas_op != NULL);
760  for (i = 0; i < cas_op->cg_rec.cr_nr; ++i) {
761  cas_rec = &cas_op->cg_rec.cr_rec[i];
763  &cas_rec->cr_val.u.ab_buf,
764  filter->ff_substrings))
765  return 1;
766  }
767  } m0_tl_endfor;
768  return 0;
769 }
770 
775 #undef M0_TRACE_SUBSYSTEM
776 /*
777  * Local variables:
778  * c-indentation-style: "K&R"
779  * c-basic-offset: 8
780  * tab-width: 8
781  * fill-column: 80
782  * scroll-step: 1
783  * End:
784  */
785 /*
786  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
787  */
#define M0_PRE(cond)
Definition: dtm.h:554
struct m0_fol_rec_header fr_header
Definition: fol.h:192
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static void ffs_op_put(struct m0_fdmi_src_rec *src_rec)
Definition: fol_fdmi_src.c:408
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_fol_rec_fini(struct m0_fol_rec *rec)
Definition: fol.c:104
static struct m0_addb2_mach * m
Definition: consumer.c:38
struct m0_atomic64 t_fdmi_ref
Definition: tx.h:412
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
const struct m0_fol_frag_ops * rp_ops
Definition: fol.h:244
static void be_tx_put_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: fol_fdmi_src.c:243
void * b_addr
Definition: buf.h:39
Definition: ub.c:78
bool fsr_dryrun
Definition: src_rec.h:106
M0_INTERNAL bool m0_buf_eq(const struct m0_buf *x, const struct m0_buf *y)
Definition: buf.c:90
static struct m0_sm_group * grp
Definition: bytecount.c:38
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
struct m0_uint128 fsr_rec_id
Definition: src_rec.h:80
M0_INTERNAL bool m0_fdmi__record_is_valid(struct m0_fdmi_src_rec *src_rec)
Definition: source_dock.c:121
M0_INTERNAL int m0_fol_fdmi_src_deinit(void)
Definition: fol_fdmi_src.c:601
M0_INTERNAL int m0_fol_fdmi_src_init(void)
Definition: fol_fdmi_src.c:551
static int64_t m0_atomic64_sub_return(struct m0_atomic64 *a, int64_t d)
uint64_t t_magic
Definition: tlist.h:296
int const char const void * value
Definition: dir.c:325
struct m0_tl fr_frags
Definition: fol.h:201
M0_INTERNAL void m0_fdmi_source_free(struct m0_fdmi_src *src)
Definition: source_dock.c:301
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL struct m0_fdmi_module * m0_fdmi_module__get(void)
Definition: module.c:51
#define M0_FOL_FRAG_DATA_HANDLER_DECLARE(_opecode, _get_val_func)
Definition: fol_fdmi_src.c:210
Definition: sm.h:504
void * rp_data
Definition: fol.h:249
#define container_of(ptr, type, member)
Definition: misc.h:33
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
static int ffs_op_node_eval(struct m0_fdmi_src_rec *src_rec, struct m0_fdmi_flt_var_node *value_desc, struct m0_fdmi_flt_operand *value)
Definition: fol_fdmi_src.c:351
struct m0_fol_frag_type m0_fop_fol_frag_type
Definition: sock.c:887
const struct m0_fol_frag_type * rpo_type
Definition: fol.h:283
struct m0_rpc_at_buf cr_val
Definition: cas.h:182
#define m0_tl_endfor
Definition: tlist.h:700
#define BUF_F
Definition: buf.h:75
return M0_RC(rc)
struct m0_fdmi_src * fsr_src
Definition: src_rec.h:72
#define M0_ENTRY(...)
Definition: trace.h:170
Definition: buf.h:37
struct m0_cas_rec * cr_rec
Definition: cas.h:236
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_fol_rec_init(struct m0_fol_rec *rec, struct m0_fol *fol)
Definition: fol.c:98
Definition: filter.py:1
int opcode
Definition: crate.c:301
M0_INTERNAL int m0_fdmi_source_alloc(enum m0_fdmi_rec_type_id type_id, struct m0_fdmi_src **src)
Definition: source_dock.c:285
int i
Definition: dir.c:1033
uint64_t cr_nr
Definition: cas.h:235
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
void * sa_datum
Definition: sm.h:508
Definition: trace.h:482
Definition: cnt.h:36
M0_INTERNAL void m0_fdmi_flt_uint_opnd_fill(struct m0_fdmi_flt_operand *opnd, uint64_t value)
Definition: filter.c:181
#define m0_free0(pptr)
Definition: memory.h:77
#define M0_ASSERT(cond)
#define U128_P(x)
Definition: types.h:45
M0_INTERNAL void m0_fdmi_source_deregister(struct m0_fdmi_src *src)
Definition: source_dock.c:337
static int counter
Definition: mutex.c:32
void * fsr_data
Definition: src_rec.h:75
static struct m0_dtx * ffs_get_dtx(struct m0_fdmi_src_rec *src_rec)
Definition: fol_fdmi_src.c:232
static void ffs_op_get(struct m0_fdmi_src_rec *src_rec)
Definition: fol_fdmi_src.c:396
M0_INTERNAL struct m0_fdmi_src_dock * m0_fdmi_src_dock_get(void)
Definition: fdmi.c:890
M0_INTERNAL int m0_fdmi_source_register(struct m0_fdmi_src *src)
Definition: source_dock.c:311
M0_INTERNAL void m0_fol_fdmi_src_fini(void)
Definition: fol_fdmi_src.c:594
struct m0_fdmi_src_rec fr_fdmi_rec
Definition: fol.h:203
M0_INTERNAL int m0_buf_alloc(struct m0_buf *buf, size_t size)
Definition: buf.c:43
struct m0_sm_group * sm_grp
Definition: sm.h:321
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
M0_INTERNAL void m0_fol_fdmi_post_record(struct m0_fom *fom)
Definition: fol_fdmi_src.c:646
union m0_rpc_at_buf::@447 u
Definition: dump.c:103
M0_INTERNAL void m0_be_tx_lsn_get(struct m0_be_tx *tx, m0_bindex_t *lsn, m0_bindex_t *lsn_discarded)
Definition: tx.c:761
struct m0_fol_rec tx_fol_rec
Definition: dtm.h:561
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
M0_INTERNAL void m0_be_tx_get(struct m0_be_tx *tx)
Definition: stubs.c:167
static int ffs_op_decode(struct m0_buf *buf, void **handle)
Definition: fol_fdmi_src.c:482
static void ffs_tx_inc_refc(struct m0_be_tx *be_tx, int64_t *counter)
Definition: fol_fdmi_src.c:254
struct m0_sm t_sm
Definition: tx.h:281
struct m0_cas_recv cg_rec
Definition: cas.h:384
static void ffs_op_end(struct m0_fdmi_src_rec *src_rec)
Definition: fol_fdmi_src.c:538
uint64_t rh_lsn_discarded
Definition: fol.h:177
Definition: cas.h:372
uint64_t rh_lsn
Definition: fol.h:172
M0_INTERNAL bool m0_fol_fdmi__filter_kv_substring_match(struct m0_buf *value, const char **substrings)
Definition: fol_fdmi_src.c:710
#define U128X_F
Definition: types.h:42
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
static struct ffs_fol_frag_handler ffs_frag_handler_array[]
Definition: fol_fdmi_src.c:214
Definition: fom.h:481
struct m0_sm_ast t_fdmi_put_ast
Definition: tx.h:424
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
struct m0_be_tx tx_betx
Definition: dtm.h:559
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
#define PRIi64
Definition: types.h:59
#define M0_FDMI_SOURCE_POST_RECORD(_src_rec_ptr)
Definition: source_dock.h:148
#define BUF_P(p)
Definition: buf.h:76
M0_TL_DESCR_DEFINE(ffs_tx, "fdmi fol src tx list", M0_INTERNAL, struct m0_be_tx, t_fdmi_linkage, t_magic, M0_BE_TX_MAGIC, M0_BE_TX_ENGINE_MAGIC)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
static int64_t ffs_rec_put(struct m0_fdmi_src_rec *src_rec)
Definition: fol_fdmi_src.c:332
uint32_t rh_data_len
Definition: fol.h:164
M0_INTERNAL void m0_be_tx_put(struct m0_be_tx *tx)
Definition: stubs.c:171
#define m0_tlist_endfor
Definition: tlist.h:448
static void ffs_tx_dec_refc(struct m0_be_tx *be_tx, int64_t *counter)
Definition: fol_fdmi_src.c:285
static int ffs_op_encode(struct m0_fdmi_src_rec *src_rec, struct m0_buf *buf)
Definition: fol_fdmi_src.c:419
static unsigned done
Definition: storage.c:91
#define m0_tlist_for(descr, head, obj)
Definition: tlist.h:435
static struct m0_cas_op * cas_op(const struct m0_fom *fom)
Definition: service.c:1761
Definition: nucleus.c:42
M0_INTERNAL int m0_fol_fdmi_filter_kv_substring(struct m0_fdmi_eval_ctx *ctx, struct m0_conf_fdmi_filter *filter, struct m0_fdmi_eval_var_info *var_info)
Definition: fol_fdmi_src.c:738
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
static struct m0_addb2_source * s
Definition: consumer.c:39
#define M0_BUF_INIT_CONST(size, data)
Definition: buf.h:66
#define M0_BUF_INIT(size, data)
Definition: buf.h:64
static void ffs_op_begin(struct m0_fdmi_src_rec *src_rec)
Definition: fol_fdmi_src.c:520
int32_t rc
Definition: trigger_fop.h:47
static int64_t ffs_rec_get(struct m0_fdmi_src_rec *src_rec)
Definition: fol_fdmi_src.c:314
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_TL_DEFINE(ffs_tx, M0_INTERNAL, struct m0_be_tx)
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
M0_INTERNAL int m0_fol_rec_encode(struct m0_fol_rec *rec, struct m0_buf *at)
Definition: fol.c:314
Definition: tx.h:280
uint32_t ffrp_fop_code
Definition: fop.h:354
M0_INTERNAL int m0_fol_rec_decode(struct m0_fol_rec *rec, struct m0_buf *at)
Definition: fol.c:331