Motr  M0
next_merge.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2016-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_DIX
31 #include "lib/trace.h"
32 
33 #include "lib/arith.h" /* min64 */
34 #include "lib/memory.h" /* M0_ALLOC_ARR */
35 #include "pool/pool.h" /* m0_pool_version */
36 #include "lib/errno.h"
37 #include "dix/client.h"
38 #include "dix/req.h"
39 #include "lib/finject.h"
40 
41 #define NOENT (-ENOENT)
42 #define PROCESSING_IS_DONE (-ENOKEY)
43 
44 static void sc_result_add(struct m0_dix_next_sort_ctx *key_ctx,
45  uint32_t cidx,
46  struct m0_dix_next_resultset *rs,
47  uint32_t key_id,
48  struct m0_cas_next_reply *rep);
49 
50 static int sc_rep_cmp(const struct m0_cas_next_reply *a,
51  const struct m0_cas_next_reply *b)
52 {
53  if (a == NULL && b == NULL)
54  return 0;
55  if (a == NULL)
56  return -1;
57  if (b == NULL)
58  return 1;
59  return memcmp(a->cnp_key.b_addr, b->cnp_key.b_addr,
60  min64(a->cnp_key.b_nob, b->cnp_key.b_nob)) ?:
62 }
63 
64 static bool sc_rep_le(const struct m0_cas_next_reply *a,
65  const struct m0_cas_next_reply *b)
66 {
67  return sc_rep_cmp(a, b) < 0;
68 }
69 
70 static bool sc_rep_eq(const struct m0_cas_next_reply *a,
71  const struct m0_cas_next_reply *b)
72 {
73  return sc_rep_cmp(a, b) == 0;
74 }
75 
76 static void sc_next(struct m0_dix_next_sort_ctx *ctx)
77 {
78  ctx->sc_pos++;
79  if (ctx->sc_pos >= ctx->sc_reps_nr)
80  ctx->sc_done = true;
81 }
82 
83 static int sc_rep_get(struct m0_dix_next_sort_ctx *ctx,
84  struct m0_cas_next_reply **rep)
85 {
86  *rep = NULL;
87  if (ctx->sc_reps_nr != 0 &&
88  (ctx->sc_done || ctx->sc_pos >= ctx->sc_reps_nr))
89  return PROCESSING_IS_DONE;
90  if (ctx->sc_stop || ctx->sc_reps_nr == 0)
91  return NOENT;
92  *rep = &ctx->sc_reps[ctx->sc_pos];
93  if ((*rep)->cnp_rc == NOENT) {
94  ctx->sc_stop = true;
95  return NOENT;
96  }
97  return 0;
98 }
99 
105  uint32_t key_idx,
106  const uint32_t *recs_nr)
107 {
108  uint32_t pos = 0;
109  uint32_t start_pos = 0;
110  uint32_t i;
111  struct m0_cas_next_reply *rep;
112 
113  if (ctx->sc_reps_nr == 0) {
114  ctx->sc_stop = true;
115  return 0;
116  }
117 
118  for (i = 0; i < key_idx && pos < ctx->sc_reps_nr; i++) {
119  rep = &ctx->sc_reps[pos];
120  start_pos += recs_nr[i];
122  if (rep->cnp_rc == NOENT) {
123  pos++;
124  continue;
125  }
126  for (; pos < start_pos && pos < ctx->sc_reps_nr; pos++);
127  }
128  ctx->sc_pos = pos;
129  ctx->sc_stop = false;
130  return ctx->sc_pos;
131 }
132 
147 static bool sc_min_val_get(struct m0_dix_next_sort_ctx_arr *ctxarr,
148  struct m0_cas_next_reply **rep,
149  struct m0_dix_next_sort_ctx **ret_ctx,
150  uint32_t *ret_idx)
151 {
152  uint32_t ctx_id;
153  uint32_t done_cnt = 0;
154  uint32_t nokey_cnt = 0;
155  struct m0_dix_next_sort_ctx *ctx;
156  struct m0_dix_next_sort_ctx *ctx_arr;
157  struct m0_cas_next_reply *min = NULL;
158  struct m0_cas_next_reply *val;
159  int rc;
160 
161  *ret_ctx = NULL;
162  ctx_arr = ctxarr->sca_ctx;
163  /* Find minimal value in sort contexts. */
164  for (ctx_id = 0; ctx_id < ctxarr->sca_nr; ctx_id++) {
165  ctx = &ctx_arr[ctx_id];
166  rc = sc_rep_get(ctx, &val);
167  if (rc == NOENT) {
168  nokey_cnt++;
169  continue;
170  }
171  if (rc == PROCESSING_IS_DONE) {
172  done_cnt++;
173  continue;
174  }
175  if (sc_rep_eq(NULL, min) || sc_rep_le(val, min)) {
176  min = val;
177  *ret_ctx = ctx;
178  *ret_idx = ctx->sc_pos;
179  }
180  }
181  *rep = min;
182  if (done_cnt == ctxarr->sca_nr || nokey_cnt == ctxarr->sca_nr)
183  return true;
184 
185  /* Advance positions (if necessary) in all sort contexts. */
186  for (ctx_id = 0; ctx_id < ctxarr->sca_nr; ctx_id++) {
187  ctx = &ctx_arr[ctx_id];
188  if (ctx->sc_stop)
189  continue;
190  sc_rep_get(ctx, &val);
191  if (val == NULL || sc_rep_eq(val, min))
192  sc_next(ctx);
193  }
194  return false;
195 }
196 
198  uint32_t key_idx, uint32_t nr)
199 {
200  M0_ASSERT(rs != NULL);
201  M0_ASSERT(rs->nrs_res != NULL);
202  M0_ASSERT(key_idx < rs->nrs_res_nr);
203 
204  rs->nrs_res[key_idx].drs_nr = nr;
205  M0_ALLOC_ARR(rs->nrs_res[key_idx].drs_reps,
206  rs->nrs_res[key_idx].drs_nr);
207  if (rs->nrs_res[key_idx].drs_reps == NULL)
208  return M0_ERR(-ENOMEM);
209  return 0;
210 }
211 
219 static int dix_data_load(struct m0_dix_req *req,
220  struct m0_dix_next_resultset *rs)
221 {
222  struct m0_dix_cas_rop *cas_rop;
223  struct m0_cas_req *creq;
224  struct m0_dix_rop_ctx *rop = req->dr_rop;
225  struct m0_dix_next_sort_ctx *ctx;
226  uint32_t ctx_id = 0;
227  struct m0_dix_next_sort_ctx *ctxs;
228  uint32_t i;
229 
230  ctxs = rs->nrs_sctx_arr.sca_ctx;
231  m0_tl_for(cas_rop, &rop->dg_cas_reqs, cas_rop) {
232  ctx = &ctxs[ctx_id++];
233  creq = &cas_rop->crp_creq;
234  ctx->sc_creq = creq;
235  ctx->sc_reps_nr = m0_cas_req_nr(creq);
236  if (ctx->sc_reps_nr == 0)
237  continue;
238  M0_ALLOC_ARR(ctx->sc_reps, ctx->sc_reps_nr);
239  if (ctx->sc_reps == NULL) {
240  /* Free already allocated reps. */
241  for (i = 0; i < ctx_id; i++)
242  m0_free(ctxs[i].sc_reps);
243  return M0_ERR(-ENOMEM);
244  }
245  for (i = 0; i < ctx->sc_reps_nr; i++)
246  m0_cas_next_rep(creq, i, &ctx->sc_reps[i]);
247  } m0_tl_endfor;
248  M0_ASSERT(ctx_id == rs->nrs_sctx_arr.sca_nr);
249  return M0_RC(0);
250 }
251 
252 M0_INTERNAL int m0_dix_next_result_prepare(struct m0_dix_req *req)
253 {
254  struct m0_cas_next_reply *rep;
255  struct m0_cas_next_reply *last_rep = NULL;
256  struct m0_dix_next_sort_ctx_arr *ctx_arr;
257  struct m0_dix_next_sort_ctx *ctxs;
258  uint32_t i;
259  uint32_t key_id;
260  uint32_t ctx_id;
261  const uint32_t *recs_nr;
262  uint64_t start_keys_nr;
263  struct m0_dix_next_resultset *rs;
264  uint32_t ctxs_nr;
265  bool done = false;
266  uint32_t rc = 0;
267 
268  recs_nr = req->dr_recs_nr;
269  start_keys_nr = req->dr_items_nr;
270  rs = &req->dr_rs;
271  if (!M0_FI_ENABLED("mock_data_load")) {
272  ctxs_nr = req->dr_rop->dg_cas_reqs_nr;
273  rc = m0_dix_rs_init(rs, start_keys_nr, ctxs_nr);
274  } else
275  ctxs_nr = rs->nrs_sctx_arr.sca_nr;
276  if (rc != 0)
277  goto end;
278  for (i = 0; rc == 0 && i < start_keys_nr; i++)
279  rc = dix_rs_vals_alloc(rs, i, recs_nr[i]);
280  if (rc != 0)
281  goto end;
282  ctx_arr = &rs->nrs_sctx_arr;
283  /*
284  * Initialise all contexts and load all results from cas_rop into sort
285  * contexts.
286  */
287  ctxs = ctx_arr->sca_ctx;
288  if (!M0_FI_ENABLED("mock_data_load"))
289  rc = dix_data_load(req, rs);
290  if (rc != 0)
291  goto end;
292  /* Scan all results and merge-sort values into resultset. */
293  for (key_id = 0; !done && rc == 0 && key_id < start_keys_nr; key_id++) {
294  uint32_t cidx = 0;
295  struct m0_dix_next_sort_ctx *key_ctx = NULL;
296 
297  /* Setup key position for all contexts. */
298  for (ctx_id = 0; ctx_id < ctxs_nr; ctx_id++)
299  sc_key_pos_set(&ctxs[ctx_id], key_id, recs_nr);
300  i = 0;
301  while (rc == 0 && i < recs_nr[key_id]) {
302  if ((done = sc_min_val_get(ctx_arr, &rep, &key_ctx,
303  &cidx)))
304  break;
305  if (rep != NULL &&
306  (i == 0 || !sc_rep_eq(last_rep, rep))) {
307  sc_result_add(key_ctx, cidx, rs, key_id, rep);
308  last_rep = rep;
309  i++;
310  }
311  }
312  }
313  /* Free all creqs. We don't need any data from them. */
314  if (!M0_FI_ENABLED("mock_data_load"))
315  for (ctx_id = 0; ctx_id < ctxs_nr; ctx_id++)
316  m0_cas_req_fini(ctxs[ctx_id].sc_creq);
317  return rc;
318 end:
319  if (!M0_FI_ENABLED("mock_data_load"))
320  m0_dix_rs_fini(rs);
321  return rc;
322 }
323 
324 static int sc_init(struct m0_dix_next_sort_ctx_arr *ctx_arr, uint32_t nr)
325 {
326  ctx_arr->sca_nr = nr;
327  M0_ALLOC_ARR(ctx_arr->sca_ctx, ctx_arr->sca_nr);
328  if (ctx_arr->sca_ctx == NULL)
329  return M0_ERR(-ENOMEM);
330  return 0;
331 }
332 
333 static void sc_fini(struct m0_dix_next_sort_ctx_arr *ctx_arr)
334 {
335  uint32_t i;
336 
337  for (i = 0; i < ctx_arr->sca_nr; i++)
338  m0_free(ctx_arr->sca_ctx[i].sc_reps);
339  m0_free(ctx_arr->sca_ctx);
340 }
341 
342 M0_INTERNAL int m0_dix_rs_init(struct m0_dix_next_resultset *rs,
343  uint32_t start_keys_nr,
344  uint32_t sctx_nr)
345 {
346  int rc;
347 
348  rs->nrs_res_nr = start_keys_nr;
349  M0_ALLOC_ARR(rs->nrs_res, start_keys_nr);
350  if (rs->nrs_res == NULL)
351  return M0_ERR(-ENOMEM);
352  rc = sc_init(&rs->nrs_sctx_arr, sctx_nr);
353  return rc;
354 }
355 
356 M0_INTERNAL void m0_dix_rs_fini(struct m0_dix_next_resultset *rs)
357 {
358  struct m0_dix_next_results *res;
359  int i;
360  int j;
361 
362  M0_ASSERT(rs != NULL);
363  if (rs->nrs_res != NULL) {
364  for (i = 0; i < rs->nrs_res_nr; i++) {
365  /*
366  * Free result keys and values, cause all reps are
367  * mlocked in sc_result_add(). All other keys and values
368  * have been destroyed by m0_cas_req_fini() at the end
369  * of m0_dix_next_result_prepare().
370  */
371  res = &rs->nrs_res[i];
372  if (!M0_FI_ENABLED("mock_data_load"))
373  for (j = 0; j < rs->nrs_res[i].drs_pos; j++) {
374  m0_buf_free(&res->drs_reps[j]->cnp_key);
375  m0_buf_free(&res->drs_reps[j]->cnp_val);
376  }
377  m0_free(res->drs_reps);
378  }
379  m0_free(rs->nrs_res);
380  }
381  sc_fini(&rs->nrs_sctx_arr);
382 }
383 
384 /*
385  * Key_ctx and cidx (cas key index) are required to hold record in memory.
386  * Key/value pair is deallocated in m0_dix_req_fini()->m0_dix_rs_fini().
387  */
388 static void sc_result_add(struct m0_dix_next_sort_ctx *key_ctx,
389  uint32_t cidx,
390  struct m0_dix_next_resultset *rs,
391  uint32_t key_id,
392  struct m0_cas_next_reply *rep)
393 {
394  struct m0_cas_next_reply **reps;
395  struct m0_dix_next_results *res;
396 
397  M0_ASSERT(key_ctx != NULL);
398  M0_ASSERT(rs->nrs_res[key_id].drs_pos < rs->nrs_res[key_id].drs_nr);
399  M0_ASSERT(key_id < rs->nrs_res_nr);
400  /*
401  * Value will be freed at m0_dix_req_fini().
402  * Pointers to keys and vals are stored in next_resultset.
403  * Position sc_pos (returned from sc_min_val_get()) is equal to position
404  * in cas_rep array, that's why we can use cidx for mlock.
405  */
406  if (!M0_FI_ENABLED("mock_data_load"))
407  m0_cas_rep_mlock(key_ctx->sc_creq, cidx);
408  res = &rs->nrs_res[key_id];
409  reps = res->drs_reps;
410  reps[res->drs_pos++] = rep;
411 }
412 
413 #undef M0_TRACE_SUBSYSTEM
414 
417 /*
418  * Local variables:
419  * c-indentation-style: "K&R"
420  * c-basic-offset: 8
421  * tab-width: 8
422  * fill-column: 80
423  * scroll-step: 1
424  * End:
425  */
426 /*
427  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
428  */
M0_INTERNAL void m0_cas_req_fini(struct m0_cas_req *req)
Definition: client.c:288
static bool sc_rep_le(const struct m0_cas_next_reply *a, const struct m0_cas_next_reply *b)
Definition: next_merge.c:64
static int sc_rep_get(struct m0_dix_next_sort_ctx *ctx, struct m0_cas_next_reply **rep)
Definition: next_merge.c:83
struct m0_cas_next_reply * sc_reps
Definition: req.h:135
static size_t nr
Definition: dump.c:1505
static void sc_next(struct m0_dix_next_sort_ctx *ctx)
Definition: next_merge.c:76
static bool sc_min_val_get(struct m0_dix_next_sort_ctx_arr *ctxarr, struct m0_cas_next_reply **rep, struct m0_dix_next_sort_ctx **ret_ctx, uint32_t *ret_idx)
Definition: next_merge.c:147
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
#define NOENT
Definition: next_merge.c:41
#define NULL
Definition: misc.h:38
uint32_t nrs_res_nr
Definition: req.h:167
Definition: idx_mock.c:52
struct m0_dix_next_sort_ctx_arr nrs_sctx_arr
Definition: req.h:168
#define M0_3WAY(v0, v1)
Definition: arith.h:199
static int dix_data_load(struct m0_dix_req *req, struct m0_dix_next_resultset *rs)
Definition: next_merge.c:219
void * b_addr
Definition: buf.h:39
static struct io_request req
Definition: file.c:100
M0_INTERNAL int m0_dix_next_result_prepare(struct m0_dix_req *req)
Definition: next_merge.c:252
struct m0_dix_next_sort_ctx * sca_ctx
Definition: req.h:143
static int sc_key_pos_set(struct m0_dix_next_sort_ctx *ctx, uint32_t key_idx, const uint32_t *recs_nr)
Definition: next_merge.c:104
#define PROCESSING_IS_DONE
Definition: next_merge.c:42
struct m0_fop_getxattr_rep * rep
Definition: dir.c:455
M0_INTERNAL void m0_dix_rs_fini(struct m0_dix_next_resultset *rs)
Definition: next_merge.c:356
#define m0_tl_endfor
Definition: tlist.h:700
struct m0_cas_req crp_creq
Definition: req_internal.h:94
return M0_RC(rc)
int i
Definition: dir.c:1033
static bool sc_rep_eq(const struct m0_cas_next_reply *a, const struct m0_cas_next_reply *b)
Definition: next_merge.c:70
return M0_ERR(-EOPNOTSUPP)
uint32_t drs_nr
Definition: req.h:155
struct m0_cas_next_reply ** drs_reps
Definition: req.h:154
struct m0_cas_req * sc_creq
Definition: req.h:134
m0_bcount_t b_nob
Definition: buf.h:38
#define M0_ASSERT(cond)
M0_INTERNAL void m0_cas_next_rep(const struct m0_cas_req *req, uint32_t idx, struct m0_cas_next_reply *rep)
Definition: client.c:1825
static int sc_rep_cmp(const struct m0_cas_next_reply *a, const struct m0_cas_next_reply *b)
Definition: next_merge.c:50
static void sc_result_add(struct m0_dix_next_sort_ctx *key_ctx, uint32_t cidx, struct m0_dix_next_resultset *rs, uint32_t key_id, struct m0_cas_next_reply *rep)
Definition: next_merge.c:388
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
static struct fdmi_ctx ctx
Definition: main.c:80
static int sc_init(struct m0_dix_next_sort_ctx_arr *ctx_arr, uint32_t nr)
Definition: next_merge.c:324
static long long min(long long a, long long b)
Definition: crate.c:191
uint32_t drs_pos
Definition: req.h:156
M0_INTERNAL uint64_t m0_cas_req_nr(const struct m0_cas_req *req)
Definition: client.c:1308
#define M0_FI_ENABLED(tag)
Definition: finject.h:231
M0_INTERNAL int m0_dix_rs_init(struct m0_dix_next_resultset *rs, uint32_t start_keys_nr, uint32_t sctx_nr)
Definition: next_merge.c:342
static int64_t min64(int64_t a, int64_t b)
Definition: arith.h:46
static int dix_rs_vals_alloc(struct m0_dix_next_resultset *rs, uint32_t key_idx, uint32_t nr)
Definition: next_merge.c:197
M0_INTERNAL void m0_cas_rep_mlock(const struct m0_cas_req *req, uint64_t idx)
Definition: client.c:1816
static unsigned done
Definition: storage.c:91
struct m0_tl dg_cas_reqs
Definition: req_internal.h:168
Definition: nucleus.c:42
struct m0_dix_next_results * nrs_res
Definition: req.h:166
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
int32_t rc
Definition: trigger_fop.h:47
static void sc_fini(struct m0_dix_next_sort_ctx_arr *ctx_arr)
Definition: next_merge.c:333
struct m0_buf cnp_key
Definition: client.h:219