30 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_CLIENT 91 #include <cassandra.h> 134 sprintf(table,
"%s_%d", table_magic, table_id);
142 cass_future_error_message(future, &err_msg, &msg_len);
143 M0_LOG(
M0_ERROR,
"Cassandra query failed: %.*s", (
int)msg_len, err_msg);
155 cass_future_wait(future);
156 rc = cass_future_error_code(future);
162 result = (CassResult *)cass_future_get_result(future);
163 cass_future_free(future);
169 CassFutureCallback cb,
void *cb_data)
174 cass_future_set_callback(future, cb, cb_data);
176 cass_future_free(future);
181 CassFutureCallback cb,
187 future = cass_session_execute_batch(
session, batch);
188 error = cass_future_set_callback(future, cb, cb_data);
189 cass_future_free(future);
197 CassFuture *future =
NULL;
198 const CassPrepared *prepared;
201 cass_future_wait_timed(future, 300000);
203 rc = cass_future_error_code(future);
208 prepared = cass_future_get_prepared(future);
210 cass_future_free(future);
215 ("SELECT * FROM %s WHERE index_fid = ? AND key = ?") 218 ("INSERT INTO %s (index_fid, key, value) VALUES(?, ?, ?)") 220 ("DELETE FROM %s WHERE index_fid = ? AND key = ?") 222 ("SELECT * FROM %s WHERE index_fid = ? AND key > ?") 232 const char *table_magic)
258 const CassPrepared *prepared =
NULL;
277 int query_type,
int table_id)
281 const CassPrepared *prepared =
NULL;
311 m0_alloc(nr_statements *
sizeof(CassPrepared *));
315 for (
i = 0;
i < nr_statements;
i++)
328 for (
i = 0;
i < nr_statements;
i++) {
336 #ifdef IDX_CASS_DRV_V22 339 const char *table_magic)
343 const CassSchemaMeta *schema_meta;
344 const CassKeyspaceMeta *keyspace_meta;
345 const CassTableMeta *table_meta;
347 schema_meta = cass_session_get_schema_meta(
session);
349 cass_schema_meta_keyspace_by_name(schema_meta, keyspace);
350 if (keyspace_meta ==
NULL) {
351 cass_schema_meta_free(schema_meta);
357 table_meta = cass_keyspace_meta_table_by_name(keyspace_meta, table);
358 if (table_meta !=
NULL)
364 cass_schema_meta_free(schema_meta);
371 const char *table_magic)
375 const CassSchema *schema;
376 const CassSchemaMeta *keyspace_meta;
377 const CassSchemaMeta *table_meta;
379 schema = cass_session_get_schema(
session);
380 keyspace_meta = cass_schema_get_keyspace(schema, keyspace);
381 if (keyspace_meta ==
NULL) {
387 table_meta = cass_schema_meta_get_entry(keyspace_meta, table);
388 if (table_meta !=
NULL)
394 cass_schema_free(schema);
400 struct m0_uint128 idx_fid,
const char *table_magic)
405 CassResult *query_result;
406 CassIterator *iterator;
408 char idx_fid_str[64];
415 sprintf(
query,
"SELECT * FROM %s WHERE index_fid = ?", table);
419 cass_statement_bind_string(
statement, 0, idx_fid_str);
422 if (query_result ==
NULL)
426 iterator = cass_iterator_from_result(query_result);
427 if (cass_iterator_next(iterator))
434 cass_iterator_free(iterator);
435 cass_result_free(query_result);
473 return exist > 0 ?
M0_RC(
true) :
M0_RC(
false);
492 rc = cass_future_error_code(future);
529 char idx_fid_str[64];
533 session = cass_inst->ci_session;
551 "INSERT INTO %s (index_fid) VALUES(?)", table);
556 cass_statement_bind_string(
statement, 0, idx_fid_str);
574 char idx_fid_str[64];
581 batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
596 "DELETE from %s where index_fid = ?", table);
600 cass_statement_bind_string(
statement, 0, idx_fid_str);
601 cass_batch_add_statement(batch,
statement);
608 "DELETE from %s where index_fid = ?", table);
612 cass_statement_bind_string(
statement, 0, idx_fid_str);
613 cass_batch_add_statement(batch,
statement);
642 const CassResult *result;
643 CassIterator *iterator =
NULL;
645 const cass_byte_t *bytes;
654 result = cass_future_get_result(future);
655 if (result ==
NULL) {
656 oi->
oi_rcs[kv_idx] = -ENOENT;
660 iterator = cass_iterator_from_result(result);
661 if (iterator ==
NULL) {
662 oi->
oi_rcs[kv_idx] = -ENOENT;
666 if (cass_iterator_next(iterator)) {
668 row = cass_iterator_get_row(iterator);
669 cass_value_get_bytes(cass_row_get_column(row, 2),
677 memcpy(vals->
ov_buf[kv_idx], bytes, nr_bytes);
681 oi->
oi_rcs[kv_idx] = -ENOENT;
689 cass_iterator_free(iterator);
690 cass_result_free(result);
706 char idx_fid_str[64];
742 cass_statement_bind_string(
statement, 0, idx_fid_str);
744 (cass_byte_t *)keys->
ov_buf[
i],
762 rc = (nr_selects > 0)?1:
rc;
774 const CassPrepared *prepared;
779 char idx_fid_str[64];
787 return M0_RC(-EINVAL);
795 batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
806 if (prepared ==
NULL)
810 statement = cass_prepared_bind(prepared);
811 cass_statement_bind_string(
statement, 0, idx_fid_str);
818 cass_batch_add_statement(batch,
statement);
828 cass_batch_free(batch);
841 char idx_fid_str[64];
851 batch = cass_batch_new(CASS_BATCH_TYPE_LOGGED);
865 cass_statement_bind_string(
statement, 0, idx_fid_str);
870 cass_batch_add_statement(batch,
statement);
879 cass_batch_free(batch);
885 const cass_byte_t *k_bytes,
size_t k_size,
886 const cass_byte_t *v_bytes,
size_t v_size)
898 memcpy(keys->
ov_buf[
i], k_bytes, k_size);
901 memcpy(vals->
ov_buf[
i], v_bytes, v_size);
915 const cass_byte_t *k_bytes;
916 const cass_byte_t *v_bytes;
917 const CassResult *result;
918 CassIterator *iterator =
NULL;
932 result = cass_future_get_result(future);
936 iterator = cass_iterator_from_result(result);
937 if (iterator ==
NULL)
940 while (cass_iterator_next(iterator)) {
941 row = cass_iterator_get_row(iterator);
943 cass_value_get_bytes(cass_row_get_column(row, 1),
945 cass_value_get_bytes(cass_row_get_column(row, 2),
950 k_bytes, k_size, v_bytes, v_size);
964 cass_iterator_free(iterator);
966 cass_result_free(result);
973 char idx_fid_str[64];
979 int64_t null_key = 0;
1002 strcat(
query,
" LIMIT ");
1004 strcat(
query, keys_v_nr);
1007 cass_statement_bind_string(
statement, 0, idx_fid_str);
1017 (cass_byte_t *)&null_key,
sizeof null_key);
1054 CassCluster *cluster;
1056 cluster = cass_cluster_new();
1057 cass_cluster_set_contact_points(cluster,
ep);
1069 future = cass_session_connect_keyspace(
session, cluster, keyspace);
1071 cass_future_wait(future);
1072 rc = cass_future_error_code(future);
1073 if (
rc != CASS_OK) {
1079 cass_future_free(future);
1088 future = cass_session_close(
session);
1089 cass_future_wait(future);
1090 cass_future_free(future);
1097 char *keyspace =
NULL;
1101 CassCluster *cluster;
1111 ep =
conf->cc_cluster_ep;
1112 keyspace =
conf->cc_keyspace;
1116 if (cluster ==
NULL)
1133 ctx->isc_svc_inst = inst;
1146 if (cluster !=
NULL)
1147 cass_cluster_free(cluster);
1182 .iqo_namei_delete =
NULL,
1183 .iqo_namei_lookup =
NULL,
1184 .iqo_namei_list =
NULL,
1204 #undef M0_TRACE_SUBSYSTEM static int idx_cass_nr_tables
static int idx_cass_get(struct m0_op_idx *oi)
static int get_table_id(struct m0_uint128 fid)
static void execute_query_async(CassSession *session, CassStatement *statement, CassFutureCallback cb, void *cb_data)
M0_INTERNAL void idx_op_ast_complete(struct m0_sm_group *grp, struct m0_sm_ast *ast)
void(* sa_cb)(struct m0_sm_group *grp, struct m0_sm_ast *)
def query(from_, to_, range_end, plug_name, time_unit)
#define M0_LOG(level,...)
M0_INTERNAL void m0_sm_ast_post(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void idx_op_ast_fail(struct m0_sm_group *grp, struct m0_sm_ast *ast)
M0_INTERNAL void m0_idx_service_register(int svc_id, struct m0_idx_service_ops *sops, struct m0_idx_query_ops *qops)
static int idx_cass_namei_lookup(struct m0_op_idx *oi)
static int idx_cass_fini(void *svc)
static struct m0_clovis * m0c
struct m0_op_common oi_oc
struct m0_bufvec * oi_keys
static int init_prepared_set(CassSession *session)
idx_cass_prepared_statement_type
static const char cass_table_magic[]
int(* iqo_namei_create)(struct m0_op_idx *oi)
static int idx_cass_next(struct m0_op_idx *oi)
struct m0_entity in_entity
static const char * query_type_map[]
static int idx_cass_init(void *svc)
static void idx_cass_query_cb(CassFuture *future, void *data)
return M0_ERR(-EOPNOTSUPP)
static int idx_cass_namei_new(struct m0_op_idx *oi)
static int idx_cass_del(struct m0_op_idx *oi)
static struct idx_cass_instance * get_cass_inst(struct m0_op_idx *oi)
static CassCluster * create_cluster(char *ep)
static struct m0_idx_service_ops idx_cass_svc_ops
static int set_prepared(CassSession *session, int query_type, int table_id)
static int table_exists(CassSession *session, char *keyspace, int table_id, const char *table_magic)
void * m0_alloc(size_t size)
int(* iso_init)(void *svc)
static int statement(struct ff2c_context *ctx, struct ff2c_term *term)
struct m0_op_idx * gcd_oi
static CassResult * execute_query_sync(CassSession *session, CassStatement *statement)
static void terminate_session(CassSession *session)
static struct fdmi_ctx ctx
static void idx_cass_next_cb(CassFuture *future, void *data)
static CassSession * connect_session(CassCluster *cluster, char *keyspace)
struct m0_sm_group * oi_sm_grp
static CassError execute_query_batch(CassSession *session, CassBatch *batch, CassFutureCallback cb, void *cb_data)
static const CassPrepared * create_prepared(CassSession *session, char *query)
static const char cass_idx_table_magic[]
struct m0_entity * op_entity
struct m0_bufvec * oi_vals
static int copy_to_kv(int i, struct m0_bufvec *keys, struct m0_bufvec *vals, const cass_byte_t *k_bytes, size_t k_size, const cass_byte_t *v_bytes, size_t v_size)
static struct m0_net_test_service svc
static bool idx_exists(struct m0_op_idx *oi)
static void free_prepared_set()
#define IS_IN_ARRAY(idx, array)
static int idx_cass_namei_drop(struct m0_op_idx *oi)
M0_INTERNAL void m0_idx_cass_register(void)
static const CassPrepared * get_prepared(CassSession *session, int query_type, int table_id)
static void idx_cass_get_cb(CassFuture *future, void *data)
static char * make_query_string(int query_type, int table_id, const char *table_magic)
static struct m0_idx_query_ops idx_cass_query_ops
static int idx_cass_namei_list(struct m0_op_idx *oi)
static int row_exists(CassSession *session, int table_id, struct m0_uint128 idx_fid, const char *table_magic)
static void print_query_error(CassFuture *future)
static int idx_cass_put(struct m0_op_idx *oi)
static const CassPrepared ** idx_cass_prepared_set
static void make_table_name(char *table, int table_id, const char *table_magic)
M0_INTERNAL struct m0_client * m0__entity_instance(const struct m0_entity *entity)