23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FDMI 158 .sd_name =
"WaitForTimeout",
164 .sd_name =
"Alarmed",
177 .
scf_name =
"fdmi-src-dock-timer-fom-sm",
353 M0_LEAVE(
"FDMI FOM is not initialized yet");
370 M0_ENTRY(
"sd_timer_fom %p", timer_fom);
425 M0_ENTRY(
"sd_fom %p, src_rec %p", sd_fom, src_rec);
449 fdmi_matched_filter_list_tlink_init_at(
452 }
else if (ret < 0) {
466 M0_ENTRY(
"sd_fom %p, src_rec %p", sd_fom, src_rec);
557 pending_fops_tlist_del(pending_fop);
569 if (!fdmi_record_inflight_tlink_is_in(src_rec)) {
570 fdmi_record_inflight_tlist_add_tail(
620 if (pending_fop ==
NULL)
651 if (!fdmi_record_inflight_tlink_is_in(src_rec)) {
652 fdmi_record_inflight_tlist_add_tail(
660 }
else if (
rc == -EBUSY)
669 M0_ENTRY(
"src_rec=%p, endpoint=%s", src_rec, endpoint);
671 n = m0_tl_reduce(fdmi_matched_filter_list, flt,
673 + !!
m0_streq(endpoint, flt->ff_endpoints[0]));
692 if (fop_data ==
NULL)
693 goto data_alloc_fail;
698 goto flts_alloc_fail;
714 const char *endpoint)
725 M0_ENTRY(
"src_rec %p, endpoint %s", src_rec, endpoint);
729 if (filter_num > 0) {
738 while (k < filter_num) {
739 flt = fdmi_matched_filter_list_tlist_head(
744 flt = fdmi_matched_filter_list_tlist_next(
746 fdmi_matched_filter_list_tlink_del_fini(tmp);
748 flt = fdmi_matched_filter_list_tlist_next(
757 for (idx = 0; idx < matched->
fmf_count; idx++) {
780 const char *endpoint;
782 M0_ENTRY(
"sd_ctx %p src_rec %p", sd_ctx, src_rec);
787 while (!fdmi_matched_filter_list_tlist_is_empty(
790 matched_filter = fdmi_matched_filter_list_tlist_head(
872 M0_ENTRY(
"sd_fom=%p src_rec=%p fdmi_filter=%p",
873 sd_fom, src_rec, fdmi_filter);
886 "sd_fom=%p src_rec=%p fdmi_filter=%p",
887 sd_fom, src_rec, fdmi_filter);
940 fdmi_record_inflight_tlist_remove(src_rec);
947 fdmi_record_inflight_tlist_remove(src_rec);
983 src_rec = fdmi_record_list_tlist_pop(
987 if (src_rec ==
NULL) {
1002 if (!fdmi_matched_filter_list_tlist_is_empty(
1007 }
else if (
rc != -ENOENT) {
1014 "FDMI record processing error %d",
rc);
1057 (
int)(ref_cnt - 1));
1066 if (
rc != 0 && (ref_cnt - 1) > 0) {
1069 fdmi_record_inflight_tlist_remove(src_rec);
1110 if (rr_fom ==
NULL) {
1209 #undef M0_TRACE_SUBSYSTEM static int payload_encode(struct m0_fdmi_src_rec *src_rec, struct m0_fop *fop)
M0_INTERNAL void m0_fdmi__src_dock_fom_stop(struct m0_fdmi_src_dock *src_dock)
M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
struct m0_fop_type m0_fop_fdmi_rec_release_rep_fopt
M0_EXTERN struct m0_reqh_service_type m0_fdmi_service_type
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
M0_INTERNAL void m0_rpc_conn_pool_destroy(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)
m0_time_t ri_resend_interval
M0_TL_DECLARE(fdmi_record_inflight, M0_EXTERN, struct m0_fdmi_src_rec)
#define M0_ALLOC_ARR(arr, nr)
static void fdmi_sd_fom_check(struct fdmi_sd_fom *sd_fom)
struct m0_fdmi_flt_id_arr fr_matched_flts
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
struct m0_fid ff_filter_id
enum m0_rpc_item_priority ri_prio
int(* fco_open)(struct m0_filterc_ctx *ctx, enum m0_fdmi_rec_type_id rec_type_id, struct m0_filterc_iter *iter)
struct m0_tl fsr_filter_list
M0_INTERNAL void m0_rpc_conn_pool_fini(struct m0_rpc_conn_pool *pool)
M0_INTERNAL void m0_fom_block_enter(struct m0_fom *fom)
static size_t fdmi_sd_fom_locality(const struct m0_fom *fom)
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
M0_TL_DESCR_DEFINE(pending_fops, "pending fops list", M0_INTERNAL, struct fdmi_pending_fop, fti_linkage, fti_magic, M0_FDMI_SRC_DOCK_PENDING_FOP_MAGIC, M0_FDMI_SRC_DOCK_PENDING_FOP_HEAD_MAGIC)
M0_INTERNAL int m0_fdmi__handle_release(struct m0_uint128 *fdmi_rec_id)
struct fdmi_sd_fom fsdc_sd_fom
static const struct m0_fom_type_ops fdmi_sd_fom_type_ops
static int fdmi_rr_fom_tick(struct m0_fom *fom)
struct m0_mutex fsdc_list_mutex
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
M0_TL_DESCR_DECLARE(fdmi_record_inflight, M0_EXTERN)
static struct m0_sm_group * grp
static struct m0_fop * alloc_fdmi_rec_fop(int filter_num)
int(* fco_get_next)(struct m0_filterc_iter *iter, struct m0_conf_fdmi_filter **out)
static int apply_filters(struct fdmi_sd_fom *sd_fom, struct m0_fdmi_src_rec *src_rec)
#define M0_LOG(level,...)
const struct m0_fom_ops fdmi_rr_fom_ops
struct m0_sm_ast fstf_wakeup_ast
static void fdmi_sd_fom_fini(struct m0_fom *fom)
static int fdmi_post_fop(struct m0_fop *fop, struct m0_rpc_session *session)
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL int m0_rpc_conn_pool_init(struct m0_rpc_conn_pool *pool, struct m0_rpc_machine *rpc_mach, m0_time_t conn_timeout, uint64_t max_rpcs_in_flight)
int(* ffth_handler)(struct m0_fdmi_eval_ctx *ctx, struct m0_conf_fdmi_filter *filter, struct m0_fdmi_eval_var_info *var_info)
struct fdmi_sd_fom * sd_fom
struct m0_uint128 fsr_rec_id
m0_time_t fsf_last_checkpoint
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
M0_INTERNAL bool m0_fdmi__record_is_valid(struct m0_fdmi_src_rec *src_rec)
int(* fs_node_eval)(struct m0_fdmi_src_rec *src_rec, struct m0_fdmi_flt_var_node *value_desc, struct m0_fdmi_flt_operand *value)
static int fdmi_sd_timer_fom_tick(struct m0_fom *fom)
int const char const void * value
static int fdmi_filter_calc(struct fdmi_sd_fom *sd_fom, struct m0_fdmi_src_rec *src_rec, struct m0_conf_fdmi_filter *fdmi_filter)
void * m0_fop_data(const struct m0_fop *fop)
static void wakeup_iff_waiting(struct m0_sm_group *grp, struct m0_sm_ast *ast)
static void fdmi_rec_notif_replied(struct m0_rpc_item *item)
static int node_eval(void *data, struct m0_fdmi_flt_var_node *value_desc, struct m0_fdmi_flt_operand *value)
static int sd_fom_send_record(struct fdmi_sd_fom *sd_fom, struct m0_fop *fop, const char *ep)
static void fdmi__src_dock_timer_fom_wakeup(struct fdmi_sd_timer_fom *timer_fom)
struct m0_semaphore fsf_shutdown
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
struct m0_filterc_ctx fsf_filter_ctx
m0_time_t m0_time(uint64_t secs, long ns)
const char ** ff_endpoints
static struct m0_rpc_item * item
static const struct m0_fom_type_ops fdmi_sd_timer_fom_type_ops
fdmi_src_dock_timer_fom_phase
struct m0_clink fti_clink
M0_INTERNAL void m0_fdmi__fs_get(struct m0_fdmi_src_rec *src_rec)
const struct m0_filterc_ops * fcc_ops
struct m0_fom_type ft_fom_type
void(* fco_stop)(struct m0_filterc_ctx *ctx)
M0_INTERNAL void m0_fdmi__src_dock_fom_init(void)
struct m0_fdmi_src * fsr_src
M0_INTERNAL void m0_fdmi_eval_init(struct m0_fdmi_eval_ctx *ctx)
M0_INTERNAL const char * m0_rpc_conn_addr(const struct m0_rpc_conn *conn)
static struct m0_sm_conf fdmi_src_dock_timer_fom_sm_conf
static struct m0_sm_ast ast[NR]
int(* fs_encode)(struct m0_fdmi_src_rec *src_rec, struct m0_buf *buf)
M0_INTERNAL int m0_fom_timeout_wait_on(struct m0_fom_timeout *to, struct m0_fom *fom, m0_time_t deadline)
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
struct fdmi_sd_timer_fom fsdc_sd_timer_fom
static struct m0_rpc_machine * m0_fdmi__sd_conn_pool_rpc_machine(void)
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
static const struct m0_fom_ops fdmi_sd_fom_ops
M0_INTERNAL bool m0_fom_is_waiting(const struct m0_fom *fom)
#define M0_RC_INFO(rc, fmt,...)
struct m0_fop_type * f_type
static struct m0_fom_type fdmi_sd_timer_fom_type
struct m0_rpc_machine * cp_rpc_mach
static int filters_nr(struct m0_fdmi_src_rec *src_rec, const char *endpoint)
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
#define M0_ERR_INFO(rc, fmt,...)
struct m0_uint128 frr_frid
struct m0_rpc_fop_session_establish est
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
struct m0_filterc_iter fsf_filter_iter
M0_INTERNAL void m0_fom_ready(struct m0_fom *fom)
#define M0_AMB(obj, ptr, field)
M0_INTERNAL void m0_fdmi__enqueue(struct m0_fdmi_src_rec *src_rec)
void m0_fom_fini(struct m0_fom *fom)
M0_TL_DEFINE(pending_fops, static, struct fdmi_pending_fop)
struct m0_sm_ast fsf_wakeup_ast
M0_INTERNAL void m0_filterc_ctx_init(struct m0_filterc_ctx *ctx, const struct m0_filterc_ops *ops)
static bool pending_fop_clink_cb(struct m0_clink *clink)
m0_time_t m0_time_now(void)
M0_INTERNAL void m0_filterc_ctx_fini(struct m0_filterc_ctx *ctx)
static int fdmi_sd_fom_tick(struct m0_fom *fom)
static struct m0_fop reply_fop
static struct m0_sm_state_descr fdmi_src_dock_state_descr[]
int(* get_value_cb)(void *user_data, struct m0_fdmi_flt_var_node *value_desc, struct m0_fdmi_flt_operand *value)
struct m0_tl rh_rpc_machines
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
M0_INTERNAL struct m0_fdmi_src_dock * m0_fdmi_src_dock_get(void)
M0_INTERNAL void m0_fom_block_leave(struct m0_fom *fom)
M0_INTERNAL struct m0_chan * m0_rpc_conn_pool_session_chan(struct m0_rpc_session *session)
static int fdmi_rr_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
struct m0_fop * m0_fop_get(struct m0_fop *fop)
struct m0_rpc_item * ri_reply
struct m0_semaphore fstf_shutdown
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
static struct m0_fop_fsync_rep reply_data
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
M0_INTERNAL void m0_fdmi__fs_begin(struct m0_fdmi_src_rec *src_rec)
M0_INTERNAL void m0_fdmi__src_dock_fom_wakeup(struct fdmi_sd_fom *sd_fom)
struct m0_sm_ast * sa_next
const struct m0_sm_conf fdmi_rr_fom_sm_conf
void m0_rpc_reply_post(struct m0_rpc_item *request, struct m0_rpc_item *reply)
static struct m0_clink clink[RDWR_REQUEST_MAX]
M0_INTERNAL bool m0_rpc_conn_pool_session_established(struct m0_rpc_session *session)
M0_INTERNAL int m0_fdmi_eval_flt(struct m0_fdmi_eval_ctx *ctx, struct m0_conf_fdmi_filter *filter, struct m0_fdmi_eval_var_info *var_info)
static struct m0_sm_state_descr fdmi_rr_fom_state_descr[]
static int sd_fom_save_pending_fop(struct fdmi_sd_fom *sd_fom, struct m0_fop *fop, struct m0_rpc_session *session)
static struct m0_pool pool
struct m0_mutex fsf_pending_fops_lock
M0_INTERNAL enum m0_fdmi_rec_type_id m0_fdmi__sd_rec_type_id_get(struct m0_fdmi_src_rec *src_rec)
static struct m0_sm_state_descr fdmi_src_dock_timer_fom_state_descr[]
struct m0_tl fsdc_posted_rec_list
void(* rio_replied)(struct m0_rpc_item *item)
void m0_clink_add_lock(struct m0_chan *chan, struct m0_clink *link)
struct m0_tl fsdc_rec_inflight
static void fdmi_sd_timer_fom_fini(struct m0_fom *fom)
struct m0_fop_type m0_fop_fdmi_rec_not_fopt
static const struct m0_rpc_item_ops fdmi_rec_not_item_ops
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
M0_INTERNAL void m0_fom_timeout_fini(struct m0_fom_timeout *to)
struct m0_rpc_conn_pool fsf_conn_pool
struct m0_fdmi_eval_ctx fsf_flt_eval
struct m0_tlink fti_linkage
M0_INTERNAL int m0_rpc_conn_pool_get_async(struct m0_rpc_conn_pool *pool, const char *remote_ep, struct m0_rpc_session **session)
const struct m0_fom_type_ops fdmi_rr_fom_type_ops
struct m0_tl fsf_pending_fops
#define M0_ALLOC_PTR(ptr)
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
const struct m0_rpc_item_ops * ri_ops
m0_time_t m0_time_from_now(uint64_t secs, long ns)
struct m0_fom_timeout fstf_timeout
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
struct m0_rpc_session * ri_session
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
void m0_fop_put_lock(struct m0_fop *fop)
static struct m0_fop * fop
static struct m0_sm_conf fdmi_src_dock_fom_sm_conf
struct m0_fid * fmf_flt_id
struct m0_uint128 fr_rec_id
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
static void fdmi_rr_fom_fini(struct m0_fom *fom)
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
int(* fco_start)(struct m0_filterc_ctx *ctx, struct m0_reqh *reqh)
void(* fo_fini)(struct m0_fom *fom)
#define m0_tlist_for(descr, head, obj)
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
M0_INTERNAL int m0_fol_fdmi_filter_kv_substring(struct m0_fdmi_eval_ctx *ctx, struct m0_conf_fdmi_filter *filter, struct m0_fdmi_eval_var_info *var_info)
bool fsdc_filters_defined
static struct m0_fop * fop_create(struct m0_fdmi_src_rec *src_rec, const char *endpoint)
void m0_fom_phase_set(struct m0_fom *fom, int phase)
M0_INTERNAL void m0_fom_timeout_init(struct m0_fom_timeout *to)
static const struct m0_fom_ops fdmi_sd_timer_fom_ops
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
struct m0_rpc_item f_item
M0_INTERNAL void m0_fdmi_eval_fini(struct m0_fdmi_eval_ctx *ctx)
enum m0_fdmi_filter_type_id ff_type
M0_INTERNAL void m0_fdmi__fs_put(struct m0_fdmi_src_rec *src_rec)
static struct m0_fdmi_sd_filter_type_handler fdmi_filter_type_handlers[]
enum m0_fdmi_filter_type_id ffth_id
static int sd_fom_process_matched_filters(struct m0_fdmi_src_dock *sd_ctx, struct m0_fdmi_src_rec *src_rec)
struct m0_rpc_session * fti_session
M0_INTERNAL void m0_fdmi__enqueue_locked(struct m0_fdmi_src_rec *src_rec)
struct m0_rpc_conn * s_conn
static struct m0_fom_type fdmi_sd_fom_type
const m0_time_t M0_TIME_IMMEDIATELY
static int process_fdmi_rec(struct fdmi_sd_fom *sd_fom, struct m0_fdmi_src_rec *src_rec)
M0_INTERNAL void m0_fom_timeout_cancel(struct m0_fom_timeout *to)
M0_INTERNAL int m0_fdmi__src_dock_fom_start(struct m0_fdmi_src_dock *src_dock, const struct m0_filterc_ops *filterc_ops, struct m0_reqh *reqh)
M0_INTERNAL void m0_rpc_conn_pool_put(struct m0_rpc_conn_pool *pool, struct m0_rpc_session *session)
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
void(* fco_close)(struct m0_filterc_iter *iter)