Motr  M0
idx_cass.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #include "motr/client.h"
24 #include "motr/client_internal.h"
25 #include "motr/addb.h"
26 #include "motr/idx.h"
27 
28 #include "lib/errno.h" /* ENOMEM */
29 
30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CLIENT
31 #include "lib/trace.h"
32 
88 #ifndef __KERNEL__
89 
90 #include <uv.h>
91 #include <cassandra.h>
92 
94  char *ci_keyspace;
95  CassCluster *ci_cluster;
96  CassSession *ci_session;
97 };
98 
103 static const char cass_table_magic[] = "cass_v150915";
104 static const char cass_idx_table_magic[] = "cass_idx_v150915";
105 
109 };
110 
117 };
118 
119 
121 static const CassPrepared **idx_cass_prepared_set;
122 
127 static int get_table_id(struct m0_uint128 fid)
128 {
129  return 0;
130 }
131 
132 static void make_table_name(char *table, int table_id, const char *table_magic)
133 {
134  sprintf(table, "%s_%d", table_magic, table_id);
135 }
136 
137 static void print_query_error(CassFuture *future)
138 {
139  const char *err_msg;
140  size_t msg_len;
141 
142  cass_future_error_message(future, &err_msg, &msg_len);
143  M0_LOG(M0_ERROR, "Cassandra query failed: %.*s", (int)msg_len, err_msg);
144 }
145 
146 static CassResult* execute_query_sync(CassSession *session,
147  CassStatement *statement)
148 {
149  int rc;
150  CassFuture *future;
151  CassResult *result;
152 
153  future = cass_session_execute(session, statement);
154 
155  cass_future_wait(future);
156  rc = cass_future_error_code(future);
157  if (rc != CASS_OK) {
158  print_query_error(future);
159  return NULL;
160  }
161 
162  result = (CassResult *)cass_future_get_result(future);
163  cass_future_free(future);
164 
165  return result;
166 }
167 
168 static void execute_query_async(CassSession *session, CassStatement *statement,
169  CassFutureCallback cb, void *cb_data)
170 {
171  CassFuture *future;
172 
173  future = cass_session_execute(session, statement);
174  cass_future_set_callback(future, cb, cb_data);
175 
176  cass_future_free(future);
177 }
178 
179 static CassError execute_query_batch(CassSession *session,
180  CassBatch *batch,
181  CassFutureCallback cb,
182  void *cb_data)
183 {
184  CassError error;
185  CassFuture *future;
186 
187  future = cass_session_execute_batch(session, batch);
188  error = cass_future_set_callback(future, cb, cb_data);
189  cass_future_free(future);
190 
191  return error;
192 }
193 
194 static const CassPrepared* create_prepared(CassSession *session, char *query)
195 {
196  CassError rc;
197  CassFuture *future = NULL;
198  const CassPrepared *prepared;
199 
200  future = cass_session_prepare(session, query);
201  cass_future_wait_timed(future, 300000);
202 
203  rc = cass_future_error_code(future);
204  if (rc != CASS_OK) {
205  print_query_error(future);
206  prepared = NULL;
207  } else
208  prepared = cass_future_get_prepared(future);
209 
210  cass_future_free(future);
211  return prepared;
212 }
213 
214 #define CQL_GET \
215  ("SELECT * FROM %s WHERE index_fid = ? AND key = ?")
216  //("SELECT * FROM %s WHERE index_fid = ?;")
217 #define CQL_PUT \
218  ("INSERT INTO %s (index_fid, key, value) VALUES(?, ?, ?)")
219 #define CQL_DEL \
220  ("DELETE FROM %s WHERE index_fid = ? AND key = ?")
221 #define CQL_NEXT \
222  ("SELECT * FROM %s WHERE index_fid = ? AND key > ?")
223 
224 static const char *query_type_map[] = {
225  [IDX_CASS_GET] = CQL_GET,
226  [IDX_CASS_PUT] = CQL_PUT,
227  [IDX_CASS_DEL] = CQL_DEL,
229 };
230 
231 static char *make_query_string(int query_type, int table_id,
232  const char *table_magic)
233 {
234  char table[MAX_CASS_TABLE_NAME_LEN];
235  char *query = NULL;
236 
238  if (query == NULL)
239  goto exit;
240 
241  make_table_name(table, table_id, table_magic);
242 
244  if (IS_IN_ARRAY(query_type, query_type_map))
245  sprintf(query, query_type_map[query_type], table);
246  else {
247  M0_LOG(M0_ERROR, "Query type %i NOT supported.", query_type);
248  m0_free0(&query);
249  }
250 exit:
251  return query;
252 }
253 
254 static int set_prepared(CassSession *session, int query_type, int table_id)
255 {
256  int loc;
257  char *query;
258  const CassPrepared *prepared = NULL;
259 
260  M0_ENTRY();
261 
262  query = make_query_string(query_type, table_id, cass_table_magic);
263  if (query == NULL)
264  return M0_ERR(-ENOMEM);
265 
266  /* Put into prepared statement array for future reference. */
267  prepared = create_prepared(session, query);
268  loc = table_id * IDX_CASS_STATEMENT_TYPE_NR + query_type;
269  M0_LOG(M0_DEBUG, "loc = %d", loc);
270  idx_cass_prepared_set[loc] = prepared;
271 
272  m0_free(query);
273  return M0_RC(0);
274 }
275 
276 static const CassPrepared* get_prepared(CassSession *session,
277  int query_type, int table_id)
278 {
279  int rc;
280  int loc;
281  const CassPrepared *prepared = NULL;
282 
283  M0_ENTRY();
284 
285  loc = table_id * IDX_CASS_STATEMENT_TYPE_NR + query_type;
286  if( idx_cass_prepared_set[loc] != NULL) {
287  prepared = idx_cass_prepared_set[loc];
288  goto exit;
289  }
290 
291  /* Create a prepared statement if it does not exist. */
292  rc = set_prepared(session, query_type, table_id);
293  if (rc != 0)
294  prepared = NULL;
295  else
296  prepared = idx_cass_prepared_set[loc];
297 
298 exit:
299  M0_LEAVE();
300  return prepared;
301 }
302 
303 static int init_prepared_set(CassSession *session)
304 {
305  int i;
306  int nr_statements;
307 
308  nr_statements =
311  m0_alloc(nr_statements * sizeof(CassPrepared *));
313  return M0_ERR(-ENOMEM);
314 
315  for (i = 0; i < nr_statements; i++)
317 
318  return M0_RC(0);
319 }
320 
321 static void free_prepared_set()
322 {
323  int i;
324  int nr_statements;
325 
326  nr_statements =
328  for (i = 0; i < nr_statements; i++) {
329  if (idx_cass_prepared_set[i] != NULL)
330  cass_prepared_free(idx_cass_prepared_set[i]);
331  }
332 
334 }
335 
336 #ifdef IDX_CASS_DRV_V22 /* Cassandra cpp driver version >=2.2. */
337 
338 static int table_exists(CassSession *session, char *keyspace, int table_id,
339  const char *table_magic)
340 {
341  int rc;
342  char table[MAX_CASS_TABLE_NAME_LEN];
343  const CassSchemaMeta *schema_meta;
344  const CassKeyspaceMeta *keyspace_meta;
345  const CassTableMeta *table_meta;
346 
347  schema_meta = cass_session_get_schema_meta(session);
348  keyspace_meta =
349  cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
350  if (keyspace_meta == NULL) {
351  cass_schema_meta_free(schema_meta);
352  rc = -EINVAL;
353  goto exit;
354  }
355 
356  make_table_name(table, table_id, table_magic);
357  table_meta = cass_keyspace_meta_table_by_name(keyspace_meta, table);
358  if (table_meta != NULL)
359  rc = 1;
360  else
361  rc = 0;
362 
363 exit:
364  cass_schema_meta_free(schema_meta);
365  return M0_RC(rc);
366 }
367 
368 #else /* Cassandra cpp driver version < 2.2. */
369 
370 static int table_exists(CassSession *session, char *keyspace, int table_id,
371  const char *table_magic)
372 {
373  int rc;
374  char table[MAX_CASS_TABLE_NAME_LEN];
375  const CassSchema *schema;
376  const CassSchemaMeta *keyspace_meta;
377  const CassSchemaMeta *table_meta;
378 
379  schema = cass_session_get_schema(session);
380  keyspace_meta = cass_schema_get_keyspace(schema, keyspace);
381  if (keyspace_meta == NULL) {
382  rc = -EINVAL;
383  goto exit;
384  }
385 
386  make_table_name(table, table_id, table_magic);
387  table_meta = cass_schema_meta_get_entry(keyspace_meta, table);
388  if (table_meta != NULL)
389  rc = 1;
390  else
391  rc = 0;
392 
393 exit:
394  cass_schema_free(schema);
395  return M0_RC(rc);
396 }
397 #endif
398 
399 static int row_exists(CassSession *session, int table_id,
400  struct m0_uint128 idx_fid, const char *table_magic)
401 {
402  int rc = 0;
403  char *query;
404  CassStatement *statement;
405  CassResult *query_result;
406  CassIterator *iterator;
407  char table[MAX_CASS_TABLE_NAME_LEN];
408  char idx_fid_str[64];
409 
411  if (query == NULL)
412  return M0_ERR(-ENOMEM);
413 
414  make_table_name(table, table_id, table_magic);
415  sprintf(query, "SELECT * FROM %s WHERE index_fid = ?", table);
416 
417  statement = cass_statement_new(query, 1);
418  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64, idx_fid.u_hi, idx_fid.u_lo);
419  cass_statement_bind_string(statement, 0, idx_fid_str);
420 
421  query_result = execute_query_sync(session, statement);
422  if (query_result == NULL)
423  return M0_ERR(-EINVAL);
424 
425  /* Check if Cassandra returns any rows. */
426  iterator = cass_iterator_from_result(query_result);
427  if (cass_iterator_next(iterator))
428  rc = 1;
429  else
430  rc = 0;
431 
432  m0_free(query);
433  cass_statement_free(statement);
434  cass_iterator_free(iterator);
435  cass_result_free(query_result);
436 
437  return M0_RC(rc);
438 }
439 
440 static struct idx_cass_instance* get_cass_inst(struct m0_op_idx *oi)
441 {
442  struct m0_client *m0c;
443 
445  return (struct idx_cass_instance *) m0c->m0c_idx_svc_ctx.isc_svc_inst;
446 }
447 
448 static bool idx_exists(struct m0_op_idx *oi)
449 {
450  int table_id;
451  struct m0_uint128 idx_fid;
452  CassSession *session;
453  int exist;
454  char *keyspace;
455 
456  M0_ENTRY();
457 
458  idx_fid = oi->oi_idx->in_entity.en_id;
459 
460  /* Form a 'SELECT' query string. */
462  keyspace = get_cass_inst(oi)->ci_keyspace;
463  table_id = get_table_id(idx_fid);
464 
465  /* Check if the table storing rows of this index exists. */
466  exist = table_exists(session, keyspace, table_id, cass_idx_table_magic);
467  if (exist > 0) {
468  /* Check if there exists any row for this index. */
469  exist = row_exists(session, table_id, idx_fid,
471  }
472 
473  return exist > 0 ? M0_RC(true) : M0_RC(false);
474 }
475 
483 static void idx_cass_query_cb(CassFuture* future, void* data)
484 {
485  struct m0_op_idx *oi;
486  CassError rc;
487 
488  M0_ENTRY();
489 
490  oi = (struct m0_op_idx *)data;
491 
492  rc = cass_future_error_code(future);
493  if (rc != CASS_OK) {
494  oi->oi_query_rc = rc;
495  print_query_error(future);
496  }
497 
498  oi->oi_nr_queries--;
499  if (oi->oi_nr_queries != 0)
500  return;
501 
502  /*
503  * An index GET queryClient is considered success only if operation is
504  * executed successfully. If operation failed to execute, i.e. global
505  * error code, oi->oi_query_rc, is not success client will call
506  * fail callback.
507  * In all other cases, operation is considered success.
508  * (Even if all the keys presented in queries are absent from store.).
509  */
510  if (rc == CASS_OK) {
512  oi->oi_ar.ar_rc = 0;
514 
515  } else {
517  oi->oi_ar.ar_rc = -rc;
519  }
520 }
521 
522 static int idx_cass_namei_new(struct m0_op_idx *oi)
523 {
524  char *query;
525  char table[MAX_CASS_TABLE_NAME_LEN];
526  CassSession *session;
527  CassStatement *statement;
528  struct idx_cass_instance *cass_inst;
529  char idx_fid_str[64];
530  struct m0_uint128 idx_fid;
531 
532  cass_inst = get_cass_inst(oi);
533  session = cass_inst->ci_session;
534  idx_fid = oi->oi_idx->in_entity.en_id;
535 
536  /* If the index exits */
537  if (idx_exists(oi))
538  return M0_ERR(-EEXIST);
539 
540  oi->oi_query_rc = CASS_OK;
541  oi->oi_nr_queries = 1;
542 
544  if (query == NULL)
545  return M0_ERR(-ENOMEM);
546 
547  make_table_name(table,
550  sprintf(query,
551  "INSERT INTO %s (index_fid) VALUES(?)", table);
552  statement = cass_statement_new(query, 1);
553  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64,
554  idx_fid.u_hi, idx_fid.u_lo);
555 
556  cass_statement_bind_string(statement, 0, idx_fid_str);
557 
560  m0_free(query);
561  cass_statement_free(statement);
562 
563  /* Return 1 */
564  return M0_RC(1);
565 }
566 
567 static int idx_cass_namei_drop(struct m0_op_idx *oi)
568 {
569  char *query;
570  char table[MAX_CASS_TABLE_NAME_LEN];
571  CassSession *session;
572  CassStatement *statement;
573  struct idx_cass_instance *cass_inst;
574  char idx_fid_str[64];
575  struct m0_uint128 idx_fid;
576  CassBatch *batch;
577 
578  cass_inst = get_cass_inst(oi);
579  session = cass_inst->ci_session;
580  idx_fid = oi->oi_idx->in_entity.en_id;
581  batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
582 
583  /*
584  * Delete all the KV-pairs associated with an index.
585  */
586  oi->oi_query_rc = CASS_OK;
587  oi->oi_nr_queries = 1;
588 
590  if (query == NULL)
591  return M0_ERR(-ENOMEM);
592 
595  sprintf(query,
596  "DELETE from %s where index_fid = ?", table);
597 
598  statement = cass_statement_new(query, 1);
599  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64, idx_fid.u_hi, idx_fid.u_lo);
600  cass_statement_bind_string(statement, 0, idx_fid_str);
601  cass_batch_add_statement(batch, statement);
602  cass_statement_free(statement);
603 
604  /* Now delete the index entry from the index table. */
607  sprintf(query,
608  "DELETE from %s where index_fid = ?", table);
609 
610  statement = cass_statement_new(query, 1);
611  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64, idx_fid.u_hi, idx_fid.u_lo);
612  cass_statement_bind_string(statement, 0, idx_fid_str);
613  cass_batch_add_statement(batch, statement);
615 
616  m0_free(query);
617  cass_statement_free(statement);
618 
619  return M0_RC(1);
620 }
621 
622 static int idx_cass_namei_lookup(struct m0_op_idx *oi)
623 {
624  return M0_ERR(-ENOSYS);
625 }
626 
627 static int idx_cass_namei_list(struct m0_op_idx *oi)
628 {
629  return M0_ERR(-ENOSYS);
630 }
631 
632 struct get_cb_data {
634  struct m0_op_idx *gcd_oi;
635 };
636 
637 static void idx_cass_get_cb(CassFuture* future, void* data)
638 {
639  int kv_idx = 0;
640  struct m0_bufvec *vals;
641  struct m0_op_idx *oi;
642  const CassResult *result;
643  CassIterator *iterator = NULL;
644  const CassRow *row;
645  const cass_byte_t *bytes;
646  size_t nr_bytes;
647 
648  M0_ENTRY();
649 
650  oi = ((struct get_cb_data *)data)->gcd_oi;
651  kv_idx = ((struct get_cb_data *)data)->gcd_kv_idx;
652  vals = oi->oi_vals;
653 
654  result = cass_future_get_result(future);
655  if (result == NULL) {
656  oi->oi_rcs[kv_idx] = -ENOENT;
657  goto query_cb;
658  }
659 
660  iterator = cass_iterator_from_result(result);
661  if (iterator == NULL) {
662  oi->oi_rcs[kv_idx] = -ENOENT;
663  goto query_cb;
664  }
665 
666  if (cass_iterator_next(iterator)) {
667  M0_LOG(M0_DEBUG, "Get row and extract values.");
668  row = cass_iterator_get_row(iterator);
669  cass_value_get_bytes(cass_row_get_column(row, 2),
670  &bytes, &nr_bytes);
671 
672  /* Copy value back to K-V pair. */
673  M0_LOG(M0_DEBUG, "Copy value.");
674  vals->ov_buf[kv_idx] = m0_alloc(nr_bytes);
675  if (vals->ov_buf[kv_idx] == NULL) /*TODO: How to handle this error? */
676  goto query_cb;
677  memcpy(vals->ov_buf[kv_idx], bytes, nr_bytes);
678  vals->ov_vec.v_count[kv_idx] = nr_bytes;
679  oi->oi_rcs[kv_idx] = 0;
680  } else {
681  oi->oi_rcs[kv_idx] = -ENOENT;
682  }
683 
684 
685 query_cb:
686  idx_cass_query_cb(future, oi);
687 
688  /* Free result and 'data'. */
689  cass_iterator_free(iterator);
690  cass_result_free(result);
691  m0_free(data);
692  M0_LEAVE();
693 }
694 
695 /*
696  * The simplest implementation: one query for one K-V pair, it is not
697  * optimized for performance just to make index GET work.
698  */
699 static int idx_cass_get(struct m0_op_idx *oi)
700 {
701  int i;
702  int rc = 0;
703  int nr_selects = 0;
704  int table_id;
705  char *query = NULL;
706  char idx_fid_str[64];
707  struct m0_uint128 idx_fid;
708  struct m0_bufvec *keys;
709  CassSession *session;
710  CassStatement *statement;
711  struct get_cb_data *gcd;
712  bool exist;
713 
714  M0_ENTRY();
715 
716  idx_fid = oi->oi_idx->in_entity.en_id;
717  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64, idx_fid.u_hi, idx_fid.u_lo);
718 
719  /* Form a 'SELECT' query string. */
721  table_id = get_table_id(idx_fid);
722 
723  /* If the index exits */
724  exist = idx_exists(oi);
725  if (!exist)
726  return M0_ERR(-ENOENT);
727 
729  if (query == NULL)
730  return M0_ERR(-ENOMEM);
731 
732  /*
733  * Examine each key, form a new statement for a key and
734  * pack into a batch query.
735  */
736  keys = oi->oi_keys;
737  oi->oi_query_rc = CASS_OK;
738  oi->oi_nr_queries = keys->ov_vec.v_nr;
739  for (i = 0; i < keys->ov_vec.v_nr; i++) {
740  /* Crate a statement for each K-V pair. */
741  statement = cass_statement_new(query, 2);
742  cass_statement_bind_string(statement, 0, idx_fid_str); //"0_8ad6");
743  cass_statement_bind_bytes(statement, 1,
744  (cass_byte_t *)keys->ov_buf[i],
745  keys->ov_vec.v_count[i]);
746 
747  /* Set callback and its data, then issue the query. */
748  gcd = m0_alloc(sizeof(struct get_cb_data));
749  if (gcd == NULL) {
750  rc = -ENOMEM;
751  break;
752  }
753 
754  gcd->gcd_oi = oi;
755  gcd->gcd_kv_idx = i;
757 
758  /* Post-processing.*/
759  nr_selects++;
760  cass_statement_free(statement);
761  }
762  rc = (nr_selects > 0)?1:rc;
763 
764  m0_free(query);
765  return M0_RC(rc);
766 }
767 
768 static int idx_cass_put(struct m0_op_idx *oi)
769 {
770  int i;
771  int table_id;
772  CassSession *session;
773  CassStatement *statement;
774  const CassPrepared *prepared;
775  CassBatch *batch;
776  struct m0_bufvec *keys;
777  struct m0_bufvec *vals;
778  struct m0_uint128 idx_fid;
779  char idx_fid_str[64];
780  bool exist;
781 
782  M0_ENTRY();
783 
784  keys = oi->oi_keys;
785  vals = oi->oi_vals;
786  if (keys->ov_vec.v_nr != vals->ov_vec.v_nr)
787  return M0_RC(-EINVAL);
788 
789  idx_fid = oi->oi_idx->in_entity.en_id;
790  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64, idx_fid.u_hi, idx_fid.u_lo);
791 
792  /* All K-V pair insert are added into one 'big' BATCH query. */
793  table_id = get_table_id(idx_fid);
795  batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
796 
797  /* If the index exits */
798  exist = idx_exists(oi);
799  if (!exist)
800  return M0_ERR(-ENOENT);
801 
802  oi->oi_nr_queries = 1;
803  oi->oi_query_rc = CASS_OK;
804  for (i = 0; i < keys->ov_vec.v_nr; i++) {
805  prepared = get_prepared(session, IDX_CASS_PUT, table_id);
806  if (prepared == NULL)
807  goto exit;
808 
809  M0_LOG(M0_DEBUG, "Bind Cassandra statement.");
810  statement = cass_prepared_bind(prepared);
811  cass_statement_bind_string(statement, 0, idx_fid_str);
812  cass_statement_bind_bytes(statement, 1,
813  keys->ov_buf[i], keys->ov_vec.v_count[i]);
814  cass_statement_bind_bytes(statement, 2,
815  vals->ov_buf[i], vals->ov_vec.v_count[i]);
816 
817  M0_LOG(M0_DEBUG, "Add Cassandra statement to batch.");
818  cass_batch_add_statement(batch, statement);
819 
820  cass_statement_free(statement);
821  }
822 
823  /* Actual BATCH query is executed here. */
824  M0_LOG(M0_DEBUG, "Issue batch query for K-V PUT.");
826 
827 exit:
828  cass_batch_free(batch);
829  return M0_RC(1);
830 }
831 
832 static int idx_cass_del(struct m0_op_idx *oi)
833 {
834  int i;
835  int table_id;
836  CassSession *session;
837  CassStatement *statement;
838  CassBatch *batch;
839  struct m0_bufvec *keys;
840  struct m0_uint128 idx_fid;
841  char idx_fid_str[64];
842  char *query;
843 
844  M0_ENTRY();
845 
846  keys = oi->oi_keys;
847  idx_fid = oi->oi_idx->in_entity.en_id;
848  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64, idx_fid.u_hi, idx_fid.u_lo);
849 
850  /* All K-V pair insert are added into one 'big' BATCH query. */
851  batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
852 
853  /* Form a 'DELETE' query string. */
855  table_id = get_table_id(idx_fid);
857  if (query == NULL)
858  return M0_ERR(-ENOMEM);
859 
860  oi->oi_nr_queries = 1;
861  oi->oi_query_rc = CASS_OK;
862  for (i = 0; i < keys->ov_vec.v_nr; i++) {
863  M0_LOG(M0_DEBUG, "Bind Cassandra statement.");
864  statement = cass_statement_new(query, 2);
865  cass_statement_bind_string(statement, 0, idx_fid_str);
866  cass_statement_bind_bytes(statement, 1,
867  keys->ov_buf[i], keys->ov_vec.v_count[i]);
868 
869  M0_LOG(M0_DEBUG, "Add Cassandra statement to batch.");
870  cass_batch_add_statement(batch, statement);
871 
872  cass_statement_free(statement);
873  }
874 
875  /* Actual BATCH query is executed here. */
876  M0_LOG(M0_DEBUG, "Issue batch query for K-V DEL.");
878 
879  cass_batch_free(batch);
880  m0_free(query);
881  return M0_RC(1);
882 }
883 
884 static int copy_to_kv(int i, struct m0_bufvec *keys, struct m0_bufvec *vals,
885  const cass_byte_t *k_bytes, size_t k_size,
886  const cass_byte_t *v_bytes, size_t v_size)
887 {
888  keys->ov_buf[i] = m0_alloc(k_size);
889  if (keys->ov_buf[i] == NULL)
890  return M0_ERR(-ENOMEM);
891 
892  vals->ov_buf[i] = m0_alloc(v_size);
893  if (vals->ov_buf[i] == NULL) {
894  m0_free(keys->ov_buf[i]);
895  return M0_ERR(-ENOMEM);
896  }
897 
898  memcpy(keys->ov_buf[i], k_bytes, k_size);
899  keys->ov_vec.v_count[i] = k_size;
900 
901  memcpy(vals->ov_buf[i], v_bytes, v_size);
902  vals->ov_vec.v_count[i] = v_size;
903 
904  return M0_RC(0);
905 }
906 
907 static void idx_cass_next_cb(CassFuture* future, void* data)
908 {
909  int rc;
910  int kv_cnt = 0;
911  struct m0_bufvec *keys;
912  struct m0_bufvec *vals;
913  size_t k_size;
914  size_t v_size;
915  const cass_byte_t *k_bytes;
916  const cass_byte_t *v_bytes;
917  const CassResult *result;
918  CassIterator *iterator = NULL;
919  const CassRow *row;
920  struct m0_op_idx *oi;
921 
922  oi = (struct m0_op_idx *)data;
923  keys = oi->oi_keys;
924  vals = oi->oi_vals;
925 
926  /* Reset buf for the first key. */
927  m0_free(keys->ov_buf[0]);
928  keys->ov_buf[0] = NULL;
929  keys->ov_vec.v_count[0] = 0;
930 
931  /* Iterate over each returned rows and copy keys and values. */
932  result = cass_future_get_result(future);
933  if (result == NULL)
934  goto query_cb;
935 
936  iterator = cass_iterator_from_result(result);
937  if (iterator == NULL)
938  goto query_cb;
939 
940  while (cass_iterator_next(iterator)) {
941  row = cass_iterator_get_row(iterator);
942 
943  cass_value_get_bytes(cass_row_get_column(row, 1),
944  &k_bytes, &k_size);
945  cass_value_get_bytes(cass_row_get_column(row, 2),
946  &v_bytes, &v_size);
947 
948  /* Copy key and value. */
949  rc = copy_to_kv(kv_cnt, keys, vals,
950  k_bytes, k_size, v_bytes, v_size);
951  if (rc != 0) {
952  break;
953  }
954 
955  kv_cnt++;
956  if (kv_cnt >= keys->ov_vec.v_nr)
957  break;
958  }
959 
960 
961 query_cb:
962  idx_cass_query_cb(future, oi);
963 
964  cass_iterator_free(iterator);
965  /* Free query result. */
966  cass_result_free(result);
967 }
968 
969 static int idx_cass_next(struct m0_op_idx *oi)
970 {
971  int table_id;
972  char *query = NULL;
973  char idx_fid_str[64];
974  char keys_v_nr[12];
975  struct m0_uint128 idx_fid;
976  struct m0_bufvec *keys;
977  CassSession *session;
978  CassStatement *statement;
979  int64_t null_key = 0;
980  bool exist;
981 
982  M0_ENTRY();
983 
984  keys = oi->oi_keys;
985  idx_fid = oi->oi_idx->in_entity.en_id;
986  sprintf(idx_fid_str, "%" PRIx64 "_%"PRIx64, idx_fid.u_hi, idx_fid.u_lo);
987 
989 
990  /* Make a query statement for NEXT. */
991  table_id = get_table_id(idx_fid);
992 
993  /* If the index exists */
994  exist = idx_exists(oi);
995  if (!exist)
996  return M0_ERR(-ENOENT);
997 
999  if (query == NULL)
1000  return M0_ERR(-ENOMEM);
1001 
1002  strcat(query, " LIMIT ");
1003  sprintf(keys_v_nr, "%d", keys->ov_vec.v_nr);
1004  strcat(query, keys_v_nr);
1005 
1006  statement = cass_statement_new(query, 2);
1007  cass_statement_bind_string(statement, 0, idx_fid_str);
1008 
1009  /*
1010  * If the start key is not specified, records are retrieved
1011  * from the very beginning of the index. 0 can be safely seen
1012  * as the minimum key value no matter how an application defines
1013  * its own key data structure.
1014  */
1015  if (keys->ov_buf[0] == NULL) {
1016  cass_statement_bind_bytes(statement, 1,
1017  (cass_byte_t *)&null_key, sizeof null_key);
1018  } else {
1019  cass_statement_bind_bytes(statement, 1,
1020  keys->ov_buf[0], keys->ov_vec.v_count[0]);
1021 
1022  }
1023 
1024  /* Set callback and its data, then issue the query. */
1025  oi->oi_nr_queries = 1;
1026  oi->oi_query_rc = CASS_OK;
1028 
1029  /* Post-processing.*/
1030  cass_statement_free(statement);
1031 
1032  m0_free(query);
1033  return M0_RC(1);
1034 }
1035 
1038  .iqo_namei_delete = idx_cass_namei_drop,
1039  .iqo_namei_lookup = idx_cass_namei_lookup,
1040  .iqo_namei_list = idx_cass_namei_list,
1041 
1042  .iqo_get = idx_cass_get,
1043  .iqo_put = idx_cass_put,
1044  .iqo_del = idx_cass_del,
1045  .iqo_next = idx_cass_next,
1046 };
1047 
1052 static CassCluster* create_cluster(char *ep)
1053 {
1054  CassCluster *cluster;
1055 
1056  cluster = cass_cluster_new();
1057  cass_cluster_set_contact_points(cluster, ep);
1058 
1059  return cluster;
1060 }
1061 
1062 static CassSession* connect_session(CassCluster *cluster, char *keyspace)
1063 {
1064  CassError rc;
1065  CassSession *session;
1066  CassFuture *future;
1067 
1068  session = cass_session_new();
1069  future = cass_session_connect_keyspace(session, cluster, keyspace);
1070 
1071  cass_future_wait(future);
1072  rc = cass_future_error_code(future);
1073  if (rc != CASS_OK) {
1074  print_query_error(future);
1075 
1076  cass_session_free(session);
1077  session = NULL;
1078  }
1079  cass_future_free(future);
1080 
1081  return session;
1082 }
1083 
1084 static void terminate_session(CassSession *session)
1085 {
1086  CassFuture *future;
1087 
1088  future = cass_session_close(session);
1089  cass_future_wait(future);
1090  cass_future_free(future);
1091 }
1092 
1093 static int idx_cass_init(void *svc)
1094 {
1095  int rc;
1096  char *ep;
1097  char *keyspace = NULL;
1098  struct m0_idx_service_ctx *ctx;
1099  struct m0_idx_cass_config *conf;
1100  struct idx_cass_instance *inst;
1101  CassCluster *cluster;
1102  CassSession *session = NULL;
1103 
1104  M0_ENTRY();
1105 
1106  /* Connect to Cassandra cluster. */
1107  if (svc == NULL)
1108  return M0_ERR(-EINVAL);
1109  ctx = (struct m0_idx_service_ctx *)svc;
1110  conf = (struct m0_idx_cass_config *)ctx->isc_svc_conf;
1111  ep = conf->cc_cluster_ep;
1112  keyspace = conf->cc_keyspace;
1113 
1114  rc = -ENETUNREACH;
1115  cluster = create_cluster(ep);
1116  if (cluster == NULL)
1117  goto error;
1118 
1119  session = connect_session(cluster, keyspace);
1120  if (session == NULL)
1121  goto error;
1122 
1123  /* Set Cassandra instance. */
1124  inst = m0_alloc(sizeof *inst);
1125  if (inst == NULL) {
1126  rc = -ENOMEM;
1127  goto error;
1128  }
1129 
1130  inst->ci_cluster = cluster;
1131  inst->ci_session = session;
1132  inst->ci_keyspace = keyspace;
1133  ctx->isc_svc_inst = inst;
1134 
1135  /* Set prepared statements. */
1136  idx_cass_nr_tables = conf->cc_max_column_family_num;
1138 
1139  return M0_RC(0);
1140 
1141 error:
1142  if (session != NULL) {
1144  cass_session_free(session);
1145  }
1146  if (cluster != NULL)
1147  cass_cluster_free(cluster);
1148 
1149  return M0_RC(rc);
1150 }
1151 
1152 static int idx_cass_fini(void *svc)
1153 {
1154  struct idx_cass_instance *inst;
1155 
1156  M0_ENTRY();
1157 
1158  if (svc == NULL)
1159  return M0_ERR(-EINVAL);
1160  inst = ((struct m0_idx_service_ctx *)svc)->isc_svc_inst;
1161 
1163  cass_session_free(inst->ci_session);
1164  cass_cluster_free(inst->ci_cluster);
1165 
1166  m0_free(inst);
1167 
1168  /*TODO: prepared statements. */
1170 
1171  return M0_RC(0);
1172 }
1173 
1176  .iso_fini = idx_cass_fini
1177 };
1178 
1179 #else
1180 static struct m0_idx_query_ops idx_cass_query_ops = {
1182  .iqo_namei_delete = NULL,
1183  .iqo_namei_lookup = NULL,
1184  .iqo_namei_list = NULL,
1185  .iqo_get = NULL,
1186  .iqo_put = NULL,
1187  .iqo_del = NULL,
1188  .iqo_next = NULL,
1189 };
1190 
1191 static struct m0_idx_service_ops idx_cass_svc_ops = {
1192  .iso_init = NULL,
1193  .iso_fini = NULL
1194 };
1195 
1196 #endif
1197 
1198 M0_INTERNAL void m0_idx_cass_register(void)
1199 {
1202 }
1203 
1204 #undef M0_TRACE_SUBSYSTEM
1205 
1206 /*
1207  * Local variables:
1208  * c-indentation-style: "K&R"
1209 
1210  * c-basic-offset: 8
1211  * tab-width: 8
1212  * fill-column: 80
1213  * scroll-step: 1
1214  * End:
1215  */
1216 /*
1217  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
1218  */
static int idx_cass_nr_tables
Definition: idx_cass.c:120
#define CQL_PUT
Definition: idx_cass.c:217
static int idx_cass_get(struct m0_op_idx *oi)
Definition: idx_cass.c:699
int32_t oi_nr_queries
CassCluster * ci_cluster
Definition: idx_cass.c:95
#define NULL
Definition: misc.h:38
static int get_table_id(struct m0_uint128 fid)
Definition: idx_cass.c:127
static void execute_query_async(CassSession *session, CassStatement *statement, CassFutureCallback cb, void *cb_data)
Definition: idx_cass.c:168
M0_INTERNAL void idx_op_ast_complete(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: idx.c:311
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
Definition: sm.h:506
def query(from_, to_, range_end, plug_name, time_unit)
Definition: hist.py:65
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: sm.c:135
#define M0_CASSERT(cond)
M0_INTERNAL void idx_op_ast_fail(struct m0_sm_group *grp, struct m0_sm_ast *ast)
Definition: idx.c:367
M0_INTERNAL void m0_idx_service_register(int svc_id, struct m0_idx_service_ops *sops, struct m0_idx_query_ops *qops)
Definition: idx.c:670
struct m0_vec ov_vec
Definition: vec.h:147
static int idx_cass_namei_lookup(struct m0_op_idx *oi)
Definition: idx_cass.c:622
struct m0_bufvec data
Definition: di.c:40
static int idx_cass_fini(void *svc)
Definition: idx_cass.c:1152
static struct m0_clovis * m0c
Definition: main.c:25
idx_cass_table_query_len
Definition: idx_cass.c:106
struct m0_op oc_op
Definition: conf.py:1
static int error
Definition: mdstore.c:64
static struct m0_rpc_session session
Definition: formation2.c:38
struct m0_op_common oi_oc
struct m0_bufvec * oi_keys
static int init_prepared_set(CassSession *session)
Definition: idx_cass.c:303
void ** ov_buf
Definition: vec.h:149
idx_cass_prepared_statement_type
Definition: idx_cass.c:111
#define PRIx64
Definition: types.h:61
#define CQL_DEL
Definition: idx_cass.c:219
static const char cass_table_magic[]
Definition: idx_cass.c:103
struct m0_fid fid
Definition: di.c:46
return M0_RC(rc)
Definition: sock.c:754
#define M0_ENTRY(...)
Definition: trace.h:170
int(* iqo_namei_create)(struct m0_op_idx *oi)
Definition: idx.h:123
static int idx_cass_next(struct m0_op_idx *oi)
Definition: idx_cass.c:969
struct m0_entity in_entity
Definition: client.h:836
int i
Definition: dir.c:1033
static const char * query_type_map[]
Definition: idx_cass.c:224
static int idx_cass_init(void *svc)
Definition: idx_cass.c:1093
static void idx_cass_query_cb(CassFuture *future, void *data)
Definition: idx_cass.c:483
return M0_ERR(-EOPNOTSUPP)
static int idx_cass_namei_new(struct m0_op_idx *oi)
Definition: idx_cass.c:522
struct m0_idx * oi_idx
char * ci_keyspace
Definition: idx_cass.c:94
static int idx_cass_del(struct m0_op_idx *oi)
Definition: idx_cass.c:832
static struct idx_cass_instance * get_cass_inst(struct m0_op_idx *oi)
Definition: idx_cass.c:440
#define m0_free0(pptr)
Definition: memory.h:77
static CassCluster * create_cluster(char *ep)
Definition: idx_cass.c:1052
struct crate_conf * conf
static struct m0_idx_service_ops idx_cass_svc_ops
Definition: idx_cass.c:1174
static int set_prepared(CassSession *session, int query_type, int table_id)
Definition: idx_cass.c:254
struct m0_ast_rc oi_ar
uint64_t u_hi
Definition: types.h:36
static int table_exists(CassSession *session, char *keyspace, int table_id, const char *table_magic)
Definition: idx_cass.c:370
void * m0_alloc(size_t size)
Definition: memory.c:126
int(* iso_init)(void *svc)
Definition: idx.h:137
static int statement(struct ff2c_context *ctx, struct ff2c_term *term)
Definition: parser.c:195
struct m0_op_idx * gcd_oi
Definition: idx_cass.c:634
uint32_t v_nr
Definition: vec.h:51
static CassResult * execute_query_sync(CassSession *session, CassStatement *statement)
Definition: idx_cass.c:146
static void terminate_session(CassSession *session)
Definition: idx_cass.c:1084
int32_t * oi_rcs
m0_bcount_t * v_count
Definition: vec.h:53
struct m0_uint128 en_id
Definition: client.h:708
static struct fdmi_ctx ctx
Definition: main.c:80
static void idx_cass_next_cb(CassFuture *future, void *data)
Definition: idx_cass.c:907
CassSession * ci_session
Definition: idx_cass.c:96
int gcd_kv_idx
Definition: idx_cass.c:633
static CassSession * connect_session(CassCluster *cluster, char *keyspace)
Definition: idx_cass.c:1062
struct m0_sm_ast ar_ast
struct m0_sm_group * oi_sm_grp
char * ep
Definition: sw.h:132
static CassError execute_query_batch(CassSession *session, CassBatch *batch, CassFutureCallback cb, void *cb_data)
Definition: idx_cass.c:179
static const CassPrepared * create_prepared(CassSession *session, char *query)
Definition: idx_cass.c:194
static const char cass_idx_table_magic[]
Definition: idx_cass.c:104
struct m0_entity * op_entity
Definition: client.h:660
struct m0_bufvec * oi_vals
static int copy_to_kv(int i, struct m0_bufvec *keys, struct m0_bufvec *vals, const cass_byte_t *k_bytes, size_t k_size, const cass_byte_t *v_bytes, size_t v_size)
Definition: idx_cass.c:884
static struct m0_net_test_service svc
Definition: service.c:34
static bool idx_exists(struct m0_op_idx *oi)
Definition: idx_cass.c:448
static void free_prepared_set()
Definition: idx_cass.c:321
#define IS_IN_ARRAY(idx, array)
Definition: misc.h:311
static int idx_cass_namei_drop(struct m0_op_idx *oi)
Definition: idx_cass.c:567
#define CQL_NEXT
Definition: idx_cass.c:221
M0_INTERNAL void m0_idx_cass_register(void)
Definition: idx_cass.c:1198
static const CassPrepared * get_prepared(CassSession *session, int query_type, int table_id)
Definition: idx_cass.c:276
static void idx_cass_get_cb(CassFuture *future, void *data)
Definition: idx_cass.c:637
#define CQL_GET
Definition: idx_cass.c:214
Definition: nucleus.c:42
static char * make_query_string(int query_type, int table_id, const char *table_magic)
Definition: idx_cass.c:231
uint64_t u_lo
Definition: types.h:37
static struct m0_idx_query_ops idx_cass_query_ops
Definition: idx_cass.c:1036
static int idx_cass_namei_list(struct m0_op_idx *oi)
Definition: idx_cass.c:627
void m0_free(void *data)
Definition: memory.c:146
static int row_exists(CassSession *session, int table_id, struct m0_uint128 idx_fid, const char *table_magic)
Definition: idx_cass.c:399
static void print_query_error(CassFuture *future)
Definition: idx_cass.c:137
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
static int idx_cass_put(struct m0_op_idx *oi)
Definition: idx_cass.c:768
Definition: vec.h:145
static const CassPrepared ** idx_cass_prepared_set
Definition: idx_cass.c:121
static void make_table_name(char *table, int table_id, const char *table_magic)
Definition: idx_cass.c:132
M0_INTERNAL struct m0_client * m0__entity_instance(const struct m0_entity *entity)
Definition: client.c:226