Motr  M0
service.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
29 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_ADDB
30 
31 #include "lib/trace.h"
32 #include "lib/assert.h"
33 #include "lib/chan.h"
34 #include "lib/mutex.h"
35 #include "lib/memory.h"
36 #include "lib/errno.h" /* ENOMEM */
37 #include "lib/misc.h" /* M0_AMB */
38 #include "reqh/reqh_service.h"
39 #include "reqh/reqh.h"
40 #include "fop/fom.h"
41 #include "fop/fop.h"
42 
43 #include "addb2/addb2.h"
44 #include "addb2/addb2_xc.h"
45 #include "addb2/internal.h"
46 #include "addb2/consumer.h"
47 #include "addb2/sys.h"
48 #include "addb2/service.h"
49 
50 struct addb2_service {
53 };
54 
55 struct addb2_fom {
56  struct m0_fom a2_fom;
58 };
59 
60 static int addb2_service_start(struct m0_reqh_service *service);
61 static void addb2_service_stop(struct m0_reqh_service *service);
62 static void addb2_service_fini(struct m0_reqh_service *service);
63 static void addb2_done(struct m0_addb2_trace_obj *obj);
64 static size_t addb2_fom_home_locality(const struct m0_fom *fom);
66  const struct m0_reqh_service_type *stype);
67 
70 static const struct m0_fom_ops addb2_fom_ops;
71 
72 M0_INTERNAL int m0_addb2_service_module_init(void)
73 {
75 }
76 
77 M0_INTERNAL void m0_addb2_service_module_fini(void)
78 {
80 }
81 
83 {
85  return 0;
86 }
87 
89 {
91 
93 }
94 
96 {
98 
100  m0_addb2_source_fini(&service->ase_src);
101  m0_free(service);
102 }
103 
105  const struct m0_reqh_service_type *stype)
106 {
107  struct addb2_service *service;
108 
110  if (service != NULL) {
111  *svc = &service->ase_service;
112  (*svc)->rs_type = stype;
113  (*svc)->rs_ops = &addb2_service_ops;
114  m0_addb2_source_init(&service->ase_src);
115  return M0_RC(0);
116  } else
117  return M0_ERR(-ENOMEM);
118 }
119 
120 static int addb2_fom_create(struct m0_fop *fop,
121  struct m0_fom **out, struct m0_reqh *reqh)
122 {
123  struct addb2_fom *fom;
124 
125  M0_ALLOC_PTR(fom);
126  if (fom != NULL) {
127  *out = &fom->a2_fom;
129  &addb2_fom_ops, fop, NULL, reqh);
130  return M0_RC(0);
131  } else
132  return M0_ERR(-ENOMEM);
133 }
134 
135 enum {
139 };
140 
141 static int addb2_fom_tick(struct m0_fom *fom0)
142 {
143  struct addb2_fom *fom = M0_AMB(fom, fom0, a2_fom);
144  struct m0_addb2_trace *trace = m0_fop_data(fom0->fo_fop);
145  struct m0_reqh_service *svc = fom0->fo_service;
147  struct m0_addb2_source *src = &service->ase_src;
148  struct m0_addb2_cursor *cur = &fom->a2_cur;
149  struct m0_addb2_sys *sys = m0_fom_dom()->fd_addb2_sys;
150  struct m0_addb2_trace_obj *obj;
151 
152  switch (m0_fom_phase(fom0)) {
153  case ADDB2_CONSUME:
154  m0_addb2_cursor_init(cur, trace);
155  while (m0_addb2_cursor_next(cur) > 0)
156  m0_addb2_consume(src, &cur->cu_rec);
159  return M0_FSO_AGAIN;
160  case ADDB2_SUBMIT:
161  M0_ALLOC_PTR(obj);
162  if (obj != NULL) {
163  obj->o_tr = *trace;
164  obj->o_done = &addb2_done;
165  /*
166  * Reset the data, so fom finalisation doesn't free it.
167  *
168  * The trace is freed in addb2_done().
169  */
170  fom0->fo_fop->f_data.fd_data = NULL;
171  if (m0_addb2_sys_submit(sys, obj) == 0)
172  addb2_done(obj);
173  } else
174  M0_LOG(M0_NOTICE, "Lost trace.");
176  return M0_FSO_WAIT;
177  default:
178  M0_IMPOSSIBLE("Invalid phase");
179  }
180 }
181 
182 static void addb2_fom_fini(struct m0_fom *fom0)
183 {
184  struct addb2_fom *fom = M0_AMB(fom, fom0, a2_fom);
185 
186  m0_fom_fini(fom0);
187  m0_free(fom);
188 }
189 
190 static void addb2_done(struct m0_addb2_trace_obj *obj)
191 {
192  m0_free(obj->o_tr.tr_body);
193  m0_free(obj);
194 }
195 
196 static size_t addb2_fom_home_locality(const struct m0_fom *fom)
197 {
198  static size_t seq = 0;
199  return seq++;
200 }
201 
202 static const struct m0_fom_ops addb2_fom_ops = {
204  .fo_home_locality = &addb2_fom_home_locality,
205  .fo_fini = &addb2_fom_fini
206 };
207 
208 M0_INTERNAL const struct m0_fom_type_ops m0_addb2__fom_type_ops = {
210 };
211 
213  [ADDB2_CONSUME] = {
214  .sd_name = "consume",
215  .sd_allowed = M0_BITS(ADDB2_SUBMIT),
216  .sd_flags = M0_SDF_INITIAL
217  },
218  [ADDB2_SUBMIT] = {
219  .sd_name = "submit",
220  .sd_allowed = M0_BITS(ADDB2_DONE),
221  },
222  [ADDB2_DONE] = {
223  .sd_name = "done",
224  .sd_flags = M0_SDF_TERMINAL
225  }
226 };
227 
228 M0_INTERNAL const struct m0_sm_conf m0_addb2__sm_conf = {
229  .scf_name = "addb2 fom",
230  .scf_nr_states = ARRAY_SIZE(addb2_fom_phases),
231  .scf_state = addb2_fom_phases
232 };
233 
234 static const struct m0_reqh_service_type_ops addb2_service_type_ops = {
236 };
237 
238 static const struct m0_reqh_service_ops addb2_service_ops = {
240  .rso_start = &addb2_service_start,
241  .rso_stop = &addb2_service_stop,
242  .rso_fini = &addb2_service_fini
243 };
244 
246  .rst_name = "M0_CST_ADDB2",
247  .rst_ops = &addb2_service_type_ops,
248  .rst_level = M0_RS_LEVEL_NORMAL,
249  .rst_typecode = M0_CST_ADDB2
250 };
251 
252 #undef M0_TRACE_SUBSYSTEM
253 
256 /*
257  * Local variables:
258  * c-indentation-style: "K&R"
259  * c-basic-offset: 8
260  * tab-width: 8
261  * fill-column: 80
262  * scroll-step: 1
263  * End:
264  */
265 /*
266  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
267  */
M0_INTERNAL int m0_reqh_service_state_get(const struct m0_reqh_service *s)
Definition: reqh_service.c:560
void * fd_data
Definition: fop.h:75
#define M0_PRE(cond)
static void addb2_service_stop(struct m0_reqh_service *service)
Definition: service.c:88
struct m0_fop * fo_fop
Definition: fom.h:490
struct m0_reqh_service ase_service
Definition: service.c:51
#define NULL
Definition: misc.h:38
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
static int addb2_service_start(struct m0_reqh_service *service)
Definition: service.c:82
Definition: sm.h:350
int(* fo_tick)(struct m0_fom *fom)
Definition: fom.h:663
#define M0_LOG(level,...)
Definition: trace.h:167
static int addb2_fom_create(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: service.c:120
int(* fto_create)(struct m0_fop *fop, struct m0_fom **out, struct m0_reqh *reqh)
Definition: fom.h:650
M0_INTERNAL const struct m0_sm_conf m0_addb2__sm_conf
Definition: service.c:228
int m0_addb2_cursor_next(struct m0_addb2_cursor *cur)
Definition: addb2.c:704
void m0_addb2_cursor_fini(struct m0_addb2_cursor *cur)
Definition: addb2.c:699
static int addb2_service_type_allocate(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: service.c:104
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
#define M0_BITS(...)
Definition: misc.h:236
static int addb2_fom_tick(struct m0_fom *fom0)
Definition: service.c:141
static void addb2_fom_fini(struct m0_fom *fom0)
Definition: service.c:182
int m0_reqh_service_type_register(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:473
static struct foo * obj
Definition: tlist.c:302
m0_fom_phase
Definition: fom.h:372
struct m0_fom_type ft_fom_type
Definition: fop.h:232
struct m0_addb2_source ase_src
Definition: service.c:52
return M0_RC(rc)
M0_INTERNAL int m0_addb2_service_module_init(void)
Definition: service.c:72
void m0_fom_init(struct m0_fom *fom, const struct m0_fom_type *fom_type, const struct m0_fom_ops *ops, struct m0_fop *fop, struct m0_fop *reply, struct m0_reqh *reqh)
Definition: fom.c:1372
struct m0_fop_type * f_type
Definition: fop.h:81
return M0_ERR(-EOPNOTSUPP)
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
static const struct socktype stype[]
Definition: sock.c:1156
void m0_fom_fini(struct m0_fom *fom)
Definition: fom.c:1324
const char * scf_name
Definition: sm.h:352
static struct m0_sm_state_descr addb2_fom_phases[]
Definition: service.c:212
const char * rst_name
Definition: reqh_service.h:447
struct m0_addb2_cursor a2_cur
Definition: service.c:57
void m0_addb2_consume(struct m0_addb2_source *src, const struct m0_addb2_record *rec)
Definition: consumer.c:143
int m0_addb2_sys_submit(struct m0_addb2_sys *sys, struct m0_addb2_trace_obj *obj)
Definition: sys.c:392
static const struct m0_fom_ops addb2_fom_ops
Definition: service.c:70
static void addb2_done(struct m0_addb2_trace_obj *obj)
Definition: service.c:190
Definition: reqh.h:94
Definition: dump.c:103
void m0_addb2_cursor_init(struct m0_addb2_cursor *cur, const struct m0_addb2_trace *trace)
Definition: addb2.c:690
M0_INTERNAL struct m0_reqh_service_type m0_addb2_service_type
Definition: service.c:245
static size_t addb2_fom_home_locality(const struct m0_fom *fom)
Definition: service.c:196
int m0_reqh_service_async_start_simple(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.c:601
Definition: fom.h:481
struct m0_fop_data f_data
Definition: fop.h:82
struct m0_reqh reqh
Definition: rm_foms.c:48
const char * sd_name
Definition: sm.h:383
int(* rsto_service_allocate)(struct m0_reqh_service **service, const struct m0_reqh_service_type *stype)
Definition: reqh_service.h:435
M0_INTERNAL const struct m0_fom_type_ops m0_addb2__fom_type_ops
Definition: service.c:208
const struct m0_reqh_service_type * rs_type
Definition: reqh_service.h:227
struct m0_reqh_service * fo_service
Definition: fom.h:505
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
M0_INTERNAL struct m0_fom_domain * m0_fom_dom(void)
Definition: locality.c:575
static struct m0_net_test_service svc
Definition: service.c:34
M0_INTERNAL void m0_addb2_service_module_fini(void)
Definition: service.c:77
struct m0_fom a2_fom
Definition: service.c:56
static struct m0_fop * fop
Definition: item.c:57
static const struct m0_reqh_service_ops addb2_service_ops
Definition: service.c:68
struct m0_addb2_sys * fd_addb2_sys
Definition: fom.h:338
void m0_addb2_source_fini(struct m0_addb2_source *src)
Definition: consumer.c:60
#define out(...)
Definition: gen.c:41
static const struct m0_reqh_service_type_ops addb2_service_type_ops
Definition: service.c:69
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
void m0_free(void *data)
Definition: memory.c:146
void m0_addb2_source_init(struct m0_addb2_source *src)
Definition: consumer.c:55
void m0_reqh_service_type_unregister(struct m0_reqh_service_type *rstype)
Definition: reqh_service.c:490
static struct m0_reqh_service * service[REQH_IN_UT_MAX]
Definition: long_lock_ut.c:46
struct m0_pdclust_src_addr src
Definition: fd.c:108
static void addb2_service_fini(struct m0_reqh_service *service)
Definition: service.c:95
int(* rso_start_async)(struct m0_reqh_service_start_async_ctx *asc)
Definition: reqh_service.h:341
#define ARRAY_SIZE(a)
Definition: misc.h:45
uint64_t seq
Definition: common.c:97
Definition: fop.h:79
#define M0_IMPOSSIBLE(fmt,...)