Motr  M0
remote.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2021 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_DTM
31 
32 #include "lib/trace.h"
33 #include "lib/misc.h" /* M0_IN */
34 #include "rpc/rpc.h"
35 #include "rpc/rpc_opcodes.h" /* M0_DTM_NOTIFICATION_OPCODE */
36 #include "fop/fop.h"
37 #include "reqh/reqh.h" /* m0_reqh_fop_handle */
38 
39 #include "dtm/dtm_internal.h"
40 #include "dtm/history.h"
41 #include "dtm/remote.h"
42 #include "dtm/remote_xc.h"
43 
46  R_FIXED = 2,
47  R_RESET = 3,
48  R_UNDO = 4
49 };
50 
51 static const struct m0_dtm_remote_ops rem_rpc_ops;
52 static const struct m0_dtm_remote_ops rem_local_ops;
53 static void rem_rpc_notify(struct m0_dtm_remote *rem,
54  const struct m0_dtm_history *history,
56 static int rem_rpc_deliver(struct m0_rpc_machine *mach,
57  struct m0_rpc_item *item);
58 
59 static struct m0_fop_type rem_rpc_fopt;
60 static const struct m0_fop_type_ops rem_rpc_ftype_ops;
64 
65 M0_INTERNAL void m0_dtm_remote_add(struct m0_dtm_remote *rem,
66  struct m0_dtm_oper *oper,
67  struct m0_dtm_history *history,
68  struct m0_dtm_update *update)
69 {
70  m0_dtm_fol_remote_add(&rem->re_fol, oper);
71 }
72 
73 M0_INTERNAL void m0_dtm_remote_init(struct m0_dtm_remote *remote,
74  struct m0_uint128 *id,
75  struct m0_dtm *local)
76 {
77  M0_PRE(!m0_uint128_eq(id, &local->d_id));
78  remote->re_id = *id;
80 }
81 
82 M0_INTERNAL void m0_dtm_remote_fini(struct m0_dtm_remote *remote)
83 {
85 }
86 
88  struct m0_uint128 *id,
89  struct m0_dtm *local,
90  struct m0_rpc_conn *conn)
91 {
92  m0_dtm_remote_init(&remote->rpr_rem, id, local);
93  remote->rpr_conn = conn;
94  remote->rpr_rem.re_ops = &rem_rpc_ops;
95 }
96 
98 {
99  m0_dtm_remote_fini(&remote->rpr_rem);
100 }
101 
102 static void rem_rpc_persistent(struct m0_dtm_remote *rem,
103  struct m0_dtm_history *history)
104 {
105  rem_rpc_notify(rem, history,
107 }
108 
109 static void rem_rpc_fixed(struct m0_dtm_remote *rem,
110  struct m0_dtm_history *history)
111 {
112  rem_rpc_notify(rem, history, 0, R_FIXED);
113 }
114 
115 static void rem_rpc_reset(struct m0_dtm_remote *rem,
116  struct m0_dtm_history *history)
117 {
118  rem_rpc_notify(rem, history, history->h_hi.hi_ver, R_RESET);
119 }
120 
121 static void rem_rpc_undo(struct m0_dtm_remote *rem,
122  struct m0_dtm_history *history, m0_dtm_ver_t upto)
123 {
124  rem_rpc_notify(rem, history, upto, R_UNDO);
125 }
126 
127 M0_EXTERN struct m0_rpc_session *m0_rpc_conn_session0(const struct m0_rpc_conn
128  *conn);
129 
130 static void rem_rpc_send(struct m0_dtm_remote *rem,
131  struct m0_dtm_update *update)
132 {
133  struct m0_rpc_item *item = &update->upd_comm.uc_body->f_item;
134 
135  M0_PRE(update->upd_comm.uc_body != NULL);
137  m0_rpc_post(item);
138 }
139 
140 static void rem_rpc_resend(struct m0_dtm_remote *rem,
141  struct m0_dtm_update *update)
142 {
143  struct m0_rpc_item *item = &update->upd_comm.uc_body->f_item;
144 
145  M0_PRE(update->upd_comm.uc_body != NULL);
146  M0_PRE(M0_IN(update->upd_up.up_state, (M0_DOS_INPROGRESS,
150  /* add 1/100 second deadline to give a chance to build a better rpc. */
151  item->ri_deadline = m0_time_from_now(0, 10000000);
152  m0_rpc_post(item);
153 }
154 
155 static const struct m0_dtm_remote_ops rem_rpc_ops = {
157  .reo_fixed = &rem_rpc_fixed,
158  .reo_reset = &rem_rpc_reset,
159  .reo_send = &rem_rpc_send,
160  .reo_resend = &rem_rpc_resend,
161  .reo_undo = &rem_rpc_undo
162 };
163 
164 static void notice_pack(struct m0_dtm_notice *notice,
165  const struct m0_dtm_history *history,
167 {
168  notice->dno_opcode = opcode;
169  notice->dno_ver = ver;
170  m0_dtm_history_pack(history, &notice->dno_id);
171 }
172 
173 static void rem_rpc_notify(struct m0_dtm_remote *rem,
174  const struct m0_dtm_history *history,
176 {
177  struct m0_dtm_rpc_remote *rpr;
178  struct m0_fop *fop;
179 
180  M0_PRE(rem->re_ops == &rem_rpc_ops);
181  rpr = M0_AMB(rpr, rem, rpr_rem);
183  if (fop != NULL) {
184  struct m0_rpc_item *item;
185 
186  notice_pack(m0_fop_data(fop), history, ver, opcode);
187  item = &fop->f_item;
189  item->ri_deadline = 0;
192  }
193 }
194 
195 M0_INTERNAL int m0_dtm_remote_global_init(void)
196 {
200  .name = "dtm notice",
202  .xt = m0_dtm_notice_xc,
203  .rpc_flags = M0_RPC_ITEM_TYPE_ONEWAY,
204  .fop_ops = &rem_rpc_ftype_ops,
206  return 0;
207 }
208 
209 M0_INTERNAL void m0_dtm_remote_global_fini(void)
210 {
212 }
213 
214 static void notice_deliver(struct m0_dtm_notice *notice, struct m0_dtm *dtm)
215 {
216  struct m0_dtm_history *history;
217  int result;
218 
219  result = m0_dtm_history_unpack(dtm, &notice->dno_id, &history);
220  if (result == 0) {
221  switch (notice->dno_opcode) {
222  case R_PERSISTENT:
223  m0_dtm_history_persistent(history, notice->dno_ver);
224  break;
225  case R_FIXED:
226  break;
227  case R_RESET:
228  m0_dtm_history_reset(history, notice->dno_ver);
229  break;
230  case R_UNDO:
231  m0_dtm_history_undo(history, notice->dno_ver);
232  break;
233  default:
234  M0_LOG(M0_ERROR, "DTM notice: %i.", notice->dno_opcode);
235  }
236  } else
237  M0_LOG(M0_ERROR, "DTM history: %i.", result);
238 }
239 
241  struct m0_rpc_item *item)
242 {
244  return 0;
245 }
246 
248 {}
249 
250 static const struct m0_fop_type_ops rem_rpc_ftype_ops = {
251  /* nothing */
252 };
253 
254 static struct m0_rpc_item_type_ops rem_rpc_itype_ops = {
255  /* initialised in m0_dtm_remote_global_init() */
256 };
257 
258 static const struct m0_rpc_item_ops rem_rpc_item_sender_ops = {
259  /* nothing */
260 };
261 
262 M0_UNUSED static const struct m0_rpc_item_ops rem_rpc_item_redo_ops = {
264 };
265 
266 M0_INTERNAL void m0_dtm_local_remote_init(struct m0_dtm_local_remote *lre,
267  struct m0_uint128 *id,
268  struct m0_dtm *local,
269  struct m0_reqh *reqh)
270 {
271  m0_dtm_remote_init(&lre->lre_rem, id, local);
272  lre->lre_rem.re_ops = &rem_local_ops;
273  lre->lre_reqh = reqh;
274 }
275 
276 M0_INTERNAL void m0_dtm_local_remote_fini(struct m0_dtm_local_remote *lre)
277 {
279 }
280 
281 static void rem_local_notify(struct m0_dtm_remote *rem,
282  const struct m0_dtm_history *history,
284 {
285  struct m0_dtm_notice notice;
286 
287  notice_pack(&notice, history, ver, opcode);
289 }
290 
291 static void rem_local_persistent(struct m0_dtm_remote *rem,
292  struct m0_dtm_history *history)
293 {
294  rem_local_notify(rem, history,
296 }
297 
298 static void rem_local_fixed(struct m0_dtm_remote *rem,
299  struct m0_dtm_history *history)
300 {
301  rem_local_notify(rem, history, 0, R_FIXED);
302 }
303 
304 static void rem_local_reset(struct m0_dtm_remote *rem,
305  struct m0_dtm_history *history)
306 {
307  rem_local_notify(rem, history, history->h_hi.hi_ver, R_RESET);
308 }
309 
310 static void rem_local_send(struct m0_dtm_remote *rem,
311  struct m0_dtm_update *update)
312 {
313  struct m0_dtm_local_remote *lre;
314  int result;
315 
316  lre = M0_AMB(lre, rem, lre_rem);
317  result = m0_reqh_fop_handle(lre->lre_reqh, update->upd_comm.uc_body);
318  if (result != 0)
319  M0_LOG(M0_ERROR, "redo: %i.", result);
320 }
321 
322 static void rem_local_undo(struct m0_dtm_remote *rem,
323  struct m0_dtm_history *history, m0_dtm_ver_t upto)
324 {
325  rem_local_notify(rem, history, upto, R_UNDO);
326 }
327 
328 
329 static const struct m0_dtm_remote_ops rem_local_ops = {
331  .reo_fixed = &rem_local_fixed,
332  .reo_reset = &rem_local_reset,
333  .reo_send = &rem_local_send,
334  .reo_resend = &rem_local_send,
335  .reo_undo = &rem_local_undo
336 };
337 
338 #undef M0_TRACE_SUBSYSTEM
339 
342 /*
343  * Local variables:
344  * c-indentation-style: "K&R"
345  * c-basic-offset: 8
346  * tab-width: 8
347  * fill-column: 80
348  * scroll-step: 1
349  * End:
350  */
351 /*
352  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
353  */
struct m0_dtm_up upd_up
Definition: update.h:72
static void rem_rpc_undo(struct m0_dtm_remote *rem, struct m0_dtm_history *history, m0_dtm_ver_t upto)
Definition: remote.c:121
uint8_t dno_opcode
Definition: remote.h:100
uint64_t id
Definition: cob.h:2380
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
const struct m0_rpc_item_type_ops m0_fop_default_item_type_ops
static struct m0_dtm_local_remote local
Definition: transmit.c:71
static const struct m0_rpc_item_ops rem_rpc_item_sender_ops
Definition: remote.c:62
#define M0_PRE(cond)
M0_INTERNAL void m0_rpc_oneway_item_post(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
Definition: rpc.c:169
struct m0_dtm_update_comm upd_comm
Definition: update.h:75
enum m0_dtm_state up_state
Definition: nucleus.h:94
struct m0_fop * uc_body
Definition: update.h:68
static void notice_deliver(struct m0_dtm_notice *notice, struct m0_dtm *dtm)
Definition: remote.c:214
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
static void rem_local_fixed(struct m0_dtm_remote *rem, struct m0_dtm_history *history)
Definition: remote.c:298
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_dtm_fol_remote_init(struct m0_dtm_fol_remote *frem, struct m0_dtm *dtm, struct m0_dtm_remote *remote)
Definition: fol.c:126
static void rem_local_reset(struct m0_dtm_remote *rem, struct m0_dtm_history *history)
Definition: remote.c:304
#define M0_FOP_TYPE_INIT(ft,...)
Definition: fop.h:307
M0_INTERNAL bool m0_uint128_eq(const struct m0_uint128 *u0, const struct m0_uint128 *u1)
Definition: misc.c:39
#define M0_LOG(level,...)
Definition: trace.h:167
M0_INTERNAL int m0_reqh_fop_handle(struct m0_reqh *reqh, struct m0_fop *fop)
Definition: reqh.c:546
Definition: remote.c:46
static void rem_local_send(struct m0_dtm_remote *rem, struct m0_dtm_update *update)
Definition: remote.c:310
M0_EXTERN struct m0_rpc_session * m0_rpc_conn_session0(const struct m0_rpc_conn *conn)
Definition: conn.c:749
struct m0_dtm_fol_remote re_fol
Definition: remote.h:58
void(* reo_persistent)(struct m0_dtm_remote *rem, struct m0_dtm_history *history)
Definition: remote.h:62
static void rem_rpc_send(struct m0_dtm_remote *rem, struct m0_dtm_update *update)
Definition: remote.c:130
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
void m0_fop_type_fini(struct m0_fop_type *fopt)
Definition: fop.c:232
M0_INTERNAL void m0_dtm_rpc_remote_fini(struct m0_dtm_rpc_remote *remote)
Definition: remote.c:97
static struct m0_addb2_mach * mach
Definition: storage.c:42
struct m0_dtm_hi h_hi
Definition: history.h:56
static void rem_rpc_persistent(struct m0_dtm_remote *rem, struct m0_dtm_history *history)
Definition: remote.c:102
M0_INTERNAL void m0_dtm_remote_add(struct m0_dtm_remote *rem, struct m0_dtm_oper *oper, struct m0_dtm_history *history, struct m0_dtm_update *update)
Definition: remote.c:65
static struct m0_xcode_type ** xt[]
Definition: protocol.c:64
M0_INTERNAL void m0_dtm_remote_global_fini(void)
Definition: remote.c:209
static struct m0_rpc_item * item
Definition: item.c:56
static void rem_local_notify(struct m0_dtm_remote *rem, const struct m0_dtm_history *history, m0_dtm_ver_t ver, enum rem_rpc_notification opcode)
Definition: remote.c:281
struct m0_rpc_conn * rpr_conn
Definition: remote.h:88
struct m0_dtm_history ch_history
Definition: history.h:102
struct m0_dtm_history_id dno_id
Definition: remote.h:98
struct m0_reqh * lre_reqh
Definition: remote.h:105
uint64_t dno_ver
Definition: remote.h:99
int opcode
Definition: crate.c:301
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
M0_INTERNAL void m0_dtm_history_undo(struct m0_dtm_history *history, m0_dtm_ver_t upto)
Definition: history.c:144
static const struct m0_reqh_service_ops rpc_ops
Definition: service.c:92
struct m0_dtm_update * h_persistent
Definition: history.h:60
Definition: remote.c:48
static const struct m0_fop_type_ops rem_rpc_ftype_ops
Definition: remote.c:60
M0_INTERNAL void m0_dtm_history_persistent(struct m0_dtm_history *history, m0_dtm_ver_t upto)
Definition: history.c:107
static void rem_rpc_resend(struct m0_dtm_remote *rem, struct m0_dtm_update *update)
Definition: remote.c:140
static struct m0_fop_type rem_rpc_fopt
Definition: remote.c:59
const char * name
Definition: trace.c:110
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
static void notice_pack(struct m0_dtm_notice *notice, const struct m0_dtm_history *history, m0_dtm_ver_t ver, enum rem_rpc_notification opcode)
Definition: remote.c:164
static int rem_rpc_deliver(struct m0_rpc_machine *mach, struct m0_rpc_item *item)
Definition: remote.c:240
M0_INTERNAL int m0_dtm_history_unpack(struct m0_dtm *dtm, const struct m0_dtm_history_id *id, struct m0_dtm_history **out)
Definition: history.c:248
static void rem_rpc_reset(struct m0_dtm_remote *rem, struct m0_dtm_history *history)
Definition: remote.c:115
uint64_t m0_dtm_ver_t
Definition: nucleus.h:48
static void rem_rpc_fixed(struct m0_dtm_remote *rem, struct m0_dtm_history *history)
Definition: remote.c:109
static const struct m0_dtm_remote_ops rem_local_ops
Definition: remote.c:52
#define HISTORY_DTM(history)
Definition: dtm_internal.h:106
static void rem_local_persistent(struct m0_dtm_remote *rem, struct m0_dtm_history *history)
Definition: remote.c:291
M0_INTERNAL m0_dtm_ver_t update_ver(const struct m0_dtm_update *update)
Definition: history.c:194
Definition: reqh.h:94
M0_INTERNAL void m0_dtm_history_reset(struct m0_dtm_history *history, m0_dtm_ver_t since)
Definition: history.c:125
struct m0_rpc_conn conn
Definition: fsync.c:96
Definition: remote.c:47
struct m0_dtm_controlh rfo_ch
Definition: fol.h:56
M0_INTERNAL void m0_dtm_history_pack(const struct m0_dtm_history *history, struct m0_dtm_history_id *id)
Definition: history.c:239
M0_INTERNAL void m0_dtm_fol_remote_fini(struct m0_dtm_fol_remote *frem)
Definition: fol.c:138
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
M0_INTERNAL void m0_dtm_rpc_remote_init(struct m0_dtm_rpc_remote *remote, struct m0_uint128 *id, struct m0_dtm *local, struct m0_rpc_conn *conn)
Definition: remote.c:87
struct m0_reqh reqh
Definition: rm_foms.c:48
static void rem_rpc_notify(struct m0_dtm_remote *rem, const struct m0_dtm_history *history, m0_dtm_ver_t ver, enum rem_rpc_notification opcode)
Definition: remote.c:173
static const struct m0_dtm_remote_ops rem_rpc_ops
Definition: remote.c:51
m0_dtm_ver_t hi_ver
Definition: nucleus.h:66
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
static struct m0_rm_remote * remote
Definition: rm_fops.c:35
M0_INTERNAL void m0_dtm_fol_remote_add(struct m0_dtm_fol_remote *frem, struct m0_dtm_oper *oper)
Definition: fol.c:146
const struct m0_dtm_remote_ops * re_ops
Definition: remote.h:57
static struct m0_fop * fop
Definition: item.c:57
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
M0_INTERNAL void m0_dtm_remote_init(struct m0_dtm_remote *remote, struct m0_uint128 *id, struct m0_dtm *local)
Definition: remote.c:73
M0_INTERNAL void m0_dtm_local_remote_fini(struct m0_dtm_local_remote *lre)
Definition: remote.c:276
static M0_UNUSED const struct m0_rpc_item_ops rem_rpc_item_redo_ops
Definition: remote.c:63
static struct m0_rpc_item_type_ops rem_rpc_itype_ops
Definition: remote.c:61
Definition: dtm.h:529
static void rem_local_undo(struct m0_dtm_remote *rem, struct m0_dtm_history *history, m0_dtm_ver_t upto)
Definition: remote.c:322
M0_INTERNAL void m0_dtm_local_remote_init(struct m0_dtm_local_remote *lre, struct m0_uint128 *id, struct m0_dtm *local, struct m0_reqh *reqh)
Definition: remote.c:266
struct m0_dtm_remote lre_rem
Definition: remote.h:104
M0_INTERNAL void m0_dtm_remote_fini(struct m0_dtm_remote *remote)
Definition: remote.c:82
int(* rito_deliver)(struct m0_rpc_machine *rpcmach, struct m0_rpc_item *item)
Definition: item.h:438
M0_INTERNAL int m0_dtm_remote_global_init(void)
Definition: remote.c:195
struct m0_rpc_item f_item
Definition: fop.h:83
rem_rpc_notification
Definition: remote.c:44
Definition: fop.h:79
void m0_rpc_item_cancel_init(struct m0_rpc_item *item)
Definition: item.c:947
m0_time_t ri_deadline
Definition: item.h:141
struct m0_fop * m0_fop_alloc(struct m0_fop_type *fopt, void *data, struct m0_rpc_machine *mach)
Definition: fop.c:96
#define M0_UNUSED
Definition: misc.h:380
static M0_UNUSED void rem_rpc_redo_replied(struct m0_rpc_item *item)
Definition: remote.c:247