Motr  M0
operation.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2013-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_DTM
31 
32 #include "lib/trace.h"
33 #include "lib/misc.h" /* M0_IN */
34 #include "lib/errno.h"
35 #include "lib/memory.h"
36 
37 #include "dtm/dtm_internal.h"
38 #include "dtm/history.h"
39 #include "dtm/update.h"
40 #include "dtm/remote.h"
41 #include "dtm/operation.h"
42 #include "dtm/dtm.h"
43 
44 M0_INTERNAL void m0_dtm_oper_init(struct m0_dtm_oper *oper, struct m0_dtm *dtm,
45  struct m0_tl *uu)
46 {
47  struct m0_dtm_update *update;
48 
49  m0_dtm_op_init(&oper->oprt_op, &dtm->d_nu);
51  if (uu != NULL) {
52  m0_tl_for(oper, uu, update) {
53  oper_tlist_move_tail(&oper->oprt_uu, update);
54  } m0_tl_endfor;
55  }
56 }
57 
58 M0_INTERNAL void m0_dtm_oper_fini(struct m0_dtm_oper *oper)
59 {
60  struct m0_dtm *dtm = nu_dtm(oper->oprt_op.op_nu);
61 
63  dtm_lock(dtm);
65  m0_dtm_op_fini(&oper->oprt_op);
66  dtm_unlock(dtm);
67 }
68 
69 M0_INTERNAL bool m0_dtm_oper_invariant(const struct m0_dtm_oper *oper)
70 {
71  const struct m0_dtm_op *op = &oper->oprt_op;
72  return
74  m0_tl_forall(oper, u0, &op->op_ups,
75  m0_tl_forall(oper, u1, &op->op_ups,
76  ergo(u0->upd_label == u1->upd_label,
77  u0 == u1 || (!(oper->oprt_flags & M0_DOF_CLOSED) &&
78  u0->upd_label == 0)))) &&
79  (oper->oprt_flags & M0_DOF_CLOSED) ==
81 }
82 
83 M0_INTERNAL void m0_dtm_oper_close(struct m0_dtm_oper *oper)
84 {
85  uint32_t max_label = 0;
86 
87  oper_for(oper, update) {
88  struct m0_dtm_history *history;
89 
90  history = UPDATE_HISTORY(update);
91  if (history->h_rem != NULL &&
92  oper_update_unique(oper, update)) {
93  m0_dtm_remote_add(history->h_rem, oper, history, update);
94  }
95  } oper_endfor;
96  oper_lock(oper);
97  M0_PRE(!(oper->oprt_flags & M0_DOF_CLOSED));
99  oper_for(oper, update) {
100  if (update->upd_label < M0_DTM_USER_UPDATE_BASE) {
101  max_label = max32u(max_label, update->upd_label);
103  history_close(UPDATE_HISTORY(update));
104  }
105  } oper_endfor;
106  oper_for(oper, update) {
107  struct m0_dtm_history *history;
108 
109  history = UPDATE_HISTORY(update);
110  if (update->upd_label == 0)
111  update->upd_label = ++max_label;
112  M0_ASSERT(max_label < M0_DTM_USER_UPDATE_BASE);
113  history->h_max_ver = max64u(history->h_max_ver,
114  update->upd_up.up_ver);
115  } oper_endfor;
116  m0_dtm_op_close(&oper->oprt_op);
117  oper->oprt_flags |= M0_DOF_CLOSED;
119  oper_unlock(oper);
120 }
121 
122 M0_INTERNAL void m0_dtm_oper_prepared(const struct m0_dtm_oper *oper,
123  const struct m0_dtm_remote *rem)
124 {
126  oper_lock(oper);
128  up_for(&oper->oprt_op, up) {
129  struct m0_dtm_history *history;
130 
131  history = UP_HISTORY(up);
132  M0_PRE(up->up_state >= M0_DOS_PREPARE);
133  if (history->h_rem == rem) {
134  up_prepared(up);
135  history->h_max_ver = max64u(history->h_max_ver,
136  up->up_ver);
137  }
138  } up_endfor;
139  advance_try(&oper->oprt_op);
141  oper_unlock(oper);
142 }
143 
144 M0_INTERNAL void m0_dtm_oper_done(const struct m0_dtm_oper *oper,
145  const struct m0_dtm_remote *rem)
146 {
148  oper_lock(oper);
150  up_for(&oper->oprt_op, up) {
151  M0_PRE(up->up_state >= M0_DOS_PREPARE);
152  if (UP_HISTORY(up)->h_rem == rem) {
153  M0_PRE(up->up_state == M0_DOS_INPROGRESS);
154  up->up_state = M0_DOS_VOLATILE;
155  }
156  } up_endfor;
158  oper_unlock(oper);
159 }
160 
161 M0_INTERNAL void m0_dtm_oper_pack(struct m0_dtm_oper *oper,
162  const struct m0_dtm_remote *rem,
163  struct m0_dtm_oper_descr *ode)
164 {
165  uint32_t idx = 0;
166 
168  oper_lock(oper);
170  oper_for(oper, update) {
172  HISTORY_DTM(UPDATE_HISTORY(update)));
173  if (UPDATE_REM(update) == rem) {
174  M0_ASSERT(idx < ode->od_updates.ou_nr);
175  m0_dtm_update_pack(update,
176  &ode->od_updates.ou_update[idx++]);
177  }
178  } oper_endfor;
179  ode->od_updates.ou_nr = idx;
180  oper_unlock(oper);
181 }
182 
183 M0_INTERNAL int m0_dtm_oper_build(struct m0_dtm_oper *oper, struct m0_tl *uu,
184  const struct m0_dtm_oper_descr *ode)
185 {
186  uint32_t i;
187  int result;
188 
189  M0_PRE(!(oper->oprt_flags & M0_DOF_CLOSED));
190  oper_lock(oper);
192  for (result = 0, i = 0; i < ode->od_updates.ou_nr; ++i) {
193  struct m0_dtm_update_descr *ud = &ode->od_updates.ou_update[i];
194  struct m0_dtm_update *update;
195 
196  update = oper_tlist_pop(uu);
197  M0_ASSERT(update != NULL);
198  result = m0_dtm_update_build(update, oper, ud);
199  if (result != 0)
200  break;
201  }
202  M0_POST(ergo(result == 0, m0_dtm_oper_invariant(oper)));
203  oper_unlock(oper);
204  if (result != 0)
205  m0_dtm_oper_fini(oper);
206  return result;
207 }
208 
209 M0_INTERNAL void m0_dtm_reply_pack(const struct m0_dtm_oper *oper,
210  const struct m0_dtm_oper_descr *request,
211  struct m0_dtm_oper_descr *reply)
212 {
213  uint32_t i;
214  uint32_t j;
215 
217  oper_lock(oper);
219  for (j = 0, i = 0; i < request->od_updates.ou_nr; ++i) {
220  struct m0_dtm_update_descr *ud;
221  const struct m0_dtm_update *update;
222 
223  ud = &request->od_updates.ou_update[i];
224  update = m0_dtm_oper_get(oper, ud->udd_data.da_label);
225  M0_ASSERT(update != NULL);
228  M0_ASSERT(j < reply->od_updates.ou_nr);
230  }
231  reply->od_updates.ou_nr = j;
233  oper_unlock(oper);
234 }
235 
236 M0_INTERNAL void m0_dtm_reply_unpack(struct m0_dtm_oper *oper,
237  const struct m0_dtm_oper_descr *reply)
238 {
239  uint32_t i;
240 
242  oper_lock(oper);
244  for (i = 0; i < reply->od_updates.ou_nr; ++i) {
245  struct m0_dtm_update_descr *ud;
246  struct m0_dtm_update *update;
247 
248  ud = &reply->od_updates.ou_update[i];
249  update = m0_dtm_oper_get(oper, ud->udd_data.da_label);
250  M0_ASSERT(update != NULL); /* -EPROTO */
251  M0_PRE(M0_IN(update->upd_up.up_state, (M0_DOS_INPROGRESS,
252  M0_DOS_PREPARE)));
253  m0_dtm_update_unpack(update, ud);
254  }
256  oper_unlock(oper);
257 }
258 
259 M0_INTERNAL struct m0_dtm_update *m0_dtm_oper_get(const struct m0_dtm_oper *oper,
260  uint32_t label)
261 {
263  oper_for(oper, update) {
264  if (update->upd_label == label)
265  return update;
266  } oper_endfor;
267  return NULL;
268 }
269 
270 M0_INTERNAL bool oper_update_unique(const struct m0_dtm_oper *oper,
271  const struct m0_dtm_update *update)
272 {
273  oper_for(oper, scan) {
274  if (scan == update)
275  return true;
276  if (UPDATE_REM(scan) == UPDATE_REM(update))
277  return false;
278  } oper_endfor;
279  M0_IMPOSSIBLE("Missing update.");
280  return false;
281 }
282 
283 M0_INTERNAL void oper_lock(const struct m0_dtm_oper *oper)
284 {
285  nu_lock(oper->oprt_op.op_nu);
286 }
287 
288 M0_INTERNAL void oper_unlock(const struct m0_dtm_oper *oper)
289 {
290  nu_unlock(oper->oprt_op.op_nu);
291 }
292 
293 M0_INTERNAL void oper_print(const struct m0_dtm_oper *oper)
294 {
295  M0_LOG(M0_FATAL, "oper flags: %lx uu: %u",
296  (unsigned long)oper->oprt_flags,
297  (unsigned)oper_tlist_length(&oper->oprt_uu));
298  oper_for(oper, update) {
299  update_print(update);
300  } oper_endfor;
301 }
302 
303 #undef M0_TRACE_SUBSYSTEM
304 
307 /*
308  * Local variables:
309  * c-indentation-style: "K&R"
310  * c-basic-offset: 8
311  * tab-width: 8
312  * fill-column: 80
313  * scroll-step: 1
314  * End:
315  */
316 /*
317  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
318  */
M0_INTERNAL void m0_dtm_op_init(struct m0_dtm_op *op, struct m0_dtm_nu *nu)
Definition: nucleus.c:77
M0_INTERNAL int m0_dtm_update_build(struct m0_dtm_update *update, struct m0_dtm_oper *oper, const struct m0_dtm_update_descr *updd)
Definition: update.c:132
struct m0_dtm_up upd_up
Definition: update.h:72
#define M0_PRE(cond)
enum m0_dtm_state up_state
Definition: nucleus.h:94
struct m0_dtm_update_data udd_data
Definition: update.h:118
M0_INTERNAL void m0_dtm_update_unpack(struct m0_dtm_update *update, const struct m0_dtm_update_descr *updd)
Definition: update.c:113
struct m0_dtm_nu * op_nu
Definition: nucleus.h:106
#define NULL
Definition: misc.h:38
static struct m0_dtm_oper_descr ode[REM_NR][OPER_NR]
Definition: dtx.c:68
#define ergo(a, b)
Definition: misc.h:293
#define oper_for(o, update)
Definition: dtm_internal.h:77
struct m0_dtm_oper_updates od_updates
Definition: operation.h:53
struct m0_dtm_remote * h_rem
Definition: history.h:59
#define M0_LOG(level,...)
Definition: trace.h:167
uint32_t da_label
Definition: update.h:98
struct m0_dtm_fol_remote re_fol
Definition: remote.h:58
M0_INTERNAL void m0_dtm_oper_init(struct m0_dtm_oper *oper, struct m0_dtm *dtm, struct m0_tl *uu)
Definition: operation.c:44
#define UP_HISTORY(up)
Definition: dtm_internal.h:103
struct m0_dtm_op oprt_op
Definition: operation.h:48
M0_INTERNAL void m0_dtm_oper_prepared(const struct m0_dtm_oper *oper, const struct m0_dtm_remote *rem)
Definition: operation.c:122
M0_INTERNAL struct m0_dtm * nu_dtm(struct m0_dtm_nu *nu)
Definition: dtm.c:154
M0_INTERNAL void m0_dtm_oper_done(const struct m0_dtm_oper *oper, const struct m0_dtm_remote *rem)
Definition: operation.c:144
M0_INTERNAL void up_prepared(struct m0_dtm_up *up)
Definition: nucleus.c:222
M0_INTERNAL int m0_dtm_oper_build(struct m0_dtm_oper *oper, struct m0_tl *uu, const struct m0_dtm_oper_descr *ode)
Definition: operation.c:183
#define oper_endfor
Definition: dtm_internal.h:83
M0_INTERNAL void m0_dtm_remote_add(struct m0_dtm_remote *rem, struct m0_dtm_oper *oper, struct m0_dtm_history *history, struct m0_dtm_update *update)
Definition: remote.c:65
M0_INTERNAL void dtm_lock(struct m0_dtm *dtm)
Definition: dtm.c:159
#define m0_tl_endfor
Definition: tlist.h:700
op
Definition: libdemo.c:64
struct m0_dtm_history ch_history
Definition: history.h:102
M0_INTERNAL void update_print(const struct m0_dtm_update *update)
Definition: update.c:317
int i
Definition: dir.c:1033
#define UPDATE_HISTORY(update)
Definition: dtm_internal.h:104
M0_INTERNAL bool oper_update_unique(const struct m0_dtm_oper *oper, const struct m0_dtm_update *update)
Definition: operation.c:270
M0_INTERNAL void m0_dtm_oper_pack(struct m0_dtm_oper *oper, const struct m0_dtm_remote *rem, struct m0_dtm_oper_descr *ode)
Definition: operation.c:161
#define M0_ASSERT(cond)
M0_INTERNAL bool m0_dtm_descr_matches_update(const struct m0_dtm_update *update, const struct m0_dtm_update_descr *updd)
Definition: update.c:166
#define HISTORY_DTM(history)
Definition: dtm_internal.h:106
Definition: tlist.h:251
#define up_endfor
Definition: dtm_internal.h:54
struct m0_dtm_nu d_nu
Definition: dtm.h:533
M0_INTERNAL bool m0_dtm_controlh_update_is_close(const struct m0_dtm_update *update)
Definition: history.c:414
M0_INTERNAL bool m0_dtm_op_invariant(const struct m0_dtm_op *op)
Definition: nucleus.c:470
struct m0_dtm_update_descr * ou_update
Definition: operation.h:62
#define M0_POST(cond)
M0_INTERNAL void history_close(struct m0_dtm_history *history)
Definition: history.c:169
M0_INTERNAL void nu_lock(struct m0_dtm_nu *nu)
Definition: nucleus.c:538
M0_INTERNAL struct m0_dtm_update * m0_dtm_oper_get(const struct m0_dtm_oper *oper, uint32_t label)
Definition: operation.c:259
struct m0_dtm_controlh rfo_ch
Definition: fol.h:56
M0_INTERNAL void m0_dtm_op_close(const struct m0_dtm_op *op)
Definition: nucleus.c:84
#define UPDATE_REM(update)
Definition: dtm_internal.h:105
M0_INTERNAL void oper_unlock(const struct m0_dtm_oper *oper)
Definition: operation.c:288
M0_INTERNAL void m0_dtm_op_fini(struct m0_dtm_op *op)
Definition: nucleus.c:135
M0_INTERNAL void nu_unlock(struct m0_dtm_nu *nu)
Definition: nucleus.c:543
M0_INTERNAL void m0_dtm_reply_unpack(struct m0_dtm_oper *oper, const struct m0_dtm_oper_descr *reply)
Definition: operation.c:236
uint64_t oprt_flags
Definition: operation.h:50
struct m0_tl oprt_uu
Definition: operation.h:49
static uint32_t max32u(uint32_t a, uint32_t b)
Definition: arith.h:61
M0_INTERNAL void m0_dtm_oper_close(struct m0_dtm_oper *oper)
Definition: operation.c:83
M0_INTERNAL void m0_dtm_update_list_fini(struct m0_tl *list)
Definition: update.c:182
M0_INTERNAL bool op_state(const struct m0_dtm_op *op, enum m0_dtm_state state)
Definition: nucleus.c:511
M0_INTERNAL void dtm_unlock(struct m0_dtm *dtm)
Definition: dtm.c:164
M0_INTERNAL void m0_dtm_update_pack(const struct m0_dtm_update *update, struct m0_dtm_update_descr *updd)
Definition: update.c:93
#define up_for(o, up)
Definition: dtm_internal.h:48
Definition: dtm.h:529
M0_INTERNAL void m0_dtm_oper_fini(struct m0_dtm_oper *oper)
Definition: operation.c:58
static struct m0_tl uu
Definition: dtx.c:65
M0_INTERNAL void oper_print(const struct m0_dtm_oper *oper)
Definition: operation.c:293
M0_INTERNAL void m0_dtm_update_list_init(struct m0_tl *list)
Definition: update.c:177
struct m0_dtm_oper_updates od_updates
Definition: operation.h:66
static struct m0_dtm_oper_descr reply
Definition: transmit.c:94
static int scan(struct scanner *s)
Definition: beck.c:963
M0_INTERNAL bool m0_dtm_oper_invariant(const struct m0_dtm_oper *oper)
Definition: operation.c:69
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL void advance_try(const struct m0_dtm_op *op)
Definition: nucleus.c:145
m0_dtm_ver_t h_max_ver
Definition: history.h:69
M0_INTERNAL void oper_lock(const struct m0_dtm_oper *oper)
Definition: operation.c:283
static uint64_t max64u(uint64_t a, uint64_t b)
Definition: arith.h:71
M0_INTERNAL void m0_dtm_reply_pack(const struct m0_dtm_oper *oper, const struct m0_dtm_oper_descr *request, struct m0_dtm_oper_descr *reply)
Definition: operation.c:209
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735
#define M0_IMPOSSIBLE(fmt,...)