Motr  M0
fom.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FOP
24 #include "lib/trace.h"
25 
26 #include "lib/misc.h"
27 #include "lib/errno.h"
28 #include "lib/assert.h"
29 #include "lib/memory.h"
30 #include "lib/locality.h"
31 #include "lib/processor.h"
32 #include "lib/time.h"
33 #include "lib/timer.h"
34 #include "lib/arith.h"
35 #include "lib/uuid.h" /* m0_node_uuid */
36 #include "lib/semaphore.h" /* m0_semaphore */
37 #include "lib/finject.h" /* M0_FI_ENABLED */
38 #include "addb2/net.h"
39 #include "addb2/addb2.h"
40 #include "addb2/storage.h"
41 #include "addb2/identifier.h"
42 #include "addb2/sys.h"
43 #include "addb2/global.h"
44 #include "motr/magic.h"
45 #include "fop/fop.h"
46 #include "fop/fom_long_lock.h"
47 #include "module/instance.h" /* m0_get */
48 #include "reqh/reqh.h"
49 #include "reqh/reqh_service.h"
50 #include "sm/sm.h"
51 #include "rpc/rpc_machine.h"
52 #include "rpc/rpc_opcodes.h"
53 #include "fdmi/fol_fdmi_src.h"
54 #include "motr/iem.h"
55 
136 enum {
141 };
142 
147  HANDLER = 1,
151 };
152 
171  uint64_t lt_magix;
172 };
173 
176  uint64_t wd_phases;
177  struct m0_fom *wd_fom;
179  int wd_rc;
180 };
181 
182 M0_TL_DESCR_DEFINE(thr, "fom thread", static, struct m0_loc_thread, lt_linkage,
184 M0_TL_DEFINE(thr, static, struct m0_loc_thread);
185 
186 M0_TL_DESCR_DEFINE(runq, "runq fom", static, struct m0_fom, fo_linkage,
188 M0_TL_DEFINE(runq, static, struct m0_fom);
189 
190 M0_TL_DESCR_DEFINE(wail, "wail fom", static, struct m0_fom, fo_linkage,
192 M0_TL_DEFINE(wail, static, struct m0_fom);
193 
194 static bool fom_wait_time_is_out(const struct m0_fom_domain *dom,
195  const struct m0_fom *fom);
196 static int loc_thr_create(struct m0_fom_locality *loc);
197 
198 static void hung_foms_notify(struct m0_locality_chore *chore,
199  struct m0_locality *loc, void *place);
200 
202 M0_INTERNAL struct m0_sm_conf fom_states_conf;
203 
210 };
211 
217 };
218 
219 static void group_lock(struct m0_fom_locality *loc)
220 {
221  m0_sm_group_lock(&loc->fl_group);
222 }
223 
224 static void group_unlock(struct m0_fom_locality *loc)
225 {
227 }
228 
229 M0_INTERNAL bool m0_fom_group_is_locked(const struct m0_fom *fom)
230 {
231  return m0_mutex_is_locked(&fom->fo_loc->fl_group.s_lock);
232 }
233 
234 static bool is_in_runq(const struct m0_fom *fom)
235 {
236  return runq_tlist_contains(&fom->fo_loc->fl_runq, fom);
237 }
238 
239 static bool is_in_wail(const struct m0_fom *fom)
240 {
241  return wail_tlist_contains(&fom->fo_loc->fl_wail, fom);
242 }
243 
244 static bool thread_invariant(const struct m0_loc_thread *t)
245 {
246  struct m0_fom_locality *loc = t->lt_loc;
247 
248  return
249  _0C(M0_IN(t->lt_state, (HANDLER, BLOCKED, UNBLOCKING, IDLE))) &&
250  _0C((loc->fl_handler == t) == (t->lt_state == HANDLER)) &&
251  _0C(ergo(t->lt_state == UNBLOCKING,
252  m0_atomic64_get(&loc->fl_unblocking) > 0));
253 }
254 
255 M0_INTERNAL bool m0_fom_domain_invariant(const struct m0_fom_domain *dom)
256 {
257  size_t cpu_max = m0_processor_nr_max();
258  return
259  _0C(dom != NULL && dom->fd_localities != NULL) &&
260  _0C(dom->fd_localities_nr <= cpu_max) &&
261  _0C(m0_forall(i, dom->fd_localities_nr,
262  dom->fd_localities[i] != NULL)) &&
263  _0C(dom->fd_ops != NULL);
264 }
265 
266 M0_INTERNAL bool m0_locality_invariant(const struct m0_fom_locality *loc)
267 {
268  return
269  _0C(loc != NULL && loc->fl_dom != NULL) &&
271  _0C(M0_CHECK_EX(m0_tlist_invariant(&runq_tl, &loc->fl_runq))) &&
272  _0C(M0_CHECK_EX(m0_tlist_invariant(&wail_tl, &loc->fl_wail))) &&
273  _0C(m0_tl_forall(thr, t, &loc->fl_threads,
274  t->lt_loc == loc && thread_invariant(t))) &&
275  _0C(ergo(loc->fl_handler != NULL,
276  thr_tlist_contains(&loc->fl_threads, loc->fl_handler))) &&
277  _0C(M0_CHECK_EX(m0_tl_forall(runq, fom, &loc->fl_runq,
278  fom->fo_loc == loc))) &&
279  _0C(M0_CHECK_EX(m0_tl_forall(wail, fom, &loc->fl_wail,
280  fom->fo_loc == loc)));
281 }
282 
283 M0_INTERNAL struct m0_reqh *m0_fom_reqh(const struct m0_fom *fom)
284 {
285  return fom->fo_service->rs_reqh;
286 }
287 
288 static inline enum m0_fom_state fom_state(const struct m0_fom *fom)
289 {
290  return fom->fo_sm_state.sm_state;
291 }
292 
293 static inline void fom_state_set(struct m0_fom *fom, enum m0_fom_state state)
294 {
295  m0_sm_state_set(&fom->fo_sm_state, state);
296 }
297 
298 static bool fom_is_blocked(const struct m0_fom *fom)
299 {
300  return
302  M0_IN(fom->fo_thread->lt_state, (BLOCKED, UNBLOCKING));
303 }
304 
305 /* Returns fom from state machine m0_fom::fo_sm_state */
306 M0_UNUSED static inline struct m0_fom *sm2fom(struct m0_sm *sm)
307 {
308  return container_of(sm, struct m0_fom, fo_sm_state);
309 }
310 
311 M0_INTERNAL bool m0_fom_invariant(const struct m0_fom *fom)
312 {
313  return
314  _0C(fom != NULL) && _0C(fom->fo_loc != NULL) &&
315  _0C(fom->fo_type != NULL) && _0C(fom->fo_ops != NULL) &&
316 
318 
319  /* fom magic is the same in runq and wail tlists,
320  * so we can use either one here.
321  * @todo replace this with bob_check() */
322  M0_CHECK_EX(m0_tlink_invariant(&runq_tl, fom)) &&
323 
328  is_in_wail(fom))) &&
329  _0C(ergo(fom->fo_thread != NULL,
330  fom_state(fom) == M0_FOS_RUNNING)) &&
331  _0C(ergo(fom->fo_pending != NULL,
333  _0C(ergo(fom->fo_cb.fc_state != M0_FCS_DONE,
334  fom_state(fom) == M0_FOS_WAITING)) &&
335  _0C(fom->fo_service->rs_type == fom->fo_type->ft_rstype);
336 }
337 
338 /*
339  * TODO: replace with corresponding HA handler when it's integrated
340  */
341 static bool hung_fom_notify(const struct m0_fom *fom)
342 {
343  m0_time_t diff;
344  uint32_t fop_opcode = 0;
345  struct m0_rpc_item *item = NULL;
346  struct m0_rpc_conn *conn = NULL;
347  struct m0_rpc_machine *rpc_mc = NULL;
348 
349  if (M0_IN(fom->fo_type->ft_id, (M0_BE_TX_GROUP_OPCODE,
353  return true;
354 
355  diff = m0_time_sub(m0_time_now(), fom->fo_sm_state.sm_state_epoch);
358  M0_LOG(M0_WARN, "FOP HUNG[" TIME_F " seconds in processing]: "
359  "fom=%p, fop %p[%u] phase: %s", TIME_P(diff), fom,
360  &fom->fo_fop,
361  fom->fo_fop == NULL ? 0 : m0_fop_opcode(fom->fo_fop),
363 
364  if (false && /* XXX disabled temporarily */
366  if (fom->fo_fop != NULL) {
367  fop_opcode = m0_fop_opcode(fom->fo_fop);
368  item = &fom->fo_fop->f_item;
369  if (item->ri_session != NULL) {
371  if (conn != NULL) {
372  rpc_mc = conn->c_rpc_machine;
373  }
374  }
375  }
376 
380  "HUNG_FOM_NOTIFY diff_in_sec="TIME_F" "
381  "fom=%p fop=%p fop_opcode=%u fom_phase=\"%s\" "
382  "rpc_machine_ep=%s "
383  "remote_machine_ep=%s",
384  TIME_P(diff), fom, &fom->fo_fop, fop_opcode,
386  (rpc_mc == NULL) ? "NOT_AVAILABLE" :
387  m0_rpc_machine_ep(rpc_mc),
388  (item == NULL) ? "NOT_AVAILABLE" :
390  }
391 
392  if (fom->fo_ops->fo_hung_notify != NULL)
393  fom->fo_ops->fo_hung_notify(fom);
394  }
395 
396  return true; /* by convention of m0_tl_forall */
397 }
398 
399 static bool fom_wait_time_is_out(const struct m0_fom_domain *dom,
400  const struct m0_fom *fom)
401 {
402  return hung_fom_notify(fom);
403 }
404 
413 static void fom_ready(struct m0_fom *fom)
414 {
415  struct m0_fom_locality *loc;
416  bool empty;
417 
419  loc = fom->fo_loc;
420  empty = runq_tlist_is_empty(&loc->fl_runq);
421  runq_tlist_add_tail(&loc->fl_runq, fom);
422  M0_CNT_INC(loc->fl_runq_nr);
424  if (empty)
425  m0_chan_signal(&loc->fl_runrun);
427 }
428 
429 M0_INTERNAL void m0_fom_ready(struct m0_fom *fom)
430 {
431  struct m0_fom_locality *loc = fom->fo_loc;
432 
434 
435  wail_tlist_del(fom);
436  M0_CNT_DEC(loc->fl_wail_nr);
438  fom_ready(fom);
439 }
440 
441 static void readyit(struct m0_sm_group *grp, struct m0_sm_ast *ast)
442 {
443  struct m0_fom *fom = container_of(ast, struct m0_fom, fo_cb.fc_ast);
444 
445  m0_fom_ready(fom);
446 }
447 
448 static void fom_addb2_push(struct m0_fom *fom)
449 {
451  fom->fo_transitions, fom->fo_sm_phase.sm_state);
452 }
453 
454 static void addb2_introduce(struct m0_fom *fom)
455 {
456  struct m0_rpc_item *req;
457  static struct m0_sm_addb2_stats phase_stats = {
458  .as_id = M0_AVI_PHASE,
459  .as_nr = 0
460  };
461  uint64_t sender_id = 0;
462  uint64_t item_sm_id = 0;
463  uint64_t phase_sm_id = 0;
464  uint64_t state_sm_id = 0;
465 
466  if (!m0_sm_addb2_counter_init(&fom->fo_sm_phase))
467  fom->fo_sm_phase.sm_addb2_stats = &phase_stats;
468  if (!m0_sm_addb2_counter_init(&fom->fo_sm_state))
469  fom->fo_sm_state.sm_addb2_stats =
471 
472  req = fom->fo_fop != NULL ? &fom->fo_fop->f_item : NULL;
473 
475 
476  if (req != NULL && req->ri_session != NULL) {
477  sender_id = req->ri_session->s_conn->c_sender_id;
478  item_sm_id = m0_sm_id_get(&req->ri_sm);
479  phase_sm_id = m0_sm_id_get(&fom->fo_sm_phase);
480  state_sm_id = m0_sm_id_get(&fom->fo_sm_state);
481  }
482 
484  FID_P(&fom->fo_service->rs_service_fid),
485  /*
486  * Session can be NULL for connection and session
487  * establishing fops.
488  */
489  sender_id,
490  req != NULL ? req->ri_type->rit_opcode : 0,
491  fom->fo_rep_fop != NULL ?
492  fom->fo_rep_fop->f_item.ri_type->rit_opcode : 0,
493  fom->fo_local,
494  item_sm_id,
495  phase_sm_id,
496  state_sm_id);
497  if (fom->fo_ops->fo_addb2_descr != NULL)
498  fom->fo_ops->fo_addb2_descr(fom);
500 }
501 
502 static void queueit(struct m0_sm_group *grp, struct m0_sm_ast *ast)
503 {
504  struct m0_fom *fom = container_of(ast, struct m0_fom, fo_cb.fc_ast);
505 
508 
511  fom_ready(fom);
512 }
513 
514 static void thr_addb2_enter(struct m0_loc_thread *thr,
515  struct m0_fom_locality *loc)
516 {
518  M0_ASSERT(m0_thread_tls()->tls_addb2_mach == NULL);
521 }
522 
523 static void thr_addb2_leave(struct m0_loc_thread *thr,
524  struct m0_fom_locality *loc)
525 {
526  M0_PRE(m0_thread_tls()->tls_addb2_mach == loc->fl_addb2_mach);
530 }
531 
532 M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
533 {
534  fom->fo_cb.fc_ast.sa_cb = readyit;
535  m0_sm_ast_post(&fom->fo_loc->fl_group, &fom->fo_cb.fc_ast);
536 }
537 
538 M0_INTERNAL void m0_fom_block_enter(struct m0_fom *fom)
539 {
540  struct m0_fom_locality *loc;
541  struct m0_loc_thread *thr;
542 
546 
547  loc = fom->fo_loc;
548  thr = fom->fo_thread;
549 
550  M0_PRE(thr->lt_state == HANDLER);
551  M0_PRE(thr == loc->fl_handler);
552 
553  /*
554  * If there are unblocking threads, trying to complete
555  * m0_fom_block_leave() call, do nothing and release the group lock. One
556  * of these threads would grab it and become the handler.
557  *
558  * Otherwise, wake up one idle thread, creating it if necessary.
559  *
560  * Note that loc->fl_unblocking can change under us, but:
561  *
562  * - it cannot become 0 if it wasn't, because it is decremented
563  * under the group lock and,
564  *
565  * - if it were increased after we check it, nothing bad would
566  * happen: an extra idle thread wakeup is harmless.
567  */
568  if (m0_atomic64_get(&loc->fl_unblocking) == 0) {
569  if (!m0_chan_has_waiters(&loc->fl_idle))
570  loc_thr_create(loc);
571  m0_chan_signal(&loc->fl_idle);
572  }
573 
574  thr->lt_state = BLOCKED;
575  loc->fl_handler = NULL;
578  thr_addb2_leave(thr, loc);
579  group_unlock(loc);
580 }
581 
582 M0_INTERNAL void m0_fom_block_leave(struct m0_fom *fom)
583 {
584  struct m0_fom_locality *loc;
585  struct m0_loc_thread *thr;
586 
587  loc = fom->fo_loc;
588  thr = fom->fo_thread;
589 
590  M0_PRE(thr->lt_state == BLOCKED);
591  /*
592  * Signal the handler that there is a thread that wants to unblock, just
593  * in case the handler is sleeping on empty runqueue.
594  *
595  * It is enough to do this only when loc->fl_unblocking increments from
596  * 0 to 1, because the handler won't go to sleep until
597  * loc->fl_unblocking drops to 0.
598  */
599  if (m0_atomic64_add_return(&loc->fl_unblocking, 1) == 1)
601  /*
602  * lt_state must be changed after fl_unblocking increment
603  * to avoid panics at thread_invariant(). Also, we rely
604  * on the following fact from the Linux kernel here:
605  *
606  * - RMW operations that have a return value are fully ordered;
607  *
608  * (See https://www.kernel.org/doc/Documentation/atomic_t.txt)
609  */
610  thr->lt_state = UNBLOCKING;
611 
612  group_lock(loc);
613  thr_addb2_enter(thr, loc);
617  M0_ASSERT(loc->fl_handler == NULL);
618  loc->fl_handler = thr;
619  thr->lt_state = HANDLER;
622 }
623 
624 M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
625 {
626  struct m0_fom_domain *dom;
627  size_t loc_idx;
628 
629  M0_PRE(fom != NULL);
630 
631  dom = m0_fom_dom();
632  loc_idx = fom->fo_ops->fo_home_locality(fom) % dom->fd_localities_nr;
633  M0_ASSERT(loc_idx < dom->fd_localities_nr);
634  fom->fo_loc = dom->fd_localities[loc_idx];
635  fom->fo_loc_idx = loc_idx;
637  fom->fo_cb.fc_ast.sa_cb = &queueit;
638  m0_sm_ast_post(&fom->fo_loc->fl_group, &fom->fo_cb.fc_ast);
639 }
640 
654 static void fom_wait(struct m0_fom *fom)
655 {
656  struct m0_fom_locality *loc;
657 
659  loc = fom->fo_loc;
660  wail_tlist_add_tail(&loc->fl_wail, fom);
661  M0_CNT_INC(loc->fl_wail_nr);
664 }
665 
666 static bool fom_wait_is_completed(const struct fom_wait_data *wd)
667 {
668  return (M0_BITS(m0_fom_phase(wd->wd_fom)) &
670 }
671 
672 static int fom_wait_rc(const struct fom_wait_data *wd)
673 {
675  M0_LOG(M0_DEBUG, "conf=%s phase = %s",
678  m0_fom_phase(wd->wd_fom)));
679 
680  return m0_fom_phase(wd->wd_fom) == M0_FOM_PHASE_FINISH ?
681  M0_ERR(-ESRCH) : 0;
682 }
683 
684 static bool fom_wait_cb(struct m0_clink *clink)
685 {
686  struct fom_wait_data *wd = M0_AMB(wd, clink, wd_clink);
687 
688  if (fom_wait_is_completed(wd)) {
689  wd->wd_rc = fom_wait_rc(wd);
690  /*
691  * Detach clink from FOM phase SM channel to not get
692  * assertion on FOM finalisation.
693  */
694  m0_clink_del(&wd->wd_clink);
695  /* That will signal on semaphore in m0_fom_timedwait(). */
696  return false;
697  }
698  return true;
699 }
700 
701 static int fom_wait_init(void *data)
702 {
703  struct fom_wait_data *wd = data;
704 
705  /*
706  * FOM may be already in finish state, it worth checking it here to not
707  * wait deadline.
708  */
710  if (!wd->wd_completed)
712  return wd->wd_completed ? fom_wait_rc(wd) : 0;
713 }
714 
715 static int fom_wait_fini(void *data)
716 {
717  struct fom_wait_data *wd = data;
718 
719  if (m0_clink_is_armed(&wd->wd_clink))
720  m0_clink_del(&wd->wd_clink);
721  return 0;
722 }
723 
724 M0_INTERNAL int m0_fom_timedwait(struct m0_fom *fom, uint64_t phases,
725  m0_time_t deadline)
726 {
727  struct fom_wait_data wd;
728  struct m0_locality *loc = &fom->fo_loc->fl_locality;
729  int result;
730 
731  wd.wd_phases = phases;
732  wd.wd_fom = fom;
734  result = m0_locality_call(loc, &fom_wait_init, &wd);
735  if (!wd.wd_completed) {
736  result = m0_chan_timedwait(&wd.wd_clink, deadline) ?
737  wd.wd_rc : M0_ERR(-ETIMEDOUT);
738  m0_locality_call(loc, &fom_wait_fini, &wd);
739  }
740  m0_clink_fini(&wd.wd_clink);
741  return result;
742 }
743 
747 static void cb_done(struct m0_fom_callback *cb)
748 {
749  struct m0_clink *clink = &cb->fc_clink;
750 
751  M0_PRE(cb->fc_state == M0_FCS_ARMED);
752 
754  cb->fc_state = M0_FCS_DONE;
755 
757 }
758 
762 static void cb_run(struct m0_fom_callback *cb)
763 {
765 
766  cb_done(cb);
767  cb->fc_bottom(cb);
768 }
769 
770 static void *cb_next(struct m0_fom_callback *cb)
771 {
772  return cb->fc_ast.sa_next;
773 }
774 
780 static void fom_exec(struct m0_fom *fom)
781 {
782  int rc;
783  struct m0_fom_locality *loc;
784 
785  loc = fom->fo_loc;
786  fom->fo_thread = loc->fl_handler;
788  do {
791  rc = fom->fo_ops->fo_tick(fom);
792  if (FOM_PHASE_DEBUG) {
793  fom->fo_log[fom->fo_transitions %
794  ARRAY_SIZE(fom->fo_log)] =
795  m0_fom_phase(fom);
796  }
797  /*
798  * (rc == M0_FSO_AGAIN) means that next phase transition is
799  * possible. Current policy is to execute the transition
800  * immediately. Alternative is to put the fom on the runqueue
801  * and select "the best" fom from the runqueue.
802  */
803  fom->fo_transitions++;
804  } while (rc == M0_FSO_AGAIN);
805 
806  fom->fo_thread = NULL;
807 
810 
812  /*
813  * Finish fom itself.
814  */
815  fom->fo_ops->fo_fini(fom);
816  /*
817  * Don't touch the fom after this point.
818  */
819  } else {
820  struct m0_fom_callback *cb;
821 
822  fom_wait(fom);
823  /*
824  * If there are pending call-backs, execute them, until one of
825  * them wakes the fom up. Don't bother to optimize moving
826  * between queues: this is a rare case.
827  *
828  * Note: call-backs are executed in LIFO order.
829  */
831  while ((cb = fom->fo_pending) != NULL) {
832  fom->fo_pending = cb_next(cb);
833  cb_run(cb);
834  /*
835  * call-back is not allowed to destroy a fom.
836  */
838  if (fom_state(fom) != M0_FOS_WAITING)
839  break;
840  }
843  }
844 }
845 
851 static struct m0_fom *fom_dequeue(struct m0_fom_locality *loc)
852 {
853  struct m0_fom *fom;
854 
855  fom = runq_tlist_pop(&loc->fl_runq);
856  if (fom != NULL) {
857  M0_ASSERT(fom->fo_loc == loc);
858  M0_CNT_DEC(loc->fl_runq_nr);
860  }
861  return fom;
862 }
863 
867 static void loc_handler_thread(struct m0_loc_thread *th)
868 {
869  struct m0_clink *clink = &th->lt_clink;
870  struct m0_fom_locality *loc = th->lt_loc;
871 
872  while (1) {
873  /*
874  * start idle, wait for work to do. The clink was registered
875  * with &loc->fl_idle by loc_thr_create().
876  */
877  M0_ASSERT(th->lt_state == IDLE);
879 
880  /* become the handler thread */
881  group_lock(loc);
882  M0_ASSERT(loc->fl_handler == NULL);
883  loc->fl_handler = th;
884  th->lt_state = HANDLER;
885  thr_addb2_enter(th, loc);
886 
887  /*
888  * re-initialise the clink and arrange for it to receive group
889  * AST notifications and runrun wakeups.
890  */
895  m0_clink_add(&loc->fl_runrun, clink);
896 
897  /*
898  * main handler loop.
899  *
900  * This loop terminates when the locality is finalised
901  * (loc->fl_shutdown) or this thread should go back to the idle
902  * state.
903  */
904  while (1) {
905  struct m0_fom *fom;
907 
908  m0_addb2_force(M0_MKTIME(5, 0));
909  /*
910  * All foms that have been queued up in runqueue will be
911  * executed, clearing their accumulation in semaphore.
912  */
914  /*
915  * Check for a blocked thread that tries to unblock and
916  * complete a phase transition.
917  */
918  if (m0_atomic64_get(&loc->fl_unblocking) > 0)
919  /*
920  * Idle ourselves. The unblocking thread (first
921  * to grab the group lock in case there are
922  * many), becomes the new handler.
923  */
924  break;
928  fom = fom_dequeue(loc);
929  if (fom != NULL) {
931  fom_exec(fom);
933  } else if (loc->fl_shutdown)
934  break;
935  else
936  /*
937  * Yes, sleep with the lock held. Knock on
938  * &loc->fl_runrun or &loc->fl_group.s_clink to
939  * wake.
940  */
942  }
943  loc->fl_handler = NULL;
944  th->lt_state = IDLE;
947  m0_clink_init(&th->lt_clink, NULL);
948  m0_clink_add(&loc->fl_idle, &th->lt_clink);
949  thr_addb2_leave(th, loc);
950  group_unlock(loc);
951  if (loc->fl_shutdown)
952  break;
953  }
954 }
955 
960 static int loc_thr_init(struct m0_loc_thread *th)
961 {
962  return m0_thread_confine(&th->lt_thread, &th->lt_loc->fl_processors);
963 }
964 
965 static void loc_thr_fini(struct m0_loc_thread *th)
966 {
968  M0_PRE(th->lt_state == IDLE);
969  m0_clink_del(&th->lt_clink);
970  m0_clink_fini(&th->lt_clink);
972  thr_tlink_del_fini(th);
973  m0_free(th);
974 }
975 
976 static int loc_thr_create(struct m0_fom_locality *loc)
977 {
978  struct m0_loc_thread *thr;
979  int res;
980 
982 
983  M0_ENTRY("%p", loc);
984 
985  M0_ALLOC_PTR(thr);
986  if (thr == NULL)
987  return M0_ERR(-ENOMEM);
988  thr->lt_state = IDLE;
990  thr->lt_loc = loc;
991  thr_tlink_init_at_tail(thr, &loc->fl_threads);
992 
993  m0_clink_init(&thr->lt_clink, NULL);
994  m0_clink_add(&loc->fl_idle, &thr->lt_clink);
995 
996  res = M0_THREAD_INIT(&thr->lt_thread, struct m0_loc_thread *,
998  "m0_loc_thread");
999  if (res != 0)
1000  loc_thr_fini(thr);
1001  return M0_RC(res);
1002 }
1003 
1004 static void loc_addb2_fini(struct m0_fom_locality *loc)
1005 {
1006  struct m0_addb2_mach *orig = m0_thread_tls()->tls_addb2_mach;
1007 
1012  m0_thread_tls()->tls_addb2_mach = orig;
1014 }
1015 
1019 static void loc_fini(struct m0_fom_locality *loc)
1020 {
1021  struct m0_loc_thread *th;
1022 
1023  loc->fl_shutdown = true;
1025 
1026  group_lock(loc);
1028  m0_chan_broadcast(&loc->fl_idle);
1029  while ((th = thr_tlist_head(&loc->fl_threads)) != NULL) {
1030  group_unlock(loc);
1031  m0_thread_join(&th->lt_thread);
1032  group_lock(loc);
1033  loc_thr_fini(th);
1034  }
1035  group_unlock(loc);
1036 
1037  runq_tlist_fini(&loc->fl_runq);
1038  M0_ASSERT(loc->fl_runq_nr == 0);
1039  wail_tlist_fini(&loc->fl_wail);
1040  M0_ASSERT(loc->fl_wail_nr == 0);
1041  thr_tlist_fini(&loc->fl_threads);
1043  m0_chan_fini_lock(&loc->fl_idle);
1045  m0_sm_group_fini(&loc->fl_group);
1047  loc_addb2_fini(loc);
1049 }
1050 
1065 static int loc_init(struct m0_fom_locality *loc, struct m0_fom_domain *dom,
1066  size_t idx)
1067 {
1068  int res;
1069  struct m0_addb2_mach *orig = m0_thread_tls()->tls_addb2_mach;
1070 
1071  M0_PRE(loc != NULL);
1072 
1073  M0_ENTRY();
1074 
1075  loc->fl_dom = dom;
1076  loc->fl_addb2_mach = m0_addb2_sys_get(dom->fd_addb2_sys);
1077  if (loc->fl_addb2_mach == NULL) {
1078  res = M0_ERR(-ENOMEM);
1079  goto err;
1080  }
1081 
1082  runq_tlist_init(&loc->fl_runq);
1083  loc->fl_runq_nr = 0;
1084  wail_tlist_init(&loc->fl_wail);
1085  loc->fl_wail_nr = 0;
1086  loc->fl_idx = idx;
1093  m0_addb2_hist_add(&loc->fl_runq_counter, 1, 30, M0_AVI_RUNQ, -1);
1094  m0_addb2_hist_add(&loc->fl_wail_counter, 1, 30, M0_AVI_WAIL, -1);
1096  M0_AVI_LOCALITY_FORQ, -1);
1104  m0_thread_tls()->tls_addb2_mach = orig;
1105 
1107  &loc->fl_group, loc->fl_dom, loc->fl_idx);
1108  m0_sm_group_init(&loc->fl_group);
1109  loc->fl_group.s_addb2 = &loc->fl_grp_addb2;
1110  m0_chan_init(&loc->fl_runrun, &loc->fl_group.s_lock);
1111  loc->fl_runrun.ch_addb2 = &loc->fl_chan_addb2;
1112  thr_tlist_init(&loc->fl_threads);
1113  m0_atomic64_set(&loc->fl_unblocking, 0);
1114  m0_chan_init(&loc->fl_idle, &loc->fl_group.s_lock);
1115 
1116  res = m0_bitmap_init(&loc->fl_processors, dom->fd_localities_nr);
1117  if (res == 0) {
1118  int i;
1119 
1120  m0_bitmap_set(&loc->fl_processors, idx, true);
1121  /* create a pool of idle threads plus the handler thread. */
1122  group_lock(loc);
1123  for (i = 0; i < LOC_IDLE_NR + 1; ++i) {
1124  res = loc_thr_create(loc);
1125  if (res != 0)
1126  break;
1127  }
1128  group_unlock(loc);
1129  /*
1130  * All threads created above are blocked at
1131  * loc_handler_thread()::m0_chan_wait(clink). One thread
1132  * per-locality is woken at the end of m0_fom_domain_init().
1133  */
1134  }
1135  if (res != 0)
1136  loc_fini(loc);
1137  return M0_RC(res);
1138 err:
1139  return M0_ERR(res);
1140 }
1141 
1142 /*
1143  * Compose HW core mask with preset mask from instance
1144  */
1145 static void core_mask_apply(struct m0_bitmap *onln_cpu_map)
1146 {
1147  struct m0_bitmap *cores;
1148  int i;
1149 
1150  cores = &m0_get()->i_proc_attr.pca_core_mask;
1151 
1152  if (m0_bitmap_set_nr(cores) == 0)
1153  return;
1154 
1155  for (i = 0; i < cores->b_nr && i < onln_cpu_map->b_nr; ++i)
1156  if (!m0_bitmap_get(cores, i))
1157  m0_bitmap_set(onln_cpu_map, i, false);
1158 }
1159 
1160 static void hung_foms_notify(struct m0_locality_chore *chore,
1161  struct m0_locality *loc, void *place)
1162 {
1163  struct m0_fom_locality *floc = container_of(loc, struct m0_fom_locality,
1164  fl_locality);
1165  const struct m0_fom_domain *dom = floc->fl_dom;
1166 
1167  (void)m0_tl_forall(runq, fom, &floc->fl_runq,
1168  dom->fd_ops->fdo_time_is_out(dom, fom));
1169  (void)m0_tl_forall(wail, fom, &floc->fl_wail,
1170  dom->fd_ops->fdo_time_is_out(dom, fom));
1171 }
1172 
1173 M0_INTERNAL int m0_fom_domain_init(struct m0_fom_domain **out)
1174 {
1175  struct m0_fom_domain *dom;
1176  struct m0_fom_locality *loc;
1177  int result;
1178  size_t cpu_max;
1179  size_t cpu_nr;
1180  size_t i;
1181  struct m0_bitmap cpu_map;
1182 
1183  M0_ENTRY();
1184 
1185  cpu_max = m0_processor_nr_max();
1186  result = m0_bitmap_init(&cpu_map, cpu_max);
1187  if (result != 0)
1188  return M0_ERR(result);
1189 
1190  m0_processors_online(&cpu_map);
1191  core_mask_apply(&cpu_map);
1192  cpu_nr = m0_bitmap_set_nr(&cpu_map);
1193 
1194  M0_ALLOC_PTR(dom);
1195  if (dom == NULL) {
1196  m0_bitmap_fini(&cpu_map);
1197  return M0_ERR(-ENOMEM);
1198  }
1199  dom->fd_ops = &m0_fom_dom_ops;
1200 
1201  result = m0_addb2_sys_init(&dom->fd_addb2_sys,
1202  &(struct m0_addb2_config) {
1203  .co_queue_max = (cpu_nr + 1 ) / 2 *
1204  1024 * 1024,
1205  .co_pool_min = cpu_nr,
1206  .co_pool_max = cpu_nr
1207  });
1208  if (result == 0) {
1209  M0_ALLOC_ARR(dom->fd_localities, cpu_nr);
1210  if (dom->fd_localities != NULL) {
1211  dom->fd_localities_nr = cpu_nr;
1212  for (i = 0; i < cpu_nr; ++i) {
1213  /* Do not support holes in cpu mask. */
1214  M0_ASSERT(m0_bitmap_get(&cpu_map, i));
1215  M0_ALLOC_PTR(loc);
1216  if (loc != NULL) {
1217  result = loc_init(loc, dom, i);
1218  if (result == 0)
1219  dom->fd_localities[i] = loc;
1220  else
1221  m0_free(loc);
1222  } else
1223  result = M0_ERR(-ENOMEM);
1224  if (result != 0)
1225  break;
1226  }
1227  if (result == 0) {
1229  /* Wake up handler threads. */
1230  for (i = 0; i < cpu_nr; ++i) {
1231  loc = dom->fd_localities[i];
1232  group_lock(loc);
1233  m0_chan_signal(&loc->fl_idle);
1234  group_unlock(loc);
1235  }
1236 
1237  m0_locality_chore_init(&dom->fd_hung_foms_chore,
1239  NULL,
1241  0);
1242  }
1243  } else
1244  result = M0_ERR(-ENOMEM);
1245  }
1246  m0_bitmap_fini(&cpu_map);
1247  if (result == 0)
1248  *out = dom;
1249  else {
1250  *out = NULL;
1252  }
1253  return result;
1254 }
1255 
1256 M0_INTERNAL void m0_fom_domain_fini(struct m0_fom_domain *dom)
1257 {
1258  int i;
1259 
1260  m0_locality_chore_fini(&dom->fd_hung_foms_chore);
1261  if (dom->fd_localities != NULL) {
1262  for (i = dom->fd_localities_nr - 1; i >= 0; --i) {
1263  if (dom->fd_localities[i] != NULL)
1264  loc_fini(dom->fd_localities[i]);
1265  }
1267  for (i = 0; i < dom->fd_localities_nr; i++)
1268  m0_free(dom->fd_localities[i]);
1269  m0_free(dom->fd_localities);
1270  }
1271  if (dom->fd_addb2_sys != NULL)
1272  m0_addb2_sys_fini(dom->fd_addb2_sys);
1273  m0_free(dom);
1274 }
1275 
1276 static bool is_loc_locker_empty(struct m0_fom_locality *loc, uint32_t key)
1277 {
1278  return m0_locality_lockers_is_empty(&loc->fl_locality, key);
1279 }
1280 
1281 M0_INTERNAL bool m0_fom_domain_is_idle_for(const struct m0_reqh_service *svc)
1282 {
1283  struct m0_fom_domain *dom = m0_fom_dom();
1284  return m0_forall(i, dom->fd_localities_nr,
1285  is_loc_locker_empty(dom->fd_localities[i],
1286  svc->rs_fom_key));
1287 }
1288 
1289 M0_INTERNAL bool m0_fom_domain_is_idle(const struct m0_fom_domain *dom)
1290 {
1291  return m0_forall(i, dom->fd_localities_nr,
1292  dom->fd_localities[i]->fl_foms == 0);
1293 }
1294 
1295 M0_INTERNAL void m0_fom_locality_inc(struct m0_fom *fom)
1296 {
1297  unsigned key = fom->fo_service->rs_fom_key;
1298  struct m0_fom_locality *loc = fom->fo_loc;
1299  uint64_t cnt;
1300 
1301  M0_ASSERT(key != 0);
1302  cnt = (uint64_t)m0_locality_lockers_get(&loc->fl_locality, key);
1303  M0_CNT_INC(cnt);
1304  M0_CNT_INC(loc->fl_foms);
1306  m0_locality_lockers_set(&loc->fl_locality, key, (void *)cnt);
1307 }
1308 
1309 M0_INTERNAL bool m0_fom_locality_dec(struct m0_fom *fom)
1310 {
1311  unsigned key = fom->fo_service->rs_fom_key;
1312  struct m0_fom_locality *loc = fom->fo_loc;
1313  uint64_t cnt;
1314 
1315  M0_ASSERT(key != 0);
1316  cnt = (uint64_t)m0_locality_lockers_get(&loc->fl_locality, key);
1317  M0_CNT_DEC(cnt);
1318  M0_CNT_DEC(loc->fl_foms);
1319  m0_locality_lockers_set(&loc->fl_locality, key, (void *)cnt);
1321  return cnt == 0;
1322 }
1323 
1324 void m0_fom_fini(struct m0_fom *fom)
1325 {
1326  struct m0_reqh *reqh;
1327 
1328  M0_ENTRY("fom: %p fop %p rep fop %p", fom, fom->fo_fop,
1329  fom->fo_rep_fop);
1331  M0_PRE(fom->fo_pending == NULL);
1332 
1333  reqh = m0_fom_reqh(fom);
1335 
1336  m0_sm_fini(&fom->fo_sm_phase);
1337  m0_sm_fini(&fom->fo_sm_state);
1338  runq_tlink_fini(fom);
1339  m0_fom_callback_fini(&fom->fo_cb);
1340 
1341  if (fom->fo_fop != NULL) {
1342  M0_LOG(M0_DEBUG, "fom: %p fop %p item %p[%u] rep fop %p",
1343  fom, fom->fo_fop, &fom->fo_fop->f_item,
1344  m0_fop_opcode(fom->fo_fop), fom->fo_rep_fop);
1345  m0_fop_put_lock(fom->fo_fop);
1346  }
1347  if (fom->fo_rep_fop != NULL)
1348  m0_fop_put_lock(fom->fo_rep_fop);
1349 
1350  /*
1351  * Channel lock is taken before decrementing the locality count
1352  * because otherwise, a race window is left out because of which
1353  * the function m0_reqh_idle_wait_for(), which checks the value of
1354  * this locality count to figure out whether request handler is idle,
1355  * can exit prematurely. This could result in an attempt to lock or
1356  * unlock the channel mutex even after it is finalized by the caller
1357  * of m0_reqh_idle_wait_for() function.
1358  *
1359  * TODO: This being a hot path, instead of taking and releasing a
1360  * global lock, fom and service finalisations should synchronise
1361  * through an RCU-like mechanism.
1362  */
1364  if (m0_fom_locality_dec(fom))
1367 
1368  M0_LEAVE();
1369 }
1370 M0_EXPORTED(m0_fom_fini);
1371 
1372 void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type,
1373  const struct m0_fom_ops *ops, struct m0_fop *fop,
1374  struct m0_fop *reply, struct m0_reqh *reqh)
1375 {
1376  M0_PRE(fom != NULL);
1377  M0_PRE(reqh != NULL);
1378 
1379  M0_ENTRY("fom: %p fop %p rep fop %p", fom, fop, reply);
1380 
1381  fom->fo_type = fom_type;
1382  fom->fo_ops = ops;
1383  fom->fo_transitions = 0;
1384  fom->fo_local = false;
1385  m0_fom_callback_init(&fom->fo_cb);
1386  runq_tlink_init(fom);
1387 
1388  if (fop != NULL) {
1389  m0_fop_get(fop);
1390  M0_LOG(M0_DEBUG, "fom: %p fop %p item %p[%u] rep fop %p",
1392  }
1393 
1394  fom->fo_fop = fop;
1395 
1396  if (reply != NULL) {
1397  m0_fop_get(reply);
1398  fop->f_item.ri_reply = &reply->f_item;
1399  }
1400  fom->fo_rep_fop = reply;
1401 
1406  fom->fo_service = m0_reqh_service_find(fom_type->ft_rstype, reqh);
1407 
1408  M0_ASSERT(fom->fo_service != NULL);
1409  M0_LEAVE();
1410 }
1411 M0_EXPORTED(m0_fom_init);
1412 
1413 static bool fom_clink_cb(struct m0_clink *link)
1414 {
1415  struct m0_fom_callback *cb = container_of(link, struct m0_fom_callback,
1416  fc_clink);
1417  M0_PRE(cb->fc_state >= M0_FCS_ARMED);
1418 
1419  if (cb->fc_state == M0_FCS_ARMED &&
1420  (cb->fc_top == NULL || !cb->fc_top(cb)))
1421  m0_sm_ast_post(&cb->fc_fom->fo_loc->fl_group, &cb->fc_ast);
1422 
1423  return true;
1424 }
1425 
1426 static void fom_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
1427 {
1428  struct m0_fom_callback *cb = container_of(ast, struct m0_fom_callback,
1429  fc_ast);
1430  struct m0_fom *fom = cb->fc_fom;
1431 
1433  M0_PRE(cb->fc_state == M0_FCS_ARMED);
1434 
1435  if (fom_state(fom) == M0_FOS_WAITING)
1436  cb_run(cb);
1437  else {
1439  fom_is_blocked(fom));
1440  /*
1441  * Call-back arrived while our fom is in READY state (hanging on
1442  * the runqueue, waiting for its turn) or RUNNING state (blocked
1443  * between m0_fom_block_enter() and
1444  * m0_fom_block_leave()). Instead of executing the call-back
1445  * immediately, add it to the stack of pending call-backs for
1446  * this fom. The call-back will be executed by fom_exec() when
1447  * the fom is about to return to the WAITING state.
1448  */
1449  cb->fc_ast.sa_next = (void *)fom->fo_pending;
1450  fom->fo_pending = cb;
1451  }
1452 }
1453 
1454 M0_INTERNAL void m0_fom_callback_init(struct m0_fom_callback *cb)
1455 {
1456  cb->fc_state = M0_FCS_DONE;
1458 }
1459 
1460 M0_INTERNAL void m0_fom_callback_arm(struct m0_fom *fom, struct m0_chan *chan,
1461  struct m0_fom_callback *cb)
1462 {
1463  M0_PRE(cb->fc_bottom != NULL);
1464  M0_PRE(cb->fc_state == M0_FCS_DONE);
1465 
1466  cb->fc_fom = fom;
1467 
1468  cb->fc_ast.sa_cb = &fom_ast_cb;
1469  cb->fc_state = M0_FCS_ARMED;
1470  m0_mb();
1471  cb->fc_clink.cl_is_oneshot = true;
1472  m0_clink_add(chan, &cb->fc_clink);
1473 }
1474 
1475 static bool fom_callback_is_armed(const struct m0_fom_callback *cb)
1476 {
1477  return cb->fc_state == M0_FCS_ARMED;
1478 }
1479 
1480 M0_INTERNAL bool m0_fom_is_waiting_on(const struct m0_fom *fom)
1481 {
1482  return fom_callback_is_armed(&fom->fo_cb);
1483 }
1484 
1485 static void fom_ready_cb(struct m0_fom_callback *cb)
1486 {
1487  m0_fom_ready(cb->fc_fom);
1488 }
1489 
1490 M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan,
1491  struct m0_fom_callback *cb)
1492 {
1493  cb->fc_bottom = fom_ready_cb;
1495 }
1496 
1497 M0_INTERNAL void m0_fom_callback_fini(struct m0_fom_callback *cb)
1498 {
1499  M0_PRE(cb->fc_state == M0_FCS_DONE);
1500  m0_clink_fini(&cb->fc_clink);
1501 }
1502 
1503 static void cb_cancel(struct m0_fom_callback *cb)
1504 {
1505  struct m0_fom_callback *prev;
1506 
1507  prev = cb->fc_fom->fo_pending;
1508  while (prev != NULL && cb_next(prev) != cb)
1509  prev = cb_next(prev);
1510  if (prev != NULL)
1511  prev->fc_ast.sa_next = cb_next(cb);
1512 }
1513 
1514 M0_INTERNAL void m0_fom_callback_cancel(struct m0_fom_callback *cb)
1515 {
1516  struct m0_clink *clink = &cb->fc_clink;
1517 
1518  M0_PRE(cb->fc_state >= M0_FCS_ARMED);
1519 
1520  if (cb->fc_state == M0_FCS_ARMED) {
1522  cb_done(cb);
1523  /* Once the clink is finalised, the AST cannot be posted, cancel
1524  the AST. */
1526  /* Once the AST is cancelled, cb cannot be added to the pending
1527  list, cancel cb. */
1528  cb_cancel(cb);
1529  }
1530 }
1531 
1532 M0_INTERNAL void m0_fom_timeout_init(struct m0_fom_timeout *to)
1533 {
1534  M0_SET0(to);
1535  m0_sm_timer_init(&to->to_timer);
1537 }
1538 
1539 M0_INTERNAL void m0_fom_timeout_fini(struct m0_fom_timeout *to)
1540 {
1542  m0_sm_timer_fini(&to->to_timer);
1543 }
1544 
1545 static void fom_timeout_cb(struct m0_sm_timer *timer)
1546 {
1547  struct m0_fom_timeout *to = container_of(timer, struct m0_fom_timeout,
1548  to_timer);
1549  struct m0_fom_callback *cb = &to->to_cb;
1550 
1551  cb->fc_state = M0_FCS_ARMED;
1552  fom_ast_cb(to->to_timer.tr_grp, &cb->fc_ast);
1553 }
1554 
1555 static int fom_timeout_start(struct m0_fom_timeout *to,
1556  struct m0_fom *fom,
1557  void (*cb)(struct m0_fom_callback *),
1558  m0_time_t deadline)
1559 {
1560  to->to_cb.fc_fom = fom;
1561  to->to_cb.fc_bottom = cb;
1562  return m0_sm_timer_start(&to->to_timer, fom->fo_sm_state.sm_grp,
1563  fom_timeout_cb, deadline);
1564 }
1565 
1566 M0_INTERNAL int m0_fom_timeout_wait_on(struct m0_fom_timeout *to,
1567  struct m0_fom *fom,
1568  m0_time_t deadline)
1569 {
1570  return fom_timeout_start(to, fom, fom_ready_cb, deadline);
1571 }
1572 
1573 M0_INTERNAL int m0_fom_timeout_arm(struct m0_fom_timeout *to,
1574  struct m0_fom *fom,
1575  void (*cb)(struct m0_fom_callback *),
1576  m0_time_t deadline)
1577 {
1578  return fom_timeout_start(to, fom, cb, deadline);
1579 }
1580 
1581 M0_INTERNAL void m0_fom_timeout_cancel(struct m0_fom_timeout *to)
1582 {
1583  struct m0_fom_callback *cb = &to->to_cb;
1584  struct m0_sm_timer *tr = &to->to_timer;
1585 
1588 
1589  m0_sm_timer_cancel(tr);
1591  }
1592 }
1593 
1595 
1596 M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id,
1597  const struct m0_fom_type_ops *ops,
1598  const struct m0_reqh_service_type *svc_type,
1599  const struct m0_sm_conf *sm)
1600 {
1602  M0_PRE(id > 0);
1603  M0_PRE(M0_IN(m0_fom__types[id], (NULL, type)));
1604 
1605  if (m0_fom__types[id] == NULL) {
1606  type->ft_id = id;
1607  type->ft_ops = ops;
1608  if (sm != NULL)
1609  type->ft_conf = *sm;
1610  type->ft_state_conf = fom_states_conf0;
1611  type->ft_rstype = svc_type;
1612  m0_fom__types[id] = type;
1613  }
1614 }
1615 
1616 static struct m0_sm_state_descr fom_states[] = {
1617  [M0_FOS_INIT] = {
1619  .sd_name = "Init",
1620  .sd_allowed = M0_BITS(M0_FOS_FINISH, M0_FOS_READY)
1621  },
1622  [M0_FOS_READY] = {
1623  .sd_name = "Ready",
1624  .sd_allowed = M0_BITS(M0_FOS_RUNNING)
1625  },
1626  [M0_FOS_RUNNING] = {
1627  .sd_name = "Running",
1628  .sd_allowed = M0_BITS(M0_FOS_READY, M0_FOS_WAITING,
1629  M0_FOS_FINISH)
1630  },
1631  [M0_FOS_WAITING] = {
1632  .sd_name = "Waiting",
1633  .sd_allowed = M0_BITS(M0_FOS_READY, M0_FOS_FINISH)
1634  },
1635  [M0_FOS_FINISH] = {
1636  .sd_flags = M0_SDF_TERMINAL,
1637  .sd_name = "Finished",
1638  }
1639 };
1640 
1642  { "Schedule", M0_FOS_INIT, M0_FOS_READY },
1643  { "Failed", M0_FOS_INIT, M0_FOS_FINISH },
1644  { "Run", M0_FOS_READY, M0_FOS_RUNNING },
1645  { "Yield", M0_FOS_RUNNING, M0_FOS_READY },
1646  { "Sleep", M0_FOS_RUNNING, M0_FOS_WAITING },
1647  { "Done", M0_FOS_RUNNING, M0_FOS_FINISH },
1648  { "Wakeup", M0_FOS_WAITING, M0_FOS_READY },
1649  { "Terminate", M0_FOS_WAITING, M0_FOS_FINISH }
1650 };
1651 
1652 M0_INTERNAL struct m0_sm_conf fom_states_conf = {
1653  .scf_name = "FOM states",
1654  .scf_nr_states = ARRAY_SIZE(fom_states),
1655  .scf_state = fom_states,
1656  .scf_trans_nr = ARRAY_SIZE(fom_trans),
1657  .scf_trans = fom_trans
1658 };
1659 
1660 static struct m0_sm_conf fom_states_conf0;
1661 
1662 M0_INTERNAL int m0_foms_init(void)
1663 {
1667 }
1668 
1669 M0_INTERNAL void m0_foms_fini(void)
1670 {
1672 }
1673 
1674 M0_INTERNAL void m0_fom_sm_init(struct m0_fom *fom)
1675 {
1676  struct m0_sm_group *fom_group;
1677 
1678  M0_PRE(fom != NULL);
1679  M0_PRE(fom->fo_loc != NULL);
1680 
1681  fom_group = &fom->fo_loc->fl_group;
1682  m0_sm_init(&fom->fo_sm_phase, &fom->fo_type->ft_conf,
1683  M0_FOM_PHASE_INIT, fom_group);
1684  m0_sm_init(&fom->fo_sm_state, &fom->fo_type->ft_state_conf,
1685  M0_FOS_INIT, fom_group);
1686 }
1687 
1688 void m0_fom_phase_set(struct m0_fom *fom, int phase)
1689 {
1690  M0_LOG(M0_DEBUG, "fom=%p, item %p[%u] phase set: %s -> %s", fom,
1691  fom->fo_fop == NULL ? NULL : &fom->fo_fop->f_item,
1692  fom->fo_fop == NULL ? 0 : m0_fop_opcode(fom->fo_fop),
1694  m0_fom_phase_name(fom, phase));
1695  m0_sm_state_set(&fom->fo_sm_phase, phase);
1696 }
1697 M0_EXPORTED(m0_fom_phase_set);
1698 
1699 void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
1700 {
1701  M0_LOG(M0_DEBUG, "fom=%p, item %p[%u] phase set: %s -> %s", fom,
1702  fom->fo_fop == NULL ? NULL : &fom->fo_fop->f_item,
1703  fom->fo_fop == NULL ? 0 : m0_fop_opcode(fom->fo_fop),
1705  m0_fom_phase_name(fom, phase));
1706  m0_sm_move(&fom->fo_sm_phase, rc, phase);
1707 }
1708 M0_EXPORTED(m0_fom_phase_move);
1709 
1710 void m0_fom_phase_moveif(struct m0_fom *fom, int32_t rc, int phase0, int phase1)
1711 {
1712  m0_fom_phase_move(fom, rc, rc == 0 ? phase0 : phase1);
1713 }
1714 M0_EXPORTED(m0_fom_phase_moveif);
1715 
1716 int m0_fom_phase(const struct m0_fom *fom)
1717 {
1718  return fom->fo_sm_phase.sm_state;
1719 }
1720 M0_EXPORTED(m0_fom_phase);
1721 
1722 M0_INTERNAL const char *m0_fom_phase_name(const struct m0_fom *fom, int phase)
1723 {
1724  return m0_sm_state_name(&fom->fo_sm_phase, phase);
1725 }
1726 
1727 M0_INTERNAL int m0_fom_rc(const struct m0_fom *fom)
1728 {
1729  return fom->fo_sm_phase.sm_rc;
1730 }
1731 
1732 M0_INTERNAL bool m0_fom_is_waiting(const struct m0_fom *fom)
1733 {
1734  return fom_state(fom) == M0_FOS_WAITING && is_in_wail(fom);
1735 }
1736 
1737 M0_INTERNAL int m0_fom_fol_rec_add(struct m0_fom *fom)
1738 {
1739  return M0_RC(m0_dtx_fol_add(&fom->fo_tx));
1740 }
1741 
1742 M0_INTERNAL void m0_fom_fdmi_record_post(struct m0_fom *fom)
1743 {
1744 #ifndef __KERNEL__
1746 #endif
1747 }
1748 
1749 M0_INTERNAL struct m0_reqh *m0_fom2reqh(const struct m0_fom *fom)
1750 {
1751  M0_PRE(fom != NULL && fom->fo_service != NULL);
1752  return fom->fo_service->rs_reqh;
1753 }
1754 
1755 #undef M0_TRACE_SUBSYSTEM
1756 
1758 /*
1759  * Local variables:
1760  * c-indentation-style: "K&R"
1761  * c-basic-offset: 8
1762  * tab-width: 8
1763  * fill-column: 80
1764  * scroll-step: 1
1765  * End:
1766  */
struct m0_addb2_sensor fl_clock
Definition: fom.h:301
static void addb2_introduce(struct m0_fom *fom)
Definition: fom.c:454
static void hung_foms_notify(struct m0_locality_chore *chore, struct m0_locality *loc, void *place)
Definition: fom.c:1160
M0_INTERNAL void m0_fom_domain_fini(struct m0_fom_domain *dom)
Definition: fom.c:1256
uint64_t id
Definition: cob.h:2380
M0_INTERNAL void m0_foms_fini(void)
Definition: fom.c:1669
void m0_fom_phase_moveif(struct m0_fom *fom, int32_t rc, int phase0, int phase1)
Definition: fom.c:1710
static void loc_handler_thread(struct m0_loc_thread *th)
Definition: fom.c:867
static void fom_ready_cb(struct m0_fom_callback *cb)
Definition: fom.c:1485
void m0_addb2_force(m0_time_t delay)
Definition: addb2.c:589
uint32_t m0_fop_opcode(const struct m0_fop *fop)
Definition: fop.c:226
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
uint64_t scf_addb2_key
Definition: sm.h:361
int wd_rc
Definition: fom.c:179
static enum m0_fom_state fom_state(const struct m0_fom *fom)
Definition: fom.c:288
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
Definition: bitmap.c:86
M0_INTERNAL int m0_foms_init(void)
Definition: fom.c:1662
static struct m0_sm_state_descr phases[]
Definition: stats_fom.c:31
uint64_t wd_phases
Definition: fom.c:176
static void fom_ready(struct m0_fom *fom)
Definition: fom.c:413
struct m0_tl fl_runq
Definition: fom.h:259
static int(* diff[M0_PARITY_CAL_ALGO_NR])(struct m0_parity_math *math, struct m0_buf *old, struct m0_buf *new, struct m0_buf *parity, uint32_t index)
Definition: parity_math.c:290
struct m0_fom_domain * fl_dom
Definition: fom.h:256
M0_INTERNAL int m0_sm_addb2_init(struct m0_sm_conf *conf, uint64_t id, uint64_t counter)
Definition: sm.c:846
M0_INTERNAL void m0_fom_block_enter(struct m0_fom *fom)
Definition: fom.c:538
uint64_t ft_id
Definition: fom.h:613
#define NULL
Definition: misc.h:38
struct m0_loc_thread * fl_handler
Definition: fom.h:289
M0_INTERNAL int m0_thread_confine(struct m0_thread *q, const struct m0_bitmap *processors)
Definition: kthread.c:197
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
Definition: bitmap.c:97
struct m0_addb2_hist fl_fom_active
Definition: fom.h:298
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
M0_INTERNAL bool m0_fom_locality_dec(struct m0_fom *fom)
Definition: fom.c:1309
static const struct m0_locality_chore_ops hung_foms_chore_ops
Definition: fom.c:215
struct m0_addb2_hist ca_queue_hist
Definition: chan.h:454
#define ergo(a, b)
Definition: misc.h:293
void * m0_locality_data(int key)
Definition: locality.c:474
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
struct m0_bitmap fl_processors
Definition: fom.h:295
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
Definition: sm.h:350
struct m0_addb2_mach * tls_addb2_mach
Definition: thread.h:68
struct m0_sm_group_addb2 * s_addb2
Definition: sm.h:519
struct m0_fom_callback to_cb
Definition: fom.h:795
static void fom_ast_cb(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: fom.c:1426
static struct io_request req
Definition: file.c:100
M0_INTERNAL bool m0_chan_has_waiters(struct m0_chan *chan)
Definition: chan.c:185
static struct m0_sm_group * grp
Definition: bytecount.c:38
static struct m0_sm_trans_descr fom_trans[M0_FOS_TRANS_NR]
Definition: fom.c:1641
M0_INTERNAL void m0_locality_fini(struct m0_locality *loc)
Definition: locality.c:140
static int loc_thr_create(struct m0_fom_locality *loc)
Definition: fom.c:976
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
struct m0_tlink fo_linkage
Definition: fom.h:511
M0_LEAVE()
#define min_check(a, b)
Definition: arith.h:88
M0_INTERNAL m0_processor_nr_t m0_processor_nr_max(void)
Definition: processor.c:1093
struct m0_sm_group fl_group
Definition: fom.h:274
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
struct m0_chan fl_idle
Definition: fom.h:293
M0_INTERNAL void m0_locality_dom_unset(struct m0_fom_domain *dom)
Definition: locality.c:182
struct m0_fom_locality * lt_loc
Definition: fom.c:169
M0_INTERNAL int m0_fom_domain_init(struct m0_fom_domain **out)
Definition: fom.c:1173
struct m0_clink s_clink
Definition: sm.h:516
M0_INTERNAL void m0_fom_callback_init(struct m0_fom_callback *cb)
Definition: fom.c:1454
static void thr_addb2_leave(struct m0_loc_thread *thr, struct m0_fom_locality *loc)
Definition: fom.c:523
#define M0_ADDB2_PUSH(id,...)
Definition: addb2.h:261
int fl_idx
Definition: fom.h:296
static bool fom_clink_cb(struct m0_clink *link)
Definition: fom.c:1413
struct m0_bufvec data
Definition: di.c:40
M0_INTERNAL const char * m0_sm_state_name(const struct m0_sm *mach, int state)
Definition: sm.c:781
struct m0_chan s_chan
Definition: sm.h:518
struct m0_clink wd_clink
Definition: fom.c:175
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
static void fom_wait(struct m0_fom *fom)
Definition: fom.c:654
static void fom_timeout_cb(struct m0_sm_timer *timer)
Definition: fom.c:1545
M0_INTERNAL void m0_fom_wait_on(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1490
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL size_t m0_bitmap_set_nr(const struct m0_bitmap *map)
Definition: bitmap.c:172
static void loc_addb2_fini(struct m0_fom_locality *loc)
Definition: fom.c:1004
static void loc_fini(struct m0_fom_locality *loc)
Definition: fom.c:1019
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
M0_INTERNAL struct m0 * m0_get(void)
Definition: instance.c:41
Definition: sm.h:504
static int fom_timeout_start(struct m0_fom_timeout *to, struct m0_fom *fom, void(*cb)(struct m0_fom_callback *), m0_time_t deadline)
Definition: fom.c:1555
void m0_addb2_hist_mod(struct m0_addb2_hist *hist, int64_t val)
Definition: histogram.c:68
M0_INTERNAL void m0_chan_lock(struct m0_chan *ch)
Definition: chan.c:68
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
int m0_locality_call(struct m0_locality *loc, int(*cb)(void *), void *data)
Definition: locality.c:570
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
M0_INTERNAL bool m0_sm_addb2_counter_init(struct m0_sm *sm)
Definition: sm.c:891
M0_INTERNAL void m0_locality_chores_run(struct m0_locality *locality)
Definition: locality.c:305
#define FOM_PHASE_DEBUG
Definition: fom.h:236
#define TIME_P(t)
Definition: time.h:45
static bool fom_callback_is_armed(const struct m0_fom_callback *cb)
Definition: fom.c:1475
static bool fom_wait_is_completed(const struct fom_wait_data *wd)
Definition: fom.c:666
M0_INTERNAL const char * m0_rpc_machine_ep(const struct m0_rpc_machine *rmach)
Definition: rpc_machine.c:603
static int loc_thr_init(struct m0_loc_thread *th)
Definition: fom.c:960
static struct m0_rpc_item * item
Definition: item.c:56
uint64_t lt_magix
Definition: fom.c:171
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
Definition: sm.c:65
m0_fom_phase
Definition: fom.h:372
M0_INTERNAL int m0_fom_timeout_arm(struct m0_fom_timeout *to, struct m0_fom *fom, void(*cb)(struct m0_fom_callback *), m0_time_t deadline)
Definition: fom.c:1573
static struct m0_fom_domain_ops m0_fom_dom_ops
Definition: fom.c:208
static void group_unlock(struct m0_fom_locality *loc)
Definition: fom.c:224
const struct m0_fom_type * fo_type
Definition: dump.c:107
static void cb_done(struct m0_fom_callback *cb)
Definition: fom.c:747
#define M0_CHECK_EX(cond)
return M0_RC(rc)
struct m0_sm fo_sm_state
Definition: fom.h:524
static void fom_state_set(struct m0_fom *fom, enum m0_fom_state state)
Definition: fom.c:293
struct m0_fom * wd_fom
Definition: fom.c:177
M0_INTERNAL struct m0_thread_tls * m0_thread_tls(void)
Definition: kthread.c:67
#define M0_ENTRY(...)
Definition: trace.h:170
static void readyit(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: fom.c:441
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
M0_INTERNAL int m0_fom_timeout_wait_on(struct m0_fom_timeout *to, struct m0_fom *fom, m0_time_t deadline)
Definition: fom.c:1566
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
static bool hung_fom_notify(const struct m0_fom *fom)
Definition: fom.c:341
M0_INTERNAL void m0_chan_signal(struct m0_chan *chan)
Definition: chan.c:159
struct m0_locality fl_locality
Definition: fom.h:302
M0_INTERNAL bool m0_fom_is_waiting(const struct m0_fom *fom)
Definition: fom.c:1732
#define TIME_F
Definition: time.h:44
static void fom_addb2_push(struct m0_fom *fom)
Definition: fom.c:448
int i
Definition: dir.c:1033
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
bool fl_shutdown
Definition: fom.h:287
struct m0_chan_addb2 fl_chan_addb2
Definition: fom.h:304
struct m0_tl fl_wail
Definition: fom.h:263
static struct m0_sm_state_descr fom_states[]
Definition: fom.c:1616
struct m0_chan_addb2 * ch_addb2
Definition: chan.h:237
M0_INTERNAL bool m0_sm_timer_is_armed(const struct m0_sm_timer *timer)
Definition: sm.c:628
return M0_ERR(-EOPNOTSUPP)
struct m0_addb2_mach * fl_addb2_mach
Definition: fom.h:297
void m0_addb2_sys_fini(struct m0_addb2_sys *sys)
Definition: sys.c:214
struct m0_chan fl_runrun
Definition: fom.h:282
uint64_t ga_forq
Definition: sm.h:803
M0_INTERNAL void m0_clink_attach(struct m0_clink *link, struct m0_clink *group, m0_chan_cb_t cb)
Definition: chan.c:215
#define M0_ADDB2_IN(id, stmnt,...)
Definition: addb2.h:281
static void cb_cancel(struct m0_fom_callback *cb)
Definition: fom.c:1503
M0_INTERNAL void m0_fom_ready(struct m0_fom *fom)
Definition: fom.c:429
Definition: cnt.h:36
void m0_addb2_push(uint64_t id, int n, const uint64_t *value)
Definition: addb2.c:412
struct m0_atomic64 fl_unblocking
Definition: fom.h:292
M0_INTERNAL int m0_dtx_fol_add(struct m0_dtx *tx)
Definition: dtm.c:169
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
M0_INTERNAL void m0_addb2_global_thread_leave(void)
Definition: global.c:58
struct m0_fom_callback * fo_pending
Definition: fom.h:530
M0_INTERNAL bool m0_fom_domain_is_idle_for(const struct m0_reqh_service *svc)
Definition: fom.c:1281
M0_INTERNAL void m0_fom_callback_fini(struct m0_fom_callback *cb)
Definition: fom.c:1497
struct m0_clink fc_clink
Definition: fom.h:446
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
int m0_addb2_sys_init(struct m0_addb2_sys **out, const struct m0_addb2_config *conf)
Definition: sys.c:176
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
M0_INTERNAL bool m0_locality_invariant(const struct m0_fom_locality *loc)
Definition: fom.c:266
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
Definition: sm.c:53
struct m0_sm_group_addb2 fl_grp_addb2
Definition: fom.h:303
struct m0_sm_group * tr_grp
Definition: sm.h:612
m0_time_t m0_time_now(void)
Definition: time.c:134
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
M0_INTERNAL bool m0_fom_group_is_locked(const struct m0_fom *fom)
Definition: fom.c:229
static int fom_wait_init(void *data)
Definition: fom.c:701
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
static struct m0_thread t[8]
Definition: service_ut.c:1230
static bool thread_invariant(const struct m0_loc_thread *t)
Definition: fom.c:244
#define M0_ADDB2_OBJ(obj)
Definition: addb2.h:276
unsigned fl_foms
Definition: fom.h:271
M0_INTERNAL void m0_fom_fdmi_record_post(struct m0_fom *fom)
Definition: fom.c:1742
M0_INTERNAL bool m0_fom_domain_is_idle(const struct m0_fom_domain *dom)
Definition: fom.c:1289
static void m0_atomic64_dec(struct m0_atomic64 *a)
static int fom_wait_rc(const struct fom_wait_data *wd)
Definition: fom.c:672
M0_INTERNAL void m0_locality_dom_set(struct m0_fom_domain *dom)
Definition: locality.c:174
M0_INTERNAL void m0_addb2_global_thread_enter(void)
Definition: global.c:43
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
M0_INTERNAL void m0_clink_cleanup_locked(struct m0_clink *link)
Definition: chan.c:319
M0_INTERNAL void m0_fom_type_init(struct m0_fom_type *type, uint64_t id, const struct m0_fom_type_ops *ops, const struct m0_reqh_service_type *svc_type, const struct m0_sm_conf *sm)
Definition: fom.c:1596
static struct m0_stob_domain * dom
Definition: storage.c:38
M0_INTERNAL void m0_fom_block_leave(struct m0_fom *fom)
Definition: fom.c:582
M0_INTERNAL struct m0_sm_conf fom_states_conf
Definition: fom.c:202
struct m0_fop * m0_fop_get(struct m0_fop *fop)
Definition: fop.c:162
M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
Definition: chan.c:326
struct m0_rpc_item * ri_reply
Definition: item.h:163
static void core_mask_apply(struct m0_bitmap *onln_cpu_map)
Definition: fom.c:1145
struct m0_thread lt_thread
Definition: fom.c:167
M0_INTERNAL void m0_processors_online(struct m0_bitmap *map)
Definition: processor.c:1113
static void loc_thr_fini(struct m0_loc_thread *th)
Definition: fom.c:965
#define M0_POST(cond)
M0_INTERNAL void m0_fol_fdmi_post_record(struct m0_fom *fom)
Definition: fol_fdmi_src.c:646
int m0_locality_chore_init(struct m0_locality_chore *chore, const struct m0_locality_chore_ops *ops, void *datum, m0_time_t interval, size_t datasize)
Definition: locality.c:270
M0_INTERNAL void m0_sm_addb2_fini(struct m0_sm_conf *conf)
Definition: sm.c:870
static void cb_run(struct m0_fom_callback *cb)
Definition: fom.c:762
struct m0_sm_timer to_timer
Definition: fom.h:791
Definition: reqh.h:94
M0_INTERNAL void m0_bitmap_set(struct m0_bitmap *map, size_t idx, bool val)
Definition: bitmap.c:139
const struct m0_fom_type * fo_type
Definition: fom.h:485
struct m0_addb2_hist fl_runq_counter
Definition: fom.h:299
Definition: dump.c:103
Definition: chan.h:229
void m0_addb2_clock_add(struct m0_addb2_sensor *clock, uint64_t label, int idx)
Definition: counter.c:109
void(* fc_bottom)(struct m0_fom_callback *cb)
Definition: fom.h:463
struct m0_sm_ast * sa_next
Definition: sm.h:509
struct m0_rpc_conn conn
Definition: fsync.c:96
m0_fom_state
Definition: fom.h:355
static struct m0_clink clink[RDWR_REQUEST_MAX]
int m0_fom_phase(const struct m0_fom *fom)
Definition: fom.c:1716
struct m0_mutex s_lock
Definition: sm.h:514
uint64_t fo_magic
Definition: fom.h:534
struct m0_addb2_hist ga_forq_hist
Definition: sm.h:804
M0_INTERNAL void m0_fom_sm_init(struct m0_fom *fom)
Definition: fom.c:1674
struct m0_tlink lt_linkage
Definition: fom.c:168
#define FID_P(f)
Definition: fid.h:77
uint64_t m0_time_seconds(const m0_time_t time)
Definition: time.c:83
bool(* fc_top)(struct m0_fom_callback *cb)
Definition: fom.h:458
void m0_addb2_pop(uint64_t id)
Definition: addb2.c:440
M0_INTERNAL void m0_chan_unlock(struct m0_chan *ch)
Definition: chan.c:73
#define M0_MOTR_IEM_DESC(_sev_id, _mod_id, _evt_id, _desc,...)
Definition: iem.h:103
void m0_addb2_hist_add_auto(struct m0_addb2_hist *hist, int skip, uint64_t label, int idx)
Definition: histogram.c:50
struct m0_clink lt_clink
Definition: fom.c:170
M0_INTERNAL void m0_fom_callback_arm(struct m0_fom *fom, struct m0_chan *chan, struct m0_fom_callback *cb)
Definition: fom.c:1460
M0_TL_DEFINE(thr, static, struct m0_loc_thread)
M0_INTERNAL bool m0_fom_domain_invariant(const struct m0_fom_domain *dom)
Definition: fom.c:255
static struct m0_sm_conf fom_states_conf0
Definition: fom.c:201
static int64_t m0_atomic64_get(const struct m0_atomic64 *a)
struct m0_tl fl_threads
Definition: fom.h:291
M0_INTERNAL int m0_fom_timedwait(struct m0_fom *fom, uint64_t phases, m0_time_t deadline)
Definition: fom.c:724
#define m0_forall(var, nr,...)
Definition: misc.h:112
uint32_t sd_flags
Definition: sm.h:378
Definition: fom.h:481
static void m0_mb(void)
static void fom_exec(struct m0_fom *fom)
Definition: fom.c:780
static bool is_in_runq(const struct m0_fom *fom)
Definition: fom.c:234
void m0_locality_chore_fini(struct m0_locality_chore *chore)
Definition: locality.c:296
static bool fom_is_blocked(const struct m0_fom *fom)
Definition: fom.c:298
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
Definition: fom.c:150
struct m0_reqh reqh
Definition: rm_foms.c:48
M0_INTERNAL void m0_fom_timeout_fini(struct m0_fom_timeout *to)
Definition: fom.c:1539
struct m0_addb2_mach * m0_addb2_sys_get(struct m0_addb2_sys *sys)
Definition: sys.c:272
static void * cb_next(struct m0_fom_callback *cb)
Definition: fom.c:770
#define M0_CNT_INC(cnt)
Definition: arith.h:226
struct m0_bitmap pca_core_mask
Definition: process_attr.h:54
static struct m0_chan chan[RDWR_REQUEST_MAX]
M0_INTERNAL bool m0_tlist_invariant(const struct m0_tl_descr *d, const struct m0_tl *list)
Definition: tlist.c:236
M0_INTERNAL void m0_sm_timer_cancel(struct m0_sm_timer *timer)
Definition: sm.c:610
static M0_UNUSED struct m0_fom * sm2fom(struct m0_sm *sm)
Definition: fom.c:306
struct m0_fom_locality * fo_loc
Definition: fom.h:483
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
static void group_lock(struct m0_fom_locality *loc)
Definition: fom.c:219
static void queueit(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: fom.c:502
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_chan sm_chan
Definition: sm.h:331
bool(* fdo_time_is_out)(const struct m0_fom_domain *dom, const struct m0_fom *fom)
Definition: fom.h:348
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
M0_INTERNAL bool m0_chan_timedwait(struct m0_clink *link, const m0_time_t abs_timeout)
Definition: chan.c:349
struct m0_sm_group rh_sm_grp
Definition: reqh.h:107
enum loc_thread_state lt_state
Definition: fom.c:166
M0_INTERNAL struct m0_fom_domain * m0_fom_dom(void)
Definition: locality.c:575
m0_time_t m0_time_sub(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:65
struct m0_rpc_session * ri_session
Definition: item.h:147
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL bool m0_fom_is_waiting_on(const struct m0_fom *fom)
Definition: fom.c:1480
M0_INTERNAL int m0_fom_rc(const struct m0_fom *fom)
Definition: fom.c:1727
Definition: sm.h:301
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
Definition: bitmap.c:105
M0_INTERNAL void m0_locality_init(struct m0_locality *loc, struct m0_sm_group *grp, struct m0_fom_domain *dom, size_t idx)
Definition: locality.c:126
struct m0_sm_conf ft_conf
Definition: fom.h:615
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_sm_move(struct m0_sm *mach, int32_t rc, int state)
Definition: sm.c:485
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
loc_thread_state
Definition: fom.c:146
#define IS_IN_ARRAY(idx, array)
Definition: misc.h:311
M0_INTERNAL void m0_fom_callback_cancel(struct m0_fom_callback *cb)
Definition: fom.c:1514
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
struct m0_fom * fc_fom
Definition: fom.h:452
static bool fom_wait_time_is_out(const struct m0_fom_domain *dom, const struct m0_fom *fom)
Definition: fom.c:399
M0_INTERNAL void m0_fom_queue(struct m0_fom *fom)
Definition: fom.c:624
struct m0_addb2_hist ca_wait_hist
Definition: chan.h:452
M0_INTERNAL struct m0_uint128 m0_node_uuid
Definition: uuid.c:126
struct m0_proc_attr i_proc_attr
Definition: instance.h:143
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
struct m0_addb2_sys * fd_addb2_sys
Definition: fom.h:338
static struct m0_fom * fom_dequeue(struct m0_fom_locality *loc)
Definition: fom.c:851
struct m0_sm fo_sm_phase
Definition: fom.h:522
M0_INTERNAL int m0_fom_fol_rec_add(struct m0_fom *fom)
Definition: fom.c:1737
#define M0_MKTIME(secs, ns)
Definition: time.h:86
static void fom_type(struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
Definition: dump.c:516
M0_INTERNAL void m0_semaphore_drain(struct m0_semaphore *semaphore)
Definition: semaphore.c:25
void(* co_tick)(struct m0_locality_chore *chore, struct m0_locality *loc, void *place)
Definition: locality.h:130
struct m0_thread_handle t_h
Definition: thread.h:112
struct m0_addb2_hist ca_cb_hist
Definition: chan.h:453
static void thr_addb2_enter(struct m0_loc_thread *thr, struct m0_fom_locality *loc)
Definition: fom.c:514
M0_INTERNAL void m0_fom_locality_inc(struct m0_fom *fom)
Definition: fom.c:1295
void m0_addb2_sys_put(struct m0_addb2_sys *sys, struct m0_addb2_mach *m)
Definition: sys.c:295
M0_INTERNAL void m0_sm_ast_cancel(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:183
struct m0_addb2_hist fl_wail_counter
Definition: fom.h:300
M0_INTERNAL uint64_t m0_pid(void)
Definition: kthread.c:290
#define out(...)
Definition: gen.c:41
size_t fl_wail_nr
Definition: fom.h:264
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
int type
Definition: dir.c:1031
Definition: fom.c:147
M0_INTERNAL void m0_fom_timeout_init(struct m0_fom_timeout *to)
Definition: fom.c:1532
size_t b_nr
Definition: bitmap.h:44
M0_INTERNAL void m0_sm_timer_fini(struct m0_sm_timer *timer)
Definition: sm.c:566
struct m0_fom_ops ops
Definition: io_foms.c:623
static int fom_wait_fini(void *data)
Definition: fom.c:715
void m0_addb2_hist_add(struct m0_addb2_hist *hist, int64_t min, int64_t max, uint64_t label, int idx)
Definition: histogram.c:36
M0_INTERNAL void m0_sm_asts_run(struct m0_sm_group *grp)
Definition: sm.c:150
M0_INTERNAL void m0_sm_timer_init(struct m0_sm_timer *timer)
Definition: sm.c:559
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
M0_INTERNAL uint64_t m0_sm_id_get(const struct m0_sm *sm)
Definition: sm.c:1021
uint64_t as_id
Definition: sm.h:797
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
static bool is_loc_locker_empty(struct m0_fom_locality *loc, uint32_t key)
Definition: fom.c:1276
size_t fd_localities_nr
Definition: fom.h:333
struct m0_rpc_item f_item
Definition: fop.h:83
const struct m0_reqh_service_type * ft_rstype
Definition: fom.h:617
M0_INTERNAL struct m0_fom_type * m0_fom__types[M0_OPCODES_NR]
Definition: fom.c:1594
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
bool wd_completed
Definition: fom.c:178
M0_INTERNAL struct m0_reqh * m0_fom2reqh(const struct m0_fom *fom)
Definition: fom.c:1749
Definition: fom.c:148
int32_t rc
Definition: trigger_fop.h:47
static bool fom_wait_cb(struct m0_clink *clink)
Definition: fom.c:684
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL bool m0_tlink_invariant(const struct m0_tl_descr *d, const void *obj)
Definition: tlist.c:275
size_t fl_runq_nr
Definition: fom.h:260
M0_INTERNAL int m0_sm_timer_start(struct m0_sm_timer *timer, struct m0_sm_group *group, void(*cb)(struct m0_sm_timer *), m0_time_t deadline)
Definition: sm.c:577
struct m0_fom_callback fo_cb
Definition: fom.h:488
struct m0_rpc_conn * s_conn
Definition: session.h:312
M0_TL_DESCR_DEFINE(thr, "fom thread", static, struct m0_loc_thread, lt_linkage, lt_magix, M0_FOM_THREAD_MAGIC, M0_FOM_THREAD_HEAD_MAGIC)
struct m0_sm_ast fc_ast
Definition: fom.h:450
Definition: fop.h:79
struct m0_sm_conf ft_state_conf
Definition: fom.h:616
static int64_t m0_atomic64_add_return(struct m0_atomic64 *a, int64_t d)
static void empty(void)
Definition: consumer.c:101
M0_INTERNAL struct m0_reqh * m0_fom_reqh(const struct m0_fom *fom)
Definition: fom.c:283
Definition: trace.h:478
M0_INTERNAL void m0_fom_timeout_cancel(struct m0_fom_timeout *to)
Definition: fom.c:1581
static void m0_atomic64_set(struct m0_atomic64 *a, int64_t num)
M0_INTERNAL const char * m0_rpc_item_remote_ep_addr(const struct m0_rpc_item *item)
Definition: item.c:1188
M0_INTERNAL bool m0_fom_invariant(const struct m0_fom *fom)
Definition: fom.c:311
Definition: idx_mock.c:47
enum m0_fc_state fc_state
Definition: fom.h:451
M0_INTERNAL const char * m0_fom_phase_name(const struct m0_fom *fom, int phase)
Definition: fom.c:1722
static bool is_in_wail(const struct m0_fom *fom)
Definition: fom.c:239
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331
static int loc_init(struct m0_fom_locality *loc, struct m0_fom_domain *dom, size_t idx)
Definition: fom.c:1065
#define M0_UNUSED
Definition: misc.h:380