Motr  M0
net.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2017-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_DIXCM
24 #include "lib/trace.h"
25 
26 #include "lib/memory.h"
27 
28 #include "cm/proxy.h"
29 #include "dix/cm/cm.h"
30 #include "dix/cm/cp.h"
31 #include "dix/cm/dix_cp_onwire.h"
32 
33 #include "fop/fop.h"
34 #include "fop/fom.h"
35 #include "net/net.h"
36 #include "rpc/item.h"
37 #include "rpc/rpclib.h"
38 #include "rpc/session.h"
39 #include "rpc/conn.h"
40 #include "dix/fid_convert.h"
41 
47 static void dix_cp_reply_received(struct m0_rpc_item *item);
48 
49 /*
50  * Over-ridden rpc item ops, required to send notification to the copy packet
51  * send phase that reply has been received and the copy packet can be finalised.
52  */
53 static const struct m0_rpc_item_ops dix_cp_item_ops = {
55 };
56 
57 /* Converts in-memory copy packet structure to onwire copy packet structure. */
58 static int dixcp_to_dixcpx(struct m0_dix_cm_cp *dix_cp,
59  struct m0_dix_cpx *dix_cpx)
60 {
61  struct m0_cm_cp *cp;
62  int rc;
63 
64  M0_PRE(dix_cp != NULL);
65  M0_PRE(dix_cpx != NULL);
66 
67  cp = &dix_cp->dc_base;
68 
69  dix_cpx->dcx_ctg_fid = dix_cp->dc_ctg_fid;
70  dix_cpx->dcx_ctg_op_flags = dix_cp->dc_ctg_op_flags;
71  dix_cpx->dcx_cp.cpx_prio = cp->c_prio;
72  dix_cpx->dcx_phase = M0_CCP_SEND;
73  m0_cm_ag_id_copy(&dix_cpx->dcx_cp.cpx_ag_id, &cp->c_ag->cag_id);
74  m0_bitmap_onwire_init(&dix_cpx->dcx_cp.cpx_bm, 0);
75 
76  m0_rpc_at_init(&dix_cpx->dcx_ab_key);
77  m0_rpc_at_init(&dix_cpx->dcx_ab_val);
78 
79  rc = m0_rpc_at_add(&dix_cpx->dcx_ab_key, &dix_cp->dc_key,
80  cp->c_cm_proxy->px_conn);
81  if (rc == 0)
82  rc = m0_rpc_at_add(&dix_cpx->dcx_ab_val, &dix_cp->dc_val,
83  cp->c_cm_proxy->px_conn);
84  if (rc == 0) {
85  /* Now it's up to dix_cpx to free buffers. */
86  M0_SET0(&dix_cp->dc_key);
87  M0_SET0(&dix_cp->dc_val);
88  }
89 
90  return rc;
91 }
92 
93 static void dix_cp_reply_received(struct m0_rpc_item *req_item)
94 {
95  struct m0_fop *req_fop;
96  struct m0_dix_cm_cp *dix_cp;
97  struct m0_rpc_item *rep_item;
98  struct m0_cm_cp_fop *cp_fop;
99  struct m0_fop *rep_fop;
100  struct m0_cpx_reply *cpx_rep;
101 
102  M0_ENTRY();
103  req_fop = m0_rpc_item_to_fop(req_item);
104  cp_fop = M0_AMB(cp_fop, req_fop, cf_fop);
105  dix_cp = cp2dixcp(cp_fop->cf_cp);
106  M0_LOG(M0_DEBUG, "cm %p, cp %p", cpfom2cm(&dix_cp->dc_base.c_fom),
107  &dix_cp->dc_base);
108  rep_item = req_item->ri_reply;
109  if (!m0_rpc_item_error(req_item) && rep_item != NULL) {
110  if (m0_rpc_item_is_generic_reply_fop(rep_item)) {
111  dix_cp->dc_base.c_rc =
113  } else {
114  rep_fop = m0_rpc_item_to_fop(rep_item);
115  cpx_rep = m0_fop_data(rep_fop);
116  dix_cp->dc_base.c_rc = cpx_rep->cr_rc;
117  }
118  } else
119  dix_cp->dc_base.c_rc = m0_rpc_item_error(req_item);
120 
121  m0_fom_wakeup(&dix_cp->dc_base.c_fom);
122  M0_LEAVE("%d", dix_cp->dc_base.c_rc);
123 }
124 
125 static void dix_cp_fop_release(struct m0_ref *ref)
126 {
127  struct m0_cm_cp_fop *cp_fop;
128  struct m0_fop *fop = M0_AMB(fop, ref, f_ref);
129  struct m0_dix_cpx *dix_cpx = m0_fop_data(fop);
130 
131  cp_fop = M0_AMB(cp_fop, fop, cf_fop);
132  M0_ASSERT(cp_fop != NULL);
133  m0_rpc_at_fini(&dix_cpx->dcx_ab_key);
134  m0_rpc_at_fini(&dix_cpx->dcx_ab_val);
135  m0_fop_fini(fop);
136  m0_free(cp_fop);
137 }
138 
139 M0_INTERNAL int m0_dix_cm_cp_send(struct m0_cm_cp *cp, struct m0_fop_type *ft)
140 {
141  struct m0_dix_cm_cp *dix_cp;
142  struct m0_dix_cpx *dix_cpx;
143  struct m0_rpc_session *session;
144  struct m0_cm_cp_fop *cp_fop;
145  struct m0_fop *fop;
146  struct m0_rpc_item *item;
147  int rc;
148 
149  M0_ENTRY();
150  M0_PRE(cp != NULL && m0_fom_phase(&cp->c_fom) == M0_CCP_SEND);
151  M0_PRE(cp->c_cm_proxy != NULL);
152 
153  dix_cp = cp2dixcp(cp);
154  M0_ALLOC_PTR(cp_fop);
155  if (cp_fop == NULL) {
156  rc = M0_ERR(-ENOMEM);
157  goto out;
158  }
159  fop = &cp_fop->cf_fop;
162  if (rc != 0) {
163  m0_fop_fini(fop);
164  m0_free(cp_fop);
165  goto out;
166  }
167 
168  dix_cpx = m0_fop_data(fop);
169  M0_PRE(dix_cpx != NULL);
170  cp_fop->cf_cp = cp;
171  cp->c_ops->co_complete(cp);
172  rc = dixcp_to_dixcpx(dix_cp, dix_cpx);
173  if (rc != 0) {
174  m0_fop_fini(fop);
175  m0_free(cp_fop);
176  goto out;
177  }
178 
182 
187  item->ri_deadline = 0;
188 
189  m0_rpc_post(item);
191 out:
192  if (rc != 0) {
193  M0_LOG(M0_ERROR, "rc=%d", rc);
194  m0_buf_free(&dix_cp->dc_key);
195  m0_buf_free(&dix_cp->dc_val);
197  return M0_RC(M0_FSO_AGAIN);
198  }
199 
201  return M0_RC(M0_FSO_WAIT);
202 }
203 
204 M0_INTERNAL int m0_dix_cm_cp_send_wait(struct m0_cm_cp *cp)
205 {
206  M0_PRE(cp != NULL);
207 
208  M0_LOG(M0_DEBUG, "reply rc: %d", cp->c_rc);
209 
210  if (cp->c_rc != 0) {
211  M0_LOG(M0_ERROR, "rc=%d", cp->c_rc);
213  return M0_FSO_AGAIN;
214  }
216  return M0_FSO_WAIT;
217 }
218 
219 M0_INTERNAL int m0_dix_cm_cp_recv_init(struct m0_cm_cp *cp)
220 {
221  struct m0_rpc_at_buf *at_buf = NULL;
222  struct m0_dix_cm_cp *dix_cp = cp2dixcp(cp);
223  struct m0_dix_cpx *dix_cpx = m0_fop_data(cp->c_fom.fo_fop);
224 
226  at_buf = dix_cp->dc_phase_transmit < DCM_PT_VAL ?
227  &dix_cpx->dcx_ab_key :
228  &dix_cpx->dcx_ab_val;
229 
230  return m0_rpc_at_load(at_buf, &cp->c_fom, M0_CCP_RECV_WAIT);
231 }
232 
233 M0_INTERNAL int m0_dix_cm_cp_recv_wait(struct m0_cm_cp *cp)
234 {
235  struct m0_dix_cm_cp *dix_cp = cp2dixcp(cp);
236  int result = M0_FSO_AGAIN;
237 
239  if (dix_cp->dc_phase_transmit < DCM_PT_VAL) {
240  /* Start load value, key has been loaded. */
241  dix_cp->dc_phase_transmit++;
243  } else {
244  struct m0_cas_ctg *meta = m0_ctg_meta();
245 
246  M0_ASSERT(meta != NULL);
247  /* Key and value are loaded, lock meta-catalogue. */
248  result = m0_long_read_lock(m0_ctg_lock(meta),
249  &dix_cp->dc_meta_lock,
250  M0_CCP_XFORM);
251  result = M0_FOM_LONG_LOCK_RETURN(result);
252  }
253  return result;
254 }
255 
256 M0_INTERNAL int m0_dix_cm_cp_sw_check(struct m0_cm_cp *cp)
257 {
258  M0_PRE(cp != NULL && m0_fom_phase(&cp->c_fom) == M0_CCP_SW_CHECK);
259 
260  /* In DIX we do not care about sliding window, always ready to send. */
262  return M0_FSO_AGAIN;
263 }
264 
266 #undef M0_TRACE_SUBSYSTEM
267 
268 /*
269  * Local variables:
270  * c-indentation-style: "K&R"
271  * c-basic-offset: 8
272  * tab-width: 8
273  * fill-column: 80
274  * scroll-step: 1
275  * End:
276  */
M0_INTERNAL void m0_cm_ag_id_copy(struct m0_cm_ag_id *dst, const struct m0_cm_ag_id *src)
Definition: ag.c:83
M0_INTERNAL int m0_rpc_post(struct m0_rpc_item *item)
Definition: rpc.c:63
M0_INTERNAL void m0_fom_wakeup(struct m0_fom *fom)
Definition: fom.c:532
#define M0_PRE(cond)
struct m0_mutex px_mutex
Definition: proxy.h:112
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
enum m0_rpc_item_priority ri_prio
Definition: item.h:133
struct m0_fop * fo_fop
Definition: fom.h:490
uint32_t cpx_prio
Definition: cp_onwire.h:38
#define M0_FOM_LONG_LOCK_RETURN(rc)
#define NULL
Definition: misc.h:38
static int dixcp_to_dixcpx(struct m0_dix_cm_cp *dix_cp, struct m0_dix_cpx *dix_cpx)
Definition: net.c:58
struct m0_rpc_at_buf dcx_ab_key
Definition: dix_cp_onwire.h:55
bool m0_rpc_item_is_generic_reply_fop(const struct m0_rpc_item *item)
Definition: fom_generic.c:75
M0_INTERNAL int m0_dix_cm_cp_send(struct m0_cm_cp *cp, struct m0_fop_type *ft)
Definition: net.c:139
M0_INTERNAL struct m0_long_lock * m0_ctg_lock(struct m0_cas_ctg *ctg)
Definition: ctg_store.c:2162
M0_INTERNAL void m0_fop_init(struct m0_fop *fop, struct m0_fop_type *fopt, void *data, void(*fop_release)(struct m0_ref *))
Definition: fop.c:79
#define M0_LOG(level,...)
Definition: trace.h:167
Definition: cp.h:160
M0_LEAVE()
M0_INTERNAL void m0_rpc_at_init(struct m0_rpc_at_buf *ab)
Definition: at.c:433
struct m0_long_lock_link dc_meta_lock
Definition: cp.h:79
static void dix_cp_reply_received(struct m0_rpc_item *item)
Definition: net.c:93
struct m0_fid dc_ctg_fid
Definition: cp.h:53
struct m0_cm_ag_id cag_id
Definition: ag.h:72
M0_INTERNAL int m0_dix_cm_cp_send_wait(struct m0_cm_cp *cp)
Definition: net.c:204
void * m0_fop_data(const struct m0_fop *fop)
Definition: fop.c:220
M0_INTERNAL int m0_rpc_at_add(struct m0_rpc_at_buf *ab, const struct m0_buf *buf, const struct m0_rpc_conn *conn)
Definition: at.c:462
uint32_t dcx_phase
Definition: dix_cp_onwire.h:53
M0_INTERNAL int m0_rpc_at_load(struct m0_rpc_at_buf *ab, struct m0_fom *fom, int next_phase)
Definition: at.c:414
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
struct m0_buf dc_key
Definition: cp.h:90
static struct m0_rpc_item * item
Definition: item.c:56
m0_fom_phase
Definition: fom.h:372
struct m0_rpc_session * px_session
Definition: proxy.h:116
int32_t cr_rc
Definition: cp_onwire.h:64
return M0_RC(rc)
#define M0_ENTRY(...)
Definition: trace.h:170
int32_t m0_rpc_item_generic_reply_rc(const struct m0_rpc_item *reply)
Definition: fom_generic.c:81
uint32_t dc_ctg_op_flags
Definition: cp.h:71
struct m0_cpx dcx_cp
Definition: dix_cp_onwire.h:43
uint32_t dcx_ctg_op_flags
Definition: dix_cp_onwire.h:48
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL int m0_dix_cm_cp_recv_init(struct m0_cm_cp *cp)
Definition: net.c:219
#define M0_AMB(obj, ptr, field)
Definition: misc.h:320
Definition: refs.h:34
static struct m0_rpc_session session
Definition: net.c:113
M0_INTERNAL struct m0_cas_ctg * m0_ctg_meta(void)
Definition: ctg_store.c:2130
M0_INTERNAL int m0_bitmap_onwire_init(struct m0_bitmap_onwire *ow_map, size_t nr)
Definition: bitmap.c:182
#define M0_ASSERT(cond)
int c_rc
Definition: cp.h:218
struct m0_rpc_conn * px_conn
Definition: proxy.h:114
void m0_fom_phase_move(struct m0_fom *fom, int32_t rc, int phase)
Definition: fom.c:1699
struct m0_buf dc_val
Definition: cp.h:92
struct m0_rpc_at_buf dcx_ab_val
Definition: dix_cp_onwire.h:56
const struct m0_cm_cp_ops * c_ops
Definition: cp.h:169
Definition: cp.h:102
struct m0_rpc_item * ri_reply
Definition: item.h:163
M0_INTERNAL struct m0_cm * cpfom2cm(struct m0_fom *fom)
Definition: cp.c:72
struct m0_cm_aggr_group * c_ag
Definition: cp.h:172
M0_INTERNAL int m0_fop_data_alloc(struct m0_fop *fop)
Definition: fop.c:71
M0_INTERNAL void m0_buf_free(struct m0_buf *buf)
Definition: buf.c:55
M0_INTERNAL void m0_fop_fini(struct m0_fop *fop)
Definition: fop.c:136
M0_INTERNAL int m0_dix_cm_cp_recv_wait(struct m0_cm_cp *cp)
Definition: net.c:233
void(* rio_replied)(struct m0_rpc_item *item)
Definition: item.h:300
static const struct m0_rpc_item_ops dix_cp_item_ops
Definition: net.c:53
int dc_phase_transmit
Definition: cp.h:87
int32_t m0_rpc_item_error(const struct m0_rpc_item *item)
Definition: item.c:973
struct m0_cm_proxy * c_cm_proxy
Definition: cp.h:208
struct m0_cm_cp * cf_cp
Definition: cp.h:156
struct m0_ref f_ref
Definition: fop.h:80
struct m0_bitmap_onwire cpx_bm
Definition: cp_onwire.h:50
void(* co_complete)(struct m0_cm_cp *cp)
Definition: cp.h:244
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
const struct m0_rpc_item_ops * ri_ops
Definition: item.h:149
struct m0_rpc_session * ri_session
Definition: item.h:147
static void dix_cp_fop_release(struct m0_ref *ref)
Definition: net.c:125
struct m0_rpc_item * m0_fop_to_rpc_item(const struct m0_fop *fop)
Definition: fop.c:338
M0_INTERNAL void m0_rpc_at_fini(struct m0_rpc_at_buf *ab)
Definition: at.c:441
void m0_fop_put_lock(struct m0_fop *fop)
Definition: fop.c:199
static struct m0_fop * fop
Definition: item.c:57
struct m0_fop * m0_rpc_item_to_fop(const struct m0_rpc_item *item)
Definition: fop.c:346
M0_INTERNAL int m0_dix_cm_cp_sw_check(struct m0_cm_cp *cp)
Definition: net.c:256
static struct m0_fop_type * ft[]
Definition: service_ut.c:856
struct m0_fop cf_fop
Definition: cp.h:155
enum m0_cm_cp_priority c_prio
Definition: cp.h:163
#define out(...)
Definition: gen.c:41
struct m0_fid dcx_ctg_fid
Definition: dix_cp_onwire.h:46
M0_INTERNAL bool m0_long_read_lock(struct m0_long_lock *lk, struct m0_long_lock_link *link, int next_phase)
void m0_fom_phase_set(struct m0_fom *fom, int phase)
Definition: fom.c:1688
struct m0_fom c_fom
Definition: cp.h:161
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL struct m0_dix_cm_cp * cp2dixcp(const struct m0_cm_cp *cp)
Definition: cp.c:50
int32_t rc
Definition: trigger_fop.h:47
struct m0_cm_cp dc_base
Definition: cp.h:50
Definition: fop.h:79
struct m0_fop * rep_fop
Definition: dir.c:334
struct m0_cm_ag_id cpx_ag_id
Definition: cp_onwire.h:44
m0_time_t ri_deadline
Definition: item.h:141
Definition: cp.h:100