Motr  M0
sync.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "motr/addb.h"
24 #include "motr/client.h"
25 #include "motr/client_internal.h"
26 #include "motr/sync.h" /* sync_interactions */
27 
28 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CLIENT
29 #include "lib/trace.h"
30 #include "lib/finject.h"
31 #include "mdservice/fsync_fops.h" /* m0_fop_fsync_mds_fopt */
32 #include "fop/fom_generic.h" /* m0_rpc_item_is_generic_reply_fop */
33 #include "lib/memory.h" /* m0_alloc, m0_free */
34 #include "lib/tlist.h"
35 #include "lib/hash.h" /* m0_htable */
36 #include "file/file.h"
37 #include "motr/magic.h" /* M0_T1FS_FFW_TLIST_MAGIC? */
38 #include "pool/pool.h" /* pools_common_svc_ctx_tl */
39 
83 static const struct m0_bob_type os_bobtype;
85 static const struct m0_bob_type os_bobtype = {
86  .bt_name = "os_bobtype",
87  .bt_magix_offset = offsetof(struct m0_op_sync, os_magic),
88  .bt_magix = M0_OS_MAGIC,
89  .bt_check = NULL,
90 };
91 
93  "Targets to synced for a client SYNC request",
94  static, struct sync_target,
95  srt_tlink, srt_tlink_magic,
97 
98 M0_TL_DEFINE(sync_target, static, struct sync_target);
99 
100 /* TODO: use Client-defined magic values */
101 M0_TL_DESCR_DEFINE(spf, "sync_fop_wrappers pending fsync-fops",
102  static, struct sync_fop_wrapper, sfw_tlink,
103  sfw_tlink_magic, M0_T1FS_FFW_TLIST_MAGIC1,
105 
106 M0_TL_DEFINE(spf, static, struct sync_fop_wrapper);
107 
108 /* SPTI -> Service to Pending Transaction Id list */
109 M0_TL_DESCR_DEFINE(spti, "m0_reqh_service_txid pending list", M0_INTERNAL,
110  struct m0_reqh_service_txid,
111  stx_tlink, stx_link_magic,
114 
115 M0_TL_DEFINE(spti, M0_INTERNAL, struct m0_reqh_service_txid);
116 
121 static struct sync_interactions si = {
123  .si_wait_for_reply = &m0_rpc_item_wait_for_reply,
124  /* fini is for requests, allocated in a bigger structure */
125  .si_fop_fini = &m0_fop_fini,
126  /* put is for replies, allocated by a lower layer */
127  .si_fop_put = &m0_fop_put_lock,
128 };
129 
133 static void sync_fop_cleanup(struct m0_ref *ref)
134 {
135  struct m0_fop *fop;
136  struct sync_fop_wrapper *sfw;
137 
138  M0_ENTRY();
140 
141  fop = M0_AMB(fop, ref, f_ref);
142  si.si_fop_fini(fop);
143 
144  sfw = M0_AMB(sfw, fop, sfw_fop);
145  m0_free(sfw);
146 
147  M0_LEAVE("sync_fop_cleanup");
148 }
149 
151 {
152  struct m0_op *op;
153  struct m0_op_sync *os;
154 
155  os = sreq->sr_op_sync;
156  op = &os->os_oc.oc_op;
157 
158  /* Updates SYNC op's state. */
159  m0_sm_move(&op->op_sm, 0, M0_OS_EXECUTED);
161 
162  /*
163  * Currently, an SYNC request is considered success only if all services
164  * involved persist requested txid successfully.
165  * TODO: in some cases an SYNC request can be considered success even
166  * some FSYNC fops fail. For example, when an application writs
167  * to an N+K object, it is not necessary to wait for all N+K services.
168  * N services returnning successfully should be considered success for
169  * an SYNC request.
170  */
171  /* A server reply will cause the op to complete stably,
172  * with return code being reported in op->op_rc */
173  op->op_rc = sreq->sr_rc;
174  m0_sm_move(&op->op_sm, 0, M0_OS_STABLE);
175  m0_op_stable(op);
176 }
177 
178 static void sync_request_done(struct sync_request *sreq)
179 {
180  struct m0_sm_group *op_grp;
181 
182  op_grp = &sreq->sr_op_sync->os_oc.oc_op.op_sm_group;
183  m0_sm_group_lock(op_grp);
185  m0_sm_group_unlock(op_grp);
186 }
187 
188 /*
189  * AST callback for completion of an SYNC request (all fops have been replied).
190  * The AST is in the sm group of SYNC op.
191  */
192 static void sync_request_ast(struct m0_sm_group *grp,
193  struct m0_sm_ast *ast)
194 {
195  struct sync_request *sreq;
196 
197  M0_ENTRY();
198 
199  M0_PRE(ast != NULL);
200  M0_PRE(grp != NULL);
202 
203  sreq = M0_AMB(sreq, ast, sr_ast);
205 
206  M0_LEAVE();
207 }
208 
213 static void sync_fop_done(struct sync_fop_wrapper *sfw, int rc)
214 {
215  struct m0_op_sync *os;
216  struct sync_request *sreq;
217  struct m0_fop *fop;
218 
219  M0_ENTRY();
220 
221  sreq = sfw->sfw_req;
222  os = sreq->sr_op_sync;
223 
225  sreq->sr_nr_fops--;
226  spf_tlist_del(sfw);
227  fop = &sfw->sfw_fop;
229 
230  if (sreq->sr_nr_fops == 0) {
231  /* All fops for this SYNC request have been replied. */
232  sreq->sr_rc = sreq->sr_rc?:rc;
234  }
236 
237  M0_LEAVE();
238 }
239 
245  uint64_t txid)
246 {
247  M0_PRE(stx != NULL);
248 
249  if (stx->stx_tri.tri_txid <= txid) {
250  /*
251  * Our transaction got committed, update the record to be
252  * ignored in the future.
253  */
254  M0_SET0(&stx->stx_tri);
255  }
256  /*
257  * Else the stx_maximum_txid got increased while we were waiting, it
258  * is acceptable for fsync to return as long as the
259  * correct-at-time-of-sending txn was committed (which the caller
260  * should assert).
261  */
262 }
263 
267 static int sync_reply_process(struct sync_fop_wrapper *sfw)
268 {
269  int rc;
270  uint64_t reply_txid;
271  struct m0_fop *fop;
272  struct m0_fop_fsync *ffd;
273  struct m0_fop_fsync_rep *ffr;
274  struct m0_rpc_item *item;
276  struct m0_reqh_service_txid *stx;
277  struct m0_tl *pending_tx_tl;
278  struct m0_mutex *pending_tx_lock;
279  struct m0_entity *ent;
280  struct m0_op *op;
281  struct sync_request *sreq;
282  struct sync_target *tgt;
283 
284  M0_ENTRY();
285 
286  M0_PRE(sfw != NULL);
287 
288  fop = &sfw->sfw_fop;
289  sreq = sfw->sfw_req;
290  item = &fop->f_item;
292  if (rc != 0) {
293  M0_LOG(M0_ERROR, "rpc item error = %d", rc);
294  goto out;
295  }
296 
297  /* Gets the {fop,reply} data */
298  ffd = m0_fop_data(fop);
299  M0_ASSERT(ffd != NULL);
301  M0_ASSERT(ffr != NULL);
302 
303  rc = ffr->ffr_rc;
304  if (rc != 0) {
305  M0_LOG(M0_ERROR, "reply rc=%d", rc);
306  goto out;
307  }
308 
309  /* Is this a valid reply to our request */
310  reply_txid = ffr->ffr_be_remid.tri_txid;
311  if (reply_txid < ffd->ff_be_remid.tri_txid) {
312  /* Error (most likely) caused by an ioservice. */
313  rc = M0_ERR(-EIO);
314  M0_LOG(M0_ERROR, "Committed transaction is smaller "
315  "than that requested.");
316  goto out;
317  }
318 
319  /* Update the stx stored in sfw to avoid sending repeated fops. */
320  sync_fop_stx_update(sfw->sfw_stx, reply_txid);
321 
322  /* Update matched stx in all targets of an SYNC request. */
325  if (tgt->srt_type == SYNC_ENTITY) {
326  ent = tgt->u.srt_ent;
327  pending_tx_tl = &ent->en_pending_tx;
328  pending_tx_lock = &ent->en_pending_tx_lock;
329  } else if (tgt->srt_type == SYNC_OP) {
330  op = tgt->u.srt_op;
331  pending_tx_tl = &op->op_pending_tx;
332  pending_tx_lock = &op->op_pending_tx_lock;
333  } else
334  M0_IMPOSSIBLE("SYNC type not supported yet.");
335 
336  m0_mutex_lock(pending_tx_lock);
337  stx = m0_tl_find(spti, stx, pending_tx_tl,
339  if (stx != NULL)
340  sync_fop_stx_update(stx, reply_txid);
341  m0_mutex_unlock(pending_tx_lock);
342  } m0_tl_endfor;
343 
344  /* Updates Client instance wide stx. */
348 
349 out:
350  return M0_RC(rc);
351 }
352 
356 static void sync_fop_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
357 {
358  int rc;
359  struct sync_fop_wrapper *sfw;
360 
361  M0_ENTRY();
362 
363  M0_PRE(grp != NULL);
365  M0_PRE(ast != NULL);
366 
367  sfw = M0_AMB(sfw, ast, sfw_ast);
368  rc = sync_reply_process(sfw);
369  sync_fop_done(sfw, rc);
370 
371  M0_LEAVE();
372 }
373 
378 static void sync_rio_replied(struct m0_rpc_item *item)
379 {
380  struct m0_fop *fop;
381  struct m0_op_sync *os;
382  struct sync_fop_wrapper *sfw;
383 
384  M0_ENTRY();
385 
387  sfw = M0_AMB(sfw, fop, sfw_fop);
388  os = sfw->sfw_req->sr_op_sync;
389 
390  /* Trigger AST for post processing FSYNC fop. */
391  sfw->sfw_ast.sa_cb = &sync_fop_ast;
392  m0_sm_ast_post(os->os_sm_grp, &sfw->sfw_ast);
393 
394  M0_LEAVE();
395  return;
396 }
397 
401 static const struct m0_rpc_item_ops sync_ri_ops = {
403 };
404 
410 static
412  struct m0_reqh_service_txid *stx,
413  enum m0_fsync_mode mode,
414  bool set_ri_ops,
415  struct sync_fop_wrapper **sfw_out)
416 {
417  int rc;
418  struct m0_fop *fop;
419  struct m0_rpc_item *item;
420  struct m0_fop_fsync *ffd;
421  struct m0_fop_type *fopt;
422  struct sync_fop_wrapper *sfw;
423 
424  M0_ENTRY();
425 
426  M0_ALLOC_PTR(sfw);
427  if (sfw == NULL)
428  return M0_ERR(-ENOMEM);
429 
430  /* Stores the pending txid reference with the fop */
431  sfw->sfw_stx = stx;
432  sfw->sfw_req = sreq;
433 
435  if (rc != 0) {
436  m0_free(sfw);
437  return M0_ERR_INFO(rc, "Service tx session invalid");
438  }
439 
440  if (stx->stx_service_ctx->sc_type == M0_CST_MDS)
441  fopt = &m0_fop_fsync_mds_fopt;
442  else if (stx->stx_service_ctx->sc_type == M0_CST_IOS)
443  fopt = &m0_fop_fsync_ios_fopt;
444  else if (stx->stx_service_ctx->sc_type == M0_CST_CAS)
445  fopt = &m0_fop_fsync_cas_fopt;
446  else
447  M0_IMPOSSIBLE("invalid service type:%d",
449 
450  fop = &sfw->sfw_fop;
453  if (rc != 0) {
454  m0_free(sfw);
455  return M0_ERR_INFO(rc, "Allocating sync fop data failed.");
456  }
457 
458  ffd = m0_fop_data(fop);
459  ffd->ff_be_remid = stx->stx_tri;
460  ffd->ff_fsync_mode = mode;
461 
462  /*
463  * Posts the rpc_item directly so that this is asyncronous.
464  * Prepare the fop as an rpc item
465  */
466  item = &fop->f_item;
467  item->ri_ops = (set_ri_ops == true)?&sync_ri_ops:NULL;
470  item->ri_deadline = 0;
473 
474  rc = si.si_post_rpc(item);
475  if (rc != 0) {
476  si.si_fop_fini(fop);
477  return M0_ERR_INFO(rc, "Calling m0_rpc_post() failed.");
478  }
479 
480  *sfw_out = sfw;
481  return M0_RC(0);
482 }
483 
484 /*
485  * If wait_after_launch == true, caller of this function will wait for
486  * the FSYNC reply fops.
487  */
489  enum m0_fsync_mode mode,
490  bool wait_after_launch)
491 {
492  int rc;
493  int saved_error = 0;
494  struct m0_reqh_service_txid *iter;
495  struct sync_fop_wrapper *sfw = NULL;
496  struct m0_tl *stx_tl;
497 
498  M0_ENTRY();
499 
500  if (M0_FI_ENABLED("launch_failed"))
501  return M0_ERR(-EAGAIN);
502 
503  /*
504  * Finds the services with pending transactions for each entry,
505  * send an fsync fop. This is the fop sending loop.
506  */
508  stx_tl = &sreq->sr_stxs;
509  m0_tl_for(spti, stx_tl, iter) {
510  /*
511  * Sends an fsync fop for
512  * iter->stx_maximum_txid to iter->stx_service_ctx
513  */
514 
515  /* Checks if this service has any pending transactions. */
516  if (iter->stx_tri.tri_txid == 0)
517  continue;
518 
519  /* Creates and sends an FSYNC fop. */
521  !wait_after_launch, &sfw);
522  if (rc != 0) {
523  saved_error = rc;
524  break;
525  } else {
526  /* Add to list of pending fops */
528  spf_tlink_init_at(sfw, &sreq->sr_fops);
529  }
530  } m0_tl_endfor;
531 
532  /*
533  * How to handle those fops which have been sent?
534  * (1) Cancel them by calling m0_rpc_item_cancel,
535  * (2) Simply treat them as 'normal' fsync fops and let rpc item
536  * callback to handle the replies.
537  * As m0_rpc_item_cancel() only cancels rpc item locally, fops are still
538  * executed in service side, client will choose the option 2 to allow
539  * client to update its records on FSYNC.
540  *
541  * Returns saved_error directly only if no fops are sent, otherwise the
542  * error is stored in sync_request::sr_rc.
543  */
544  if (sreq->sr_nr_fops == 0) {
545  if (saved_error == 0)
546  /*
547  * It may happen when there are no pending txid. For
548  * example, sync op is launched even before
549  * a write request gets replies and sets txid.
550  */
551  rc = -EAGAIN;
552  else
553  rc = M0_ERR(saved_error);
554  } else {
555  sreq->sr_rc = saved_error != 0 ? M0_ERR(saved_error) :
556  saved_error;
557  rc = 0;
558  }
560  return M0_RC(rc);
561 }
562 
567 static int sync_reply_wait(struct sync_fop_wrapper *sfw)
568 {
569  int rc;
570  struct m0_rpc_item *item;
571  struct m0_fop *fop;
572 
573  M0_ENTRY();
574 
575  fop = &sfw->sfw_fop;
576  item = &fop->f_item;
577 
579  if (rc != 0)
580  goto out;
581 
582  rc = sync_reply_process(sfw);
583 
584 out:
585  si.si_fop_put(fop);
586 
587  return M0_RC(rc);
588 }
589 
601  enum m0_fsync_mode mode)
602 {
603  int rc;
604  int saved_error;
605  struct sync_fop_wrapper *sfw;
606 
607  M0_ENTRY();
608 
609  M0_PRE(sreq != NULL);
610 
611  /*
612  * After calling sync_core() we may have sent some fops,
613  * but stopped when one failed - collect all the replies before
614  * returning.
615  */
616  rc = sync_request_launch(sreq, mode, true);
617  if (rc != 0)
618  return M0_ERR(rc);
619 
620  /* This is the fop-reply receiving loop. */
621  saved_error = sreq->sr_rc;
622  m0_tl_teardown(spf, &sreq->sr_fops, sfw) {
623  /* Get and process the reply. */
624  rc = sync_reply_wait(sfw);
625  saved_error = saved_error ? : rc;
626  }
627 
628  return M0_RC(saved_error);
629 }
630 
632  int type, void *target)
633 {
634  struct m0_entity *ent;
635  struct m0_op *op;
636  struct sync_target *stgt;
637 
638  M0_ENTRY();
639 
641  if (stgt == NULL)
642  return M0_ERR(-ENOMEM);
643 
644  switch(type) {
645  case SYNC_ENTITY:
646  ent = (struct m0_entity *)target;
647  M0_ASSERT(M0_IN(ent->en_type,
648  (M0_ET_OBJ, M0_ET_IDX)));
650  stgt->u.srt_ent = ent;
651  break;
652  case SYNC_OP:
653  /*
654  * Only those ops which have been executed and received txid
655  * can be sync-ed in current version.
656  */
657  op = (struct m0_op *)target;
658  stgt->srt_type = SYNC_OP;
659  stgt->u.srt_op = op;
660  break;
661  case SYNC_INSTANCE:
663  break;
664  default:
665  M0_IMPOSSIBLE("Unknow type for SYNC request.");
666  break;
667  }
668  sync_target_tlink_init_at(stgt, &sreq->sr_targets);
669 
670  return M0_RC(0);
671 }
672 
674  struct m0_tl *pending_tx_tl,
675  struct m0_mutex *pending_tx_lock)
676 {
677  int i = 0;
678  int rc = 0;
679  int nr_saved_stxs = 0;
680  struct m0_reqh_service_txid *saved_stxs;
681  struct m0_reqh_service_txid *stx = NULL;
682  struct m0_reqh_service_txid *iter;
683 
684  m0_mutex_lock(pending_tx_lock);
685 
686  M0_ALLOC_ARR(saved_stxs, spti_tlist_length(pending_tx_tl));
687  if (saved_stxs == NULL) {
688  rc = M0_ERR(-ENOMEM);
689  goto error;
690  }
691 
692  m0_tl_for(spti, pending_tx_tl, iter) {
693  /* Find the record for this service */
694  stx = m0_tl_find(spti, stx, &sreq->sr_stxs,
696  if (stx != NULL) {
697  saved_stxs[nr_saved_stxs].stx_service_ctx =
699  saved_stxs[nr_saved_stxs].stx_tri = stx->stx_tri;
700  if (iter->stx_tri.tri_txid > stx->stx_tri.tri_txid)
701  stx->stx_tri = iter->stx_tri;
702  } else {
703  /* Not found - add a new stx. */
704  M0_ALLOC_PTR(stx);
705  if (stx == NULL) {
706  rc = M0_ERR(-ENOMEM);
707  goto undo;
708  }
710  stx->stx_tri = iter->stx_tri;
711  spti_tlink_init_at(stx, &sreq->sr_stxs);
712  }
713  nr_saved_stxs++;
714  } m0_tl_endfor;
715 
716  m0_free(saved_stxs);
717  m0_mutex_unlock(pending_tx_lock);
718  return M0_RC(0);
719 
720 undo:
721  m0_tl_for(spti, pending_tx_tl, iter) {
722  stx = m0_tl_find(spti, stx, &sreq->sr_stxs,
724  M0_ASSERT(stx != NULL);
725 
726  if (saved_stxs[i].stx_service_ctx == NULL)
727  /* It is a newly created stx, so simply remove it. */
728  spti_tlist_del(stx);
729  else if (stx->stx_tri.tri_txid >
730  saved_stxs[i].stx_tri.tri_txid) {
731  /* Retores the old value. */
733  saved_stxs[i].stx_service_ctx);
734  stx->stx_tri = saved_stxs[i].stx_tri;
735  }
736 
737  i++;
738  if ( i == nr_saved_stxs)
739  break;
740  } m0_tl_endfor;
741 
742 error:
743  m0_free(saved_stxs);
744  m0_mutex_unlock(pending_tx_lock);
745  return M0_RC(rc);
746 }
747 
749  struct m0_mutex *pending_tx_lock,
750  struct m0_tl *pending_tx,
751  struct m0_be_tx_remid *btr)
752 {
753  struct m0_reqh_service_txid *stx;
754 
755  /*
756  * TODO: replace this O(N) search with something better.
757  * Embbed the struct m0_reqh_service_txid in a list of
758  * 'services for this inode'? See RB1667
759  */
760  /* Find the record for this service */
761  m0_mutex_lock(pending_tx_lock);
762  stx = m0_tl_find(spti, stx, pending_tx,
764 
765  if (stx != NULL) {
766  if (btr->tri_txid > stx->stx_tri.tri_txid)
767  stx->stx_tri = *btr;
768  } else {
769  /*
770  * not found - add a new record
771  */
772  M0_ALLOC_PTR(stx);
773  if (stx != NULL) {
775  stx->stx_tri = *btr;
776 
777  spti_tlink_init_at(stx, pending_tx);
778  }
779  }
780  m0_mutex_unlock(pending_tx_lock);
781 }
782 
789  struct m0_entity *ent,
790  struct m0_op *op,
791  struct m0_be_tx_remid *btr)
792 {
793  struct m0_reqh_service_txid *stx = NULL;
794 
795  M0_ENTRY();
796 
797  M0_PRE(service != NULL);
798 
799  /* Updates pending transaction number in the entity. */
800  if (ent != NULL)
801  sync_pending_stx_update(service, &ent->en_pending_tx_lock,
802  &ent->en_pending_tx, btr);
803 
804  /* Updates pending transaction number in the op. */
805  if (op != NULL)
806  sync_pending_stx_update(service, &op->op_pending_tx_lock,
807  &op->op_pending_tx, btr);
808 
809  /* update pending transaction number in the Client instance */
812  /* update the value from the reply_fop */
813  if (btr->tri_txid > stx->stx_tri.tri_txid) {
815  stx->stx_tri = *btr;
816  }
818 
819  M0_LEAVE("Client sync record updated.");
820 }
821 
828 static bool sync_op_invariant(struct m0_op_sync *os)
829 {
830  return M0_RC(os != NULL &&
831  m0_op_sync_bob_check(os) &&
832  os->os_oc.oc_op.op_size >= sizeof *os &&
833  m0_ast_rc_bob_check(&os->os_ar) &&
834  m0_op_common_bob_check(&os->os_oc));
835 }
836 
840 static void sync_op_cb_fini(struct m0_op_common *oc)
841 {
842  struct m0_op_sync *os;
843 
844  M0_ENTRY();
845 
846  M0_PRE(oc != NULL);
847  M0_PRE(oc->oc_op.op_size >= sizeof *os);
848 
849  os = bob_of(oc, struct m0_op_sync, os_oc, &os_bobtype);
851  m0_op_common_bob_fini(&os->os_oc);
852  m0_ast_rc_bob_fini(&os->os_ar);
853  m0_op_sync_bob_fini(os);
854 
855  M0_LEAVE();
856 }
857 
861 static void sync_op_cb_free(struct m0_op_common *oc)
862 {
863  struct m0_op_sync *os;
864  struct sync_target *tgt;
865  struct m0_reqh_service_txid *stx = NULL;
866 
867  M0_ENTRY();
868 
869  M0_PRE(oc != NULL);
870  M0_PRE((oc->oc_op.op_size >= sizeof *os));
871 
872  /* By now, fini() has been called and bob_of cannot be used */
873  os = M0_AMB(os, oc, os_oc);
874  m0_tl_teardown(spti, &os->os_req->sr_stxs, stx)
875  m0_free(stx);
877  m0_free(tgt);
878  m0_free(os->os_req);
879  m0_free(os);
880 
881  M0_LEAVE();
882 }
883 
887 static void sync_op_cb_launch(struct m0_op_common *oc)
888 {
889  int rc = 0;
890  struct m0_op *op;
891  struct m0_op_sync *os;
892  struct sync_request *sreq;
893 
894  M0_ENTRY();
895 
896  M0_PRE(oc != NULL);
897  os = bob_of(oc, struct m0_op_sync, os_oc, &os_bobtype);
898  op = &oc->oc_op;
899 
900  sreq = os->os_req;
901  rc = sync_request_launch(sreq, os->os_mode, false);
902  m0_sm_move(&op->op_sm, 0, M0_OS_LAUNCHED);
903 
904  if (rc != 0) {
905  /*
906  * If the SYNC request is not sent to services, update the op's
907  * state here. Note: m0_op_launch_one() has held the
908  * group lock.
909  */
910  sreq->sr_rc = sreq->sr_rc?:rc;
912  }
913 }
914 
915 static void sync_request_init(struct sync_request *sreq)
916 {
917  M0_SET0(sreq);
918  spf_tlist_init(&sreq->sr_fops);
919  spti_tlist_init(&sreq->sr_stxs);
920  sync_target_tlist_init(&sreq->sr_targets);
922 }
923 
924 static int sync_op_init(struct m0_op *op)
925 {
926  int rc;
927  struct m0_op_common *oc;
928  struct m0_op_sync *os;
929  struct m0_locality *locality;
930  struct sync_request *sreq;
931 
932  M0_ENTRY();
933 
934  op->op_code = M0_EO_SYNC;
936  if (rc != 0)
937  return M0_RC(rc);
938  /*
939  * Initialise m0_op_common part.
940  * bob_init()'s haven't been called yet: we use M0_AMB().
941  */
942  oc = M0_AMB(oc, op, oc_op);
943  os = M0_AMB(os, oc, os_oc);
945 
946  m0_op_common_bob_init(oc);
950 
951  /* Allocates and initialises a sync request. */
953  if (sreq == NULL)
954  return M0_ERR(-ENOMEM);
956  sreq->sr_op_sync = os;
958  os->os_req = sreq;
959 
960  /* Picks a locality thread for this op. */
962  M0_ASSERT(locality != NULL);
963  os->os_sm_grp = locality->lo_grp;
964  M0_SET0(&os->os_ar);
965 
966  m0_op_sync_bob_init(os);
967  m0_ast_rc_bob_init(&os->os_ar);
968 
969  return M0_RC(0);
970 }
971 
972 int m0_sync_op_init(struct m0_op **sop)
973 {
974  int rc = 0;
975 
976  M0_ENTRY();
977 
978  rc = m0_op_alloc(sop, sizeof(struct m0_op_sync))?:
979  sync_op_init(*sop);
980 
981  return M0_RC(rc);
982 }
983 M0_EXPORTED(m0_sync_op_init);
984 
985 int m0_sync_entity_add(struct m0_op *sop,
986  struct m0_entity *ent)
987 {
988  int rc;
989  struct m0_op_common *oc;
990  struct m0_op_sync *os;
991  struct sync_request *sreq;
992 
993  M0_ENTRY();
994 
995  /*
996  * New elements can only be added to the SYNC op
997  * before the op is launched.
998  */
999  M0_PRE(sop != NULL);
1001 
1002  oc = bob_of(sop, struct m0_op_common, oc_op, &oc_bobtype);
1003  os = bob_of(oc, struct m0_op_sync, os_oc, &os_bobtype);
1004 
1005  /* Stores the target. */
1006  sreq = os->os_req;
1007  M0_ASSERT(sreq != NULL);
1009  if (rc != 0)
1010  return M0_ERR(rc);
1011 
1012  /* Adds service txid (if stx exists in the list, then merge.).*/
1013  rc = sync_request_stx_add(sreq, &ent->en_pending_tx,
1014  &ent->en_pending_tx_lock);
1015 
1016  return M0_RC(rc);
1017 }
1018 M0_EXPORTED(m0_sync_entity_add);
1019 
1020 int m0_sync_op_add(struct m0_op *sop,
1021  struct m0_op *op)
1022 {
1023  int rc;
1024  struct m0_op_common *oc;
1025  struct m0_op_sync *os;
1026  struct sync_request *sreq;
1027 
1028  M0_ENTRY();
1029 
1030  /*
1031  * New elements can only be added to the SYNC op
1032  * before the op is launched.
1033  */
1034  M0_PRE(sop != NULL);
1036  M0_PRE(op != NULL);
1037  M0_PRE(M0_IN(op->op_sm.sm_state,
1039 
1040  oc = bob_of(sop, struct m0_op_common, oc_op, &oc_bobtype);
1041  os = bob_of(oc, struct m0_op_sync, os_oc, &os_bobtype);
1042 
1043  /* Stores the target. */
1044  sreq = os->os_req;
1045  M0_ASSERT(sreq != NULL);
1047  if (rc != 0)
1048  return M0_ERR(rc);
1049 
1050  /* Adds service txid (if stx exists in the list, then merge.).*/
1051  rc = sync_request_stx_add(sreq, &op->op_pending_tx,
1052  &op->op_pending_tx_lock);
1053 
1054  return M0_RC(rc);
1055 }
1056 M0_EXPORTED(m0_sync_op_add);
1057 
1062 {
1063  int rc;
1064  struct sync_request sreq;
1065  struct sync_target *tgt;
1066  struct m0_reqh_service_txid *stx;
1067 
1068  M0_ENTRY();
1069  M0_PRE(ent != NULL);
1070 
1073  if (rc != 0)
1074  return M0_ERR(rc);
1075 
1077  &sreq, &ent->en_pending_tx, &ent->en_pending_tx_lock)?:
1079  m0_tl_teardown(spti, &sreq.sr_stxs, stx)
1080  m0_free(stx);
1082  m0_free(tgt);
1083 
1084  return M0_RC(rc);
1085 }
1086 M0_EXPORTED(m0_entity_sync);
1087 
1093 int m0_sync(struct m0_client *m0c, bool wait)
1094 {
1095  int rc;
1096  int saved_error = 0;
1097  struct m0_reqh_service_txid *stx;
1098  struct m0_reqh_service_ctx *iter;
1099  struct sync_request sreq;
1100  struct sync_fop_wrapper *sfw;
1101 
1102  M0_ENTRY();
1103 
1104  M0_PRE(si.si_post_rpc != NULL);
1106  M0_PRE(si.si_fop_fini != NULL);
1107 
1109 
1110  /*
1111  * loop over all services associated with this super block,
1112  * send an fsync fop for those with pending transactions
1113  *
1114  * fop sending loop
1115  */
1116  m0_tl_for(pools_common_svc_ctx, &m0c->m0c_pools_common.pc_svc_ctxs,
1117  iter) {
1118  /*
1119  * Send an fsync fop for iter->sc_max_pending_txt to iter.
1120  */
1122  stx = &iter->sc_max_pending_tx;
1123 
1124  /*
1125  * Check if this service has any pending transactions.
1126  * Currently for fsync operations are supported only for
1127  * ioservice and mdservice.
1128  */
1129  if (stx->stx_tri.tri_txid == 0 ||
1130  !M0_IN(stx->stx_service_ctx->sc_type,
1131  (M0_CST_MDS, M0_CST_IOS))) {
1133  continue;
1134  }
1135 
1136  /* Create and send a request */
1139  true, &sfw);
1140  if (rc != 0) {
1141  saved_error = rc;
1143  break;
1144  } else {
1145  /* Reset the rpc item ops to NULL. */
1146  sfw->sfw_fop.f_item.ri_ops = NULL;
1147  /* Add to list of pending fops */
1148  spf_tlink_init_at(sfw, &sreq.sr_fops);
1149  }
1150 
1152  } m0_tl_endfor;
1153 
1154  /*
1155  * At this point we may have sent some fops, but stopped when one
1156  * failed - collect all the replies before returning
1157  */
1158 
1159  /* reply receiving loop */
1160  m0_tl_teardown(spf, &sreq.sr_fops, sfw) {
1161  /* get and process the reply */
1162  rc = sync_reply_wait(sfw);
1163  saved_error = saved_error ? : rc;
1164  }
1165 
1166  M0_LEAVE();
1167  return (saved_error == 0) ? M0_RC(saved_error): M0_ERR(saved_error);
1168 }
1169 M0_EXPORTED(m0_sync);
1170 
1171 M0_INTERNAL struct m0_entity *
1173 {
1174  struct m0_op_sync *os;
1175  struct sync_target *stgt;
1176  struct m0_op_common *oc;
1177 
1178  M0_PRE(op != NULL);
1179  M0_PRE(op->op_code == M0_EO_SYNC);
1180 
1181  oc = bob_of(op, struct m0_op_common, oc_op, &oc_bobtype);
1182  M0_PRE(oc != NULL);
1183  os = M0_AMB(os, oc, os_oc);
1184  M0_PRE(os != NULL && os->os_req != NULL);
1185 
1186  stgt = sync_target_tlist_head(&os->os_req->sr_targets);
1187  M0_PRE(stgt != NULL);
1188  switch (stgt->srt_type) {
1189  case SYNC_ENTITY:
1190  return stgt->u.srt_ent;
1191  case SYNC_OP:
1192  return stgt->u.srt_op->op_entity;
1193  case SYNC_INSTANCE:
1194  break;
1195  default:
1196  M0_IMPOSSIBLE("Unknow type for SYNC request.");
1197  }
1198 
1199  return NULL;
1200 }
1201 
1202 #undef M0_TRACE_SUBSYSTEM
1203 /*
1204  * Local variables:
1205  * c-indentation-style: "K&R"
1206 
1207  * c-basic-offset: 8
1208  * tab-width: 8
1209  * fill-column: 80
1210  * scroll-step: 1
1211  * End:
1212  */
1213 /*
1214  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
1215  */
struct m0_sm_ast sfw_ast
Definition: sync.h:114
int32_t os_mode
struct m0_sm_group * os_sm_grp
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
struct m0_fop sfw_fop
Definition: sync.h:101
m0_time_t ri_resend_interval
Definition: item.h:144
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
static struct m0_semaphore wait
Definition: item.c:151
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static void sync_fop_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sync.c:356
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
#define NULL
Definition: misc.h:38
struct m0_sm_ast sr_ast
Definition: sync.h:89
struct m0_tl sr_stxs
Definition: sync.h:76
struct m0_mutex sc_max_pending_tx_lock
Definition: reqh_service.h:773
static size_t locality(const struct m0_fom *fom)
Definition: rm_foms.c:269
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
int32_t sr_rc
Definition: sync.h:91
static int sync_request_launch(struct sync_request *sreq, enum m0_fsync_mode mode, bool wait_after_launch)
Definition: sync.c:488
static struct m0_sm_group * grp
Definition: bytecount.c:38
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
uint32_t srt_type
Definition: sync.h:56
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
int m0_entity_sync(struct m0_entity *ent)
Definition: sync.c:1061
static void sync_request_done_locked(struct sync_request *sreq)
Definition: sync.c:150
struct m0_reqh_service_txid * sfw_stx
Definition: sync.h:109
static void sync_fop_stx_update(struct m0_reqh_service_txid *stx, uint64_t txid)
Definition: sync.c:244
struct m0_fop_type m0_fop_fsync_ios_fopt
Definition: io_fops.c:82
static struct sync_request sreq
Definition: sync.c:68
static struct m0_clovis * m0c
Definition: main.c:25
struct m0_op oc_op
M0_INTERNAL int m0_op_init(struct m0_op *op, const struct m0_sm_conf *conf, struct m0_entity *entity)
Definition: client.c:806
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
struct sync_request * os_req
static int error
Definition: mdstore.c:64
int m0_sync_op_add(struct m0_op *sop, struct m0_op *op)
Definition: sync.c:1020
Definition: sm.h:504
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
int32_t ffr_rc
Definition: fsync_fops.h:64
struct m0_fop_type m0_fop_fsync_mds_fopt
Definition: fsync_fops.c:33
static struct m0_rpc_item * item
Definition: item.c:56
int m0_rpc_item_wait_for_reply(struct m0_rpc_item *item, m0_time_t timeout)
Definition: item.c:824
const char * bt_name
Definition: bob.h:73
#define m0_tl_endfor
Definition: tlist.h:700
return M0_RC(rc)
op
Definition: libdemo.c:64
#define M0_ENTRY(...)
Definition: trace.h:170
static struct m0_sm_ast ast[NR]
Definition: locality.c:44
M0_INTERNAL void m0_sm_group_unlock(struct m0_sm_group *grp)
Definition: sm.c:96
int i
Definition: dir.c:1033
size_t op_size
Definition: client.h:664
static bool sync_op_invariant(struct m0_op_sync *os)
Definition: sync.c:828
struct m0_entity * srt_ent
Definition: sync.h:58
Definition: client.h:641
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
const struct m0_bob_type oc_bobtype
Definition: client.c:44
void(* oc_cb_free)(struct m0_op_common *oc)
return M0_ERR(-EOPNOTSUPP)
static void sync_op_cb_launch(struct m0_op_common *oc)
Definition: sync.c:887
struct m0_sm op_sm
Definition: client.h:656
static int sync_request_stx_add(struct sync_request *sreq, struct m0_tl *pending_tx_tl, struct m0_mutex *pending_tx_lock)
Definition: sync.c:673
M0_INTERNAL int m0_op_stable(struct m0_op *op)
Definition: client.c:520
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
Definition: refs.h:34
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
void(* si_fop_fini)(struct m0_fop *fop)
Definition: sync.h:129
static struct sync_target stgt
Definition: sync.c:69
struct m0_ast_rc os_ar
static int sync_reply_process(struct sync_fop_wrapper *sfw)
Definition: sync.c:267
#define M0_ASSERT(cond)
static int sync_request_launch_and_wait(struct sync_request *sreq, enum m0_fsync_mode mode)
Definition: sync.c:600
struct m0_be_tx_remid stx_tri
Definition: reqh_service.h:739
struct m0_be_tx_remid ffr_be_remid
Definition: fsync_fops.h:70
Definition: tlist.h:251
#define bob_of(ptr, type, field, bt)
Definition: bob.h:140
int(* si_post_rpc)(struct m0_rpc_item *item)
Definition: sync.h:127
struct m0_reqh_service_ctx * stx_service_ctx
Definition: reqh_service.h:733
enum m0_conf_service_type sc_type
Definition: reqh_service.h:757
M0_INTERNAL struct m0_entity * m0__op_sync_entity(const struct m0_op *op)
Definition: sync.c:1172
int m0_sync_entity_add(struct m0_op *sop, struct m0_entity *ent)
Definition: sync.c:985
struct m0_rpc_item * ri_reply
Definition: item.h:163
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
M0_BOB_DEFINE(static, &os_bobtype, m0_op_sync)
uint64_t ri_nr_sent_max
Definition: item.h:146
Definition: sync.h:48
static const struct m0_rpc_item_ops sync_ri_ops
Definition: sync.c:401
static int struct dentry int mode
Definition: dir.c:589
struct m0_tl sr_targets
Definition: sync.h:72
struct m0_op * srt_op
Definition: sync.h:59
static struct m0_reqh_service_ctx service
Definition: sync.c:93
void(* oc_cb_fini)(struct m0_op_common *oc)
static const struct m0_bob_type os_bobtype
Definition: sync.c:83
static struct m0_reqh_service_txid stx[NUM_STRECORDS]
Definition: sync.c:73
M0_INTERNAL int m0_fop_data_alloc(struct m0_fop *fop)
Definition: fop.c:71
static void sync_request_init(struct sync_request *sreq)
Definition: sync.c:915
int m0_sync(struct m0_client *m0c, bool wait)
Definition: sync.c:1093
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
int m0_sync_op_init(struct m0_op **sop)
Definition: sync.c:972
struct m0_be_tx_remid ff_be_remid
Definition: fsync_fops.h:115
struct sync_request * sfw_req
Definition: sync.h:111
uint64_t tri_txid
Definition: tx.h:431
static int sync_request_fop_send(struct sync_request *sreq, struct m0_reqh_service_txid *stx, enum m0_fsync_mode mode, bool set_ri_ops, struct sync_fop_wrapper **sfw_out)
Definition: sync.c:411
M0_INTERNAL int m0_op_executed(struct m0_op *op)
Definition: client.c:500
static void sync_op_cb_fini(struct m0_op_common *oc)
Definition: sync.c:840
M0_TL_DESCR_DEFINE(sync_target, "Targets to synced for a client SYNC request", static, struct sync_target, srt_tlink, srt_tlink_magic, M0_SYNC_TGT_TL_MAGIC, M0_SYNC_TGT_TL_MAGIC)
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
struct m0_op_sync * sr_op_sync
Definition: sync.h:69
M0_INTERNAL int m0_rpc_session_validate(struct m0_rpc_session *session)
Definition: session.c:573
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
static void sync_request_ast(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sync.c:192
struct m0_pdclust_tgt_addr tgt
Definition: fd.c:110
static void sync_request_done(struct sync_request *sreq)
Definition: sync.c:178
static void sync_rio_replied(struct m0_rpc_item *item)
Definition: sync.c:378
struct m0_sm_group op_sm_group
Definition: client.h:654
int(* si_wait_for_reply)(struct m0_rpc_item *item, m0_time_t timeout)
Definition: sync.h:128
#define M0_CNT_INC(cnt)
Definition: arith.h:226
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
static int sync_op_init(struct m0_op *op)
Definition: sync.c:924
struct m0_be_tx_remid ff_be_remid
Definition: fsync_fops.h:48
struct m0_fop_type m0_fop_fsync_cas_fopt
Definition: cas.c:53
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
struct m0_entity * op_entity
Definition: client.h:660
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
M0_INTERNAL struct m0_locality * m0__locality_pick(struct m0_client *cinst)
Definition: client.c:290
void(* si_fop_put)(struct m0_fop *fop)
Definition: sync.h:130
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
struct m0_rpc_session * ri_session
Definition: item.h:147
M0_INTERNAL void m0_sm_move(struct m0_sm *mach, int32_t rc, int state)
Definition: sm.c:485
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
M0_INTERNAL void m0_sm_group_lock(struct m0_sm_group *grp)
Definition: sm.c:83
int32_t sr_nr_fops
Definition: sync.h:85
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
m0_fsync_mode
Definition: fsync_fops.h:54
M0_INTERNAL int m0_op_alloc(struct m0_op **op, size_t op_size)
Definition: client.c:779
M0_TL_DEFINE(sync_target, static, struct sync_target)
static int sync_request_target_add(struct sync_request *sreq, int type, void *target)
Definition: sync.c:631
struct m0_tl sr_fops
Definition: sync.h:84
static int sync_reply_wait(struct sync_fop_wrapper *sfw)
Definition: sync.c:567
uint32_t ff_fsync_mode
Definition: fsync_fops.h:51
#define out(...)
Definition: gen.c:41
static void sync_fop_done(struct sync_fop_wrapper *sfw, int rc)
Definition: sync.c:213
int type
Definition: dir.c:1031
struct m0_dirent * ent
Definition: dir.c:1029
void(* oc_cb_launch)(struct m0_op_common *oc)
struct m0_rpc_link sc_rlink
Definition: reqh_service.h:759
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
void sync_record_update(struct m0_reqh_service_ctx *service, struct m0_entity *ent, struct m0_op *op, struct m0_be_tx_remid *btr)
Definition: sync.c:788
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
struct m0_rpc_item f_item
Definition: fop.h:83
uint32_t sm_state
Definition: sm.h:307
int32_t rc
Definition: trigger_fop.h:47
union sync_target::@361 u
#define offsetof(typ, memb)
Definition: misc.h:29
M0_INTERNAL bool m0_sm_group_is_locked(const struct m0_sm_group *grp)
Definition: sm.c:107
static struct sync_interactions si
Definition: sync.c:121
static void sync_op_cb_free(struct m0_op_common *oc)
Definition: sync.c:861
struct m0_sm_conf m0_op_conf
Definition: client.c:145
struct m0_mutex sr_fops_lock
Definition: sync.h:83
struct m0_reqh_service_txid sc_max_pending_tx
Definition: reqh_service.h:772
Definition: fop.h:79
static void sync_pending_stx_update(struct m0_reqh_service_ctx *service, struct m0_mutex *pending_tx_lock, struct m0_tl *pending_tx, struct m0_be_tx_remid *btr)
Definition: sync.c:748
static void sync_fop_cleanup(struct m0_ref *ref)
Definition: sync.c:133
m0_time_t ri_deadline
Definition: item.h:141
#define M0_IMPOSSIBLE(fmt,...)
struct m0_op_common os_oc