Motr  M0
fdmi_sample_plugin.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2021 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_FDMI
23 #include "lib/trace.h"
24 
25 #include "motr/client.h"
26 #include "fid/fid.h"
27 #include "motr/client_internal.h"
28 #include "lib/getopts.h" /* M0_GETOPTS */
29 #include "fdmi/fdmi.h"
30 #include "fdmi/plugin_dock.h"
31 #include "fdmi/service.h"
32 #include "reqh/reqh.h"
33 
38 struct m0_fsp_params {
46 
55 };
56 
68 
78 static struct m0_semaphore fsp_sem;
79 volatile static int terminated = 0;
86 static struct m0_config fsp_conf = {};
87 
92 static struct m0_client *fsp_client = NULL;
93 
94 /* DIX service (client interface for kv storage) config. */
95 static struct m0_idx_dix_config fsp_dix_conf = {};
96 
101 const struct m0_fdmi_pd_ops *fsp_pdo;
102 
109 
110 static void dump_fol_rec_to_json(struct m0_uint128 *rec_id,
111  struct m0_fol_rec *rec)
112 {
113  struct m0_fol_frag *frag;
114  int i, j;
115 
116  m0_tl_for(m0_rec_frag, &rec->fr_frags, frag) {
117  struct m0_fop_fol_frag *fp_frag = frag->rp_data;
118  struct m0_cas_op *cas_op = fp_frag->ffrp_fop;
119  struct m0_cas_recv cg_rec = cas_op->cg_rec;
120  struct m0_cas_rec *cr_rec = cg_rec.cr_rec;
121 
122  for (i = 0; i < cg_rec.cr_nr; i++) {
123  const unsigned char *addr;
124  int len;
125 
126  printf("{ \"opcode\": \"%d\", ",
127  fp_frag->ffrp_fop_code);
128 
129  printf("\"rec_id\": \""U128X_F"\", ",
130  U128_P(rec_id));
131 
132  printf("\"fid\": \""FID_F"\", ",
133  FID_P(&cas_op->cg_id.ci_fid));
134 
135  len = cr_rec[i].cr_key.u.ab_buf.b_nob;
136  addr = cr_rec[i].cr_key.u.ab_buf.b_addr;
138  printf("\"cr_key\": \"%.*s\", ", len, addr);
139  } else {
140  printf("\"cr_key\": \"");
141  for (j = 0; j < len; j++)
142  printf("%02x", addr[j]);
143  printf("\", ");
144  }
145  len = cr_rec[i].cr_val.u.ab_buf.b_nob;
146  addr = cr_rec[i].cr_val.u.ab_buf.b_addr;
147  if (len > 0) {
149  printf("\"cr_val\": \"%.*s\"",
150  len, addr);
151  } else {
152  printf("\"cr_val\": \"");
153  for (j = 0; j < len; j++)
154  printf("%02x", addr[j]);
155  printf("\"");
156  }
157  } else {
158  printf("\"cr_val\": \"0\"");
159  }
160  printf(" }\n");
161  }
162  } m0_tl_endfor;
163 }
164 
165 static void fsp_usage(void)
166 {
167  fprintf(stderr,
168  "Usage: fdmi_sample_plugin "
169  "-l local_addr -h ha_addr -p profile_fid -f process_fid "
170  "-g fdmi_plugin_fid [-s] [-r]\n"
171  "Use -? or -i for more verbose help on common arguments.\n"
172  "Usage example for common arguments: \n"
173  "fdmi_sample_plugin -l 192.168.52.53@tcp:12345:4:1 "
174  "-h 192.168.52.53@tcp:12345:1:1 "
175  "-p 0x7000000000000001:0x37 -f 0x7200000000000001:0x19 "
176  "-g 0x6c00000000000001:0x51"
177  "-s output the key/val as a string. Otherwise as hex."
178  "-r plugin adds an extra reference on the record and then "
179  " release it when it is consumed"
180  "\n");
181 }
182 
187 static int fsp_args_parse(struct m0_fsp_params *params, int argc, char ** argv)
188 {
189  int rc = 0;
190 
191  params->spp_local_addr = NULL;
192  params->spp_hare_addr = NULL;
193  params->spp_profile_fid = NULL;
194  params->spp_process_fid = NULL;
195  params->spp_fdmi_plugin_fid_s = NULL;
196  params->spp_output_strings = false;
197  params->spp_extra_ref = false;
198 
199  rc = M0_GETOPTS("fdmi_sample_plugin", argc, argv,
200  M0_HELPARG('?'),
201  M0_VOIDARG('i', "more verbose help",
202  LAMBDA(void, (void) {
203  fsp_usage();
204  exit(0);
205  })),
206  M0_STRINGARG('l', "Local endpoint address",
207  LAMBDA(void, (const char *string) {
208  params->spp_local_addr = (char*)string;
209  })),
210  M0_STRINGARG('h', "HA address",
211  LAMBDA(void, (const char *str) {
212  params->spp_hare_addr = (char*)str;
213  })),
214  M0_STRINGARG('f', "Process FID",
215  LAMBDA(void, (const char *str) {
216  params->spp_process_fid = (char*)str;
217  })),
218  M0_STRINGARG('p', "Profile options for client",
219  LAMBDA(void, (const char *str) {
220  params->spp_profile_fid = (char*)str;
221  })),
222  M0_STRINGARG('g', "FDMI plugin fid",
223  LAMBDA(void, (const char *str) {
224  params->spp_fdmi_plugin_fid_s =
225  (char*)str;
226  })),
227  M0_VOIDARG('s', "output key/val as string",
228  LAMBDA(void, (void) {
229  params->spp_output_strings = true;
230  })),
231  M0_VOIDARG('r', "adding an extra ref count",
232  LAMBDA(void, (void) {
233  params->spp_extra_ref = true;
234  })));
235  if (rc != 0)
236  return M0_ERR(rc);
237 
238  /* All mandatory params must be defined. */
239  if (params->spp_local_addr == NULL || params->spp_hare_addr == NULL ||
240  params->spp_profile_fid == NULL || params->spp_process_fid == NULL ||
241  params->spp_fdmi_plugin_fid_s == NULL) {
242  fsp_usage();
243  return M0_ERR(-EINVAL);
244  }
245 
246  rc = m0_fid_sscanf(params->spp_fdmi_plugin_fid_s, &params->spp_fdmi_plugin_fid);
247  if (rc != 0) {
248  rc = M0_ERR_INFO(rc, "Invalid FDMI plugin fid format: fid=%s",
249  params->spp_fdmi_plugin_fid_s);
250  }
251 
252  return rc;
253 }
254 
259 static int process_fdmi_record(struct m0_uint128 *rec_id,
260  struct m0_buf fdmi_rec,
261  struct m0_fid filter_id)
262 {
263  struct m0_fdmi_record_reg *rreg;
264  struct m0_fol_rec fol_rec;
265  int rc;
266 
267  m0_fol_rec_init(&fol_rec, NULL);
268  rc = m0_fol_rec_decode(&fol_rec, &fdmi_rec);
269  if (rc != 0)
270  goto out;
271 
273  /*
274  * Adding an extra ref on this record.
275  * It will be released when plugin gets ack from consumer.
276  */
277  rreg = m0_fdmi__pdock_record_reg_find(rec_id);
278  if (rreg != NULL)
279  m0_ref_get(&rreg->frr_ref);
280  }
281 
282  dump_fol_rec_to_json(rec_id, &fol_rec);
283 out:
284  m0_fol_rec_fini(&fol_rec);
285  return rc;
286 }
287 
288 static int fdmi_service_start(struct m0_client *m0c)
289 {
290  struct m0_reqh *reqh = &m0c->m0c_reqh;
291  struct m0_reqh_service_type *stype;
292  bool start_service = false;
293  int rc = 0;
294 
295  stype = m0_reqh_service_type_find("M0_CST_FDMI");
296  if (stype == NULL) {
297  M0_LOG(M0_ERROR, "FDMI service type is not found.");
298  return M0_ERR_INFO(-EINVAL, "Unknown reqh service type: M0_CST_FDMI");
299  }
300 
302  if (fsp_fdmi_service == NULL) {
304  if (rc != 0)
305  return M0_RC_INFO(rc, "Failed to allocate FDMI service.");
307  start_service = true;
308  }
309 
310  if (start_service)
312  return M0_RC(rc);
313 }
314 
315 static void fdmi_service_stop(struct m0_client *m0c)
316 {
317  struct m0_reqh *reqh = &m0c->m0c_reqh;
318 
319  if (fsp_fdmi_service != NULL) {
325  }
326 }
327 
329 {
330  const static struct m0_fdmi_plugin_ops pcb = {
332  };
333  const struct m0_fdmi_filter_desc fd;
334  int rc;
335 
337 
338  rc = fsp_pdo->fpo_register_filter(&params->spp_fdmi_plugin_fid, &fd, &pcb);
339  fprintf(stderr, "Plugin registration: rc=%d\n", rc);
340  if (rc != 0)
341  return rc;
342 
343  fsp_pdo->fpo_enable_filters(true, &params->spp_fdmi_plugin_fid, 1);
344  return rc;
345 }
346 
348 {
349  fsp_pdo->fpo_enable_filters(false, &params->spp_fdmi_plugin_fid, 1);
350  fsp_pdo->fpo_deregister_plugin(&params->spp_fdmi_plugin_fid, 1);
351 }
352 
358 {
359  struct m0_uint128 rec_id = { 0 };
360  int len;
361 
362  setvbuf(stdin, NULL, _IONBF, 0);
363  while (1) {
364  len = scanf(" %"SCNx64" : %"SCNx64, &rec_id.u_hi, &rec_id.u_lo);
365  if (len == 2) {
367  fsp_pdo->fpo_release_fdmi_rec(&rec_id, NULL);
368  } else {
369  if (terminated != 0)
370  break;
371  }
372  }
373 }
374 
375 static int fsp_init(struct m0_fsp_params *params)
376 {
377  int rc;
378 
379  fsp_conf.mc_local_addr = params->spp_local_addr;
380  fsp_conf.mc_ha_addr = params->spp_hare_addr;
381  fsp_conf.mc_profile = params->spp_profile_fid;
382  fsp_conf.mc_process_fid = params->spp_process_fid;
389 
392 
393  /* Client instance init */
394  rc = m0_client_init(&fsp_client, &fsp_conf, true);
395  if (rc != 0)
396  return rc;
397 
398  M0_POST(fsp_client != NULL);
399 
401  if (rc != 0) {
402  m0_client_fini(fsp_client, true);
403  return rc;
404  }
405 
407  if (rc != 0) {
409  m0_client_fini(fsp_client, true);
410  }
411  return rc;
412 }
413 
414 static void fsp_fini(struct m0_fsp_params *params)
415 {
418 
419  /* Client stops its services including FDMI */
420  m0_client_fini(fsp_client, true);
421 }
422 
423 /*
424  * Signals handling
425  */
426 static void fsp_sighandler(int signum)
427 {
428  fprintf(stderr, "fdmi_sample_plugin interrupted by signal %d\n", signum);
430  terminated = 1;
431 
432  /* Restore default handlers. */
433  signal(SIGINT, SIG_DFL);
434  signal(SIGTERM, SIG_DFL);
435 }
436 
437 static int fsp_sighandler_init(void)
438 {
439  struct sigaction sa = { .sa_handler = fsp_sighandler };
440  int rc;
441 
442  sigemptyset(&sa.sa_mask);
443 
444  /* Block these signals while the handler runs. */
445  sigaddset(&sa.sa_mask, SIGINT);
446  sigaddset(&sa.sa_mask, SIGTERM);
447 
448  rc = sigaction(SIGINT, &sa, NULL) ?: sigaction(SIGTERM, &sa, NULL);
449  return rc == 0 ? 0 : M0_ERR(errno);
450 }
451 
453 {
454  fprintf(stderr,
455  "Starting params:\n"
456  " local_ep: %s\n"
457  " hare_ep : %s\n"
458  " profile_fid : %s\n"
459  " process_fid : %s\n"
460  " plugin_fid : %s\n"
461  " output as string: %s\n",
462  params->spp_local_addr, params->spp_hare_addr,
463  params->spp_profile_fid, params->spp_process_fid,
464  params->spp_fdmi_plugin_fid_s,
465  params->spp_output_strings? "true":"false");
466 }
467 
468 int main(int argc, char **argv)
469 {
470  int rc = 0;
471 
472  if (argc == 1) {
473  fsp_usage();
474  exit(EXIT_FAILURE);
475  }
476 
477  rc = fsp_args_parse(&fsp_params, argc, argv);
478  if (rc != 0) {
479  fprintf(stderr, "Args parse failed\n");
480  return M0_ERR(errno);
481  }
482 
484  if (rc != 0)
485  return M0_ERR(errno);
486 
487  rc = fsp_init(&fsp_params);
488  if (rc != 0) {
489  rc = M0_ERR(errno);
490  goto fini_sem;
491  }
492 
494  if (rc != 0)
495  goto fini_fsp;
496 
497  /* Main thread loop */
499  fprintf(stderr, "fdmi_sample_plugin waiting for signal...\n");
502 
503 fini_fsp:
505 fini_sem:
507  return M0_RC(rc < 0 ? -rc : rc);
508 }
509 
510 #undef M0_TRACE_SUBSYSTEM
511 
#define M0_GETOPTS(progname, argc, argv,...)
Definition: getopts.h:169
M0_EXTERN struct m0_reqh_service_type m0_fdmi_service_type
Definition: fdmi.h:194
static volatile int terminated
M0_INTERNAL int m0_reqh_service_start(struct m0_reqh_service *service)
Definition: reqh_service.c:343
uint32_t mc_layout_id
Definition: client.h:954
M0_INTERNAL void m0_reqh_service_stop(struct m0_reqh_service *service)
Definition: reqh_service.c:402
#define NULL
Definition: misc.h:38
M0_INTERNAL void m0_fol_rec_fini(struct m0_fol_rec *rec)
Definition: fol.c:104
const char * mc_process_fid
Definition: client.h:937
int main(int argc, char **argv)
struct m0_fdmi_record_reg * m0_fdmi__pdock_record_reg_find(const struct m0_uint128 *rid)
Definition: plugin_dock.c:135
#define M0_LOG(level,...)
Definition: trace.h:167
#define SCNx64
Definition: types.h:62
M0_INTERNAL void m0_reqh_service_prepare_to_stop(struct m0_reqh_service *service)
Definition: reqh_service.c:375
struct m0_fsp_params fsp_params
static struct m0_semaphore fsp_sem
const struct m0_fdmi_pd_ops * fsp_pdo
void m0_client_fini(struct m0_client *m0c, bool fini_m0)
Definition: client_init.c:1711
Definition: idx.h:70
static struct m0_clovis * m0c
Definition: main.c:25
struct m0_tl fr_frags
Definition: fol.h:201
void * rp_data
Definition: fol.h:249
int m0_client_init(struct m0_client **m0c, struct m0_config *conf, bool init_m0)
Definition: client_init.c:1533
#define M0_VOIDARG(ch, desc, func)
Definition: getopts.h:177
struct m0_rpc_at_buf cr_val
Definition: cas.h:182
M0_INTERNAL struct m0_reqh_service_type * m0_reqh_service_type_find(const char *sname)
Definition: reqh_service.c:168
int(* po_fdmi_rec)(struct m0_uint128 *rec_id, struct m0_buf fdmi_rec, struct m0_fid filter_id)
Definition: plugin_dock.h:62
#define m0_tl_endfor
Definition: tlist.h:700
static void fsp_sighandler(int signum)
return M0_RC(rc)
Definition: buf.h:37
struct m0_cas_rec * cr_rec
Definition: cas.h:236
struct m0_rpc_at_buf cr_key
Definition: cas.h:172
M0_INTERNAL void m0_fol_rec_init(struct m0_fol_rec *rec, struct m0_fol *fol)
Definition: fol.c:98
void(* fpo_deregister_plugin)(struct m0_fid *filter_ids, uint64_t filter_count)
Definition: plugin_dock.h:157
static char * addr
Definition: node_k.c:37
#define M0_STRINGARG(ch, desc, func)
Definition: getopts.h:207
int i
Definition: dir.c:1033
#define M0_RC_INFO(rc, fmt,...)
Definition: trace.h:209
uint64_t cr_nr
Definition: cas.h:235
const char * mc_ha_addr
Definition: client.h:935
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
return M0_ERR(-EOPNOTSUPP)
#define LAMBDA(T,...)
Definition: thread.h:153
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
static const struct socktype stype[]
Definition: sock.c:1156
static int fsp_sighandler_init(void)
static int fsp_init(struct m0_fsp_params *params)
#define U128_P(x)
Definition: types.h:45
M0_INTERNAL void m0_reqh_service_fini(struct m0_reqh_service *service)
Definition: reqh_service.c:457
bool mc_is_oostore
Definition: client.h:920
static struct m0_client * fsp_client
static int fsp_args_parse(struct m0_fsp_params *params, int argc, char **argv)
uint64_t u_hi
Definition: types.h:36
M0_INTERNAL int m0_semaphore_init(struct m0_semaphore *semaphore, unsigned value)
Definition: semaphore.c:38
#define M0_POST(cond)
Definition: xcode.h:73
static struct m0_config fsp_conf
static void dump_fol_rec_to_json(struct m0_uint128 *rec_id, struct m0_fol_rec *rec)
M0_INTERNAL int m0_reqh_service_allocate(struct m0_reqh_service **out, const struct m0_reqh_service_type *stype, struct m0_reqh_context *rctx)
Definition: reqh_service.c:185
Definition: reqh.h:94
union m0_rpc_at_buf::@447 u
struct m0_cas_recv cg_rec
Definition: cas.h:294
struct m0_fid ci_fid
Definition: cas.h:113
static void fsp_print_params(struct m0_fsp_params *params)
M0_INTERNAL void m0_reqh_service_init(struct m0_reqh_service *service, struct m0_reqh *reqh, const struct m0_fid *fid)
Definition: reqh_service.c:428
M0_INTERNAL int m0_fid_sscanf(const char *s, struct m0_fid *fid)
Definition: fid.c:227
static void fdmi_service_stop(struct m0_client *m0c)
struct m0_fid spp_fdmi_plugin_fid
#define FID_P(f)
Definition: fid.h:77
struct m0_cas_recv cg_rec
Definition: cas.h:384
static void fsp_fini(struct m0_fsp_params *params)
bool mc_is_read_verify
Definition: client.h:925
Definition: cas.h:372
void * mc_idx_service_conf
Definition: client.h:957
void(* fpo_enable_filters)(bool enable, struct m0_fid *filter_ids, uint32_t filter_count)
Definition: plugin_dock.h:138
#define U128X_F
Definition: types.h:42
static void fini_fdmi_plugin(struct m0_fsp_params *params)
M0_INTERNAL struct m0_reqh_service * m0_reqh_service_find(const struct m0_reqh_service_type *st, const struct m0_reqh *reqh)
Definition: reqh_service.c:538
struct m0_reqh reqh
Definition: rm_foms.c:48
uint32_t mc_max_rpc_msg_size
Definition: client.h:949
const char * mc_local_addr
Definition: client.h:933
int mc_idx_service_id
Definition: client.h:956
Definition: fid.h:38
M0_INTERNAL void m0_semaphore_fini(struct m0_semaphore *semaphore)
Definition: semaphore.c:45
bool kc_create_meta
Definition: idx.h:183
uint32_t mc_tm_recv_queue_min_len
Definition: client.h:944
int(* fpo_register_filter)(const struct m0_fid *fid, const struct m0_fdmi_filter_desc *desc, const struct m0_fdmi_plugin_ops *pcb)
Definition: plugin_dock.h:131
#define M0_HELPARG(ch)
Definition: getopts.h:242
Definition: common.h:34
static void fsp_usage(void)
static int fdmi_service_start(struct m0_client *m0c)
M0_INTERNAL void m0_reqh_idle_wait_for(struct m0_reqh *reqh, struct m0_reqh_service *service)
Definition: reqh.c:591
static int init_fdmi_plugin(struct m0_fsp_params *params)
struct m0t1fs_filedata * fd
Definition: dir.c:1030
struct m0_cas_rec * cr_rec
Definition: cas.h:284
const char * mc_profile
Definition: client.h:938
struct m0_ref frr_ref
Definition: plugin_dock.h:112
static struct m0_idx_dix_config fsp_dix_conf
static struct m0_cas_op * cas_op(const struct m0_fom *fom)
Definition: service.c:1761
M0_INTERNAL void m0_semaphore_down(struct m0_semaphore *semaphore)
Definition: semaphore.c:49
static void fdmi_plugin_record_ack()
#define out(...)
Definition: gen.c:41
M0_INTERNAL void m0_semaphore_up(struct m0_semaphore *semaphore)
Definition: semaphore.c:65
uint64_t u_lo
Definition: types.h:37
def start_service(self, service, idx)
static int process_fdmi_record(struct m0_uint128 *rec_id, struct m0_buf fdmi_rec, struct m0_fid filter_id)
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
static struct m0_reqh_service * fsp_fdmi_service
void(* fpo_release_fdmi_rec)(struct m0_uint128 *rec_id, struct m0_fid *filter_id)
Definition: plugin_dock.h:148
int32_t rc
Definition: trigger_fop.h:47
char * spp_fdmi_plugin_fid_s
struct m0_cas_id cg_id
Definition: cas.h:374
#define FID_F
Definition: fid.h:75
uint32_t ffrp_fop_code
Definition: fop.h:354
M0_INTERNAL int m0_fol_rec_decode(struct m0_fol_rec *rec, struct m0_buf *at)
Definition: fol.c:331
const struct m0_fdmi_pd_ops * m0_fdmi_plugin_dock_api_get(void)
Definition: plugin_dock.c:563