Motr  M0
net.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
39 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_ADDB
40 
41 #include "lib/trace.h"
42 #include "lib/tlist.h"
43 #include "lib/mutex.h"
44 #include "lib/memory.h"
45 #include "lib/types.h"
46 #include "lib/time.h"
47 #include "motr/magic.h"
48 #include "fop/fop.h"
49 #include "rpc/rpc.h" /* m0_rpc_oneway_item_post */
50 #include "rpc/conn.h"
51 #include "rpc/item_source.h"
52 #include "rpc/rpc_opcodes.h" /* M0_ADDB_FOP_OPCODE */
53 #include "addb2/addb2.h"
54 #include "addb2/addb2_xc.h"
55 #include "addb2/service.h"
56 #include "addb2/internal.h"
57 
61 struct m0_addb2_net {
63  struct m0_mutex ne_lock;
68  struct m0_tl ne_queue;
72  struct m0_tl ne_src;
82  void (*ne_callback)(struct m0_addb2_net *, void *);
86  void *ne_datum;
87 };
88 
93 struct source {
100  uint64_t s_magix;
101 };
102 
103 M0_TL_DESCR_DEFINE(src, "addb2 rpc sources",
104  static, struct source, s_linkage, s_magix,
106 M0_TL_DEFINE(src, static, struct source);
107 
108 static void src_fini (struct source *s);
109 static void net_lock (struct m0_addb2_net *net);
110 static void net_unlock (struct m0_addb2_net *net);
111 static void net_force (struct m0_addb2_net *net);
112 static void net_sent (struct m0_rpc_item *item);
113 static bool net_invariant(const struct m0_addb2_net *net);
114 static void net_fop_init (struct m0_fop *fop, struct m0_addb2_net *net,
115  struct m0_addb2_trace *trace);
116 
117 static bool src_has_item(const struct m0_rpc_item_source *ris);
118 static void src_conn_terminating(struct m0_rpc_item_source *ris);
119 static struct m0_rpc_item *src_get_item(struct m0_rpc_item_source *ris,
120  m0_bcount_t size);
121 
122 static const struct m0_rpc_item_source_ops src_ops;
123 static struct m0_fop_type net_fopt;
124 static const struct m0_rpc_item_ops net_rpc_ops;
125 
126 static const m0_time_t IDLE_THRESHOLD = M0_MKTIME(10, 0);
127 
128 M0_INTERNAL struct m0_addb2_net *m0_addb2_net_init(void)
129 {
130  struct m0_addb2_net *net;
131 
132  M0_ALLOC_PTR(net);
133  if (net != NULL) {
135  tr_tlist_init(&net->ne_queue);
136  src_tlist_init(&net->ne_src);
138  }
139  return net;
140 }
141 
142 M0_INTERNAL void m0_addb2_net_fini(struct m0_addb2_net *net)
143 {
144  struct source *s;
145 
146  /*
147  * This lock-unlock is a barrier against concurrently finishing last
148  * src_get_item(), which executed the idle call-back.
149  *
150  * It *is* valid to acquire and finalise a lock in the same function in
151  * this particular case.
152  */
153  net_lock(net);
154  net_unlock(net);
155 
156  M0_PRE(net->ne_callback == NULL);
157  m0_tl_teardown(src, &net->ne_src, s) {
158  src_fini(s);
159  }
160  src_tlist_fini(&net->ne_src);
161  tr_tlist_fini(&net->ne_queue);
163  m0_free(net);
164 }
165 
166 M0_INTERNAL int m0_addb2_net_add(struct m0_addb2_net *net,
167  struct m0_rpc_conn *conn)
168 {
169  struct source *s;
170  int result;
171 
172  M0_ALLOC_PTR(s);
173  if (s != NULL) {
174  s->s_net = net;
175  /*
176  * Lock ordering: addb-net lock nests within rpc machine lock.
177  */
179  net_lock(net);
180  src_tlink_init_at_tail(s, &net->ne_src);
181  m0_rpc_item_source_init(&s->s_src,
182  "addb2 item source", &src_ops);
184  net_unlock(net);
186  result = 0;
187  } else
188  result = M0_ERR(-ENOMEM);
189  return M0_RC(result);
190 }
191 
192 M0_INTERNAL void m0_addb2_net_del(struct m0_addb2_net *net,
193  struct m0_rpc_conn *conn)
194 {
195  struct source *s;
196 
197  net_lock(net);
198  s = m0_tl_find(src, scan, &net->ne_src, scan->s_src.ris_conn == conn);
199  M0_ASSERT(s != NULL);
200  src_fini(s);
201  net_unlock(net);
202 }
203 
204 M0_INTERNAL int m0_addb2_net_submit(struct m0_addb2_net *net,
205  struct m0_addb2_trace_obj *obj)
206 {
207  net_lock(net);
208  M0_PRE(net->ne_callback == NULL);
209  tr_tlink_init_at_tail(obj, &net->ne_queue);
210  net_unlock(net);
211  return +1;
212 }
213 
214 M0_INTERNAL void m0_addb2_net_tick(struct m0_addb2_net *net)
215 {
216  /*
217  * If a trace was sent out recently, do nothing, otherwise send a dummy
218  * null trace through some of connections to trigger piggy-backing of
219  * more traces.
220  */
222  net_force(net);
223 }
224 
225 M0_INTERNAL void m0_addb2_net_stop(struct m0_addb2_net *net,
226  void (*callback)(struct m0_addb2_net *,
227  void *),
228  void *datum)
229 {
230  struct m0_tl *q = &net->ne_queue;
231  struct m0_addb2_trace_obj *obj;
232 
233  net_lock(net);
234  M0_PRE(net->ne_callback == NULL);
235  /*
236  * If there are no sources, it makes no sense to wait for queue drain.
237  */
238  if (src_tlist_is_empty(&net->ne_src)) {
239  m0_tl_teardown(tr, q, obj) {
240  m0_addb2_trace_done(&obj->o_tr);
241  }
242  }
243  /*
244  * @todo What to do when addb2 net is stopping, but there are no rpc
245  * packets to piggy-back remaining traces to?
246  *
247  * This usually happens during umount.
248  *
249  * For now, just discard all the traces.
250  */
251  if (!tr_tlist_is_empty(q)) {
252  M0_LOG(M0_NOTICE, "Traces discarded: %zi", tr_tlist_length(q));
253  m0_tl_teardown(tr, q, obj) {
254  m0_addb2_trace_done(&obj->o_tr);
255  }
256  }
257  if (tr_tlist_is_empty(q))
258  (*callback)(net, datum);
259  else {
260  net->ne_callback = callback;
261  net->ne_datum = datum;
262  }
263  net_unlock(net);
264 }
265 
266 M0_INTERNAL bool m0_addb2_net__is_not_locked(const struct m0_addb2_net *net)
267 {
269 }
270 
271 M0_INTERNAL int m0_addb2_net_module_init(void)
272 {
274  .name = "addb2-fop",
276  .rpc_flags = M0_RPC_ITEM_TYPE_ONEWAY,
277  .xt = m0_addb2_trace_xc,
278  .fom_ops = &m0_addb2__fom_type_ops,
279  .sm = &m0_addb2__sm_conf,
280  .svc_type = &m0_addb2_service_type);
281  return 0;
282 }
283 
284 M0_INTERNAL void m0_addb2_net_module_fini(void)
285 {
287 }
288 
289 
290 static void src_fini(struct source *s)
291 {
293  m0_rpc_item_source_fini(&s->s_src);
294  src_tlist_remove(s);
295  m0_free(s);
296 }
297 
298 static void net_lock(struct m0_addb2_net *net)
299 {
302 }
303 
304 static void net_unlock(struct m0_addb2_net *net)
305 {
308 }
309 
310 static bool net_invariant(const struct m0_addb2_net *net)
311 {
312  return m0_tl_forall(src, s, &net->ne_src,
313  s->s_net == net);
314 }
315 
316 static void net_force(struct m0_addb2_net *net)
317 {
318  struct source *s;
319  struct m0_fop *fop;
320  static struct m0_addb2_trace NULL_TRACE = {
321  .tr_nr = 0,
322  .tr_body = NULL
323  };
324 
325  M0_ALLOC_PTR(fop);
326  if (fop != NULL) {
327  net_fop_init(fop, net, &NULL_TRACE);
328  net_lock(net);
329  /* Send everything to the first addb service. Scalability
330  problem. */
331  s = src_tlist_head(&net->ne_src);
332  net_unlock(net);
333  if (s != NULL) {
334  m0_rpc_oneway_item_post(s->s_src.ris_conn,
335  &fop->f_item);
336  } else {
337  fop->f_data.fd_data = NULL;
338  m0_fop_fini(fop);
339  m0_free(fop);
340  }
341  } else
342  M0_LOG(M0_ERROR, "Cannot allocate fop.");
343 }
344 
352 static bool src_has_item(const struct m0_rpc_item_source *ris)
353 {
354  struct source *s = M0_AMB(s, ris, s_src);
355  struct m0_addb2_net *net = s->s_net;
356  bool empty;
357 
358  net_lock(net);
359  empty = tr_tlist_is_empty(&net->ne_queue);
360  net_unlock(net);
361  return !empty;
362 }
363 
367 static void net_fop_init(struct m0_fop *fop, struct m0_addb2_net *net,
368  struct m0_addb2_trace *trace)
369 {
370  /* Do not bother to xcode the trace. */
373 }
374 
384 static struct m0_rpc_item *src_get_item(struct m0_rpc_item_source *ris,
386 {
387  struct source *s = M0_AMB(s, ris, s_src);
388  struct m0_addb2_net *net = s->s_net;
389  struct m0_rpc_item *item = NULL;
390  struct m0_fop *fop;
391  struct m0_addb2_trace_obj *obj;
392  struct m0_addb2_trace *t;
393 
394  M0_ALLOC_PTR(fop);
395  if (fop != NULL) {
396  net_lock(net);
397  obj = tr_tlist_head(&net->ne_queue);
398  if (obj != NULL) {
399  t = &obj->o_tr;
400  if (m0_addb2_trace_size(t) <= size) {
401  net_fop_init(fop, net, t);
402  item = &fop->f_item;
403  tr_tlist_del(obj);
404  net->ne_last = m0_time_now();
405  if (tr_tlist_is_empty(&net->ne_queue) &&
406  net->ne_callback != NULL) {
408  net->ne_callback = NULL;
409  }
410  }
411  }
412  net_unlock(net);
413  } else
414  M0_LOG(M0_ERROR, "Cannot allocate fop.");
415  if (item == NULL)
416  m0_free(fop);
417  return item;
418 }
419 
425 static void src_conn_terminating(struct m0_rpc_item_source *ris)
426 {
427  struct source *s = M0_AMB(s, ris, s_src);
428 
429  m0_addb2_net_del(s->s_net, ris->ris_conn);
430 }
431 
437 static void net_sent(struct m0_rpc_item *item)
438 {
439  struct m0_fop *fop = m0_rpc_item_to_fop(item);
441  if (item->ri_error != 0)
442  M0_LOG(M0_ERROR, "Addb trace lost in rpc.");
443  /*
444  * Clear fop data, which points to a trace, so that m0_fop_fini() won't
445  * try to free it.
446  */
447  fop->f_data.fd_data = NULL;
448 }
449 
450 static const struct m0_rpc_item_source_ops src_ops = {
452  .riso_get_item = &src_get_item,
453  .riso_conn_terminating = &src_conn_terminating
454 };
455 
456 static const struct m0_rpc_item_ops net_rpc_ops = {
457  .rio_sent = &net_sent
458 };
459 
460 #undef M0_TRACE_SUBSYSTEM
461 
464 /*
465  * Local variables:
466  * c-indentation-style: "K&R"
467  * c-basic-offset: 8
468  * tab-width: 8
469  * fill-column: 80
470  * scroll-step: 1
471  * End:
472  */
473 /*
474  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
475  */
static void src_conn_terminating(struct m0_rpc_item_source *ris)
Definition: net.c:425
void m0_rpc_item_source_fini(struct m0_rpc_item_source *ris)
Definition: item_source.c:70
void * fd_data
Definition: fop.h:75
M0_INTERNAL void m0_addb2_net_fini(struct m0_addb2_net *net)
Definition: net.c:142
#define M0_PRE(cond)
M0_INTERNAL void m0_rpc_oneway_item_post(const struct m0_rpc_conn *conn, struct m0_rpc_item *item)
Definition: rpc.c:169
struct m0_rpc_conn * ris_conn
Definition: item_source.h:64
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static struct m0_semaphore q
Definition: rwlock.c:55
#define NULL
Definition: misc.h:38
#define M0_FOP_TYPE_INIT(ft,...)
Definition: fop.h:307
void m0_rpc_item_source_deregister(struct m0_rpc_item_source *ris)
Definition: item_source.c:107
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
void(* ne_callback)(struct m0_addb2_net *, void *)
Definition: net.c:82
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
Definition: mutex.c:101
void m0_rpc_item_source_init(struct m0_rpc_item_source *ris, const char *name, const struct m0_rpc_item_source_ops *ops)
Definition: item_source.c:52
static const struct m0_rpc_item_ops net_rpc_ops
Definition: net.c:124
int32_t ri_error
Definition: item.h:161
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
void m0_fop_type_fini(struct m0_fop_type *fopt)
Definition: fop.c:232
M0_EXTERN const struct m0_sm_conf m0_addb2__sm_conf
Definition: internal.h:70
struct m0_tlink s_linkage
Definition: net.c:99
static void src_fini(struct source *s)
Definition: net.c:290
uint64_t m0_bcount_t
Definition: types.h:77
static void net_fop_init(struct m0_fop *fop, struct m0_addb2_net *net, struct m0_addb2_trace *trace)
Definition: net.c:367
M0_INTERNAL bool m0_addb2_net__is_not_locked(const struct m0_addb2_net *net)
Definition: net.c:266
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
static struct m0_xcode_type ** xt[]
Definition: protocol.c:64
static struct m0_rpc_item * item
Definition: item.c:56
static struct foo * obj
Definition: tlist.c:302
M0_INTERNAL struct m0_addb2_net * m0_addb2_net_init(void)
Definition: net.c:128
return M0_RC(rc)
static const m0_time_t IDLE_THRESHOLD
Definition: net.c:126
M0_TL_DEFINE(src, static, struct source)
int opcode
Definition: crate.c:301
M0_TL_DESCR_DEFINE(src, "addb2 rpc sources", static, struct source, s_linkage, s_magix, M0_ADDB2_SOURCE_MAGIC, M0_ADDB2_SOURCE_HEAD_MAGIC)
struct m0_rpc_machine * c_rpc_machine
Definition: conn.h:278
void m0_addb2_trace_done(const struct m0_addb2_trace *ctrace)
Definition: addb2.c:650
bool m0_time_is_in_past(m0_time_t t)
Definition: time.c:102
struct m0_tl ne_queue
Definition: net.c:68
static bool src_has_item(const struct m0_rpc_item_source *ris)
Definition: net.c:352
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_rpc_machine_unlock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:558
static void net_unlock(struct m0_addb2_net *net)
Definition: net.c:304
const char * name
Definition: trace.c:110
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
#define M0_ASSERT(cond)
static struct m0_rpc_item * src_get_item(struct m0_rpc_item_source *ris, m0_bcount_t size)
Definition: net.c:384
m0_time_t m0_time_now(void)
Definition: time.c:134
static struct m0_thread t[8]
Definition: service_ut.c:1230
Definition: tlist.h:251
static const struct m0_rpc_item_source_ops src_ops
Definition: net.c:122
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
#define M0_POST(cond)
M0_INTERNAL struct m0_reqh_service_type m0_addb2_service_type
Definition: service.c:245
M0_INTERNAL int m0_addb2_net_submit(struct m0_addb2_net *net, struct m0_addb2_trace_obj *obj)
Definition: net.c:204
struct m0_rpc_conn conn
Definition: fsync.c:96
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
struct m0_tl ne_src
Definition: net.c:72
void(* rio_sent)(struct m0_rpc_item *item)
Definition: item.h:267
m0_time_t m0_time_add(const m0_time_t t1, const m0_time_t t2)
Definition: time.c:47
M0_INTERNAL void m0_addb2_net_del(struct m0_addb2_net *net, struct m0_rpc_conn *conn)
Definition: net.c:192
struct m0_rpc_item_source s_src
Definition: net.c:94
struct m0_fop_data f_data
Definition: fop.h:82
M0_INTERNAL int m0_addb2_net_add(struct m0_addb2_net *net, struct m0_rpc_conn *conn)
Definition: net.c:166
m0_time_t ne_last
Definition: net.c:78
M0_INTERNAL void m0_addb2_net_module_fini(void)
Definition: net.c:284
void * ne_datum
Definition: net.c:86
M0_INTERNAL void m0_rpc_machine_lock(struct m0_rpc_machine *machine)
Definition: rpc_machine.c:551
M0_INTERNAL void m0_fop_release(struct m0_ref *ref)
Definition: fop.c:148
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
M0_INTERNAL m0_bcount_t m0_addb2_trace_size(const struct m0_addb2_trace *trace)
Definition: addb2.c:683
m0_bcount_t size
Definition: di.c:39
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
static struct m0_fop * fop
Definition: item.c:57
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
#define M0_MKTIME(secs, ns)
Definition: time.h:86
static void net_sent(struct m0_rpc_item *item)
Definition: net.c:437
M0_INTERNAL void m0_addb2_net_stop(struct m0_addb2_net *net, void(*callback)(struct m0_addb2_net *, void *), void *datum)
Definition: net.c:225
static struct m0_addb2_net * net
Definition: net.c:27
M0_INTERNAL int m0_addb2_net_module_init(void)
Definition: net.c:271
static bool net_invariant(const struct m0_addb2_net *net)
Definition: net.c:310
M0_EXTERN const struct m0_fom_type_ops m0_addb2__fom_type_ops
Definition: internal.h:69
struct m0_addb2_net * s_net
Definition: net.c:95
uint64_t tr_nr
Definition: addb2.h:442
M0_INTERNAL void m0_addb2_net_tick(struct m0_addb2_net *net)
Definition: net.c:214
uint64_t s_magix
Definition: net.c:100
static int scan(struct scanner *s)
Definition: beck.c:963
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
static struct m0_fop_type net_fopt
Definition: net.c:123
Definition: net.c:93
void m0_free(void *data)
Definition: memory.c:146
Definition: mutex.h:47
static struct m0_addb2_source * s
Definition: consumer.c:39
struct m0_rpc_item f_item
Definition: fop.h:83
static void net_lock(struct m0_addb2_net *net)
Definition: net.c:298
struct m0_pdclust_src_addr src
Definition: fd.c:108
void m0_rpc_item_source_register_locked(struct m0_rpc_conn *conn, struct m0_rpc_item_source *ris)
Definition: item_source.c:83
struct m0_mutex ne_lock
Definition: net.c:63
Definition: fop.h:79
static void empty(void)
Definition: consumer.c:101
static void net_force(struct m0_addb2_net *net)
Definition: net.c:316
bool(* riso_has_item)(const struct m0_rpc_item_source *ris)
Definition: item_source.h:85
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735