Motr  M0
rm.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
24 #undef M0_TRACE_SUBSYSTEM
25 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_RM
26 
27 #include "lib/memory.h" /* M0_ALLOC_PTR */
28 #include "lib/misc.h" /* M0_SET_ARR0 */
29 #include "lib/errno.h" /* ETIMEDOUT */
30 #include "lib/arith.h" /* M0_CNT_{INC,DEC} */
31 #include "lib/mutex.h"
32 #include "lib/trace.h"
33 #include "lib/bob.h"
34 #include "lib/finject.h" /* M0_FI_ENABLED */
35 #include "fid/fid.h"
36 #include "addb2/addb2.h"
37 #include "motr/magic.h"
38 #include "sm/sm.h"
39 #include "conf/obj.h" /* m0_conf_obj */
40 #include "fid/fid.h" /* m0_fid */
41 
42 #include "rm/rm.h"
43 #include "rm/rm_internal.h"
44 #include "rpc/service.h" /* m0_rpc_service_session_release */
45 
51 
52 static void resource_get (struct m0_rm_resource *res);
53 static void resource_put (struct m0_rm_resource *res);
54 static bool resource_list_check (const struct m0_rm_resource *res,
55  void *datum);
56 static bool resource_type_invariant(const struct m0_rm_resource_type *rt);
57 
58 static void owner_balance (struct m0_rm_owner *o);
59 static bool owner_invariant (struct m0_rm_owner *owner);
60 static void pin_del (struct m0_rm_pin *pin);
61 static bool owner_invariant_state (const struct m0_rm_owner *owner,
62  struct owner_invariant_state *is);
63 static struct m0_rm_incoming *cr2in(const struct m0_rm_credit *cr);
64 static void incoming_check (struct m0_rm_incoming *in);
65 static int incoming_check_with (struct m0_rm_incoming *in,
66  struct m0_rm_credit *credit);
67 static void incoming_complete (struct m0_rm_incoming *in, int32_t rc);
68 static void incoming_policy_apply (struct m0_rm_incoming *in);
69 static void incoming_policy_none (struct m0_rm_incoming *in);
70 static bool incoming_invariant (const struct m0_rm_incoming *in);
71 static int incoming_pin_nr (const struct m0_rm_incoming *in,
72  uint32_t flags);
73 static void incoming_release (struct m0_rm_incoming *in);
74 
75 static int credit_pin_nr (const struct m0_rm_credit *credit,
76  uint32_t flags);
77 static int service_locate (struct m0_rm_resource_type *rtype,
78  struct m0_rm_remote *rem);
79 static int resource_locate (struct m0_rm_resource_type *rtype,
80  struct m0_rm_remote *rem);
81 static int outgoing_check (struct m0_rm_incoming *in,
83  struct m0_rm_credit *credit,
84  struct m0_rm_remote *other);
85 static int revoke_send (struct m0_rm_incoming *in,
86  struct m0_rm_loan *loan,
87  struct m0_rm_credit *credit);
88 static int cancel_send (struct m0_rm_loan *loan);
89 static int borrow_send (struct m0_rm_incoming *in,
90  struct m0_rm_credit *credit);
91 static bool credit_eq (const struct m0_rm_credit *c0,
92  const struct m0_rm_credit *c1);
93 static bool credit_is_empty (const struct m0_rm_credit *credit);
94 static bool credit_intersects (const struct m0_rm_credit *A,
95  const struct m0_rm_credit *B);
96 static bool credit_conflicts (const struct m0_rm_credit *A,
97  const struct m0_rm_credit *B);
98 static int credit_diff (struct m0_rm_credit *c0,
99  const struct m0_rm_credit *c1);
100 static void conflict_notify (struct m0_rm_credit *credit);
101 static void windup_incoming_complete(struct m0_rm_incoming *in,
102  int32_t rc);
103 static void windup_incoming_conflict(struct m0_rm_incoming *in);
104 static int cached_credits_hold (struct m0_rm_incoming *in);
105 static void cached_credits_clear (struct m0_rm_owner *owner);
106 static bool owner_is_idle (const struct m0_rm_owner *o);
107 static bool owner_is_liquidated (const struct m0_rm_owner *o);
108 static bool incoming_is_complete (const struct m0_rm_incoming *in);
109 static int remnant_credit_get (const struct m0_rm_credit *src,
110  const struct m0_rm_credit *diff,
111  struct m0_rm_credit **remnant_credit);
112 static int remnant_loan_get (const struct m0_rm_loan *loan,
113  const struct m0_rm_credit *credit,
114  struct m0_rm_loan **remnant_loan);
115 static int loan_dup (const struct m0_rm_loan *src_loan,
116  struct m0_rm_loan **dest_loan);
117 static void owner_liquidate (struct m0_rm_owner *src_owner);
118 static void owner_cleanup (struct m0_rm_owner *owner);
119 static void credit_processor (struct m0_rm_resource_type *rt);
120 static bool rm_on_remote_death_cb (struct m0_clink *link);
121 static bool owner_smgrp_is_locked (const struct m0_rm_owner *owner);
122 
123 static void rm_remote_online_handler(struct m0_rm_remote *remote);
124 static void rm_remote_death_handler(struct m0_rm_remote *remote);
125 
126 #define INCOMING_CREDIT(in) in->rin_want.cr_datum
127 
128 M0_TL_DESCR_DEFINE(res, "resources", , struct m0_rm_resource,
129  r_linkage, r_magix,
131 M0_TL_DEFINE(res, M0_INTERNAL, struct m0_rm_resource);
132 
133 static struct m0_bob_type resource_bob;
135 
136 M0_TL_DESCR_DEFINE(m0_rm_ur, "usage credits", , struct m0_rm_credit,
137  cr_linkage, cr_magix,
139 M0_TL_DEFINE(m0_rm_ur, M0_INTERNAL, struct m0_rm_credit);
140 
141 M0_TL_DESCR_DEFINE(m0_remotes, "remote owners", , struct m0_rm_remote,
142  rem_res_linkage, rem_magix,
144 M0_TL_DEFINE(m0_remotes, M0_INTERNAL, struct m0_rm_remote);
145 
146 M0_TL_DESCR_DEFINE(m0_owners, "local owners", , struct m0_rm_owner,
147  ro_owner_linkage, ro_magix,
149 M0_TL_DEFINE(m0_owners, M0_INTERNAL, struct m0_rm_owner);
150 
151 static const struct m0_bob_type credit_bob = {
152  .bt_name = "credit",
153  .bt_magix_offset = offsetof(struct m0_rm_credit, cr_magix),
154  .bt_magix = M0_RM_CREDIT_MAGIC,
155  .bt_check = NULL
156 };
157 M0_BOB_DEFINE(M0_INTERNAL, &credit_bob, m0_rm_credit);
158 
159 M0_TL_DESCR_DEFINE(pr, "pins-of-credit", , struct m0_rm_pin,
160  rp_credit_linkage, rp_magix,
162 M0_TL_DEFINE(pr, M0_INTERNAL, struct m0_rm_pin);
163 
164 M0_TL_DESCR_DEFINE(pi, "pins-of-incoming", , struct m0_rm_pin,
165  rp_incoming_linkage, rp_magix,
167 M0_TL_DEFINE(pi, M0_INTERNAL, struct m0_rm_pin);
168 
169 static bool pin_check(const void *bob);
170 
171 static const struct m0_bob_type pin_bob = {
172  .bt_name = "pin",
173  .bt_magix_offset = offsetof(struct m0_rm_pin, rp_magix),
174  .bt_magix = M0_RM_PIN_MAGIC,
175  .bt_check = pin_check
176 };
177 M0_BOB_DEFINE(static, &pin_bob, m0_rm_pin);
178 
179 M0_INTERNAL const struct m0_bob_type loan_bob = {
180  .bt_name = "loan",
181  .bt_magix_offset = offsetof(struct m0_rm_loan, rl_magix),
182  .bt_magix = M0_RM_LOAN_MAGIC,
183  .bt_check = NULL
184 };
186 
187 static const struct m0_bob_type incoming_bob = {
188  .bt_name = "incoming request",
189  .bt_magix_offset = offsetof(struct m0_rm_incoming, rin_magix),
190  .bt_magix = M0_RM_INCOMING_MAGIC,
191  .bt_check = NULL
192 };
194 
195 static const struct m0_bob_type outgoing_bob = {
196  .bt_name = "outgoing request ",
197  .bt_magix_offset = offsetof(struct m0_rm_outgoing, rog_magix),
198  .bt_magix = M0_RM_OUTGOING_MAGIC,
199  .bt_check = NULL
200 };
202 
203 static const struct m0_bob_type rem_bob = {
204  .bt_name = "proxy for remote owner ",
205  .bt_magix_offset = offsetof(struct m0_rm_remote, rem_magix),
206  .bt_magix = M0_RM_REMOTE_MAGIC,
207  .bt_check = NULL
208 };
209 M0_BOB_DEFINE(M0_INTERNAL, &rem_bob, m0_rm_remote);
210 
211 const struct m0_uint128 m0_rm_no_group = M0_UINT128(0, 0);
212 
214 
215 M0_INTERNAL void m0_rm_domain_init(struct m0_rm_domain *dom)
216 {
217  M0_ENTRY();
218  M0_PRE(dom != NULL);
219 
220  M0_SET_ARR0(dom->rd_types);
221  m0_mutex_init(&dom->rd_lock);
223  M0_LEAVE();
224 }
225 M0_EXPORTED(m0_rm_domain_init);
226 
227 M0_INTERNAL void m0_rm_domain_fini(struct m0_rm_domain *dom)
228 {
229  M0_ENTRY();
230  M0_PRE(m0_forall(i, ARRAY_SIZE(dom->rd_types),
231  dom->rd_types[i] == NULL));
232  m0_mutex_fini(&dom->rd_lock);
233  M0_LEAVE();
234 }
235 M0_EXPORTED(m0_rm_domain_fini);
236 
239  .rio_conflict = windup_incoming_conflict,
240 };
241 
242 M0_INTERNAL struct m0_rm_resource *
244  const struct m0_rm_resource *res)
245 {
246  M0_PRE(rt->rt_ops->rto_eq != NULL);
247 
248  return m0_tl_find(res, scan, &rt->rt_resources,
249  rt->rt_ops->rto_eq(res, scan));
250 }
251 
252 M0_INTERNAL int m0_rm_type_register(struct m0_rm_domain *dom,
253  struct m0_rm_resource_type *rt)
254 {
255  int rc;
256 
257  M0_ENTRY("resource type: %s", rt->rt_name);
258  M0_PRE(rt->rt_dom == NULL);
259  M0_PRE(IS_IN_ARRAY(rt->rt_id, dom->rd_types));
260  M0_PRE(dom->rd_types[rt->rt_id] == NULL);
261 
262  m0_mutex_init(&rt->rt_lock);
263  res_tlist_init(&rt->rt_resources);
264  rt->rt_nr_resources = 0;
265  m0_sm_group_init(&rt->rt_sm_grp);
266 
267  m0_mutex_init(&rt->rt_queue_guard);
268  m0_queue_init(&rt->rt_ha_events);
269  rt->rt_stop_worker = false;
270  rc = M0_THREAD_INIT(&rt->rt_worker, struct m0_rm_resource_type *, NULL,
271  &credit_processor, rt, "m0_rm_rt_agent");
272  if (rc != 0)
273  return M0_RC(rc);
274 
275  m0_mutex_lock(&dom->rd_lock);
276  dom->rd_types[rt->rt_id] = rt;
277  rt->rt_dom = dom;
279  m0_mutex_unlock(&dom->rd_lock);
280 
281  M0_POST(dom->rd_types[rt->rt_id] == rt);
282  M0_POST(rt->rt_dom == dom);
283 
284  return M0_RC(rc);
285 }
286 M0_EXPORTED(m0_rm_type_register);
287 
289 {
290  struct m0_rm_domain *dom = rt->rt_dom;
291 
292  M0_ENTRY("resource type: %s", rt->rt_name);
293  M0_PRE(dom != NULL);
294  M0_PRE(res_tlist_is_empty(&rt->rt_resources));
295  M0_PRE(rt->rt_nr_resources == 0);
296 
297  m0_mutex_lock(&dom->rd_lock);
298  M0_PRE(IS_IN_ARRAY(rt->rt_id, dom->rd_types));
299  M0_PRE(dom->rd_types[rt->rt_id] == rt);
301 
302  dom->rd_types[rt->rt_id] = NULL;
303  m0_mutex_unlock(&dom->rd_lock);
304 
305  m0_sm_group_lock(&rt->rt_sm_grp);
306  rt->rt_stop_worker = true;
307  m0_clink_signal(&rt->rt_sm_grp.s_clink);
308  m0_sm_group_unlock(&rt->rt_sm_grp);
309 
310  M0_LOG(M0_DEBUG, "Waiting for RM RT agent to join");
311  m0_thread_join(&rt->rt_worker);
312  m0_thread_fini(&rt->rt_worker);
313  m0_sm_group_fini(&rt->rt_sm_grp);
314  m0_queue_fini(&rt->rt_ha_events);
315  m0_mutex_fini(&rt->rt_queue_guard);
316 
317  rt->rt_dom = NULL;
318  res_tlist_fini(&rt->rt_resources);
319  m0_mutex_fini(&rt->rt_lock);
320 
321  M0_POST(rt->rt_dom == NULL);
322  M0_LEAVE();
323 }
324 M0_EXPORTED(m0_rm_type_deregister);
325 
326 M0_INTERNAL struct m0_rm_resource_type *
328  const uint64_t rtype_id)
329 {
330  M0_PRE(dom != NULL);
331  M0_PRE(IS_IN_ARRAY(rtype_id, dom->rd_types));
332  M0_PRE(dom->rd_types[rtype_id] != NULL);
333 
334  return dom->rd_types[rtype_id];
335 }
336 
337 M0_INTERNAL void m0_rm_resource_add(struct m0_rm_resource_type *rtype,
338  struct m0_rm_resource *res)
339 {
340  M0_ENTRY("res-type: %p resource : %p", rtype, res);
341 
342  m0_mutex_lock(&rtype->rt_lock);
344  M0_PRE(res->r_ref == 0);
346  res->r_type = rtype;
347  res_tlink_init_at(res, &rtype->rt_resources);
348  m0_remotes_tlist_init(&res->r_remotes);
349  m0_mutex_init(&res->r_mutex);
350  m0_owners_tlist_init(&res->r_local);
351  m0_rm_resource_bob_init(res);
352  M0_CNT_INC(rtype->rt_nr_resources);
355  m0_mutex_unlock(&rtype->rt_lock);
356  M0_POST(res->r_type == rtype);
357  M0_LEAVE();
358 }
359 M0_EXPORTED(m0_rm_resource_add);
360 
361 M0_INTERNAL void m0_rm_resource_del(struct m0_rm_resource *res)
362 {
363  struct m0_rm_resource_type *rtype = res->r_type;
364 
365  M0_ENTRY("resource : %p", res);
366  m0_mutex_lock(&rtype->rt_lock);
367  M0_PRE(res->r_ref == 0);
369  M0_PRE(m0_remotes_tlist_is_empty(&res->r_remotes));
370  M0_PRE(m0_owners_tlist_is_empty(&res->r_local));
372 
373  res_tlink_del_fini(res);
374  M0_CNT_DEC(rtype->rt_nr_resources);
375 
378  m0_remotes_tlist_fini(&res->r_remotes);
379  m0_owners_tlist_fini(&res->r_local);
380  m0_rm_resource_bob_fini(res);
381  m0_mutex_unlock(&rtype->rt_lock);
382  M0_LEAVE();
383 }
384 M0_EXPORTED(m0_rm_resource_del);
385 
386 M0_INTERNAL void m0_rm_resource_free(struct m0_rm_resource *res)
387 {
388  M0_PRE(res->r_ops != NULL && res->r_ops->rop_resource_free != NULL);
389 
390  res->r_ops->rop_resource_free(res);
391 }
392 
393 M0_INTERNAL int m0_rm_resource_encode(struct m0_rm_resource *res,
394  struct m0_buf *buf)
395 {
396  struct m0_bufvec datum_buf;
397  struct m0_bufvec_cursor cursor;
398 
399  M0_PRE(buf != NULL);
400  M0_PRE(res->r_type != NULL);
401  M0_PRE(res->r_type->rt_ops != NULL);
402  M0_PRE(res->r_type->rt_ops->rto_len != NULL);
403  M0_PRE(res->r_type->rt_ops->rto_encode != NULL);
404 
405  /*
406  * A resource type ID needs to be encoded before encoding resource
407  * onto buffer; This is required because whenever a borrow request
408  * reaches creditor, type needs to be identified to call type specific
409  * decode function.
410  */
411  buf->b_nob = sizeof res->r_type->rt_id +
412  res->r_type->rt_ops->rto_len(res);
413  buf->b_addr = m0_alloc(buf->b_nob);
414  if (buf->b_addr == NULL)
415  return M0_ERR(-ENOMEM);
416 
417  datum_buf.ov_buf = &buf->b_addr;
418  datum_buf.ov_vec.v_nr = 1;
419  datum_buf.ov_vec.v_count = &buf->b_nob;
420 
421  m0_bufvec_cursor_init(&cursor, &datum_buf);
422 
423  /*
424  * Copy resource type ID first to buffer, followed by actual
425  * resource description
426  */
427  m0_bufvec_cursor_copyto(&cursor, (void *)&res->r_type->rt_id,
428  sizeof res->r_type->rt_id);
429  return M0_RC(res->r_type->rt_ops->rto_encode(&cursor, res));
430 }
431 M0_EXPORTED(m0_rm_resource_encode);
432 
433 static void resource_get(struct m0_rm_resource *res)
434 {
435  struct m0_rm_resource_type *rtype = res->r_type;
436  uint32_t count;
437 
438  M0_ENTRY("resource : %p", res);
439  m0_mutex_lock(&rtype->rt_lock);
440  count = res->r_ref;
441  M0_CNT_INC(res->r_ref);
442  m0_mutex_unlock(&rtype->rt_lock);
443  M0_LOG(M0_DEBUG, "ref[%u -> %u]", count, count + 1);
444  M0_LEAVE();
445 }
446 
447 static void resource_put(struct m0_rm_resource *res)
448 {
449  struct m0_rm_resource_type *rtype = res->r_type;
450  uint32_t count;
451 
452  M0_ENTRY("resource : %p", res);
453  m0_mutex_lock(&rtype->rt_lock);
454  count = res->r_ref;
455  M0_CNT_DEC(res->r_ref);
456  m0_mutex_unlock(&rtype->rt_lock);
457  M0_LOG(M0_DEBUG, "ref[%u -> %u]", count, count - 1);
458  M0_LEAVE();
459 }
460 
461 static struct m0_sm_state_descr owner_states[] = {
462  [ROS_INITIAL] = {
464  .sd_name = "Init",
465  .sd_allowed = M0_BITS(ROS_INITIALISING)
466  },
467  [ROS_INITIALISING] = {
468  .sd_name = "Initialising",
469  .sd_allowed = M0_BITS(ROS_ACTIVE)
470  },
471  [ROS_ACTIVE] = {
472  .sd_name = "Active",
473  .sd_allowed = M0_BITS(ROS_QUIESCE)
474  },
475  [ROS_QUIESCE] = {
476  .sd_name = "Quiesce",
477  .sd_allowed = M0_BITS(ROS_FINALISING)
478  },
479  [ROS_FINALISING] = {
480  .sd_name = "Finalising",
481  .sd_allowed = M0_BITS(ROS_INSOLVENT, ROS_DEAD_CREDITOR,
482  ROS_FINAL)
483  },
484  [ROS_DEAD_CREDITOR] = {
485  .sd_flags = M0_SDF_FAILURE | M0_SDF_FINAL,
486  .sd_name = "Creditor_is_dead",
487  .sd_allowed = M0_BITS(ROS_ACTIVE, ROS_FINAL),
488  },
489  [ROS_INSOLVENT] = {
490  .sd_flags = M0_SDF_TERMINAL,
491  .sd_name = "Insolvent"
492  },
493  [ROS_FINAL] = {
494  .sd_flags = M0_SDF_TERMINAL,
495  .sd_name = "Fini"
496  }
497 };
498 
499 static const struct m0_sm_conf owner_conf = {
500  .scf_name = "Resource Owner",
501  .scf_nr_states = ARRAY_SIZE(owner_states),
502  .scf_state = owner_states
503 };
504 
505 /*
506  * Group lock is held
507  */
508 static inline void owner_state_set(struct m0_rm_owner *owner,
509  enum m0_rm_owner_state state)
510 {
511  M0_LOG(M0_INFO, "Owner: %p, state change:[%s -> %s]\n",
512  owner, m0_sm_state_name(&owner->ro_sm, owner->ro_sm.sm_state),
513  m0_sm_state_name(&owner->ro_sm, state));
514  m0_sm_state_set(&owner->ro_sm, state);
515 }
516 
517 static inline void owner_fail(struct m0_rm_owner *owner,
518  enum m0_rm_owner_state state,
519  int rc)
520 {
521  M0_LOG(M0_INFO, "Owner: %p, state change:[%s -> %s] err %d\n",
522  owner, m0_sm_state_name(&owner->ro_sm, owner->ro_sm.sm_state),
523  m0_sm_state_name(&owner->ro_sm, state), rc);
524  m0_sm_fail(&owner->ro_sm, state, rc);
525 }
526 
527 static bool owner_has_loans(struct m0_rm_owner *owner)
528 {
529  return !m0_rm_ur_tlist_is_empty(&owner->ro_sublet) ||
531 }
532 
533 static void owner_finalisation_check(struct m0_rm_owner *owner)
534 {
536  switch (owner_state(owner)) {
537  case ROS_QUIESCE:
538  if (owner_is_idle(owner)) {
539  /*
540  * No more user-credit requests are pending.
541  * Flush the loans and cached credits.
542  */
544  if (owner_has_loans(owner)) {
545  cached_credits_clear(owner);
546  if (M0_FI_ENABLED("drop_loans")) {
547  owner_cleanup(owner);
548  owner_state_set(owner, ROS_FINAL);
549  }
550  }
551  owner_liquidate(owner);
552  M0_POST(owner_invariant(owner));
553  }
554  break;
555  case ROS_FINALISING:
556  /*
557  * owner_liquidate() creates requests. Make sure that all those
558  * requests are processed. Once the owner is liquidated, if
559  * there are no pending loans, finalise owner. Otherwise put it
560  * in INSOLVENT state. Currently there is no recovery from
561  * INSOLVENT state.
562  */
563  if (owner_is_liquidated(owner)) {
564  cached_credits_clear(owner);
565  if (owner_has_loans(owner))
567  else if (owner->ro_user_windup)
568  owner_state_set(owner, ROS_FINAL);
569  else
570  owner_fail(owner, ROS_DEAD_CREDITOR, -ENODEV);
571  if (owner_state(owner) == ROS_INSOLVENT)
572  owner_cleanup(owner);
573  }
574  break;
575  case ROS_INSOLVENT:
576  case ROS_FINAL:
577  break;
578  default:
579  break;
580  }
581 }
582 
583 static bool owner_smgrp_is_locked(const struct m0_rm_owner *owner)
584 {
585  struct m0_sm_group *smgrp;
586 
587  M0_PRE(owner != NULL);
588  smgrp = owner_grp(owner);
589  return m0_mutex_is_locked(&smgrp->s_lock);
590 }
591 
592 M0_INTERNAL void m0_rm_owner_lock(struct m0_rm_owner *owner)
593 {
594  m0_sm_group_lock(owner_grp(owner));
595 }
596 M0_EXPORTED(m0_rm_owner_lock);
597 
598 static int m0_rm_owner_trylock(struct m0_rm_owner *owner)
599 {
600  return m0_mutex_trylock(&owner_grp(owner)->s_lock);
601 }
602 
603 M0_INTERNAL void m0_rm_owner_unlock(struct m0_rm_owner *owner)
604 {
606 }
607 M0_EXPORTED(m0_rm_owner_unlock);
608 
609 M0_INTERNAL void m0_rm_owner_init(struct m0_rm_owner *owner,
610  struct m0_fid *fid,
611  const struct m0_uint128 *group,
612  struct m0_rm_resource *res,
613  struct m0_rm_remote *creditor)
614 {
617  M0_PRE(fid != NULL);
619 
620  M0_ENTRY("owner: %p resource: %p creditor: %p",
621  owner, res, creditor);
622  owner->ro_resource = res;
623  m0_sm_init(&owner->ro_sm, &owner_conf, ROS_INITIAL, owner_grp(owner));
624  m0_rm_owner_lock(owner);
626  m0_rm_owner_unlock(owner);
627  owner->ro_group_id = *group;
628  m0_fid_set(&owner->ro_fid, fid->f_container, fid->f_key);
629  owner->ro_seq = 0;
630 
631  RM_OWNER_LISTS_FOR(owner, m0_rm_ur_tlist_init);
632  resource_get(res);
633  m0_rm_owner_lock(owner);
634  owner_state_set(owner, ROS_ACTIVE);
635  m0_rm_owner_unlock(owner);
636  owner->ro_creditor = creditor;
637 
638  m0_cookie_new(&owner->ro_id);
639  m0_mutex_lock(&res->r_type->rt_lock);
640  m0_owners_tlink_init_at_tail(owner, &res->r_local);
641  m0_mutex_unlock(&res->r_type->rt_lock);
642  M0_POST(owner_invariant(owner));
643  M0_POST(owner->ro_resource == res);
644 
645  M0_LEAVE();
646 }
647 M0_EXPORTED(m0_rm_owner_init);
648 
649 M0_INTERNAL void m0_rm_owner_init_rfid(struct m0_rm_owner *owner,
650  const struct m0_uint128 *group,
651  struct m0_rm_resource *res,
652  struct m0_rm_remote *creditor)
653 {
654  struct m0_fid fid;
655 
657  m0_rm_owner_init(owner, &fid, group, res, creditor);
658 }
659 M0_EXPORTED(m0_rm_owner_init_rfid);
660 
662 {
663  struct m0_rm_ha_event *event;
664  struct m0_queue_link *q_link;
665  struct m0_rm_ha_tracker *tracker;
666  struct m0_rm_remote *remote;
667 
668  m0_mutex_lock(&rt->rt_queue_guard);
669  q_link = m0_queue_get(&rt->rt_ha_events);
670  m0_mutex_unlock(&rt->rt_queue_guard);
671 
672  if (q_link == NULL)
673  return;
674  event = M0_AMB(event, q_link, rhe_link);
675  tracker = event->rhe_tracker;
676  remote = M0_AMB(remote, tracker, rem_tracker);
677 
678  if (tracker->rht_state == event->rhe_state)
679  goto end;
680  switch (event->rhe_state) {
681  case M0_NC_ONLINE:
683  break;
684  case M0_NC_FAILED:
686  break;
687  case M0_NC_TRANSIENT:
688  break;
689  default:
690  M0_IMPOSSIBLE("Impossible state (%d) for rm owner",
691  event->rhe_state);
692  }
693 end:
694  m0_free(event);
695 }
696 
698 {
699  M0_ENTRY();
700  M0_PRE(rt != NULL);
701 
702  while (true) {
703  m0_sm_group_lock(&rt->rt_sm_grp);
705  if (rt->rt_stop_worker) {
706  m0_sm_group_unlock(&rt->rt_sm_grp);
707  M0_LEAVE("RM RT agent STOPPED");
708  return;
709  }
710  m0_sm_group_unlock(&rt->rt_sm_grp);
711  m0_chan_wait(&rt->rt_sm_grp.s_clink);
712  }
713 }
714 
715 static void reserve_prio_set(struct m0_rm_reserve_prio *prio,
716  m0_time_t timestamp,
717  struct m0_rm_owner *owner)
718 {
719  M0_ASSERT(prio != NULL);
720  M0_ASSERT(owner != NULL);
721 
722  prio->rrp_time = timestamp;
723  prio->rrp_owner = owner->ro_fid;
724  prio->rrp_seq = owner->ro_seq++;
725 }
726 
727 static bool reserve_prio_is_set(struct m0_rm_reserve_prio *prio)
728 {
729  return m0_fid_is_set(&prio->rrp_owner) && prio->rrp_time != 0;
730 }
731 
732 M0_INTERNAL int m0_rm_owner_selfadd(struct m0_rm_owner *owner,
733  struct m0_rm_credit *r)
734 {
735  struct m0_rm_credit *credit_transfer;
736  struct m0_rm_loan *nominal_capital;
737  int rc;
738 
739  M0_ENTRY("owner: %p", owner);
740  M0_PRE(r != NULL);
741  M0_PRE(r->cr_owner == owner);
742  /* Owner must be "top-most" */
743  M0_PRE(owner->ro_creditor == NULL);
744 
745  M0_ALLOC_PTR(nominal_capital);
746  M0_ALLOC_PTR(credit_transfer);
747  if (nominal_capital != NULL && credit_transfer != NULL) {
748  /*
749  * Immediately transfer the credits. Otherwise owner will not
750  * be balanced.
751  */
752  m0_rm_credit_init(credit_transfer, owner);
753  rc = m0_rm_credit_copy(credit_transfer, r) ?:
754  m0_rm_loan_init(nominal_capital, r, NULL);
755  if (rc == 0) {
756  nominal_capital->rl_other = owner->ro_creditor;
757  nominal_capital->rl_id = M0_RM_LOAN_SELF_ID;
758  /* Add capital to the borrowed list. */
759  m0_rm_ur_tlist_add(&owner->ro_borrowed,
760  &nominal_capital->rl_credit);
761  /* Add credit transfer to the CACHED list. */
762  m0_rm_ur_tlist_add(&owner->ro_owned[OWOS_CACHED],
763  credit_transfer);
764  }
765  } else
766  rc = M0_ERR(-ENOMEM);
767  if (rc != 0) {
768  m0_free(nominal_capital);
769  m0_free(credit_transfer);
770  }
771  M0_POST(ergo(rc == 0, owner_invariant(owner)));
772  return M0_RC(rc);
773 }
774 M0_EXPORTED(m0_rm_owner_selfadd);
775 
776 static bool owner_is_idle(const struct m0_rm_owner *o)
777 {
778  /*
779  * Owner is considered to be idle when all local requests are
780  * satisfied and there are no outgoing requests.
781  */
782  return m0_forall(i, ARRAY_SIZE(o->ro_incoming),
784  m0_tl_forall(m0_rm_ur, c, &o->ro_incoming[i][j],
785  incoming_state(cr2in(c)) == RI_SUCCESS))) &&
788 }
789 
790 static bool owner_is_liquidated(const struct m0_rm_owner *o)
791 {
792  /*
793  * Owner is considered to be liquidated if all locally granted requests
794  * are released and there no outgoing requests.
795  */
796  return m0_forall(i, ARRAY_SIZE(o->ro_incoming),
799  &o->ro_incoming[i][j]))) &&
802 }
803 
804 static void owner_liquidate(struct m0_rm_owner *o)
805 {
806  struct m0_rm_credit *credit;
807  struct m0_rm_loan *loan;
808  struct m0_rm_incoming *in;
809  int rc = 0;
810 
811  M0_ENTRY("owner: %p", o);
812  /*
813  * While processing the queues, if -ENOMEM or other error occurs
814  * then the owner will be in a limbo. A force cleanup remains one of
815  * the options.
816  */
817  m0_tl_for (m0_rm_ur, &o->ro_sublet, credit) {
818  M0_ALLOC_PTR(in);
819  if (in == NULL)
820  break;
823  in->rin_priority = 0;
826  /*
827  * This is convoluted. Now that user incoming requests have
828  * drained, we add our incoming requests for REVOKE and CANCEL
829  * processing to the incoming queue.
830  *
831  * If there are any errors then loans (sublets, borrows) will
832  * remain in the list. Eventually owner will enter INSOLVENT
833  * state.
834  */
835  M0_ASSERT(m0_rm_credit_bob_check(credit));
836  rc = m0_rm_credit_copy(&in->rin_want, credit);
837  if (rc == 0) {
838  m0_rm_ur_tlist_add(
840  &in->rin_want);
841  } else
842  break;
843  } m0_tl_endfor;
844 
845  /*
846  * Call conflict callback for all held credits. Users are expected to
847  * put them, so owner can continue with windup process.
848  */
849  m0_tl_for (m0_rm_ur, &o->ro_owned[OWOS_HELD], credit) {
850  conflict_notify(credit);
851  } m0_tl_endfor;
852 
853  m0_tl_for (m0_rm_ur, &o->ro_borrowed, credit) {
854  M0_ASSERT(m0_rm_credit_bob_check(credit));
855  loan = bob_of(credit, struct m0_rm_loan, rl_credit, &loan_bob);
856  if (loan->rl_id == M0_RM_LOAN_SELF_ID ||
857  (o->ro_creditor != NULL && o->ro_creditor->rem_dead)) {
858  m0_rm_ur_tlist_del(credit);
859  m0_rm_loan_fini(loan);
860  m0_free(loan);
861  } else
862  cancel_send(loan);
863  } m0_tl_endfor;
864  owner_balance(o);
865  M0_LEAVE();
866 }
867 
868 static void owner_cleanup(struct m0_rm_owner *o)
869 {
870  struct m0_rm_credit *credit;
871  struct m0_rm_loan *loan;
872 
873  M0_PRE(owner_is_idle(o));
874 
875  m0_tl_for (m0_rm_ur, &o->ro_sublet, credit) {
876  M0_ASSERT(m0_rm_credit_bob_check(credit));
877  loan = bob_of(credit, struct m0_rm_loan, rl_credit, &loan_bob);
878  m0_rm_ur_tlist_del(credit);
879  m0_rm_loan_fini(loan);
880  m0_free(loan);
881  } m0_tl_endfor;
882 
883  m0_tl_for (m0_rm_ur, &o->ro_borrowed, credit) {
884  M0_ASSERT(m0_rm_credit_bob_check(credit));
885  loan = bob_of(credit, struct m0_rm_loan, rl_credit, &loan_bob);
886  m0_rm_ur_tlist_del(credit);
887  m0_rm_loan_fini(loan);
888  m0_free(loan);
889  } m0_tl_endfor;
890 }
891 
892 M0_INTERNAL int m0_rm_owner_timedwait(struct m0_rm_owner *owner,
893  uint64_t state,
894  const m0_time_t abs_timeout)
895 {
896  int rc;
897 
898  m0_rm_owner_lock(owner);
899  rc = m0_sm_timedwait(&owner->ro_sm, state, abs_timeout);
900  m0_rm_owner_unlock(owner);
901  return rc;
902 }
903 
904 M0_INTERNAL void m0_rm_owner_creditor_reset(struct m0_rm_owner *owner,
905  struct m0_rm_remote *creditor)
906 {
907  M0_ENTRY();
909  owner->ro_creditor == NULL);
910  m0_rm_owner_lock(owner);
911  owner->ro_creditor = creditor;
912  if (owner_state(owner) == ROS_DEAD_CREDITOR)
913  owner_state_set(owner, ROS_ACTIVE);
914  m0_rm_owner_unlock(owner);
915  M0_LEAVE();
916 }
917 
918 
919 static void owner_windup_locked(struct m0_rm_owner *owner)
920 {
922  /*
923  * Put the owner in ROS_QUIESCE. This will prevent any new
924  * incoming requests on it.
925  */
927  owner_balance(owner);
928 }
929 
930 M0_INTERNAL void m0_rm_owner_windup(struct m0_rm_owner *owner)
931 {
932  M0_ENTRY("owner %p", owner);
933  m0_rm_owner_lock(owner);
934  owner->ro_user_windup = true;
935  if (owner_state(owner) == ROS_ACTIVE)
936  owner_windup_locked(owner);
937  else if (owner_state(owner) == ROS_DEAD_CREDITOR)
938  owner_state_set(owner, ROS_FINAL);
939  m0_rm_owner_unlock(owner);
940  M0_LEAVE();
941 }
942 
943 M0_INTERNAL void m0_rm_owner_fini(struct m0_rm_owner *owner)
944 {
945  struct m0_rm_resource *res = owner->ro_resource;
946 
947  M0_ENTRY("owner: %p", owner);
948  M0_PRE(owner_invariant(owner));
949  M0_PRE(M0_IN(owner_state(owner),
951 
952  m0_mutex_lock(&res->r_type->rt_lock);
953  m0_owners_tlink_del_fini(owner);
954  m0_mutex_unlock(&res->r_type->rt_lock);
955  RM_OWNER_LISTS_FOR(owner, m0_rm_ur_tlist_fini);
956  owner->ro_resource = NULL;
957  owner->ro_creditor = NULL;
958  resource_put(res);
959 
960  M0_LEAVE();
961 }
962 M0_EXPORTED(m0_rm_owner_fini);
963 
964 M0_INTERNAL void m0_rm_credit_init(struct m0_rm_credit *credit,
965  struct m0_rm_owner *owner)
966 {
967  M0_ENTRY("credit: %p with owner: %p", credit, owner);
968  M0_PRE(credit != NULL);
969  M0_PRE(owner->ro_resource->r_ops != NULL);
971 
972  credit->cr_datum = 0;
973  credit->cr_group_id = m0_rm_no_group;
974  m0_rm_ur_tlink_init(credit);
975  pr_tlist_init(&credit->cr_pins);
976  m0_rm_credit_bob_init(credit);
977  credit->cr_owner = owner;
978  owner->ro_resource->r_ops->rop_credit_init(owner->ro_resource, credit);
979 
980  M0_POST(credit->cr_ops != NULL);
981  M0_LEAVE();
982 }
983 M0_EXPORTED(m0_rm_credit_init);
984 
985 M0_INTERNAL void m0_rm_credit_fini(struct m0_rm_credit *credit)
986 {
987  M0_ENTRY("credit: %p", credit);
988  M0_PRE(credit != NULL);
989 
990  m0_rm_ur_tlink_fini(credit);
991  pr_tlist_fini(&credit->cr_pins);
992  m0_rm_credit_bob_fini(credit);
993  credit->cr_ops->cro_free(credit);
994  M0_LEAVE();
995 }
996 M0_EXPORTED(m0_rm_credit_fini);
997 
998 static struct m0_sm_state_descr inc_states[] = {
999  [RI_INITIALISED] = {
1001  .sd_name = "Initialised",
1002  .sd_allowed = M0_BITS(RI_CHECK, RI_FAILURE, RI_FINAL)
1003  },
1004  [RI_CHECK] = {
1005  .sd_name = "Check",
1006  .sd_allowed = M0_BITS(RI_SUCCESS, RI_FAILURE, RI_WAIT)
1007  },
1008  [RI_SUCCESS] = {
1009  .sd_name = "Success",
1010  .sd_allowed = M0_BITS(RI_RELEASED)
1011  },
1012  [RI_FAILURE] = {
1013  .sd_flags = M0_SDF_FAILURE,
1014  .sd_name = "Failure",
1015  .sd_allowed = M0_BITS(RI_FINAL)
1016  },
1017  [RI_WAIT] = {
1018  .sd_name = "Wait",
1019  .sd_allowed = M0_BITS(RI_WAIT, RI_FAILURE, RI_CHECK)
1020  },
1021  [RI_RELEASED] = {
1022  .sd_name = "Released",
1023  .sd_allowed = M0_BITS(RI_FINAL)
1024  },
1025  [RI_FINAL] = {
1026  .sd_flags = M0_SDF_TERMINAL,
1027  .sd_name = "Final",
1028  }
1029 };
1030 
1031 static const struct m0_sm_conf inc_conf = {
1032  .scf_name = "Incoming Request",
1033  .scf_nr_states = ARRAY_SIZE(inc_states),
1034  .scf_state = inc_states
1035 };
1036 
1037 static struct m0_rm_incoming *cr2in(const struct m0_rm_credit *cr)
1038 {
1039  return container_of(cr, struct m0_rm_incoming, rin_want);
1040 }
1041 
1042 static inline void incoming_state_set(struct m0_rm_incoming *in,
1043  enum m0_rm_incoming_state state)
1044 {
1045  /*
1046  * Incoming sm group is owner's group, which in turn is the owner
1047  * resource type's group. It is not always safe to access the group via
1048  * owner, because under certain circumstances incoming object may live
1049  * longer than owner does. An example may be incoming CREDIT REVOKE
1050  * request arrived right before the referred owner is to pass away due
1051  * to natural reasons.
1052  */
1054  M0_LOG(M0_INFO, "Incoming req: %p, state change:[%s -> %s]\n",
1055  in, m0_sm_state_name(&in->rin_sm, in->rin_sm.sm_state),
1056  m0_sm_state_name(&in->rin_sm, state));
1057  m0_sm_state_set(&in->rin_sm, state);
1058 }
1059 
1060 M0_INTERNAL void m0_rm_incoming_init(struct m0_rm_incoming *in,
1061  struct m0_rm_owner *owner,
1063  enum m0_rm_incoming_policy policy,
1064  uint64_t flags)
1065 {
1066  M0_ENTRY("incoming: %p for owner: %p", in, owner);
1067  M0_PRE(in != NULL);
1068 
1069  M0_SET0(in);
1071  in->rin_type = type;
1072  in->rin_policy = policy;
1073  in->rin_flags = flags;
1074  pi_tlist_init(&in->rin_pins);
1075  m0_rm_credit_init(&in->rin_want, owner);
1076  m0_rm_incoming_bob_init(in);
1078  M0_LEAVE();
1079 }
1080 M0_EXPORTED(m0_rm_incoming_init);
1081 
1082 M0_INTERNAL void incoming_surrender(struct m0_rm_incoming *in)
1083 {
1085  m0_rm_ur_tlist_del(&in->rin_want);
1086  M0_ASSERT(pi_tlist_is_empty(&in->rin_pins));
1087 }
1088 
1089 M0_INTERNAL void internal_incoming_fini(struct m0_rm_incoming *in)
1090 {
1091  M0_ENTRY();
1092  m0_sm_fini(&in->rin_sm);
1093  m0_rm_incoming_bob_fini(in);
1095  pi_tlist_fini(&in->rin_pins);
1096  M0_LEAVE();
1097 }
1098 
1099 M0_INTERNAL void m0_rm_incoming_fini(struct m0_rm_incoming *in)
1100 {
1101  M0_ENTRY();
1103 
1104  if (in->rin_remote != NULL)
1106  /*
1107  * This is to lock sm group, which in fact is owner group, which in turn
1108  * is resource type group.
1109  *
1110  * Please note, locking via owner object may be not always safe here due
1111  * to races between finalisation of incoming object and owner it's bound
1112  * to. Owner may appear lockable at the beginning, but being already
1113  * finalised when it is time to unlock sm.
1114  */
1116  M0_PRE(M0_IN(incoming_state(in),
1121  M0_LEAVE();
1122 }
1123 M0_EXPORTED(m0_rm_incoming_fini);
1124 
1125 /*
1126  * Impossible condition.
1127  */
1129 {
1130  M0_IMPOSSIBLE("Conflict not possible during windup");
1131 }
1132 
1133 static void windup_incoming_complete(struct m0_rm_incoming *in, int32_t rc)
1134 {
1135  M0_ENTRY();
1136  if (rc == 0) {
1137  incoming_release(in);
1138  incoming_surrender(in);
1139  }
1141  M0_PRE(M0_IN(incoming_state(in),
1145 
1146  m0_free(in);
1147  M0_LEAVE();
1148 }
1149 
1150 M0_INTERNAL int m0_rm_outgoing_init(struct m0_rm_outgoing *out,
1151  enum m0_rm_outgoing_type req_type,
1152  struct m0_rm_remote *other,
1153  struct m0_rm_credit *credit)
1154 {
1155  M0_ENTRY("outgoing: %p", out);
1156  M0_PRE(out != NULL);
1157 
1158  out->rog_rc = 0;
1159  out->rog_type = req_type;
1160  out->rog_sent = false;
1161  m0_rm_outgoing_bob_init(out);
1162  return M0_RC(m0_rm_loan_init(&out->rog_want, credit, other));
1163 }
1164 M0_EXPORTED(m0_rm_outgoing_init);
1165 
1166 M0_INTERNAL void m0_rm_outgoing_fini(struct m0_rm_outgoing *out)
1167 {
1168  M0_ENTRY("outgoing: %p", out);
1169  M0_PRE(out != NULL);
1170  m0_rm_loan_fini(&out->rog_want);
1171  m0_rm_outgoing_bob_fini(out);
1172  M0_LEAVE();
1173 }
1174 M0_EXPORTED(m0_rm_outgoing_fini);
1175 
1176 static int loan_dup(const struct m0_rm_loan *src_loan,
1177  struct m0_rm_loan **dest_loan)
1178 {
1179  return m0_rm_loan_alloc(dest_loan, &src_loan->rl_credit,
1180  src_loan->rl_other);
1181 }
1182 
1183 M0_INTERNAL int m0_rm_loan_alloc(struct m0_rm_loan **loan,
1184  const struct m0_rm_credit *credit,
1185  struct m0_rm_remote *creditor)
1186 {
1187  struct m0_rm_loan *new_loan;
1188  int rc = -ENOMEM;
1189 
1190  M0_ENTRY("loan credit: %llu", (long long unsigned) credit->cr_datum);
1191  M0_PRE(loan != NULL);
1192  M0_PRE(credit != NULL);
1193 
1194  M0_ALLOC_PTR(new_loan);
1195  if (new_loan != NULL) {
1196  rc = m0_rm_loan_init(new_loan, credit, creditor);
1197  if (rc != 0)
1198  m0_free0(&new_loan);
1199  }
1200 
1201  *loan = new_loan;
1202  return M0_RC(rc);
1203 }
1204 M0_EXPORTED(m0_rm_loan_alloc);
1205 
1206 /*
1207  * Allocates a new loan and calculates the difference between
1208  * loan->rl_credit and credit.
1209  */
1210 static int remnant_loan_get(const struct m0_rm_loan *loan,
1211  const struct m0_rm_credit *credit,
1212  struct m0_rm_loan **remnant_loan)
1213 {
1214  struct m0_rm_loan *new_loan;
1215  int rc;
1216 
1217  M0_ENTRY();
1218  M0_PRE(loan != NULL);
1219  M0_PRE(credit != NULL);
1220  M0_PRE(remnant_loan != NULL);
1221  M0_LOG(M0_DEBUG, "split loan credit: %llu with credit: %llu",
1222  (long long unsigned) loan->rl_credit.cr_datum,
1223  (long long unsigned) credit->cr_datum);
1224 
1225  rc = loan_dup(loan, &new_loan) ?:
1226  credit_diff(&new_loan->rl_credit, credit);
1227  if (rc != 0 && new_loan != NULL) {
1228  m0_rm_loan_fini(new_loan);
1229  m0_free0(&new_loan);
1230  }
1231  *remnant_loan = new_loan;
1232  return M0_RC(rc);
1233 }
1234 
1235 M0_INTERNAL int m0_rm_loan_init(struct m0_rm_loan *loan,
1236  const struct m0_rm_credit *credit,
1237  struct m0_rm_remote *creditor)
1238 {
1239  M0_PRE(loan != NULL);
1240  M0_PRE(credit != NULL);
1241 
1242  M0_ENTRY("loan: %p", loan);
1243  loan->rl_id = 0;
1244  m0_cookie_new(&loan->rl_id);
1245  m0_rm_credit_init(&loan->rl_credit, credit->cr_owner);
1246  m0_rm_loan_bob_init(loan);
1247  loan->rl_other = creditor;
1248  if (loan->rl_other != NULL) {
1249  M0_RM_REMOTE_GET(loan->rl_other);
1251  }
1252 
1253  return M0_RC(m0_rm_credit_copy(&loan->rl_credit, credit));
1254 }
1255 M0_EXPORTED(m0_rm_loan_init);
1256 
1257 M0_INTERNAL void m0_rm_loan_fini(struct m0_rm_loan *loan)
1258 {
1259  M0_PRE(loan != NULL);
1260 
1261  M0_ENTRY("loan: %p", loan);
1262  m0_rm_credit_fini(&loan->rl_credit);
1263  if (loan->rl_other != NULL) {
1265  M0_RM_REMOTE_PUT(loan->rl_other);
1266  }
1267  loan->rl_id = 0;
1268  m0_rm_loan_bob_fini(loan);
1269  M0_LEAVE();
1270 }
1271 M0_EXPORTED(m0_rm_loan_fini);
1272 
1273 static void pending_outgoing_send(struct m0_rm_owner *owner,
1274  struct m0_clink *link)
1275 {
1276  struct m0_rm_outgoing *outgoing;
1277  struct m0_rm_credit *credit;
1278 
1279  M0_ENTRY("owner: %p link: %p", owner, link);
1280  M0_PRE(owner != NULL);
1281  M0_PRE(link != NULL);
1282  M0_PRE(owner_smgrp_is_locked(owner));
1283 
1284  m0_tl_for(m0_rm_ur, &owner->ro_outgoing[OQS_GROUND], credit) {
1285  M0_ASSERT(m0_rm_credit_bob_check(credit));
1286  outgoing = bob_of(credit, struct m0_rm_outgoing,
1287  rog_want.rl_credit, &outgoing_bob);
1288  if (!outgoing->rog_sent &&
1289  &outgoing->rog_want.rl_other->rem_rev_sess_clink == link)
1290  m0_rm_outgoing_send(outgoing);
1291  } m0_tl_endfor;
1292  M0_LEAVE();
1293 }
1294 
1299 static bool rev_session_clink_cb(struct m0_clink *link)
1300 {
1301  int rc;
1302  struct m0_rm_remote *remote;
1303  struct m0_rm_resource *resource;
1304  struct m0_rm_owner *owner;
1305  bool busy;
1306 
1307  M0_ENTRY("link: %p", link);
1308  remote = bob_of(link, struct m0_rm_remote,
1309  rem_rev_sess_clink, &rem_bob);
1310  resource = remote->rem_resource;
1311 
1313  if (rc != 0) {
1314  M0_LEAVE("remote rev_session status is not good: rc=%d", rc);
1315  return true;
1316  }
1317 
1318  /*
1319  * Do not break RM lock ordering.
1320  * Use trylock-repeat cycle to avoid deadlocks.
1321  *
1322  * Problem with possible multiple sending of outgoing requests
1323  * for the same owner is solved by m0_rm_outgoing::rog_sent flag.
1324  */
1325  do {
1326  /*
1327  * We are holding the channel lock here to which
1328  * rem_rev_sess_clink is linked, so m0_rm_remote_fini() in
1329  * another thread would block at m0_clink_cleanup().
1330  * From another side, that thread might hold the owner lock,
1331  * in which case we will get stuck in the loop here trying to
1332  * take the same. So we do this check to avoid the deadlock.
1333  */
1334  if (remote->rem_state == REM_FREED) {
1335  M0_LEAVE("remote is finalising");
1336  break;
1337  }
1338  busy = false;
1339  m0_mutex_lock(&resource->r_type->rt_lock);
1340  m0_tl_for(m0_owners, &resource->r_local, owner) {
1341  busy = m0_rm_owner_trylock(owner);
1342  if (!busy) {
1343  pending_outgoing_send(owner, link);
1344  /*
1345  * Use m0_mutex_unlock() directly insted of
1346  * m0_rm_owner_unlock(). The latter causes asts
1347  * for owner sm group to run, which can lock
1348  * already held resource type lock.
1349  */
1354  m0_mutex_unlock(&owner_grp(owner)->s_lock);
1355  } else
1356  break;
1357  } m0_tl_endfor;
1358  m0_mutex_unlock(&resource->r_type->rt_lock);
1359  } while (busy);
1360 
1361  M0_LEAVE();
1362  return true;
1363 }
1364 
1365 M0_INTERNAL struct m0_rm_remote *
1367 {
1368  struct m0_cookie *cookie = &rem_in->ri_rem_owner_cookie;
1369  struct m0_rm_resource *res = incoming_to_resource(&rem_in->ri_incoming);
1370  struct m0_rm_remote *rem;
1371 
1372  M0_PRE(cookie != NULL);
1373  M0_PRE(res != NULL);
1374  /* XXX is it scalable? */
1375  m0_mutex_lock(&res->r_mutex);
1376  rem = m0_tl_find(m0_remotes, other, &res->r_remotes,
1377  m0_cookie_is_eq(&other->rem_cookie, cookie));
1378  if (rem != NULL)
1379  M0_RM_REMOTE_GET(rem);
1380  m0_mutex_unlock(&res->r_mutex);
1381  return rem;
1382 }
1383 
1384 static void rm_remote_free(struct m0_ref *ref)
1385 {
1386  struct m0_rm_remote *rem =
1387  container_of(ref, struct m0_rm_remote, rem_refcnt);
1388  struct m0_rpc_session *sess = rem->rem_session;
1389 
1390  /*
1391  * Free only those remotes who connected to us asking for
1392  * loans (i.e. debtors).
1393  */
1394  M0_LOG(M0_DEBUG, "rem=%p refcnt=%d", rem,
1395  (int)m0_ref_read(&rem->rem_refcnt));
1396  if (!m0_remotes_tlink_is_in(rem) || m0_ref_read(&rem->rem_refcnt) > 0)
1397  return;
1398 
1399  m0_remotes_tlist_del(rem);
1400  m0_rm_remote_fini(rem);
1401  m0_free(rem);
1402 
1403  /*
1404  * Note: it seems the session can be NULL only in UTs here.
1405  * (In particular, at rm-ut:fom-funcs.)
1406  */
1407  if (sess != NULL)
1409 }
1410 
1411 M0_INTERNAL void m0_rm_remote_init(struct m0_rm_remote *rem,
1412  struct m0_rm_resource *res)
1413 {
1414  M0_PRE(M0_IS0(rem));
1415 
1416  M0_ENTRY("remote: %p", rem);
1417  rem->rem_state = REM_INITIALISED;
1418  rem->rem_resource = res;
1419  rem->rem_cookie = M0_COOKIE_NULL;
1420  m0_chan_init(&rem->rem_signal, &res->r_type->rt_lock);
1423  m0_remotes_tlink_init(rem);
1425  m0_rm_remote_bob_init(rem);
1426  resource_get(res);
1427  M0_LEAVE();
1428 }
1429 M0_EXPORTED(m0_rm_remote_init);
1430 
1431 M0_INTERNAL void m0_rm_remote_fini(struct m0_rm_remote *rem)
1432 {
1433  struct m0_chan *conf_exp_chan = NULL;
1434 
1435  M0_ENTRY("remote: %p", rem);
1436  M0_PRE(rem != NULL);
1437  M0_PRE(M0_IN(rem->rem_state, (REM_INITIALISED,
1439  REM_OWNER_LOCATED)));
1440  rem->rem_state = REM_FREED;
1441  /*
1442  * It is possible that reverse session is not yet established,
1443  * because no revoke requests were sent to remote owner.
1444  */
1448 
1450  conf_exp_chan = rem->rem_tracker.rht_conf_exp.cl_chan;
1451  if (conf_exp_chan == NULL) {
1453  } else {
1454  m0_chan_lock(conf_exp_chan);
1456  m0_chan_unlock(conf_exp_chan);
1457  }
1459 
1460  m0_remotes_tlink_fini(rem);
1461  resource_put(rem->rem_resource);
1462  m0_rm_remote_bob_fini(rem);
1463  M0_LEAVE();
1464 }
1465 M0_EXPORTED(m0_rm_remote_fini);
1466 
1467 static void cached_credits_clear(struct m0_rm_owner *owner)
1468 {
1469  struct m0_rm_credit *credit;
1470 
1471  M0_ENTRY("owner: %p", owner);
1472  m0_tl_teardown(m0_rm_ur, &owner->ro_owned[OWOS_CACHED], credit) {
1473  m0_rm_credit_fini(credit);
1474  m0_free(credit);
1475  }
1476  M0_LEAVE();
1477 }
1478 
1479 /*
1480  * Remove the OWOS_CACHED credits that match incoming credits. If the credit(s)
1481  * completely intersects the incoming credit, remove it(them) from the cache.
1482  * If the CACHED credit partly intersects with the incoming credit, retain the
1483  * difference in the CACHE.
1484  */
1486 {
1487  struct m0_rm_pin *pin;
1488  struct m0_rm_credit *credit;
1489  struct m0_rm_credit *remnant_credit;
1490  struct m0_rm_owner *owner = in->rin_want.cr_owner;
1491  struct m0_tl retain_list;
1492  struct m0_tl remove_list;
1493  int rc = 0;
1494 
1495  M0_ENTRY("owner: %p credit: %llu", owner,
1496  (long long unsigned) INCOMING_CREDIT(in));
1497  /* Credits can be removed for remote requests */
1498  M0_PRE(in->rin_type != M0_RIT_LOCAL);
1499 
1500  m0_rm_ur_tlist_init(&retain_list);
1501  m0_rm_ur_tlist_init(&remove_list);
1502  m0_tl_for (pi, &in->rin_pins, pin) {
1503  M0_ASSERT(m0_rm_pin_bob_check(pin));
1505  credit = pin->rp_credit;
1506 
1507  pin_del(pin);
1508  M0_ASSERT(credit_pin_nr(credit, M0_RPF_TRACK) == 0);
1509  if (!credit->cr_ops->cro_intersects(credit, &in->rin_want))
1510  m0_rm_ur_tlist_move(&retain_list, credit);
1511  else {
1512  m0_rm_ur_tlist_move(&remove_list, credit);
1513  /*
1514  * The cached credit does not completely intersect
1515  * incoming credit.
1516  *
1517  * Make a copy of the cached credit and calculate
1518  * the difference with incoming credit. Store
1519  * the difference in the remnant credit.
1520  */
1521  rc = remnant_credit_get(credit, &in->rin_want,
1522  &remnant_credit);
1523  if (rc == 0) {
1524  if (credit_is_empty(remnant_credit))
1525  m0_rm_ur_tlist_add(&remove_list,
1526  remnant_credit);
1527  else
1528  m0_rm_ur_tlist_add(&retain_list,
1529  remnant_credit);
1530  }
1531  }
1532 
1533  } m0_tl_endfor;
1534 
1535  /*
1536  * On successful completion, remove the credits from the "remove-list"
1537  * and move the remnant credits to the OWOS_CACHED. Do the opposite
1538  * on failure.
1539  */
1540  m0_tl_teardown(m0_rm_ur, rc ? &retain_list : &remove_list, credit) {
1541  m0_rm_credit_fini(credit);
1542  m0_free(credit);
1543  }
1544 
1545  m0_tl_for (m0_rm_ur, rc ? &remove_list : &retain_list, credit) {
1546  m0_rm_ur_tlist_move(&owner->ro_owned[OWOS_CACHED], credit);
1547  } m0_tl_endfor;
1548 
1549  m0_rm_ur_tlist_fini(&retain_list);
1550  m0_rm_ur_tlist_fini(&remove_list);
1551  return M0_RC(rc);
1552 }
1553 
1554 M0_UNUSED static inline struct m0_rm_resource_type *
1556  return credit->cr_owner->ro_resource->r_type;
1557 }
1558 
1559 M0_UNUSED static inline struct m0_rm_resource_type *
1561  return credit_to_resource_type(&rem_in->ri_incoming.rin_want);
1562 }
1563 
1564 M0_INTERNAL int m0_rm_borrow_commit(struct m0_rm_remote_incoming *rem_in)
1565 {
1566  struct m0_rm_incoming *in = &rem_in->ri_incoming;
1567  struct m0_rm_owner *o = in->rin_want.cr_owner;
1568  struct m0_rm_loan *loan = NULL;
1569  struct m0_rm_remote *debtor = NULL;
1570  int rc;
1571 
1572  M0_ENTRY("owner: %p credit: %llu", o,
1573  (long long unsigned) INCOMING_CREDIT(in));
1574  M0_PRE(in->rin_type == M0_RIT_BORROW);
1575 
1576  debtor = m0_rm_remote_find(rem_in);
1577  M0_ASSERT(debtor != NULL);
1578  /*
1579  * The needed reference is taken at request_pre_process() already.
1580  * So we just compensate the reference from the above
1581  * m0_rm_remote_find() here.
1582  */
1583  M0_RM_REMOTE_PUT(debtor);
1584 
1585  /*
1586  * Allocate loan and copy the credit (to be borrowed).
1587  * Clear the credits cache and remove incoming credits from the cache.
1588  * If everything succeeds add loan to the sublet list.
1589  *
1590  * If there are no pins, sublet within the same group has been
1591  * granted.
1592  */
1593  rc = m0_rm_loan_alloc(&loan, &in->rin_want, debtor) ?:
1594  pi_tlist_is_empty(&in->rin_pins) ? 0 :
1596  /* @todo */
1597  loan->rl_credit.cr_group_id = rem_in->ri_group_id;
1598  if (rc != 0 && loan != NULL) {
1599  m0_rm_loan_fini(loan);
1600  m0_free(loan);
1601  } else if (rc == 0) {
1602  /*
1603  * Store the loan in the sublet list.
1604  */
1605  m0_rm_ur_tlist_add(&o->ro_sublet, &loan->rl_credit);
1606  m0_cookie_init(&loan->rl_cookie, &loan->rl_id);
1607  rem_in->ri_loan_cookie = loan->rl_cookie;
1608  }
1609 
1611  return M0_RC(rc);
1612 }
1613 M0_EXPORTED(m0_rm_borrow_commit);
1614 
1615 M0_INTERNAL int m0_rm_revoke_commit(struct m0_rm_remote_incoming *rem_in)
1616 {
1617  struct m0_rm_incoming *in = &rem_in->ri_incoming;
1618  struct m0_rm_owner *owner = in->rin_want.cr_owner;
1619  struct m0_rm_loan *brwd_loan = NULL;
1620  struct m0_rm_loan *remnant_loan;
1621  struct m0_rm_loan *add_loan = NULL;
1622  struct m0_rm_loan *remove_loan = NULL;
1623  struct m0_rm_credit *credit;
1624  struct m0_cookie *cookie;
1625  int rc = 0;
1626 
1627  M0_ENTRY("owner: %p credit: %llu", owner,
1628  (long long unsigned) INCOMING_CREDIT(in));
1629  M0_PRE(in->rin_type == M0_RIT_REVOKE);
1631 
1632  cookie = &rem_in->ri_loan_cookie;
1633  /*
1634  * Clear the credits cache and remove incoming credits from the cache.
1635  *
1636  * Check the difference between the borrowed credits and the revoke
1637  * credits. If the revoke fully intersects the previously borrowed
1638  * credit, remove it from the list.
1639  *
1640  * If it's a partial revoke, credit_diff() will retain the remnant
1641  * borrowed credit. cached_credits_remove() will leave
1642  * remnant credit in the CACHE.
1643  */
1644  /*
1645  * Find the matching loan and remove it from the borrowed list.
1646  */
1647  credit = m0_tl_find(m0_rm_ur, credit, &owner->ro_borrowed,
1648  (brwd_loan = bob_of(credit, struct m0_rm_loan,
1649  rl_credit, &loan_bob),
1650  m0_cookie_is_eq(&brwd_loan->rl_cookie, cookie)));
1651 
1652  M0_ASSERT(brwd_loan != NULL);
1653  M0_ASSERT(credit != NULL);
1654 
1655  /*
1656  * Check if there is partial revoke.
1657  * Also remove the corresponding credit from the OWOS_CACHED list.
1658  */
1659  rc = remnant_loan_get(brwd_loan, &in->rin_want, &remnant_loan) ?:
1661 
1662  if (rc == 0) {
1663  m0_rm_ur_tlist_del(credit);
1664  m0_rm_loan_fini(brwd_loan);
1665  m0_free(brwd_loan);
1666  if (credit_is_empty(&remnant_loan->rl_credit))
1667  remove_loan = remnant_loan;
1668  else
1669  add_loan = remnant_loan;
1670  } else
1671  remove_loan = remnant_loan;
1672 
1673  if (add_loan != NULL)
1674  m0_rm_ur_tlist_add(&owner->ro_borrowed, &add_loan->rl_credit);
1675 
1676  if (remove_loan != NULL) {
1677  m0_rm_loan_fini(remove_loan);
1678  m0_free(remove_loan);
1679  }
1680 
1681  M0_POST(owner_invariant(owner));
1682  return M0_RC(rc);
1683 }
1684 M0_EXPORTED(m0_rm_revoke_commit);
1685 
1724 /*
1725  * Returns:
1726  * True - when both the groups are different or they are m0_rm_no_group.
1727  * False - when both the groups are same.
1728  */
1729 
1730 static bool credit_group_conflict(const struct m0_uint128 *g1,
1731  const struct m0_uint128 *g2)
1732 {
1733  return (m0_uint128_eq(g1, g2) &&
1734  m0_uint128_eq(g1, &m0_rm_no_group)) || !m0_uint128_eq(g1, g2);
1735 }
1736 
1737 static void incoming_queue(struct m0_rm_owner *owner, struct m0_rm_incoming *in)
1738 {
1739  /*
1740  * Mark incoming request "excited". owner_balance() will process it.
1741  */
1742  m0_rm_ur_tlist_add(&owner->ro_incoming[in->rin_priority][OQS_EXCITED],
1743  &in->rin_want);
1745 
1746  if (in->rin_type == M0_RIT_LOCAL) {
1748  reserve_prio_set(&in->rin_reserve, in->rin_req_time, owner);
1749  }
1751  owner_balance(owner);
1752 }
1753 
1758 M0_INTERNAL void m0_rm_credit_get(struct m0_rm_incoming *in)
1759 {
1760  struct m0_rm_owner *owner = in->rin_want.cr_owner;
1761 
1762  M0_ENTRY("owner: %p credit: %llu", owner,
1763  (long long unsigned) INCOMING_CREDIT(in));
1765  M0_PRE(in->rin_sm.sm_rc == 0);
1766  M0_PRE(in->rin_rc == 0);
1767  M0_PRE(pi_tlist_is_empty(&in->rin_pins));
1768 
1769  m0_rm_owner_lock(owner);
1770  /*
1771  * This check will make sure that new requests are added
1772  * while owner is in ACTIVE state. This will take care
1773  * of races between owner state transition and credit requests.
1774  */
1775  if (owner_state(owner) == ROS_ACTIVE)
1776  incoming_queue(owner, in);
1777  else
1778  /*
1779  * Reject all credit requests with -EAGAIN error until owner is
1780  * in one of its final states. During finalisation owner still
1781  * possesses some credits, but can't grant them. Let client to
1782  * re-request them till they are returned to creditor and
1783  * overall credits state in cluster is more "stable".
1784  * For example, if creditor revokes loan during owner
1785  * finalisation, then race is possible between 'REVOKE' and
1786  * 'CANCEL' requests. Creditor will re-send 'REVOKE' requests
1787  * until 'CANCEL' is received.
1788  */
1789  incoming_complete(in, M0_IN(owner_state(owner), (ROS_FINAL,
1791  -ENODEV : -EAGAIN);
1792  m0_rm_owner_unlock(owner);
1793  M0_LEAVE();
1794 }
1795 M0_EXPORTED(m0_rm_credit_get);
1796 
1797 M0_INTERNAL void m0_rm_credit_put(struct m0_rm_incoming *in)
1798 {
1799  struct m0_rm_owner *owner = in->rin_want.cr_owner;
1800 
1801  M0_ENTRY("owner: %p credit: %llu", owner,
1802  (long long unsigned) INCOMING_CREDIT(in));
1803  m0_rm_owner_lock(owner);
1804  incoming_release(in);
1805  incoming_surrender(in);
1806  /*
1807  * Release of this credit may excite other waiting incoming-requests.
1808  * Hence, call owner_balance() to process them.
1809  */
1810  owner_balance(owner);
1811  m0_rm_owner_unlock(owner);
1812  M0_LEAVE();
1813 }
1814 M0_EXPORTED(m0_rm_credit_put);
1815 
1816 /*
1817  * After successful completion of incoming request, move OWOS_CACHED credits
1818  * to OWOS_HELD credits.
1819  */
1820 static int cached_credits_hold(struct m0_rm_incoming *in)
1821 {
1822  enum m0_rm_owner_owned_state ltype;
1823  struct m0_rm_pin *pin;
1824  struct m0_rm_owner *owner = in->rin_want.cr_owner;
1825  struct m0_rm_credit *credit;
1826  struct m0_rm_credit *held_credit;
1827  struct m0_rm_credit rest;
1828  struct m0_tl transfers;
1829  int rc;
1830 
1831  M0_ENTRY("owner: %p credit: %llu", owner,
1832  (long long unsigned) INCOMING_CREDIT(in));
1833  /* Only local request can hold the credits */
1834  M0_PRE(in->rin_type == M0_RIT_LOCAL);
1835 
1836  m0_rm_credit_init(&rest, in->rin_want.cr_owner);
1837  rc = m0_rm_credit_copy(&rest, &in->rin_want);
1838  if (rc != 0)
1839  goto out;
1840 
1841  m0_rm_ur_tlist_init(&transfers);
1842  m0_tl_for (pi, &in->rin_pins, pin) {
1844  credit = pin->rp_credit;
1845  M0_ASSERT(credit != NULL);
1846  M0_ASSERT(credit->cr_ops != NULL);
1847  M0_ASSERT(credit->cr_ops->cro_is_subset != NULL);
1848  M0_ASSERT(credit_intersects(&rest, credit));
1849 
1850  /* If the credit is already part of HELD list, skip it */
1851  if (credit_pin_nr(credit, M0_RPF_PROTECT) > 1) {
1852  rc = credit_diff(&rest, credit);
1853  if (rc != 0)
1854  break;
1855  else
1856  continue;
1857  }
1858 
1859  /*
1860  * Check if the cached credit is a subset (including a
1861  * proper subset) of incoming credit (request).
1862  */
1863  if (credit->cr_ops->cro_is_subset(credit, &rest)) {
1864  /* Move the subset from CACHED list to HELD list */
1865  m0_rm_ur_tlist_move(&transfers, credit);
1866  rc = credit_diff(&rest, credit);
1867  if (rc != 0)
1868  break;
1869  } else {
1870  M0_ALLOC_PTR(held_credit);
1871  if (held_credit == NULL) {
1872  rc = M0_ERR(-ENOMEM);
1873  break;
1874  }
1875 
1876  m0_rm_credit_init(held_credit, owner);
1877  /*
1878  * If incoming credit partly intersects, then move
1879  * intersection to the HELD list. Retain the difference
1880  * in the CACHED list. This may lead to fragmentation of
1881  * credits.
1882  */
1883  rc = credit->cr_ops->cro_disjoin(credit, &rest,
1884  held_credit);
1885  if (rc != 0) {
1886  m0_rm_credit_fini(held_credit);
1887  m0_free(held_credit);
1888  break;
1889  }
1890  m0_rm_ur_tlist_add(&transfers, held_credit);
1891  rc = credit_diff(&rest, held_credit);
1892  if (rc != 0)
1893  break;
1894  m0_rm_pin_add(in, held_credit, M0_RPF_PROTECT);
1895  pin_del(pin);
1896  }
1897 
1898  } m0_tl_endfor;
1899 
1900  M0_POST(ergo(rc == 0, credit_is_empty(&rest)));
1901  /*
1902  * Only cached credits are part of transfer list.
1903  * On success, move the credits to OWOS_HELD list. Otherwise move
1904  * them back OWOS_CACHED list.
1905  */
1906  ltype = rc ? OWOS_CACHED : OWOS_HELD;
1907  m0_tl_for (m0_rm_ur, &transfers, credit) {
1908  m0_rm_ur_tlist_move(&owner->ro_owned[ltype], credit);
1909  } m0_tl_endfor;
1910 
1911  m0_rm_ur_tlist_fini(&transfers);
1912 
1913 out:
1914  m0_rm_credit_fini(&rest);
1915  return M0_RC(rc);
1916 }
1917 
1921 static void incoming_pins_del(struct m0_rm_incoming *in, uint32_t flags)
1922 {
1923  struct m0_rm_pin *pin;
1924 
1925  m0_tl_for (pi, &in->rin_pins, pin) {
1926  M0_ASSERT(m0_rm_pin_bob_check(pin));
1927  if (pin->rp_flags & flags)
1928  pin_del(pin);
1929  } m0_tl_endfor;
1930 }
1931 
1938 static void owner_balance(struct m0_rm_owner *o)
1939 {
1940  struct m0_rm_pin *pin;
1941  struct m0_rm_credit *credit;
1942  struct m0_rm_outgoing *out;
1943  struct m0_rm_incoming *in;
1944  bool todo;
1945  int prio;
1946 
1947  M0_ENTRY("owner %p", o);
1948  do {
1949  todo = false;
1950  m0_tl_for (m0_rm_ur, &o->ro_outgoing[OQS_EXCITED], credit) {
1951  M0_ASSERT(m0_rm_credit_bob_check(credit));
1952  todo = true;
1953  out = bob_of(credit, struct m0_rm_outgoing,
1954  rog_want.rl_credit, &outgoing_bob);
1955  /*
1956  * Outgoing request completes: remove all pins stuck in
1957  * and finalise them. Also pass the processing error, if
1958  * any, to the corresponding incoming structure(s).
1959  *
1960  * Removing of pins might excite incoming requests
1961  * waiting for outgoing request completion.
1962  */
1963  m0_tl_for (pr, &credit->cr_pins, pin) {
1964  int32_t out_rc =
1965  out->rog_rc == -EAGAIN ? 0 : out->rog_rc;
1966 
1967  M0_ASSERT(m0_rm_pin_bob_check(pin));
1968  M0_ASSERT(pin->rp_flags == M0_RPF_TRACK);
1969  /*
1970  * If one outgoing request has set an error,
1971  * then don't overwrite the error code. It's
1972  * possible that an error code could be
1973  * reset to 0 if other requests succeed.
1974  */
1975  pin->rp_incoming->rin_rc =
1976  pin->rp_incoming->rin_rc ?: out_rc;
1977  pin_del(pin);
1978  } m0_tl_endfor;
1979  m0_rm_ur_tlist_del(credit);
1981  } m0_tl_endfor;
1982  for (prio = ARRAY_SIZE(o->ro_incoming) - 1; prio >= 0; --prio) {
1983  m0_tl_for (m0_rm_ur,
1984  &o->ro_incoming[prio][OQS_EXCITED], credit) {
1985  todo = true;
1986  in = bob_of(credit, struct m0_rm_incoming,
1988  /*
1989  * All waits completed, go to CHECK state.
1990  */
1991  m0_rm_ur_tlist_move(
1992  &o->ro_incoming[prio][OQS_GROUND],
1993  &in->rin_want);
1995  incoming_check(in);
1996  } m0_tl_endfor;
1997  }
1998  } while (todo);
1999  /*
2000  * Check if owner needs to be finalised.
2001  */
2003  M0_LEAVE();
2004 }
2005 
2016 static void barrier_pins_del(struct m0_rm_incoming *in)
2017 {
2018  struct m0_rm_pin *in_pin;
2019  struct m0_rm_pin *cr_pin;
2020  struct m0_rm_credit *cr;
2021 
2022  m0_tl_for(pi, &in->rin_pins, in_pin) {
2023  M0_ASSERT(m0_rm_pin_bob_check(in_pin));
2024  if (in_pin->rp_flags & M0_RPF_BARRIER) {
2025  cr = in_pin->rp_credit;
2026  m0_tl_for(pr, &cr->cr_pins, cr_pin) {
2027  if (cr_pin->rp_flags & M0_RPF_TRACK) {
2028  pin_del(cr_pin);
2029  }
2030  } m0_tl_endfor;
2031  pin_del(in_pin);
2032  }
2033  } m0_tl_endfor;
2034 }
2035 
2043 static void incoming_check(struct m0_rm_incoming *in)
2044 {
2045  struct m0_rm_credit rest;
2046  int rc;
2047 
2048  M0_ENTRY();
2049  M0_PRE(m0_rm_incoming_bob_check(in));
2050 
2051  /*
2052  * This function is reentrant. An outgoing request might set
2053  * the processing error for the incoming structure. Check for the
2054  * error. If there is an error, there is no need to continue the
2055  * processing.
2056  */
2057  if (in->rin_rc == 0) {
2058  m0_rm_credit_init(&rest, in->rin_want.cr_owner);
2059  rc = m0_rm_credit_copy(&rest, &in->rin_want) ?:
2060  incoming_check_with(in, &rest);
2061  M0_ASSERT(ergo(rc >= 0, credit_is_empty(&rest)));
2062  m0_rm_credit_fini(&rest);
2063  } else
2064  rc = in->rin_rc;
2065 
2066  if (rc != 0) {
2067  /*
2068  * Delete all PROTECT pins set by incoming request to
2069  * allow corresponding credits to be used for other incoming
2070  * requests. That prevents credit dependencies and possible
2071  * dead-locks.
2072  *
2073  * Also, request could protect some credits before failure.
2074  */
2076  }
2077 
2078  if (rc > 0) {
2080  M0_LEAVE();
2081  return;
2082  }
2083 
2091  barrier_pins_del(in);
2092 
2093  if (rc == 0) {
2096  /*
2097  * Transfer the CACHED credits to HELD list. Later
2098  * it may be subsumed by policy functions (or
2099  * vice versa). Credits are held only for local
2100  * request. For remote requests, they are removed
2101  * and converted into loans.
2102  */
2103  if (in->rin_type == M0_RIT_LOCAL)
2104  rc = cached_credits_hold(in);
2105  }
2106 
2107  /*
2108  * Check if incoming request is complete. When there is
2109  * partial failure (with part of the request failing)
2110  * of incoming request, it's necessary to check that it's
2111  * complete (and there are no outstanding outgoing requests
2112  * pending against it). On partial failure put the request in
2113  * RI_WAIT state.
2114  */
2115  if (incoming_is_complete(in)) {
2116  incoming_complete(in, rc);
2117  } else {
2118  in->rin_rc = rc;
2120  }
2121 
2122  M0_LEAVE();
2123 }
2124 
2125 /*
2126  * Checks if there are outstanding "outgoing requests" for this incoming
2127  * requests.
2128  */
2129 static bool incoming_is_complete(const struct m0_rm_incoming *in)
2130 {
2131  return incoming_pin_nr(in, M0_RPF_TRACK) == 0;
2132 }
2133 
2134 static bool credit_is_reserved(const struct m0_rm_credit *cr)
2135 {
2136  return credit_pin_nr(cr, M0_RPF_BARRIER) > 0;
2137 }
2138 
2147 static void conflict_notify(struct m0_rm_credit *credit)
2148 {
2149  struct m0_rm_pin *pin;
2150  struct m0_rm_incoming *in;
2151 
2152  M0_PRE(credit != NULL);
2153  M0_PRE(credit_pin_nr(credit, M0_RPF_PROTECT) > 0);
2154 
2155  m0_tl_for(pr, &credit->cr_pins, pin) {
2156  M0_ASSERT(m0_rm_pin_bob_check(pin));
2157  if (pin->rp_flags & M0_RPF_PROTECT) {
2158  in = pin->rp_incoming;
2159  /*
2160  * In current design rio_conflict() will never be called
2161  * before rio_complete() for any incoming request.
2162  * M0_RPF_PROTECT pins are deleted if incoming request
2163  * can't be fully granted and lifetime of such pins is
2164  * incoming_check() function. It is important to hold
2165  * this assertion to not confuse incoming request
2166  * originator.
2167  */
2169  in->rin_ops->rio_conflict(in);
2170  }
2171  } m0_tl_endfor;
2172 }
2173 
2178 static int credit_maybe_track(struct m0_rm_incoming *in,
2179  struct m0_rm_credit *cr,
2180  bool notify,
2181  int *wait)
2182 {
2183  int rc;
2184 
2186 
2187  if (!(in->rin_flags & RIF_LOCAL_TRY)) {
2188  rc = m0_rm_pin_add(in, cr, M0_RPF_TRACK);
2189  if (rc == 0 && notify)
2190  conflict_notify(cr);
2191  (*wait)++;
2192  return M0_RC(rc);
2193  } else {
2194  return M0_ERR(-EBUSY);
2195  }
2196 }
2197 
2210 static int incoming_check_held(struct m0_rm_incoming *in,
2211  struct m0_rm_credit *rest,
2212  struct m0_rm_credit *held,
2213  int *wait,
2214  bool *cr_used)
2215 {
2216  int rc = 0;
2217  /*
2218  * Note that second parameter is the whole credit, not 'rest'.
2219  * 'Rest' could be reduced the way that it doesn't conflict
2220  * with a local credit anymore, but "try/wait" flags by design
2221  * are applied to the whole requested credit.
2222  */
2223  bool conflict = credit_conflicts(held, &in->rin_want);
2224 
2225  M0_PRE(credit_intersects(held, rest));
2226 
2227  *cr_used = false;
2240  if (in->rin_type == M0_RIT_BORROW && !conflict) {
2241  return M0_RC(0);
2242  } else if (in->rin_type == M0_RIT_REVOKE ||
2243  (conflict && in->rin_flags & WAIT_TRY_FLAGS)) {
2244  rc = credit_maybe_track(in, held, true, wait);
2245  } else {
2246  /*
2247  * This is the case of a LOCAL request, when we
2248  * don't have to wait until the credit is released.
2249  */
2250  M0_ASSERT(in->rin_type == M0_RIT_LOCAL &&
2251  (!conflict || (in->rin_flags & WAIT_TRY_FLAGS) == 0));
2252  }
2253 
2254  *cr_used = (rc == 0);
2255  return M0_RC(rc);
2256 }
2257 
2261 static bool has_reserve_priority(struct m0_rm_incoming *in1,
2262  struct m0_rm_incoming *in2)
2263 {
2264  struct m0_rm_reserve_prio *pr1 = &in1->rin_reserve;
2265  struct m0_rm_reserve_prio *pr2 = &in2->rin_reserve;
2266  int cmp;
2267 
2268  M0_PRE(in1 != in2);
2271  M0_PRE(in1->rin_flags & RIF_RESERVE);
2272  M0_PRE(in2->rin_flags & RIF_RESERVE);
2273 
2274  cmp = M0_3WAY(pr1->rrp_time, pr2->rrp_time) ?:
2275  m0_fid_cmp(&pr1->rrp_owner, &pr2->rrp_owner) ?:
2276  M0_3WAY(pr1->rrp_seq, pr2->rrp_seq);
2277  M0_ASSERT(cmp != 0);
2278  return cmp < 0;
2279 }
2280 
2287  struct m0_rm_credit *cr,
2288  int *wait)
2289 {
2290  struct m0_rm_pin *pin;
2291  int rc = 0;
2292 
2293  pin = m0_tl_find(pr, p, &cr->cr_pins, p->rp_flags & M0_RPF_BARRIER);
2294  if (pin != NULL){
2295  if (in == pin->rp_incoming)
2296  return M0_RC(0);
2297 
2298  if (!(in->rin_flags & RIF_RESERVE) ||
2299  has_reserve_priority(pin->rp_incoming, in)) {
2300  rc = credit_maybe_track(in, cr, false, wait);
2301  } else {
2302  /* That can be a second M0_RPF_TRACK pin added
2303  * to a credit from a single request */
2304  pin->rp_flags &= ~M0_RPF_BARRIER;
2305  pin->rp_flags |= M0_RPF_TRACK;
2306  rc = m0_rm_pin_add(in, cr, M0_RPF_BARRIER);
2307  }
2308  } else if (in->rin_flags & RIF_RESERVE) {
2309  rc = m0_rm_pin_add(in, cr, M0_RPF_BARRIER);
2310  }
2311  return M0_RC(rc);
2312 }
2313 
2341 static int incoming_check_with(struct m0_rm_incoming *in,
2342  struct m0_rm_credit *rest)
2343 {
2344  struct m0_rm_credit *want = &in->rin_want;
2345  struct m0_rm_owner *o = want->cr_owner;
2346  struct m0_rm_credit *r;
2347  struct m0_rm_loan *loan;
2348  bool group_matches;
2353  bool use_credit = false;
2354  int i;
2355  int wait = 0;
2356  int rc = 0;
2357 
2358  M0_ENTRY("incoming: %p credit: %llu", in,
2359  (long long unsigned) rest->cr_datum);
2361  &o->ro_incoming[in->rin_priority][OQS_GROUND], want));
2362 
2363  /*
2364  * 1. Scan owned lists first. Check for "local" wait/try conditions.
2365  */
2366  for (i = ARRAY_SIZE(o->ro_owned) - 1; i >= 0; --i) {
2367  /*
2368  * Make sure cached credits are checked first.
2369  * It is better to use cached credit than held
2370  * if there are suitable credits in both lists.
2371  */
2373  m0_tl_for (m0_rm_ur, &o->ro_owned[i], r) {
2374  M0_ASSERT(m0_rm_credit_bob_check(r));
2375  if (!credit_intersects(r, rest))
2376  continue;
2377 
2378  if (i == OWOS_HELD)
2379  rc = incoming_check_held(in, rest, r, &wait,
2380  &use_credit);
2381  else
2382  use_credit = true;
2383 
2384  if (rc == 0 && use_credit) {
2385  rc = credit_reservation_check(in, r, &wait);
2386  if (rc == 0 && wait == 0)
2387  rc = m0_rm_pin_add(in, r,
2388  M0_RPF_PROTECT);
2389  rc = rc ?: credit_diff(rest, r);
2390  }
2391 
2392  if (rc != 0)
2393  return M0_RC(rc);
2394  } m0_tl_endfor;
2395  }
2396 
2397  /*
2398  * 2. If the credit request cannot still be satisfied, check against the
2399  * sublet list.
2400  */
2401  if (!credit_is_empty(rest)) {
2402  m0_tl_for (m0_rm_ur, &o->ro_sublet, r) {
2403  M0_ASSERT(m0_rm_credit_bob_check(r));
2405  if (!credit_intersects(r, rest))
2406  continue;
2407 
2408  group_matches =
2409  !credit_group_conflict(&r->cr_group_id,
2410  &rest->cr_group_id);
2411  if (group_matches) {
2412  rc = credit_diff(rest, r);
2413  if (rc != 0)
2414  return M0_RC(rc);
2415  if (!credit_is_empty(rest))
2416  continue;
2417  else
2418  break;
2419  } else if (!(in->rin_flags & RIF_MAY_REVOKE))
2420  return M0_ERR(-EREMOTE);
2421 
2422  loan = bob_of(r, struct m0_rm_loan, rl_credit,
2423  &loan_bob);
2424  /*
2425  * It is possible that this loop emits multiple
2426  * outgoing requests toward the same remote
2427  * owner. Don't bother to coalesce them here.
2428  * The rpc layer would do this more efficiently.
2429  *
2430  * @todo use rpc grouping here.
2431  */
2432  /*
2433  * @todo - Revoke entire loan?? rest could be
2434  * subset of r.
2435  */
2436  wait++;
2437  rc = revoke_send(in, loan, r) ?:
2438  credit_diff(rest, r);
2439  if (rc != 0)
2440  return M0_ERR(rc);
2441  } m0_tl_endfor;
2442  }
2443 
2444  /*
2445  * 3. If the credit request still cannot be satisfied, check
2446  * if it's possible borrow remaining credit from the creditor.
2447  */
2448  if (!credit_is_empty(rest)) {
2449  if (o->ro_creditor != NULL) {
2450  if (!(in->rin_flags & RIF_MAY_BORROW))
2451  return M0_ERR(-EREMOTE);
2452  wait++;
2453  rc = borrow_send(in, rest);
2454  } else
2455  rc = -ESRCH;
2456  }
2457 
2458  return M0_RC(rc ?: wait);
2459 }
2460 
2465 M0_INTERNAL void m0_rm_outgoing_complete(struct m0_rm_outgoing *og)
2466 {
2467  struct m0_rm_owner *owner;
2468 
2469  M0_ENTRY();
2470  M0_PRE(og != NULL);
2471 
2472  owner = og->rog_want.rl_credit.cr_owner;
2473  m0_rm_ur_tlist_move(&owner->ro_outgoing[OQS_EXCITED],
2474  &og->rog_want.rl_credit);
2475  owner_balance(owner);
2476  M0_LEAVE();
2477 }
2478 M0_EXPORTED(m0_rm_outgoing_complete);
2479 
2486 static void incoming_complete(struct m0_rm_incoming *in, int32_t rc)
2487 {
2488  struct m0_rm_owner *owner = in->rin_want.cr_owner;
2489 
2490  M0_ENTRY("incoming: %p error: [%d]", in, rc);
2491  M0_PRE(in->rin_ops != NULL);
2492  M0_PRE(in->rin_ops->rio_complete != NULL);
2493  M0_PRE(M0_IN(incoming_state(in), (RI_CHECK, RI_INITIALISED)));
2494  M0_PRE(rc <= 0);
2495  M0_PRE_EX(!m0_rm_ur_tlink_is_in(&in->rin_want) ||
2497  &owner->ro_incoming[in->rin_priority][OQS_GROUND],
2498  &in->rin_want));
2499 
2500  in->rin_rc = rc;
2501  M0_LOG(M0_DEBUG, "Incoming req: %p, state change:[%s -> %s]\n",
2502  in, m0_sm_state_name(&in->rin_sm, in->rin_sm.sm_state),
2503  m0_sm_state_name(&in->rin_sm,
2504  rc == 0 ? RI_SUCCESS : RI_FAILURE));
2505  m0_sm_move(&in->rin_sm, rc, rc == 0 ? RI_SUCCESS : RI_FAILURE);
2506  if (rc != 0) {
2507  incoming_release(in);
2508  m0_rm_ur_tlist_remove(&in->rin_want);
2509  M0_ASSERT(pi_tlist_is_empty(&in->rin_pins));
2510  }
2512  in->rin_ops->rio_complete(in, rc);
2513  M0_POST(owner_invariant(owner));
2514  M0_LEAVE();
2515 }
2516 
2517 static void incoming_policy_none(struct m0_rm_incoming *in)
2518 {
2519 }
2520 
2521 static void incoming_policy_apply(struct m0_rm_incoming *in)
2522 {
2523  static void (*generic[RIP_NR])(struct m0_rm_incoming *) = {
2529  };
2530 
2531  if (IS_IN_ARRAY(in->rin_policy, generic))
2532  generic[in->rin_policy](in);
2533  else {
2534  struct m0_rm_resource *resource = incoming_to_resource(in);
2535 
2536  resource->r_ops->rop_policy(resource, in);
2537  }
2538 }
2539 
2544 static int outgoing_check(struct m0_rm_incoming *in,
2545  enum m0_rm_outgoing_type otype,
2546  struct m0_rm_credit *credit,
2547  struct m0_rm_remote *other)
2548 {
2549  int i;
2550  int rc = 0;
2551  struct m0_rm_owner *owner = in->rin_want.cr_owner;
2552  struct m0_rm_credit *scan;
2553  struct m0_rm_outgoing *out;
2554 
2555  M0_ENTRY();
2556  for (i = 0; i < ARRAY_SIZE(owner->ro_outgoing); ++i) {
2557  m0_tl_for (m0_rm_ur, &owner->ro_outgoing[i], scan) {
2558  M0_ASSERT(m0_rm_credit_bob_check(scan));
2559  out = bob_of(scan, struct m0_rm_outgoing,
2561  if (out->rog_type == otype &&
2562  credit_intersects(scan, credit)) {
2563  M0_LOG(M0_DEBUG, "want %p, other %p",
2564  out->rog_want.rl_other, other);
2565  /*
2566  * Conflicting credits are always requested from
2567  * the same remote, since only one owner can
2568  * possess this credit at a time. It's also
2569  * impossible that 'credit' is composed of
2570  * several parts owned by different remotes.
2571  * Revoke requests are already split by loans,
2572  * borrow requests always go to a single
2573  * creditor.
2574  */
2576  out->rog_want.rl_other == other));
2577  /*
2578  * There is only one creditor, all borrow
2579  * requests should go to him
2580  */
2581  M0_ASSERT(ergo(otype == M0_ROT_BORROW,
2582  out->rog_want.rl_other == other));
2583  /*
2584  * Intersecting non-conflicting credits can be
2585  * owned by several remotes in case of revoke.
2586  * We should revoke all these loans.
2587  */
2588  if (out->rog_want.rl_other == other)
2589  {
2594  rc = m0_rm_pin_add(in, scan,
2595  M0_RPF_TRACK) ?:
2596  credit_diff(credit, scan);
2597  if (rc != 0)
2598  return M0_ERR(rc);
2599  }
2600  }
2601  } m0_tl_endfor;
2602  }
2603  return M0_RC(rc);
2604 }
2605 
2611 static int revoke_send(struct m0_rm_incoming *in,
2612  struct m0_rm_loan *loan,
2613  struct m0_rm_credit *credit)
2614 {
2615  struct m0_rm_credit rest;
2616  struct m0_rm_remote *other;
2617  int rc;
2618 
2619  M0_ENTRY("incoming: %p credit: %llu", in,
2620  (long long unsigned)credit->cr_datum);
2621  M0_PRE(loan != NULL);
2622 
2623  /*
2624  * Credit is part of sublet loan (sent from incoming_check_with()).
2625  * outgoing_check() destructively modifies outgoing credit. Hence,
2626  * make a copy.
2627  */
2628  other = loan->rl_other;
2629  m0_rm_credit_init(&rest, in->rin_want.cr_owner);
2630  rc = m0_rm_credit_copy(&rest, credit) ?:
2631  outgoing_check(in, M0_ROT_REVOKE, &rest, other);
2632  if (!credit_is_empty(&rest) && rc == 0)
2634  loan, &rest, other);
2635 
2636  m0_rm_credit_fini(&rest);
2637  return M0_RC(rc);
2638 }
2639 
2644 static int borrow_send(struct m0_rm_incoming *in, struct m0_rm_credit *credit)
2645 {
2646  int rc;
2647  struct m0_rm_remote *other = in->rin_want.cr_owner->ro_creditor;
2648 
2649  M0_ENTRY("incoming: %p credit: %llu", in,
2650  (long long unsigned) credit->cr_datum);
2651  M0_PRE(other != NULL);
2652 
2653  /*
2654  * Borrow sends the remaining credit request to the creditor. It's
2655  * ok if outgoing_check() modifies it. It goes well with
2656  * incoming_check_with() - which keeps on reducing the request set.
2657  */
2658  rc = outgoing_check(in, M0_ROT_BORROW, credit, other);
2659  /*
2660  * Sends the entire credit request to the creditor. Empty the credit.
2661  */
2662  if (!credit_is_empty(credit) && rc == 0)
2664  NULL, credit, other) ?:
2665  credit_diff(credit, credit);
2666  return M0_RC(rc);
2667 }
2668 
2672 static int cancel_send(struct m0_rm_loan *loan)
2673 {
2674  int rc;
2675 
2676  M0_ENTRY();
2677  M0_PRE(loan != NULL);
2678  M0_LOG(M0_DEBUG, "credit: %llu", (long long unsigned)
2679  loan->rl_credit.cr_datum);
2680 
2682  &loan->rl_credit, loan->rl_other);
2683  return M0_RC(rc);
2684 }
2685 
2690 M0_INTERNAL int m0_rm_owner_loan_debit(struct m0_rm_owner *owner,
2691  struct m0_rm_loan *paid_loan,
2692  struct m0_tl *list)
2693 {
2694  struct m0_rm_credit *cr;
2695  struct m0_rm_loan *loan;
2696  struct m0_rm_loan *remnant_loan;
2697  struct m0_tl retain_list;
2698  struct m0_tl remove_list;
2699  int rc = 0;
2700 
2701  M0_PRE(owner != NULL);
2702  M0_PRE(paid_loan != NULL);
2703  M0_ENTRY("loan=%p credit: %llu", paid_loan,
2704  (long long unsigned) paid_loan->rl_credit.cr_datum);
2705 
2706  m0_rm_ur_tlist_init(&retain_list);
2707  m0_rm_ur_tlist_init(&remove_list);
2708  m0_tl_for (m0_rm_ur, list, cr) {
2709  if (!cr->cr_ops->cro_intersects(&paid_loan->rl_credit, cr))
2710  continue;
2711 
2712  loan = bob_of(cr, struct m0_rm_loan, rl_credit, &loan_bob);
2713 
2714  if (!m0_cookie_is_eq(&loan->rl_cookie, &paid_loan->rl_cookie))
2715  continue;
2716 
2717  rc = remnant_loan_get(loan, &paid_loan->rl_credit,
2718  &remnant_loan);
2719  if (rc != 0)
2720  break;
2721 
2722  if (credit_is_empty(&remnant_loan->rl_credit))
2723  m0_rm_ur_tlist_add(&remove_list, &remnant_loan->rl_credit);
2724  else
2725  m0_rm_ur_tlist_add(&retain_list, &remnant_loan->rl_credit);
2726 
2727  m0_rm_ur_tlist_move(&remove_list, cr);
2728  } m0_tl_endfor;
2729  /*
2730  * On successful completion, remove the credits from the "remove-list"
2731  * and move the remnant credits to the 'list'. Do the opposite
2732  * on failure.
2733  */
2734  m0_tl_teardown(m0_rm_ur, rc ? &retain_list : &remove_list, cr) {
2735  loan = bob_of(cr, struct m0_rm_loan, rl_credit, &loan_bob);
2736  m0_rm_loan_fini(loan);
2737  m0_free(loan);
2738  }
2739 
2740  m0_tl_for (m0_rm_ur, rc ? &remove_list : &retain_list, cr) {
2741  m0_rm_ur_tlist_move(list, cr);
2742  } m0_tl_endfor;
2743 
2744  m0_rm_ur_tlist_fini(&retain_list);
2745  m0_rm_ur_tlist_fini(&remove_list);
2746  return M0_RC(rc);
2747 }
2748 M0_EXPORTED(m0_rm_owner_loan_debit);
2749 
2750 M0_INTERNAL int granted_maybe_reserve(struct m0_rm_credit *granted,
2751  struct m0_rm_credit *to_cache)
2752 {
2753  struct m0_rm_pin *pin;
2754  struct m0_rm_incoming *curr;
2755  struct m0_rm_incoming *best = NULL;
2756  int rc = 0;
2757 
2758  M0_ENTRY("granted %p to_cache %p", granted, to_cache);
2759  /*
2760  * First iteration: find request with highest reserve priority.
2761  */
2762  m0_tl_for(pr, &granted->cr_pins, pin) {
2763  M0_ASSERT(m0_rm_pin_bob_check(pin));
2764  curr = pin->rp_incoming;
2765  if ((curr->rin_flags & RIF_RESERVE) &&
2766  (best == NULL || has_reserve_priority(curr, best)))
2767  best = curr;
2768  } m0_tl_endfor;
2769 
2770  if (best == NULL)
2771  return M0_RC(0);
2772  /*
2773  * Second iteration: add M0_RPF_BARRIER to 'best'
2774  * and M0_RPF_TRACK pins for others.
2775  */
2776  m0_tl_for(pr, &granted->cr_pins, pin) {
2777  curr = pin->rp_incoming;
2778  rc = m0_rm_pin_add(curr, to_cache,
2779  (curr == best) ?
2781  if (rc != 0)
2782  break;
2783  } m0_tl_endfor;
2784 
2785  return M0_RC(rc);
2786 }
2787 
2788 /* Checks if the loaned credit can be cached */
2789 static int loan_check(struct m0_rm_owner *owner,
2790  struct m0_tl *list,
2791  struct m0_rm_credit *rest)
2792 {
2793  int rc = 0;
2794  struct m0_rm_credit *cr;
2795 
2796  M0_ENTRY("loan check against credit: %llu",
2797  (long long unsigned) rest->cr_datum);
2798  m0_tl_for (m0_rm_ur, list, cr) {
2799  /*
2800  * Credits that are granted to the owners from the same group
2801  * never conflicts, so intersecting (or even conflicting)
2802  * credits can be potentially granted to several owners.
2803  * Don't put the credit to cached list until all copies given
2804  * to the owners from the same group are returned back.
2805  */
2806  if (cr->cr_ops->cro_intersects(rest, cr) &&
2808  &cr->cr_group_id))
2809  {
2810  rc = credit_diff(rest, cr);
2811  if (rc != 0 || credit_is_empty(rest))
2812  break;
2813  }
2814  } m0_tl_endfor;
2815  return M0_RC(rc);
2816 }
2817 
2818 M0_INTERNAL int m0_rm_loan_settle(struct m0_rm_owner *owner,
2819  struct m0_rm_loan *loan)
2820 {
2821  int rc;
2822  struct m0_rm_credit *cached = NULL;
2823 
2824  M0_PRE(owner != NULL);
2825  M0_PRE(loan != NULL);
2826 
2827  rc = m0_rm_credit_dup(&loan->rl_credit, &cached) ?:
2828  granted_maybe_reserve(&loan->rl_credit, cached);
2829  if (rc != 0) {
2830  M0_LOG(M0_ERROR, "credit allocation failed: rc=%d", rc);
2831  return M0_RC(rc);
2832  }
2833 
2834  rc = m0_rm_owner_loan_debit(owner, loan, &owner->ro_sublet) ?:
2835  loan_check(owner, &owner->ro_sublet, cached);
2836  if (rc != 0 || credit_is_empty(cached)) {
2837  m0_free(cached);
2838  if (rc != 0)
2839  M0_LOG(M0_ERROR, "credit removal failed: rc=%d", rc);
2840  return M0_RC(rc);
2841  }
2842 
2843  /* @todo */
2844  cached->cr_group_id = m0_rm_no_group;
2845  m0_rm_ur_tlist_add(&owner->ro_owned[OWOS_CACHED], cached);
2846  M0_LOG(M0_INFO, "credit cached");
2847 
2848  return M0_RC(rc);
2849 }
2850 M0_EXPORTED(m0_rm_loan_settle);
2851 
2868 static bool resource_list_check(const struct m0_rm_resource *res, void *datum)
2869 {
2870  const struct m0_rm_resource_type *rt = datum;
2871 
2872  return m0_rm_resource_find(rt, res) == res && res->r_type == rt;
2873 }
2874 
2876 {
2877  struct m0_rm_domain *dom = rt->rt_dom;
2878  const struct m0_tl *rlist = &rt->rt_resources;
2879 
2880  return
2881  res_tlist_invariant_ext(rlist, resource_list_check,
2882  (void *)rt) &&
2883  rt->rt_nr_resources == res_tlist_length(rlist) &&
2884  dom != NULL && IS_IN_ARRAY(rt->rt_id, dom->rd_types) &&
2885  dom->rd_types[rt->rt_id] == rt;
2886 }
2887 
2888 static bool pin_check(const void *bob)
2889 {
2890  const struct m0_rm_pin *pin = bob;
2891 
2892  return pin->rp_credit != NULL &&
2893  pin->rp_incoming != NULL &&
2894  M0_IN(pin->rp_flags,
2896 }
2897 
2901 static bool incoming_invariant(const struct m0_rm_incoming *in)
2902 {
2903  return
2904  _0C((in->rin_rc != 0) == (incoming_state(in) == RI_FAILURE)) &&
2907  /* RIF_LOCAL_WAIT and RIF_LOCAL_TRY can't be set together */
2909  /* M0_RIT_BORROW and M0_RIT_REVOKE should have exactly one of
2910  * RIF_LOCAL_WAIT, RIF_LOCAL_TRY flags set */
2911  _0C(ergo(M0_IN(in->rin_type, (M0_RIT_BORROW, M0_RIT_REVOKE)),
2912  m0_is_po2(in->rin_flags & WAIT_TRY_FLAGS))) &&
2914  in->rin_want.cr_owner->ro_incoming)) &&
2915  /* a request can be in "check" state only during owner_balance()
2916  execution. */
2917  _0C(incoming_state(in) != RI_CHECK) &&
2918  _0C(pi_tlist_invariant(&in->rin_pins)) &&
2919  /* a request in the WAIT state... */
2920  _0C(ergo(incoming_state(in) == RI_WAIT,
2921  /* waits on something... */
2922  incoming_pin_nr(in, M0_RPF_TRACK) > 0 &&
2923  /* and doesn't hold anything. */
2924  incoming_pin_nr(in, M0_RPF_PROTECT) == 0)) &&
2925  /* a fulfilled request... */
2927 #if 0
2928  /* holds something... */
2929  incoming_pin_nr(in, M0_RPF_PROTECT) > 0 &&
2930 #endif
2931  /* and waits on nothing. */
2932  incoming_pin_nr(in, M0_RPF_TRACK) == 0)) &&
2933  _0C(ergo(incoming_state(in) == RI_FAILURE ||
2935  incoming_pin_nr(in, ~0) == 0)) &&
2936  _0C(pr_tlist_is_empty(&in->rin_want.cr_pins));
2937 }
2938 
2946 };
2947 
2954 };
2955 
2956 static bool credit_invariant(const struct m0_rm_credit *credit, void *data)
2957 {
2958  struct owner_invariant_state *is =
2959  (struct owner_invariant_state *) data;
2960  return
2961  /* only held credits have PROTECT pins */
2962  _0C(ergo((is->is_phase == OIS_OWNED &&
2963  is->is_owned_idx == OWOS_HELD),
2964  credit_pin_nr(credit, M0_RPF_PROTECT) > 0)) &&
2965  /* only held/cached credits can be reserved */
2966  _0C(ergo(credit_is_reserved(credit),
2967  (is->is_phase == OIS_OWNED))) &&
2968  _0C(ergo(is->is_phase == OIS_INCOMING,
2969  incoming_invariant(cr2in(credit)))) &&
2970  _0C(credit_pin_nr(credit, M0_RPF_BARRIER) <= 1);
2971 }
2972 
2973 static bool conflict_exists(const struct m0_rm_credit *cr,
2974  const struct m0_rm_owner *owner)
2975 {
2976  return m0_exists(i, ARRAY_SIZE(owner->ro_owned),
2977  m0_tl_exists(m0_rm_ur, c2, &owner->ro_owned[i],
2978  cr != c2 && cr->cr_ops->cro_conflicts(cr, c2)));
2979 }
2980 
2984 static bool owner_invariant_state(const struct m0_rm_owner *owner,
2985  struct owner_invariant_state *is)
2986 {
2987  int i;
2988 
2989  /*
2990  * Iterate over all credits lists:
2991  *
2992  * - checking their consistency as double-linked lists
2993  * (m0_rm_ur_tlist_invariant_ext());
2994  *
2995  * - making additional consistency checks:
2996  *
2997  * - that a credit is for the same resource as the owner,
2998  *
2999  * - that a credit on m0_rm_owner::ro_owned[X] is pinned iff X
3000  * == OWOS_HELD.
3001  *
3002  * - accumulating total credit and debit.
3003  */
3004  is->is_phase = OIS_BORROWED;
3005  if (!m0_rm_ur_tlist_invariant_ext(&owner->ro_borrowed,
3006  &credit_invariant, (void *)is))
3007  return M0_ERR(false);
3008  is->is_phase = OIS_SUBLET;
3009  if (!m0_rm_ur_tlist_invariant_ext(&owner->ro_sublet,
3010  &credit_invariant, (void *)is))
3011  return M0_ERR(false);
3012  is->is_phase = OIS_OUTGOING;
3013  if (!m0_rm_ur_tlist_invariant_ext(&owner->ro_outgoing[0],
3014  &credit_invariant, (void *)is))
3015  return M0_ERR(false);
3016 
3017  is->is_phase = OIS_OWNED;
3018  for (i = 0; i < ARRAY_SIZE(owner->ro_owned); ++i) {
3019  is->is_owned_idx = i;
3020  if (!m0_rm_ur_tlist_invariant_ext(&owner->ro_owned[i],
3021  &credit_invariant, (void *)is))
3022  return M0_ERR(false);
3023  }
3024  is->is_phase = OIS_INCOMING;
3025 
3026  /* No any pair of owned credits conflicts */
3027  if (m0_exists(i, ARRAY_SIZE(owner->ro_owned),
3028  m0_tl_exists(m0_rm_ur, cr, &owner->ro_owned[i],
3029  conflict_exists(cr, owner))))
3030  return M0_ERR(false);
3031 
3032  /* Calculate credit */
3033  return M0_RC(_0C(m0_forall(i, ARRAY_SIZE(owner->ro_incoming),
3034  m0_forall(j, ARRAY_SIZE(owner->ro_incoming[i]),
3035  m0_rm_ur_tlist_invariant
3036  (&owner->ro_incoming[i][j])))) &&
3037  _0C(m0_forall(i, ARRAY_SIZE(owner->ro_owned),
3038  !m0_tl_exists(m0_rm_ur, credit, &owner->ro_owned[i],
3039  credit->cr_ops->cro_join(&is->is_credit,
3040  credit)))) &&
3041  _0C(!m0_tl_exists(m0_rm_ur, credit, &owner->ro_sublet,
3042  credit->cr_ops->cro_join(&is->is_credit,
3043  credit))) &&
3044  /* Calculate debit */
3045  _0C(!m0_tl_exists(m0_rm_ur, credit, &owner->ro_borrowed,
3046  credit->cr_ops->cro_join(&is->is_debit, credit))));
3047 }
3048 
3052 static bool owner_invariant(struct m0_rm_owner *owner)
3053 {
3054  bool rc;
3055  struct owner_invariant_state is;
3056 
3057  M0_ENTRY("owner %p", owner);
3058  M0_ASSERT(m0_fid_is_set(&owner->ro_fid));
3060  M0_SET0(&is);
3061  m0_rm_credit_init(&is.is_debit, owner);
3062  m0_rm_credit_init(&is.is_credit, owner);
3063 
3064  rc = owner_invariant_state(owner, &is) &&
3065  (ergo(owner_state(owner) == ROS_ACTIVE,
3066  credit_eq(&is.is_debit, &is.is_credit)));
3067 
3070  return M0_RC(rc);
3071 }
3072 
3084 static int credit_pin_nr(const struct m0_rm_credit *credit, uint32_t flags)
3085 {
3086  int nr = 0;
3087  struct m0_rm_pin *pin;
3088 
3089  m0_tl_for (pr, &credit->cr_pins, pin) {
3090  M0_ASSERT(m0_rm_pin_bob_check(pin));
3091  if (pin->rp_flags & flags)
3092  ++nr;
3093  } m0_tl_endfor;
3094  return nr;
3095 }
3096 
3101 static int incoming_pin_nr(const struct m0_rm_incoming *in, uint32_t flags)
3102 {
3103  int nr;
3104  struct m0_rm_pin *pin;
3105 
3106  nr = 0;
3107  m0_tl_for (pi, &in->rin_pins, pin) {
3108  M0_ASSERT(m0_rm_pin_bob_check(pin));
3109  if (pin->rp_flags & flags)
3110  ++nr;
3111  } m0_tl_endfor;
3112  return nr;
3113 }
3114 
3119 static void incoming_release(struct m0_rm_incoming *in)
3120 {
3121  struct m0_rm_pin *kingpin;
3122  struct m0_rm_pin *pin;
3123  struct m0_rm_credit *credit;
3124  struct m0_rm_owner *o = in->rin_want.cr_owner;
3125 
3126  M0_ENTRY("incoming: %p", in);
3128 
3129  m0_tl_for (pi, &in->rin_pins, kingpin) {
3130  M0_ASSERT(m0_rm_pin_bob_check(kingpin));
3131  if (kingpin->rp_flags & M0_RPF_PROTECT) {
3132  credit = kingpin->rp_credit;
3133  /*
3134  * If this was the last protecting pin, wake up incoming
3135  * requests waiting on this credit release.
3136  */
3137  if (credit_pin_nr(credit, M0_RPF_PROTECT) == 1) {
3138  /*
3139  * Move the credit back to the CACHED list.
3140  */
3141  m0_rm_ur_tlist_move(&o->ro_owned[OWOS_CACHED],
3142  credit);
3143  /*
3144  * I think we are introducing "thundering herd"
3145  * problem here.
3146  */
3147  m0_tl_for (pr, &credit->cr_pins, pin) {
3148  M0_ASSERT(m0_rm_pin_bob_check(pin));
3149  if (pin->rp_flags & M0_RPF_TRACK)
3150  pin_del(pin);
3151  } m0_tl_endfor;
3152  }
3153  }
3154  pin_del(kingpin);
3155  } m0_tl_endfor;
3156  M0_LEAVE();
3157 }
3158 
3165 static void pin_del(struct m0_rm_pin *pin)
3166 {
3167  struct m0_rm_incoming *in;
3168  struct m0_rm_owner *owner;
3169 
3170  M0_ENTRY("pin %p", pin);
3171  M0_ASSERT(pin != NULL);
3172 
3173  in = pin->rp_incoming;
3174  owner = in->rin_want.cr_owner;
3176  pi_tlink_del_fini(pin);
3177  pr_tlink_del_fini(pin);
3178  if (incoming_pin_nr(in, M0_RPF_TRACK) == 0 &&
3179  pin->rp_flags & M0_RPF_TRACK) {
3180  /*
3181  * Last tracking pin removed, excite the request.
3182  */
3183  M0_LOG(M0_INFO, "Exciting incoming: %p\n", in);
3185  m0_rm_ur_tlist_move(
3186  &owner->ro_incoming[in->rin_priority][OQS_EXCITED],
3187  &in->rin_want);
3188  }
3189  m0_rm_pin_bob_fini(pin);
3190  m0_free(pin);
3191  M0_LEAVE();
3192 }
3193 
3198 M0_INTERNAL int m0_rm_pin_add(struct m0_rm_incoming *in,
3199  struct m0_rm_credit *credit,
3200  uint32_t flags)
3201 {
3202  struct m0_rm_pin *pin;
3203 
3204  M0_ENTRY("in %p credit %p flags %u", in, credit, flags);
3205 
3206  M0_ALLOC_PTR(pin);
3207  if (pin != NULL) {
3208  pin->rp_flags = flags;
3209  pin->rp_credit = credit;
3210  pin->rp_incoming = in;
3211  pr_tlink_init(pin);
3212  pi_tlink_init(pin);
3213  pr_tlist_add(&credit->cr_pins, pin);
3214  pi_tlist_add(&in->rin_pins, pin);
3215  m0_rm_pin_bob_init(pin);
3216  M0_LEAVE("new pin %p", pin);
3217  return M0_RC(0);
3218  } else
3219  return M0_ERR(-ENOMEM);
3220 }
3221 
3230 static bool credit_intersects(const struct m0_rm_credit *A,
3231  const struct m0_rm_credit *B)
3232 {
3233  M0_PRE(A->cr_ops != NULL);
3234  M0_PRE(A->cr_ops->cro_intersects != NULL);
3235 
3236  return A->cr_ops->cro_intersects(A, B);
3237 }
3238 
3239 static bool credit_conflicts(const struct m0_rm_credit *A,
3240  const struct m0_rm_credit *B)
3241 {
3242  M0_PRE(A->cr_ops != NULL);
3243  M0_PRE(A->cr_ops->cro_conflicts != NULL);
3244 
3245  return A->cr_ops->cro_conflicts(A, B) ?
3246  credit_group_conflict(&A->cr_group_id, &B->cr_group_id) : false;
3247 }
3248 
3249 
3250 static int credit_diff(struct m0_rm_credit *c0, const struct m0_rm_credit *c1)
3251 {
3252  M0_PRE(c0->cr_ops != NULL);
3253  M0_PRE(c0->cr_ops->cro_diff != NULL);
3254 
3255  return c0->cr_ops->cro_diff(c0, c1);
3256 }
3257 
3258 static bool credit_eq(const struct m0_rm_credit *c0,
3259  const struct m0_rm_credit *c1)
3260 {
3261  int rc;
3262  bool res;
3263  struct m0_rm_credit credit;
3264 
3265  /* no apples and oranges comparison. */
3266  M0_PRE(c0->cr_owner == c1->cr_owner);
3267  m0_rm_credit_init(&credit, c0->cr_owner);
3268  rc = m0_rm_credit_copy(&credit, c0);
3269  rc = rc ?: credit_diff(&credit, c1);
3270 
3271  res = rc ? false : credit_is_empty(&credit);
3272  m0_rm_credit_fini(&credit);
3273 
3274  return res;
3275 }
3276 
3277 /*
3278  * Allocates a new credit and calculates the difference between src and diff.
3279  * Stores the diff(src, diff) in the newly allocated credit.
3280  */
3281 static int remnant_credit_get(const struct m0_rm_credit *src,
3282  const struct m0_rm_credit *diff,
3283  struct m0_rm_credit **remnant_credit)
3284 {
3285  struct m0_rm_credit *new_credit;
3286  int rc;
3287 
3288  M0_ENTRY("splitting credits %llu and %llu",
3289  (long long unsigned) src->cr_datum,
3290  (long long unsigned) diff->cr_datum);
3291  M0_PRE(remnant_credit != NULL);
3292  M0_PRE(src != NULL);
3293  M0_PRE(diff != NULL);
3294 
3295  rc = m0_rm_credit_dup(src, &new_credit) ?:
3296  credit_diff(new_credit, diff);
3297  if (rc != 0 && new_credit != NULL) {
3298  m0_rm_credit_fini(new_credit);
3299  m0_free0(&new_credit);
3300  }
3301  *remnant_credit = new_credit;
3302  return M0_RC(rc);
3303 }
3304 
3308 M0_INTERNAL int m0_rm_credit_dup(const struct m0_rm_credit *src_credit,
3309  struct m0_rm_credit **dest_credit)
3310 {
3311  struct m0_rm_credit *credit;
3312  int rc = -ENOMEM;
3313 
3314  M0_ENTRY("src=%p", src_credit);
3315  M0_PRE(src_credit != NULL);
3316 
3317  M0_ALLOC_PTR(credit);
3318  if (credit != NULL) {
3319  m0_rm_credit_init(credit, src_credit->cr_owner);
3320  credit->cr_ops = src_credit->cr_ops;
3321  rc = m0_rm_credit_copy(credit, src_credit);
3322  if (rc != 0) {
3323  m0_rm_credit_fini(credit);
3324  m0_free0(&credit);
3325  }
3326  }
3327  *dest_credit = credit;
3328  return M0_RC(rc);
3329 }
3330 M0_EXPORTED(m0_rm_credit_dup);
3331 
3332 M0_INTERNAL int
3334 {
3335  M0_PRE(src != NULL);
3337 
3338  dst->cr_group_id = src->cr_group_id;
3339  return src->cr_ops->cro_copy(dst, src);
3340 }
3341 
3345 static bool credit_is_empty(const struct m0_rm_credit *credit)
3346 {
3347  return credit->cr_datum == 0;
3348 }
3349 
3350 M0_INTERNAL int m0_rm_credit_encode(struct m0_rm_credit *credit,
3351  struct m0_buf *buf)
3352 {
3353  struct m0_bufvec datum_buf;
3354  struct m0_bufvec_cursor cursor;
3355 
3356  M0_ENTRY("credit: %"PRIx64, credit->cr_datum);
3357  M0_PRE(buf != NULL);
3358  M0_PRE(credit->cr_ops != NULL);
3359  M0_PRE(credit->cr_ops->cro_len != NULL);
3360  M0_PRE(credit->cr_ops->cro_encode != NULL);
3361 
3362  buf->b_nob = credit->cr_ops->cro_len(credit);
3363  buf->b_addr = m0_alloc(buf->b_nob);
3364  if (buf->b_addr == NULL)
3365  return M0_ERR(-ENOMEM);
3366 
3367  datum_buf.ov_buf = &buf->b_addr;
3368  datum_buf.ov_vec.v_nr = 1;
3369  datum_buf.ov_vec.v_count = &buf->b_nob;
3370 
3371  m0_bufvec_cursor_init(&cursor, &datum_buf);
3372  return M0_RC(credit->cr_ops->cro_encode(credit, &cursor));
3373 }
3374 M0_EXPORTED(m0_rm_credit_encode);
3375 
3376 M0_INTERNAL int m0_rm_credit_decode(struct m0_rm_credit *credit,
3377  struct m0_buf *buf)
3378 {
3379  struct m0_bufvec datum_buf = M0_BUFVEC_INIT_BUF(&buf->b_addr,
3380  &buf->b_nob);
3381  struct m0_bufvec_cursor cursor;
3382 
3383  M0_ENTRY("credit: %llu",
3384  (long long unsigned) credit->cr_datum);
3385  M0_PRE(credit->cr_ops != NULL);
3386  M0_PRE(credit->cr_ops->cro_decode != NULL);
3387 
3388  m0_bufvec_cursor_init(&cursor, &datum_buf);
3389  return M0_RC(credit->cr_ops->cro_decode(credit, &cursor));
3390 }
3391 M0_EXPORTED(m0_rm_credit_decode);
3392 
3400 M0_INTERNAL int m0_rm_db_service_query(const char *name,
3401  struct m0_rm_remote *rem)
3402 {
3403  /* Create search query for DB using name as key and
3404  * find record and assign service ID */
3406  return 0;
3407 }
3408 
3409 M0_INTERNAL int m0_rm_remote_resource_locate(struct m0_rm_remote *rem)
3410 {
3411  /* Send resource management fop to locate resource */
3413  return 0;
3414 }
3415 
3419 static int service_locate(struct m0_rm_resource_type *rtype,
3420  struct m0_rm_remote *rem)
3421 {
3422  struct m0_clink clink;
3423  int rc;
3424 
3425  M0_PRE(m0_mutex_is_locked(&rtype->rt_lock));
3427 
3429  m0_clink_add(&rem->rem_signal, &clink);
3430  /*
3431  * DB callback should assign value to rem_service and
3432  * rem_state should be changed to REM_SERVICE_LOCATED.
3433  */
3434  rc = m0_rm_db_service_query(rtype->rt_name, rem);
3435  if (rc != 0) {
3436  M0_LOG(M0_ERROR, "m0_rm_db_service_query failed!\n");
3437  goto error;
3438  }
3439  if (rem->rem_state != REM_SERVICE_LOCATED)
3440  m0_chan_wait(&clink);
3441  if (rem->rem_state != REM_SERVICE_LOCATED)
3442  rc = -EINVAL;
3443 
3444 error:
3445  m0_clink_del(&clink);
3446  m0_clink_fini(&clink);
3447 
3448  return M0_RC(rc);
3449 }
3450 
3456 static int resource_locate(struct m0_rm_resource_type *rtype,
3457  struct m0_rm_remote *rem)
3458 {
3459  struct m0_clink clink;
3460  int rc;
3461 
3462  M0_PRE(m0_mutex_is_locked(&rtype->rt_lock));
3464 
3466  m0_clink_add(&rem->rem_signal, &clink);
3467  /*
3468  * RPC callback should assign value to rem_id and
3469  * rem_state should be set to REM_OWNER_LOCATED.
3470  */
3472  if (rc != 0) {
3473  M0_LOG(M0_ERROR, "m0_rm_remote_resource_find failed!\n");
3474  goto error;
3475  }
3476  if (rem->rem_state != REM_OWNER_LOCATED)
3477  m0_chan_wait(&clink);
3478  if (rem->rem_state != REM_OWNER_LOCATED)
3479  rc = M0_ERR(-EINVAL);
3480 error:
3481  m0_clink_del(&clink);
3482  m0_clink_fini(&clink);
3483 
3484  return M0_RC(rc);
3485 }
3486 
3487 M0_INTERNAL int m0_rm_net_locate(struct m0_rm_credit *credit,
3488  struct m0_rm_remote *other)
3489 {
3490  struct m0_rm_resource_type *rtype;
3491  struct m0_rm_resource *res;
3492  int rc;
3493 
3494  M0_PRE(other->rem_state == REM_INITIALISED);
3495 
3496  rtype = credit->cr_owner->ro_resource->r_type;
3498  rc = service_locate(rtype, other);
3499  if (rc != 0)
3500  goto error;
3501 
3502  other->rem_state = REM_OWNER_LOCATING;
3503  rc = resource_locate(rtype, other);
3504  if (rc != 0)
3505  goto error;
3506 
3507  /* Search for resource having resource id equal to remote id */
3508  m0_mutex_lock(&rtype->rt_lock);
3509  m0_tl_for (res, &rtype->rt_resources, res) {
3510  if (rtype->rt_ops->rto_is(res, other->rem_id)) {
3511  other->rem_resource = res;
3512  break;
3513  }
3514  } m0_tl_endfor;
3515  m0_mutex_unlock(&rtype->rt_lock);
3516 
3517 error:
3518  return M0_RC(rc);
3519 }
3520 M0_EXPORTED(m0_rm_net_locate);
3521 
3553 {
3554  struct m0_rm_owner *owner;
3555  struct m0_rm_credit *credit;
3556  struct m0_rm_loan *loan;
3557  int rc = 0;
3558 
3559  remote->rem_dead = true;
3560  m0_tl_for(m0_owners, &remote->rem_resource->r_local, owner) {
3561  m0_tl_for(m0_rm_ur, &owner->ro_sublet, credit) {
3562  loan = bob_of(credit, struct m0_rm_loan, rl_credit,
3563  &loan_bob);
3564  if (loan->rl_other == remote) {
3565  /* put the loan's credit back to CACHED */
3566  rc = m0_rm_loan_settle(owner, loan);
3574  if (rc != 0)
3575  M0_LOG(M0_WARN, "continuing even"
3576  " after rc = %d with ep = %s",
3578  }
3579  } m0_tl_endfor;
3580 
3581  if (owner->ro_creditor == remote &&
3582  owner_state(owner) == ROS_ACTIVE) {
3583  owner_windup_locked(owner);
3584  }
3585  } m0_tl_endfor;
3586  if (rc == 0)
3588 }
3589 
3600 {
3601  struct m0_rm_owner *owner;
3602 
3603  remote->rem_dead = false;
3604  m0_tl_for(m0_owners, &remote->rem_resource->r_local, owner) {
3605  if (owner->ro_creditor == remote &&
3606  owner_state(owner) == ROS_DEAD_CREDITOR) {
3607  owner_state_set(owner, ROS_ACTIVE);
3608  }
3609  } m0_tl_endfor;
3611 }
3612 
3614 {
3616 }
3617 
3626 static bool rm_on_remote_death_cb(struct m0_clink *link)
3627 {
3628  struct m0_rm_remote *remote;
3629  struct m0_conf_obj *obj;
3630  struct m0_rm_ha_event *event;
3631  struct m0_rm_resource_type *r_type;
3632  enum m0_ha_obj_state new_state;
3633 
3634  obj = container_of(link->cl_chan, struct m0_conf_obj, co_ha_chan);
3635  new_state = obj->co_ha_state;
3636  M0_LOG(M0_DEBUG, "link=%p new_state=%d", link, new_state);
3637  if (M0_IN(new_state, (M0_NC_FAILED,M0_NC_ONLINE))) {
3638  remote = container_of(link, struct m0_rm_remote,
3639  rem_tracker.rht_clink);
3640  M0_ALLOC_PTR(event);
3641  if (event == NULL) {
3642  M0_LOG(M0_ERROR, "Insufficient memory.");
3643  return false;
3644  }
3645  event->rhe_tracker = &remote->rem_tracker;
3646  event->rhe_state = new_state;
3647  r_type = remote->rem_resource->r_type;
3648  m0_queue_link_init(&event->rhe_link);
3649  m0_mutex_lock(&r_type->rt_queue_guard);
3651  m0_mutex_unlock(&r_type->rt_queue_guard);
3652  m0_clink_signal(&r_type->rt_sm_grp.s_clink);
3653  }
3654  return false;
3655 }
3656 
3661 /*
3662  * Local variables:
3663  * c-indentation-style: "K&R"
3664  * c-basic-offset: 8
3665  * tab-width: 8
3666  * fill-column: 80
3667  * scroll-step: 1
3668  * End:
3669  */
struct m0_rm_credit is_credit
Definition: rm.c:2952
const struct m0_rm_credit_ops * cr_ops
Definition: rm.h:502
static int m0_rm_owner_trylock(struct m0_rm_owner *owner)
Definition: rm.c:598
M0_INTERNAL int m0_rm_outgoing_init(struct m0_rm_outgoing *out, enum m0_rm_outgoing_type req_type, struct m0_rm_remote *other, struct m0_rm_credit *credit)
Definition: rm.c:1150
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
Definition: vec.h:165
M0_INTERNAL int m0_rm_owner_selfadd(struct m0_rm_owner *owner, struct m0_rm_credit *r)
Definition: rm.c:732
Definition: rm.h:886
M0_INTERNAL int m0_rm_request_out(enum m0_rm_outgoing_type otype, struct m0_rm_incoming *in, struct m0_rm_loan *loan, struct m0_rm_credit *credit, struct m0_rm_remote *other)
Definition: rm_fops.c:354
M0_INTERNAL int m0_mutex_trylock(struct m0_mutex *mutex)
Definition: mutex.c:84
static void credit_processor(struct m0_rm_resource_type *rt)
Definition: rm.c:697
static void incoming_queue(struct m0_rm_owner *owner, struct m0_rm_incoming *in)
Definition: rm.c:1737
struct m0_rm_resource * rem_resource
Definition: rm.h:752
struct m0_rm_resource_type * r_type
Definition: rm.h:304
static struct m0_addb2_philter p
Definition: consumer.c:40
static size_t nr
Definition: dump.c:1505
M0_INTERNAL void m0_chan_wait(struct m0_clink *link)
Definition: chan.c:336
void(* rio_conflict)(struct m0_rm_incoming *in)
Definition: rm.h:1498
bool(* rto_is)(const struct m0_rm_resource *resource, uint64_t id)
Definition: rm.h:459
#define M0_PRE(cond)
static int credit_diff(struct m0_rm_credit *c0, const struct m0_rm_credit *c1)
Definition: rm.c:3250
static struct m0_semaphore wait
Definition: item.c:151
static const struct m0_bob_type credit_bob
Definition: rm.c:151
static bool rev_session_clink_cb(struct m0_clink *link)
Definition: rm.c:1299
M0_INTERNAL void m0_sm_fail(struct m0_sm *mach, int fail_state, int32_t rc)
Definition: sm.c:468
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static struct m0_list list
Definition: list.c:144
enum m0_rm_remote_state rem_state
Definition: rm.h:748
static int cancel_send(struct m0_rm_loan *loan)
Definition: rm.c:2672
m0_time_t rin_req_time
Definition: rm.h:1473
M0_INTERNAL void m0_rm_ha_unsubscribe_lock(struct m0_rm_ha_tracker *tracker)
Definition: rm_ha.c:435
static int(* diff[M0_PARITY_CAL_ALGO_NR])(struct m0_parity_math *math, struct m0_buf *old, struct m0_buf *new, struct m0_buf *parity, uint32_t index)
Definition: parity_math.c:290
int const char const void size_t int flags
Definition: dir.c:328
uint64_t cr_datum
Definition: rm.h:514
M0_INTERNAL int m0_rm_owner_loan_debit(struct m0_rm_owner *owner, struct m0_rm_loan *paid_loan, struct m0_tl *list)
Definition: rm.c:2690
static void reserve_prio_set(struct m0_rm_reserve_prio *prio, m0_time_t timestamp, struct m0_rm_owner *owner)
Definition: rm.c:715
#define NULL
Definition: misc.h:38
struct m0_cookie ri_rem_owner_cookie
Definition: rm_internal.h:59
M0_INTERNAL void m0_clink_init(struct m0_clink *link, m0_chan_cb_t cb)
Definition: chan.c:201
static bool owner_invariant(struct m0_rm_owner *owner)
Definition: rm.c:3052
enum m0_rm_incoming_type rin_type
Definition: rm.h:1435
static struct m0_bufvec dst
Definition: xform.c:61
M0_INTERNAL void m0_clink_del(struct m0_clink *link)
Definition: chan.c:267
static void incoming_complete(struct m0_rm_incoming *in, int32_t rc)
Definition: rm.c:2486
bool(* cro_intersects)(const struct m0_rm_credit *self, const struct m0_rm_credit *c1)
Definition: rm.h:579
struct m0_clink rem_rev_sess_clink
Definition: rm.h:754
static struct m0_rm_remote creditor
Definition: file.c:95
struct m0_rm_remote * ro_creditor
Definition: rm.h:1026
bool res_tlist_is_empty(const struct m0_tl *list)
static bool rm_on_remote_death_cb(struct m0_clink *link)
Definition: rm.c:3626
#define ergo(a, b)
Definition: misc.h:293
static bool credit_is_reserved(const struct m0_rm_credit *cr)
Definition: rm.c:2134
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
#define M0_3WAY(v0, v1)
Definition: arith.h:199
static bool credit_intersects(const struct m0_rm_credit *A, const struct m0_rm_credit *B)
Definition: rm.c:3230
void(* rio_complete)(struct m0_rm_incoming *in, int32_t rc)
Definition: rm.h:1493
static int credit_maybe_track(struct m0_rm_incoming *in, struct m0_rm_credit *cr, bool notify, int *wait)
Definition: rm.c:2178
Definition: sm.h:350
M0_INTERNAL int m0_rm_credit_decode(struct m0_rm_credit *credit, struct m0_buf *buf)
Definition: rm.c:3376
static void pending_outgoing_send(struct m0_rm_owner *owner, struct m0_clink *link)
Definition: rm.c:1273
M0_INTERNAL bool m0_uint128_eq(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:39
m0_rm_owner_owned_state
Definition: rm.h:880
M0_INTERNAL struct m0_rm_resource_type * m0_rm_resource_type_lookup(const struct m0_rm_domain *dom, const uint64_t rtype_id)
Definition: rm.c:327
struct m0_fid rrp_owner
Definition: rm.h:1255
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
static struct m0_queue * remote_to_queue(struct m0_rm_remote *remote)
Definition: rm.c:3613
struct m0_cookie ri_loan_cookie
Definition: rm_internal.h:60
Definition: rm.c:213
struct m0_cookie rl_cookie
Definition: rm.h:1110
#define M0_CASSERT(cond)
M0_INTERNAL const struct m0_bob_type loan_bob
Definition: rm.c:179
enum m0_rm_incoming_policy rin_policy
Definition: rm.h:1447
struct m0_clink s_clink
Definition: sm.h:516
struct m0_vec ov_vec
Definition: vec.h:147
M0_INTERNAL int m0_rm_credit_dup(const struct m0_rm_credit *src_credit, struct m0_rm_credit **dest_credit)
Definition: rm.c:3308
static bool m0_is_po2(uint64_t val)
Definition: arith.h:153
static void windup_incoming_complete(struct m0_rm_incoming *in, int32_t rc)
Definition: rm.c:1133
struct m0_rm_ha_tracker rem_tracker
Definition: rm.h:784
m0_rm_outgoing_type
Definition: rm.h:1504
bool(* cro_conflicts)(const struct m0_rm_credit *self, const struct m0_rm_credit *c1)
Definition: rm.h:618
struct m0_rm_incoming * rp_incoming
Definition: rm.h:1679
static void resource_get(struct m0_rm_resource *res)
Definition: rm.c:433
Definition: rm.h:1149
static bool credit_group_conflict(const struct m0_uint128 *g1, const struct m0_uint128 *g2)
Definition: rm.c:1730
struct m0_bufvec data
Definition: di.c:40
struct m0_rm_owner * is_owner
Definition: rm.c:2953
Definition: rm.c:2943
static void rm_remote_free(struct m0_ref *ref)
Definition: rm.c:1384
M0_INTERNAL uint8_t m0_fid_tget(const struct m0_fid *fid)
Definition: fid.c:133
struct m0_queue rt_ha_events
Definition: rm.h:447
M0_INTERNAL const char * m0_sm_state_name(const struct m0_sm *mach, int state)
Definition: sm.c:781
m0_time_t rrp_time
Definition: rm.h:1251
static int error
Definition: mdstore.c:64
M0_INTERNAL bool m0_clink_is_armed(const struct m0_clink *link)
Definition: chan.c:303
static int remnant_loan_get(const struct m0_rm_loan *loan, const struct m0_rm_credit *credit, struct m0_rm_loan **remnant_loan)
Definition: rm.c:1210
M0_INTERNAL void m0_rm_domain_init(struct m0_rm_domain *dom)
Definition: rm.c:215
#define m0_exists(var, nr,...)
Definition: misc.h:134
static int outgoing_check(struct m0_rm_incoming *in, enum m0_rm_outgoing_type, struct m0_rm_credit *credit, struct m0_rm_remote *other)
Definition: rm.c:2544
m0_rm_owner_state
Definition: rm.h:810
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL void m0_rm_remote_init(struct m0_rm_remote *rem, struct m0_rm_resource *res)
Definition: rm.c:1411
uint64_t ro_id
Definition: rm.h:1066
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
Definition: queue.h:43
M0_INTERNAL void m0_chan_lock(struct m0_chan *ch)
Definition: chan.c:68
#define container_of(ptr, type, member)
Definition: misc.h:33
struct m0_rm_credit rin_want
Definition: rm.h:1450
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
struct m0_rpc_session * rem_session
Definition: rm.h:755
Definition: rm.h:1137
bool m0_rm_ur_tlist_contains(const struct m0_tl *list, const struct m0_rm_credit *credit)
M0_INTERNAL int m0_fid_cmp(const struct m0_fid *fid0, const struct m0_fid *fid1)
Definition: fid.c:170
static struct m0_rm_incoming * cr2in(const struct m0_rm_credit *cr)
Definition: rm.c:1037
static const struct m0_addb2_trace * busy[1000]
Definition: base.c:305
enum m0_ha_obj_state rhe_state
Definition: rm_ha.h:150
M0_INTERNAL bool m0_fid_is_set(const struct m0_fid *fid)
Definition: fid.c:106
M0_INTERNAL int m0_rm_loan_init(struct m0_rm_loan *loan, const struct m0_rm_credit *credit, struct m0_rm_remote *creditor)
Definition: rm.c:1235
M0_BOB_DEFINE(M0_INTERNAL, &resource_bob, m0_rm_resource)
void(* cro_free)(struct m0_rm_credit *self)
Definition: rm.h:537
void ** ov_buf
Definition: vec.h:149
M0_INTERNAL void m0_sm_group_fini(struct m0_sm_group *grp)
Definition: sm.c:65
static struct foo * obj
Definition: tlist.c:302
M0_INTERNAL int m0_rm_net_locate(struct m0_rm_credit *credit, struct m0_rm_remote *other)
Definition: rm.c:3487
#define PRIx64
Definition: types.h:61
const char * bt_name
Definition: bob.h:73
M0_INTERNAL int m0_rm_credit_encode(struct m0_rm_credit *credit, struct m0_buf *buf)
Definition: rm.c:3350
struct m0_rm_owner * cr_owner
Definition: rm.h:501
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
M0_INTERNAL int m0_rm_owner_timedwait(struct m0_rm_owner *owner, uint64_t state, const m0_time_t abs_timeout)
Definition: rm.c:892
static bool conflict_exists(const struct m0_rm_credit *cr, const struct m0_rm_owner *owner)
Definition: rm.c:2973
struct m0_rm_credit is_debit
Definition: rm.c:2951
static bool owner_smgrp_is_locked(const struct m0_rm_owner *owner)
Definition: rm.c:583
#define m0_tl_endfor
Definition: tlist.h:700
M0_INTERNAL int m0_sm_timedwait(struct m0_sm *mach, uint64_t states, m0_time_t deadline)
Definition: sm.c:387
struct m0_fid fid
Definition: di.c:46
return M0_RC(rc)
uint64_t rrp_seq
Definition: rm.h:1265
const struct m0_rm_resource_ops * r_ops
Definition: rm.h:305
static bool incoming_invariant(const struct m0_rm_incoming *in)
Definition: rm.c:2901
#define M0_ENTRY(...)
Definition: trace.h:170
static M0_UNUSED struct m0_rm_resource_type * rem_incoming_to_resource_type(struct m0_rm_remote_incoming *rem_in)
Definition: rm.c:1560
Definition: buf.h:37
M0_INTERNAL void incoming_surrender(struct m0_rm_incoming *in)
Definition: rm.c:1082
static M0_UNUSED struct m0_rm_resource_type * credit_to_resource_type(struct m0_rm_credit *credit)
Definition: rm.c:1555
M0_INTERNAL int m0_rm_resource_encode(struct m0_rm_resource *res, struct m0_buf *buf)
Definition: rm.c:393
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
static void ha_events_handle(struct m0_rm_resource_type *rt)
Definition: rm.c:661
struct m0_rm_credit rl_credit
Definition: rm.h:1099
uint64_t rem_id
Definition: rm.h:782
int i
Definition: dir.c:1033
M0_INTERNAL void m0_rm_outgoing_send(struct m0_rm_outgoing *outgoing)
Definition: rm_fops.c:273
struct m0_sm rin_sm
Definition: rm.h:1436
#define M0_SET_ARR0(arr)
Definition: misc.h:72
struct m0_tl r_local
Definition: rm.h:323
static int service_locate(struct m0_rm_resource_type *rtype, struct m0_rm_remote *rem)
Definition: rm.c:3419
M0_INTERNAL void m0_fid_set(struct m0_fid *fid, uint64_t container, uint64_t key)
Definition: fid.c:116
Definition: rm.c:213
M0_INTERNAL void m0_rpc_service_reverse_session_put(struct m0_rpc_session *sess)
Definition: service.c:220
bool ro_user_windup
Definition: rm.h:1071
struct m0_sm ro_sm
Definition: rm.h:1005
M0_INTERNAL void m0_rm_ha_tracker_fini(struct m0_rm_ha_tracker *tracker)
Definition: rm_ha.c:408
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
Definition: refs.c:24
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL int m0_rm_loan_settle(struct m0_rm_owner *owner, struct m0_rm_loan *loan)
Definition: rm.c:2818
struct m0_rm_credit * rp_credit
Definition: rm.h:1677
static void incoming_release(struct m0_rm_incoming *in)
Definition: rm.c:3119
static void owner_windup_locked(struct m0_rm_owner *owner)
Definition: rm.c:919
bool res_tlist_contains(const struct m0_tl *list, const struct m0_rm_resource *res)
struct m0_tl ro_borrowed
Definition: rm.h:1033
static enum m0_rm_owner_state owner_state(const struct m0_rm_owner *owner)
Definition: rm_internal.h:297
static int loan_check(struct m0_rm_owner *owner, struct m0_tl *list, struct m0_rm_credit *rest)
Definition: rm.c:2789
bool(* cro_is_subset)(const struct m0_rm_credit *self, const struct m0_rm_credit *c1)
Definition: rm.h:584
M0_INTERNAL void m0_rm_incoming_init(struct m0_rm_incoming *in, struct m0_rm_owner *owner, enum m0_rm_incoming_type type, enum m0_rm_incoming_policy policy, uint64_t flags)
Definition: rm.c:1060
Definition: trace.h:482
uint32_t rt_nr_resources
Definition: rm.h:415
m0_rm_incoming_state
Definition: rm.h:1134
const char * name
Definition: trace.c:110
static const struct m0_rm_incoming_ops windup_incoming_ops
Definition: rm.c:237
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
Definition: refs.h:34
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
static bool incoming_is_complete(const struct m0_rm_incoming *in)
Definition: rm.c:2129
static bool owner_invariant_state(const struct m0_rm_owner *owner, struct owner_invariant_state *is)
Definition: rm.c:2984
static bool resource_type_invariant(const struct m0_rm_resource_type *rt)
Definition: rm.c:2875
struct m0_tl cr_pins
Definition: rm.h:524
struct m0_tl ro_incoming[M0_RM_REQUEST_PRIORITY_NR][OQS_NR]
Definition: rm.h:1054
static int cached_credits_remove(struct m0_rm_incoming *in)
Definition: rm.c:1485
static bool owner_is_liquidated(const struct m0_rm_owner *o)
Definition: rm.c:790
M0_INTERNAL int m0_rm_remote_resource_locate(struct m0_rm_remote *rem)
Definition: rm.c:3409
struct m0_chan rem_signal
Definition: rm.h:757
#define m0_free0(pptr)
Definition: memory.h:77
const struct m0_uint128 m0_rm_no_group
Definition: rm.c:211
M0_INTERNAL void m0_chan_init(struct m0_chan *chan, struct m0_mutex *ch_guard)
Definition: chan.c:96
#define M0_ASSERT(cond)
const char * scf_name
Definition: sm.h:352
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
Definition: rm.h:828
Definition: rm.h:1176
static enum m0_rm_incoming_state incoming_state(const struct m0_rm_incoming *in)
Definition: rm_internal.h:285
#define M0_RM_REMOTE_PUT(remote)
Definition: rm.h:1971
M0_INTERNAL void m0_sm_group_init(struct m0_sm_group *grp)
Definition: sm.c:53
M0_INTERNAL int m0_rpc_session_status(struct m0_rpc_session *session)
Definition: service.c:281
static void incoming_check(struct m0_rm_incoming *in)
Definition: rm.c:2043
M0_INTERNAL void m0_fid_tgenerate(struct m0_fid *fid, const uint8_t tid)
Definition: fid.c:155
void(* rop_policy)(struct m0_rm_resource *resource, struct m0_rm_incoming *in)
Definition: rm.h:359
M0_INTERNAL void m0_rm_owner_fini(struct m0_rm_owner *owner)
Definition: rm.c:943
M0_INTERNAL void m0_rm_owner_init_rfid(struct m0_rm_owner *owner, const struct m0_uint128 *group, struct m0_rm_resource *res, struct m0_rm_remote *creditor)
Definition: rm.c:649
m0_time_t m0_time_now(void)
Definition: time.c:134
#define INCOMING_CREDIT(in)
Definition: rm.c:126
static struct m0_addb2_callback c
Definition: consumer.c:41
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
M0_INTERNAL int m0_rm_db_service_query(const char *name, struct m0_rm_remote *rem)
Definition: rm.c:3400
Definition: tlist.h:251
struct m0_tl ro_outgoing[OQS_NR]
Definition: rm.h:1058
M0_INTERNAL int m0_rm_credit_copy(struct m0_rm_credit *dst, const struct m0_rm_credit *src)
Definition: rm.c:3333
struct m0_tl ro_sublet
Definition: rm.h:1042
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
char * rht_ep
Definition: rm_ha.h:118
static int resource_locate(struct m0_rm_resource_type *rtype, struct m0_rm_remote *rem)
Definition: rm.c:3456
m0_ha_obj_state
Definition: note.h:119
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
M0_INTERNAL void m0_queue_link_init(struct m0_queue_link *ql)
Definition: queue.c:71
const struct m0_rm_incoming_ops * rin_ops
Definition: rm.h:1471
int32_t rin_rc
Definition: rm.h:1446
static struct m0_stob_domain * dom
Definition: storage.c:38
static int loan_dup(const struct m0_rm_loan *src_loan, struct m0_rm_loan **dest_loan)
Definition: rm.c:1176
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
static struct rectype rt[]
Definition: beck.c:428
static void owner_finalisation_check(struct m0_rm_owner *owner)
Definition: rm.c:533
static int cmp(const struct m0_ut_suite **s0, const struct m0_ut_suite **s1)
Definition: ut.c:654
M0_INTERNAL void m0_clink_signal(struct m0_clink *clink)
Definition: chan.c:326
void * m0_alloc(size_t size)
Definition: memory.c:126
void(* rop_credit_init)(struct m0_rm_resource *resource, struct m0_rm_credit *credit)
Definition: rm.h:365
struct m0_sm_group * sm_grp
Definition: sm.h:321
M0_INTERNAL int m0_rm_pin_add(struct m0_rm_incoming *in, struct m0_rm_credit *credit, uint32_t flags)
Definition: rm.c:3198
static int cached_credits_hold(struct m0_rm_incoming *in)
Definition: rm.c:1820
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
static void barrier_pins_del(struct m0_rm_incoming *in)
Definition: rm.c:2016
static void incoming_state_set(struct m0_rm_incoming *in, enum m0_rm_incoming_state state)
Definition: rm.c:1042
struct m0_pdclust_instance pi
Definition: fd.c:107
uint64_t f_container
Definition: fid.h:39
#define M0_POST(cond)
static int revoke_send(struct m0_rm_incoming *in, struct m0_rm_loan *loan, struct m0_rm_credit *credit)
Definition: rm.c:2611
static struct m0_sm_group * owner_grp(const struct m0_rm_owner *owner)
Definition: rm_internal.h:309
uint32_t v_nr
Definition: vec.h:51
M0_INTERNAL void m0_clink_cleanup(struct m0_clink *link)
Definition: chan.c:310
Definition: rm.c:2945
static void cached_credits_clear(struct m0_rm_owner *owner)
Definition: rm.c:1467
static void owner_fail(struct m0_rm_owner *owner, enum m0_rm_owner_state state, int rc)
Definition: rm.c:517
int32_t sm_rc
Definition: sm.h:336
Definition: chan.h:229
static void windup_incoming_conflict(struct m0_rm_incoming *in)
Definition: rm.c:1128
static void group(void)
Definition: sm.c:386
static const struct m0_bob_type rem_bob
Definition: rm.c:203
struct m0_rm_remote * rl_other
Definition: rm.h:1105
struct m0_mutex rt_lock
Definition: rm.h:404
m0_bcount_t * v_count
Definition: vec.h:53
static bool resource_list_check(const struct m0_rm_resource *res, void *datum)
Definition: rm.c:2868
static struct m0_rm_resource * incoming_to_resource(struct m0_rm_incoming *in)
Definition: rm_internal.h:291
static struct m0_clink clink[RDWR_REQUEST_MAX]
static void owner_cleanup(struct m0_rm_owner *owner)
Definition: rm.c:868
struct m0_mutex s_lock
Definition: sm.h:514
Definition: rm.h:1145
struct m0_rm_remote * rin_remote
Definition: rm.h:1477
M0_INTERNAL int granted_maybe_reserve(struct m0_rm_credit *granted, struct m0_rm_credit *to_cache)
Definition: rm.c:2750
M0_INTERNAL void m0_rm_resource_free(struct m0_rm_resource *res)
Definition: rm.c:386
M0_INTERNAL void m0_chan_unlock(struct m0_chan *ch)
Definition: chan.c:73
static void incoming_pins_del(struct m0_rm_incoming *in, uint32_t flags)
Definition: rm.c:1921
static const struct m0_sm_conf owner_conf
Definition: rm.c:499
static const struct m0_bob_type outgoing_bob
Definition: rm.c:195
m0_rm_incoming_policy
Definition: rm.h:1155
int(* cro_disjoin)(struct m0_rm_credit *self, const struct m0_rm_credit *c1, struct m0_rm_credit *intersection)
Definition: rm.h:597
Definition: rm.h:676
m0_time_t cr_get_time
Definition: rm.h:510
static struct m0_sm_state_descr owner_states[]
Definition: rm.c:461
M0_TL_DESCR_DEFINE(res, "resources",, struct m0_rm_resource, r_linkage, r_magix, M0_RM_RESOURCE_MAGIC, M0_RM_RESOURCE_HEAD_MAGIC)
M0_INTERNAL int m0_rm_revoke_commit(struct m0_rm_remote_incoming *rem_in)
Definition: rm.c:1615
enum m0_ha_obj_state rht_state
Definition: rm_ha.h:122
static void conflict_notify(struct m0_rm_credit *credit)
Definition: rm.c:2147
#define m0_forall(var, nr,...)
Definition: misc.h:112
uint32_t sd_flags
Definition: sm.h:378
M0_INTERNAL struct m0_queue_link * m0_queue_get(struct m0_queue *q)
Definition: queue.c:112
static const struct m0_bob_type incoming_bob
Definition: rm.c:187
enum credit_queue is_phase
Definition: rm.c:2949
const char * rt_name
Definition: rm.h:389
M0_INTERNAL void m0_rm_credit_put(struct m0_rm_incoming *in)
Definition: rm.c:1797
static bool owner_has_loans(struct m0_rm_owner *owner)
Definition: rm.c:527
Definition: rm.c:213
static void rm_remote_online_handler(struct m0_rm_remote *remote)
Definition: rm.c:3599
M0_INTERNAL void m0_rm_resource_add(struct m0_rm_resource_type *rtype, struct m0_rm_resource *res)
Definition: rm.c:337
M0_INTERNAL int64_t m0_ref_read(const struct m0_ref *ref)
Definition: refs.c:44
M0_INTERNAL void m0_rm_loan_fini(struct m0_rm_loan *loan)
Definition: rm.c:1257
M0_INTERNAL void m0_queue_put(struct m0_queue *q, struct m0_queue_link *ql)
Definition: queue.c:131
#define M0_RM_REMOTE_GET(remote)
Definition: rm.h:1957
static int incoming_check_held(struct m0_rm_incoming *in, struct m0_rm_credit *rest, struct m0_rm_credit *held, int *wait, bool *cr_used)
Definition: rm.c:2210
static void resource_put(struct m0_rm_resource *res)
Definition: rm.c:447
static int incoming_pin_nr(const struct m0_rm_incoming *in, uint32_t flags)
Definition: rm.c:3101
struct m0_rm_incoming ri_incoming
Definition: rm_internal.h:50
#define M0_CNT_INC(cnt)
Definition: arith.h:226
static void pin_del(struct m0_rm_pin *pin)
Definition: rm.c:3165
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
Definition: fid.h:38
static bool credit_is_empty(const struct m0_rm_credit *credit)
Definition: rm.c:3345
uint64_t f_key
Definition: fid.h:40
M0_INTERNAL void m0_rm_credit_init(struct m0_rm_credit *credit, struct m0_rm_owner *owner)
Definition: rm.c:964
M0_INTERNAL void m0_rm_remote_fini(struct m0_rm_remote *rem)
Definition: rm.c:1431
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
#define M0_IS0(obj)
Definition: misc.h:70
static const struct m0_sm_conf inc_conf
Definition: rm.c:1031
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL void m0_clink_add(struct m0_chan *chan, struct m0_clink *link)
Definition: chan.c:228
static bool has_reserve_priority(struct m0_rm_incoming *in1, struct m0_rm_incoming *in2)
Definition: rm.c:2261
static int r[NR]
Definition: thread.c:46
static void owner_state_set(struct m0_rm_owner *owner, enum m0_rm_owner_state state)
Definition: rm.c:508
bool m0_rm_ur_tlist_is_empty(const struct m0_tl *list)
M0_INTERNAL void m0_rm_outgoing_complete(struct m0_rm_outgoing *og)
Definition: rm.c:2465
uint64_t rin_flags
Definition: rm.h:1448
static void incoming_policy_apply(struct m0_rm_incoming *in)
Definition: rm.c:2521
Definition: rm.h:863
static struct m0_rm_remote * remote
Definition: rm_fops.c:35
M0_INTERNAL void m0_rm_owner_windup(struct m0_rm_owner *owner)
Definition: rm.c:930
M0_INTERNAL int m0_rm_loan_alloc(struct m0_rm_loan **loan, const struct m0_rm_credit *credit, struct m0_rm_remote *creditor)
Definition: rm.c:1183
uint32_t rp_flags
Definition: rm.h:1676
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_copyto(struct m0_bufvec_cursor *dcur, void *sdata, m0_bcount_t num_bytes)
Definition: vec.c:677
struct m0_tl ro_owned[OWOS_NR]
Definition: rm.h:1047
static bool credit_conflicts(const struct m0_rm_credit *A, const struct m0_rm_credit *B)
Definition: rm.c:3239
m0_rm_incoming_type
Definition: rm.h:247
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_sm_move(struct m0_sm *mach, int32_t rc, int state)
Definition: sm.c:485
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
M0_INTERNAL void m0_clink_fini(struct m0_clink *link)
Definition: chan.c:208
static bool owner_is_idle(const struct m0_rm_owner *o)
Definition: rm.c:776
int(* cro_diff)(struct m0_rm_credit *self, const struct m0_rm_credit *c1)
Definition: rm.h:660
M0_INTERNAL void m0_queue_init(struct m0_queue *q)
Definition: queue.c:53
#define IS_IN_ARRAY(idx, array)
Definition: misc.h:311
m0_bcount_t(* cro_len)(const struct m0_rm_credit *self)
Definition: rm.h:551
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
static struct m0_bob_type resource_bob
Definition: rm.c:133
M0_INTERNAL void m0_rm_incoming_fini(struct m0_rm_incoming *in)
Definition: rm.c:1099
struct m0_clink rht_conf_exp
Definition: rm_ha.h:113
const struct m0_rm_resource_type_ops * rt_ops
Definition: rm.h:388
M0_INTERNAL void m0_rm_owner_unlock(struct m0_rm_owner *owner)
Definition: rm.c:603
static int credit_reservation_check(struct m0_rm_incoming *in, struct m0_rm_credit *cr, int *wait)
Definition: rm.c:2286
struct m0_fid ro_fid
Definition: rm.h:1011
M0_INTERNAL int m0_rm_type_register(struct m0_rm_domain *dom, struct m0_rm_resource_type *rt)
Definition: rm.c:252
#define M0_UINT128(hi, lo)
Definition: types.h:40
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
int rin_priority
Definition: rm.h:1470
M0_INTERNAL void m0_rm_owner_creditor_reset(struct m0_rm_owner *owner, struct m0_rm_remote *creditor)
Definition: rm.c:904
static void owner_liquidate(struct m0_rm_owner *src_owner)
Definition: rm.c:804
struct m0_ref rem_refcnt
Definition: rm.h:781
M0_INTERNAL void m0_rm_ha_tracker_init(struct m0_rm_ha_tracker *tracker, m0_chan_cb_t cb)
Definition: rm_ha.c:399
struct m0_tl rin_pins
Definition: rm.h:1466
static const struct m0_bob_type pin_bob
Definition: rm.c:171
void m0_remotes_tlist_del(struct m0_rm_remote *rem)
M0_TL_DEFINE(res, M0_INTERNAL, struct m0_rm_resource)
Definition: rm.h:1175
static void owner_balance(struct m0_rm_owner *o)
Definition: rm.c:1938
struct m0_cookie rem_cookie
Definition: rm.h:768
#define out(...)
Definition: gen.c:41
Definition: rm.h:1171
int type
Definition: dir.c:1031
M0_INTERNAL void m0_rm_owner_lock(struct m0_rm_owner *owner)
Definition: rm.c:592
static bool credit_eq(const struct m0_rm_credit *c0, const struct m0_rm_credit *c1)
Definition: rm.c:3258
static bool pin_check(const void *bob)
Definition: rm.c:2888
uint64_t rl_id
Definition: rm.h:1111
static int remnant_credit_get(const struct m0_rm_credit *src, const struct m0_rm_credit *diff, struct m0_rm_credit **remnant_credit)
Definition: rm.c:3281
struct m0_uint128 cr_group_id
Definition: rm.h:506
static int credit_pin_nr(const struct m0_rm_credit *credit, uint32_t flags)
Definition: rm.c:3084
M0_INTERNAL void m0_rm_type_deregister(struct m0_rm_resource_type *rt)
Definition: rm.c:288
static bool reserve_prio_is_set(struct m0_rm_reserve_prio *prio)
Definition: rm.c:727
M0_INTERNAL struct m0_rm_remote * m0_rm_remote_find(struct m0_rm_remote_incoming *rem_in)
Definition: rm.c:1366
M0_INTERNAL void m0_rm_credit_fini(struct m0_rm_credit *credit)
Definition: rm.c:985
struct m0_uint128 ro_group_id
Definition: rm.h:1022
struct m0_uint128 ri_group_id
Definition: rm_internal.h:61
Definition: module.c:67
M0_INTERNAL void m0_queue_fini(struct m0_queue *q)
Definition: queue.c:59
#define M0_PRE_EX(cond)
static int scan(struct scanner *s)
Definition: beck.c:963
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
M0_INTERNAL void m0_rm_owner_init(struct m0_rm_owner *owner, struct m0_fid *fid, const struct m0_uint128 *group, struct m0_rm_resource *res, struct m0_rm_remote *creditor)
Definition: rm.c:609
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL void m0_chan_fini_lock(struct m0_chan *chan)
Definition: chan.c:112
void m0_free(void *data)
Definition: memory.c:146
static struct m0_sm_state_descr inc_states[]
Definition: rm.c:998
struct m0_sm_group rt_sm_grp
Definition: rm.h:416
static void incoming_policy_none(struct m0_rm_incoming *in)
Definition: rm.c:2517
int(* cro_decode)(struct m0_rm_credit *self, struct m0_bufvec_cursor *cur)
Definition: rm.h:546
uint32_t sm_state
Definition: sm.h:307
M0_INTERNAL int m0_rm_borrow_commit(struct m0_rm_remote_incoming *rem_in)
Definition: rm.c:1564
M0_INTERNAL void internal_incoming_fini(struct m0_rm_incoming *in)
Definition: rm.c:1089
struct m0_mutex rt_queue_guard
Definition: rm.h:430
struct m0_pdclust_src_addr src
Definition: fd.c:108
struct m0_rm_reserve_prio rin_reserve
Definition: rm.h:1475
M0_INTERNAL void m0_bob_type_tlist_init(struct m0_bob_type *bt, const struct m0_tl_descr *td)
Definition: bob.c:41
static int incoming_check_with(struct m0_rm_incoming *in, struct m0_rm_credit *credit)
Definition: rm.c:2341
int32_t rc
Definition: trigger_fop.h:47
Definition: rm.h:1675
#define ARRAY_SIZE(a)
Definition: misc.h:45
M0_INTERNAL void m0_rm_domain_fini(struct m0_rm_domain *dom)
Definition: rm.c:227
#define M0_POST_EX(cond)
bool rog_sent
Definition: rm.h:1552
#define offsetof(typ, memb)
Definition: misc.h:29
M0_INTERNAL bool m0_sm_group_is_locked(const struct m0_sm_group *grp)
Definition: sm.c:107
M0_INTERNAL void m0_rm_resource_del(struct m0_rm_resource *res)
Definition: rm.c:361
#define m0_tl_exists(name, var, head,...)
Definition: tlist.h:774
struct m0_rm_loan rog_want
Definition: rm.h:1550
#define RM_OWNER_LISTS_FOR(owner, expr)
Definition: rm_internal.h:263
credit_queue
Definition: rm.c:2939
static void rm_remote_death_handler(struct m0_rm_remote *remote)
Definition: rm.c:3552
Definition: rm.h:903
uint64_t ro_seq
Definition: rm.h:1077
static bool credit_invariant(const struct m0_rm_credit *credit, void *data)
Definition: rm.c:2956
int(* cro_encode)(struct m0_rm_credit *self, struct m0_bufvec_cursor *cur)
Definition: rm.h:541
M0_INTERNAL void m0_rm_credit_get(struct m0_rm_incoming *in)
Definition: rm.c:1758
struct m0_queue_link rhe_link
Definition: rm_ha.h:148
Definition: module.c:67
struct m0_rm_resource * ro_resource
Definition: rm.h:1015
Definition: trace.h:478
Definition: vec.h:145
struct m0_tl rt_resources
Definition: rm.h:409
M0_INTERNAL struct m0_rm_resource * m0_rm_resource_find(const struct m0_rm_resource_type *rt, const struct m0_rm_resource *res)
Definition: rm.c:243
Definition: rm.h:1156
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735
M0_INTERNAL void m0_rm_outgoing_fini(struct m0_rm_outgoing *out)
Definition: rm.c:1166
#define M0_IMPOSSIBLE(fmt,...)
static int borrow_send(struct m0_rm_incoming *in, struct m0_rm_credit *credit)
Definition: rm.c:2644
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331
bool rem_dead
Definition: rm.h:788
#define M0_UNUSED
Definition: misc.h:380