Motr  M0
consumer.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2015-2020 Seagate Technology LLC and/or its Affiliates
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * For any questions about this software or licensing,
17  * please email opensource@seagate.com or cortx-questions@seagate.com.
18  *
19  */
20 
21 
22 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_UT
23 
24 #include "lib/trace.h"
25 #include "lib/misc.h" /* ARRAY_SIZE */
26 #include "ut/ut.h"
27 #include "addb2/addb2.h"
28 #include "addb2/consumer.h"
29 #include "addb2/ut/common.h"
30 
31 static int noop_submit(const struct m0_addb2_mach *m,
32  struct m0_addb2_trace *t)
33 {
34  ++submitted;
35  return 0;
36 }
37 
38 static struct m0_addb2_mach *m;
39 static struct m0_addb2_source *s;
40 static struct m0_addb2_philter p;
41 static struct m0_addb2_callback c;
42 static struct m0_addb2_sensor sen;
43 
44 static void (*fire)(const struct m0_addb2_source *,
45  const struct m0_addb2_philter *,
46  const struct m0_addb2_callback *,
47  const struct m0_addb2_record *) = NULL;
48 
49 static void test_fire(const struct m0_addb2_source *s,
50  const struct m0_addb2_philter *p,
51  const struct m0_addb2_callback *c,
52  const struct m0_addb2_record *r)
53 {
54  (*fire)(s, p, c, r);
55 }
56 
57 static void source_get(void)
58 {
65 }
66 
67 static void source_put(void)
68 {
73  mach_put(m);
74  fire = NULL;
75 }
76 
77 static struct stream {
78  unsigned nr;
79  unsigned seen;
80  struct small_record *rec;
81 } *shouldbe;
82 
83 static uint64_t fired;
84 
85 static void cmp_fire(const struct m0_addb2_source *s,
86  const struct m0_addb2_philter *p,
87  const struct m0_addb2_callback *c,
88  const struct m0_addb2_record *r)
89 {
90  if (shouldbe != NULL) {
93  ++ shouldbe->seen;
94  }
95  ++ fired;
96 }
97 
101 static void empty(void)
102 {
103  fired = 0;
104  fire = &cmp_fire;
105  source_get();
106  source_put();
107  M0_UT_ASSERT(fired == 0);
108 }
109 
110 enum {
111  LABEL_ID = 0x7fff5aa4a947 /* A most important constant. */
112 };
113 
114 static uint64_t payload[1024];
115 
120 static void empty_push_pop(void)
121 {
122  int i;
123 
124  fired = 0;
125  fire = &cmp_fire;
126  source_get();
127  for (i = 0; i < M0_ADDB2_LABEL_MAX / 2; ++i) {
128  m0_addb2_push(LABEL_ID + i, i/8, payload + i);
131  }
132  while (--i >= 0)
134  source_put();
135  M0_UT_ASSERT(fired == 0);
136 }
137 
141 static void data(void)
142 {
143  fire = &cmp_fire;
144  shouldbe = &(struct stream) {
145  .nr = 1,
146  .seen = 0,
147  .rec = (struct small_record[]) {
148  [0] = {
149  .ar_val = VAL(LABEL_ID, 2, 3, 5, 7, 11),
150  .ar_label_nr = 0
151  }
152  }
153  };
154 
155  source_get();
156  M0_ADDB2_ADD(LABEL_ID, 2, 3, 5, 7, 11);
157  source_put();
159 }
160 
165 static void data_label(void)
166 {
167  fire = &cmp_fire;
168  shouldbe = &(struct stream) {
169  .nr = 2,
170  .seen = 0,
171  .rec = (struct small_record[]) {
172  [0] = {
173  .ar_val = VAL(LABEL_ID, 2, 3, 5, 7, 11),
174  .ar_label_nr = 1,
175  .ar_label = {
176  [0] = VAL(LABEL_ID + 1, 1, 1, 2, 3, 5)
177  }
178  },
179  [1] = {
180  .ar_val = VAL(LABEL_ID + 4, 1, 6, 21, 107),
181  .ar_label_nr = 2,
182  .ar_label = {
183  [0] = VAL(LABEL_ID + 1, 1, 1, 2, 3, 5),
184  [1] = VAL(LABEL_ID + 3, 1, 2, 4, 8, 16,
185  32, 64, 128, 256, 512)
186  }
187  }
188  }
189  };
190 
191  source_get();
192  M0_ADDB2_PUSH(LABEL_ID + 1, 1, 1, 2, 3, 5);
193  M0_ADDB2_ADD(LABEL_ID, 2, 3, 5, 7, 11);
194  M0_ADDB2_PUSH(LABEL_ID + 2, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100);
195  m0_addb2_pop(LABEL_ID + 2);
196  M0_ADDB2_PUSH(LABEL_ID + 3, 1, 2, 4, 8, 16, 32, 64, 128, 256, 512);
197  M0_ADDB2_ADD(LABEL_ID + 4, 1, 6, 21, 107); /* A060843 */
198  m0_addb2_pop(LABEL_ID + 3);
199  m0_addb2_pop(LABEL_ID + 1);
200  source_put();
202 }
203 
204 #if 0
205 #include <stdio.h>
206 
207 static void indent(unsigned x)
208 {
209  static const char ruler[] = " "
210  " ";
211  printf("%*.*s", x, x, ruler);
212 }
213 
214 static void val_print(const struct m0_addb2_value *v, int offset)
215 {
216  int i;
217 
218  indent(offset);
219  printf("[%8.8" PRIx64 " |", v->va_id);
220  for (i = 0; i < v->va_nr; ++i)
221  printf(" %8.8"PRIx64, v->va_data[i]);
222  printf("]\n");
223 }
224 
225 static void print_fire(const struct m0_addb2_source *s,
226  const struct m0_addb2_philter *p,
227  const struct m0_addb2_callback *c,
228  const struct m0_addb2_record *r)
229 {
230  int i;
231 
232  val_print(&r->ar_val, 0);
233  for (i = 0; i < r->ar_label_nr; ++i)
234  val_print(&r->ar_label[i], 8);
235  ++ fired;
236 }
237 #endif
238 
243 static void sensor(void)
244 {
245  fire = &cmp_fire;
246  shouldbe = NULL;
247  fired = 0;
248  seq = 0;
249  sensor_finalised = false;
250  M0_SET0(&sen);
251 
252  source_get();
253  M0_ADDB2_PUSH(LABEL_ID + 0, 1, 1, 2, 3, 5);
254  M0_ADDB2_PUSH(LABEL_ID + 1, 1, 6, 21, 107);
255  m0_addb2_sensor_add(&sen, LABEL_ID + 2, 2, -1, &sensor_ops);
256  M0_ADDB2_ADD(LABEL_ID + 4, 1, 6, 21, 107);
257  m0_addb2_pop(LABEL_ID + 1);
258  m0_addb2_pop(LABEL_ID + 0);
259  source_put();
260  M0_UT_ASSERT(seq + 1 == fired);
262 }
263 
268 static void sensor_N(void)
269 {
270  int issued = 0;
271 
272  fire = &cmp_fire;
273  shouldbe = NULL;
274  fired = 0;
275  seq = 0;
276  sensor_finalised = false;
277  M0_SET0(&sen);
278 
279  source_get();
280  M0_ADDB2_PUSH(LABEL_ID + 0, 1, 1, 2, 3, 5);
281  M0_ADDB2_PUSH(LABEL_ID + 1, 1, 6, 21, 107);
282  m0_addb2_sensor_add(&sen, LABEL_ID + 2, 2, -1, &sensor_ops);
283  M0_ADDB2_ADD(LABEL_ID + 4, 1, 6, 21, 107);
284  ++ issued;
285  issued += fill_one(m);
286  M0_ADDB2_ADD(LABEL_ID + 5, 127, 0, 0, 1);
287  ++ issued;
288  m0_addb2_pop(LABEL_ID + 1);
289  m0_addb2_pop(LABEL_ID + 0);
290  source_put();
293 }
294 
295 static void sensor_check_fire(const struct m0_addb2_source *s,
296  const struct m0_addb2_philter *p,
297  const struct m0_addb2_callback *c,
298  const struct m0_addb2_record *r)
299 {
300  M0_UT_ASSERT(receq(r, &(struct small_record) {
301  .ar_val = VAL(LABEL_ID + 2, SENSOR_MARKER, seq),
302  .ar_label_nr = 2,
303  .ar_label = {
304  [0] = VAL(LABEL_ID + 0, 1, 1, 2, 3, 5),
305  [1] = VAL(LABEL_ID + 1, 1, 6, 21, 107)
306  }
307  }));
308  ++ *(int *)c->ca_datum;
309 }
310 
316 static void id_philter(void)
317 {
318  struct m0_addb2_philter idph;
319  struct m0_addb2_callback sensor_check;
320  int sensor_consumed = 0;
321 
322  fire = &cmp_fire;
323  shouldbe = NULL;
324  fired = 0;
325  seq = 0;
326  sensor_finalised = false;
327  M0_SET0(&sen);
328 
329  source_get();
330 
332  m0_addb2_callback_init(&sensor_check,
333  &sensor_check_fire, &sensor_consumed);
334  m0_addb2_callback_add(&idph, &sensor_check);
335  m0_addb2_philter_add(s, &idph);
336 
337  M0_ADDB2_PUSH(LABEL_ID + 0, 1, 1, 2, 3, 5);
338  M0_ADDB2_PUSH(LABEL_ID + 1, 1, 6, 21, 107);
339  m0_addb2_sensor_add(&sen, LABEL_ID + 2, 2, -1, &sensor_ops);
340  fill_one(m);
341  fill_one(m);
342  fill_one(m);
343  fill_one(m);
344  m0_addb2_pop(LABEL_ID + 1);
345  m0_addb2_pop(LABEL_ID + 0);
346 
347  m0_addb2_philter_del(&idph);
348  m0_addb2_callback_del(&sensor_check);
349  m0_addb2_callback_fini(&sensor_check);
350  m0_addb2_philter_fini(&idph);
351  source_put();
352  M0_UT_ASSERT(sensor_consumed == seq);
354 }
355 
360 static void global_philter(void)
361 {
362  fire = &cmp_fire;
363  fired = 0;
368  M0_ADDB2_ADD(LABEL_ID, 2, 3, 5, 7, 11);
369  M0_UT_ASSERT(fired == 1);
374  fired = 0;
375  M0_ADDB2_ADD(LABEL_ID, 2, 3, 5, 7, 11);
376  M0_UT_ASSERT(fired == 0);
382  M0_ADDB2_ADD(LABEL_ID, 2, 3, 5, 7, 11);
383  M0_UT_ASSERT(fired == 2);
389 }
390 
392  .ts_name = "addb2-consumer",
393  .ts_init = NULL,
394  .ts_fini = NULL,
395  .ts_tests = {
396  { "empty", &empty },
397  { "empty-push-pop", &empty_push_pop },
398  { "data", &data },
399  { "data-label", &data_label },
400  { "sensor", &sensor },
401  { "sensor-N", &sensor_N },
402  { "id-philter", &id_philter },
403  { "global-philter", &global_philter },
404  { NULL, NULL }
405  }
406 };
407 
408 #undef M0_TRACE_SUBSYSTEM
409 
410 /*
411  * Local variables:
412  * c-indentation-style: "K&R"
413  * c-basic-offset: 8
414  * tab-width: 8
415  * fill-column: 80
416  * scroll-step: 1
417  * End:
418  */
static void id_philter(void)
Definition: consumer.c:316
static struct m0_addb2_philter p
Definition: consumer.c:40
static void sensor(void)
Definition: consumer.c:243
void m0_addb2_callback_fini(struct m0_addb2_callback *callback)
Definition: consumer.c:109
#define NULL
Definition: misc.h:38
static struct m0_addb2_mach * m
Definition: consumer.c:38
static int noop_submit(const struct m0_addb2_mach *m, struct m0_addb2_trace *t)
Definition: consumer.c:31
struct m0_ut_suite addb2_consumer_ut
Definition: consumer.c:391
static bool x
Definition: sm.c:168
unsigned va_nr
Definition: consumer.h:105
#define M0_ADDB2_PUSH(id,...)
Definition: addb2.h:261
static void test_fire(const struct m0_addb2_source *s, const struct m0_addb2_philter *p, const struct m0_addb2_callback *c, const struct m0_addb2_record *r)
Definition: consumer.c:49
bool sensor_finalised
Definition: common.c:98
uint64_t va_id
Definition: consumer.h:103
void m0_addb2_philter_global_del(struct m0_addb2_philter *ph)
Definition: consumer.c:196
static uint64_t fired
Definition: consumer.c:83
#define M0_SET0(obj)
Definition: misc.h:64
M0_ADDB2_ADD(M0_AVI_FS_CREATE, new_fid.f_container, new_fid.f_key, mode, rc)
Definition: ut.h:77
#define PRIx64
Definition: types.h:61
int submitted
Definition: common.c:30
static void data(void)
Definition: consumer.c:141
unsigned seen
Definition: consumer.c:79
static void empty_push_pop(void)
Definition: consumer.c:120
void mach_put(struct m0_addb2_mach *m)
Definition: common.c:89
int i
Definition: dir.c:1033
static void sensor_check_fire(const struct m0_addb2_source *s, const struct m0_addb2_philter *p, const struct m0_addb2_callback *c, const struct m0_addb2_record *r)
Definition: consumer.c:295
void m0_addb2_callback_add(struct m0_addb2_philter *ph, struct m0_addb2_callback *callback)
Definition: consumer.c:114
#define VAL(id,...)
Definition: common.h:62
void m0_addb2_push(uint64_t id, int n, const uint64_t *value)
Definition: addb2.c:412
static void global_philter(void)
Definition: consumer.c:360
const uint64_t SENSOR_MARKER
Definition: common.c:96
void m0_addb2_philter_id_init(struct m0_addb2_philter *ph, uint64_t id)
Definition: consumer.c:165
void m0_addb2_philter_true_init(struct m0_addb2_philter *ph)
Definition: consumer.c:160
static struct m0_addb2_callback c
Definition: consumer.c:41
static struct m0_thread t[8]
Definition: service_ut.c:1230
static struct m0_addb2_sensor sen
Definition: consumer.c:42
void m0_addb2_sensor_add(struct m0_addb2_sensor *s, uint64_t id, unsigned nr, int idx, const struct m0_addb2_sensor_ops *ops)
Definition: addb2.c:480
static void source_get(void)
Definition: consumer.c:57
bool receq(const struct m0_addb2_record *r0, const struct small_record *r1)
Definition: common.c:134
static m0_bindex_t offset
Definition: dump.c:173
void m0_addb2_philter_global_add(struct m0_addb2_philter *ph)
Definition: consumer.c:182
void m0_addb2_callback_del(struct m0_addb2_callback *callback)
Definition: consumer.c:120
void m0_addb2_philter_fini(struct m0_addb2_philter *philter)
Definition: consumer.c:78
struct m0_addb2_mach * mach_set(int(*s)(const struct m0_addb2_mach *, struct m0_addb2_trace *))
Definition: common.c:65
void m0_addb2_pop(uint64_t id)
Definition: addb2.c:440
static void source_put(void)
Definition: consumer.c:67
static void indent(int depth)
Definition: gen.c:43
const char * ts_name
Definition: ut.h:99
static uint64_t payload[1024]
Definition: consumer.c:114
const uint64_t * va_data
Definition: consumer.h:106
void m0_addb2_philter_add(struct m0_addb2_source *src, struct m0_addb2_philter *ph)
Definition: consumer.c:86
static int r[NR]
Definition: thread.c:46
static void sensor_N(void)
Definition: consumer.c:268
static const struct m0_addb2_sensor_ops sensor_ops
Definition: counter.c:41
static void data_label(void)
Definition: consumer.c:165
static struct stream * shouldbe
void m0_addb2_callback_init(struct m0_addb2_callback *callback, void(*fire)(const struct m0_addb2_source *, const struct m0_addb2_philter *, const struct m0_addb2_callback *, const struct m0_addb2_record *), void *datum)
Definition: consumer.c:97
unsigned nr
Definition: consumer.c:78
static void cmp_fire(const struct m0_addb2_source *s, const struct m0_addb2_philter *p, const struct m0_addb2_callback *c, const struct m0_addb2_record *r)
Definition: consumer.c:85
void m0_addb2_philter_del(struct m0_addb2_philter *ph)
Definition: consumer.c:92
int fill_one(struct m0_addb2_mach *m)
Definition: common.c:117
static unsigned issued
Definition: storage.c:262
static struct m0_addb2_source * s
Definition: consumer.c:39
struct m0_addb2_source * m0_addb2_mach_source(struct m0_addb2_mach *m)
Definition: addb2.c:678
#define M0_UT_ASSERT(a)
Definition: ut.h:46
static void(* fire)(const struct m0_addb2_source *, const struct m0_addb2_philter *, const struct m0_addb2_callback *, const struct m0_addb2_record *)
Definition: consumer.c:44
uint64_t seq
Definition: common.c:97
static void empty(void)
Definition: consumer.c:101
struct small_record * rec
Definition: consumer.c:80