Motr  M0
mem_xprt_bulk.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2012-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
23 /* This file is included into mem_xprt_xo.c */
24 
34  struct m0_net_bulk_mem_work_item *wi)
35 {
36  struct m0_net_buffer *nb = mem_wi_to_buffer(wi);
37 
38  M0_PRE(m0_mutex_is_not_locked(&tm->ntm_mutex));
39  M0_PRE(nb != NULL &&
42  nb->nb_tm == tm);
44 
45  /* post the completion callback (will clear M0_NET_BUF_IN_USE) */
47  return;
48 }
49 
56 static void mem_wf_active_bulk(struct m0_net_transfer_mc *tm,
57  struct m0_net_bulk_mem_work_item *wi)
58 {
59  static const enum m0_net_queue_type inverse_qt[M0_NET_QT_NR] = {
66  };
67  struct m0_net_buffer *nb = mem_wi_to_buffer(wi);
68  int rc;
69  struct m0_net_transfer_mc *passive_tm = NULL;
70  struct m0_net_end_point *match_ep = NULL;
71 
72  M0_PRE(m0_mutex_is_not_locked(&tm->ntm_mutex));
73  M0_PRE(nb != NULL &&
76  nb->nb_tm == tm &&
77  nb->nb_desc.nbd_len != 0 &&
78  nb->nb_desc.nbd_data != NULL);
80 
81  /* Note: this function, like all the mem buffer work functions, is
82  called without holding the tm or domain mutex. That means this
83  function cannot modify the tm without obtaining a lock (it also
84  means this function can lock a different tm or domain without
85  causing deadlock). It can access members of tm, like ntm_ep, that
86  do not change while the tm is M0_NET_TM_STARTED.
87  */
88 
89  do { /* provide a break context */
90  struct mem_desc *md = NULL;
91  struct m0_net_buffer *passive_nb = NULL;
92  struct m0_net_buffer *inb;
93  struct m0_net_buffer *s_buf;
94  struct m0_net_buffer *d_buf;
95  m0_bcount_t datalen;
96  struct m0_net_bulk_mem_work_item *passive_wi;
97  struct m0_net_bulk_mem_tm_pvt *passive_tp;
98 
99  /* decode the descriptor */
100  rc = mem_desc_decode(&nb->nb_desc, &md);
101  if (rc != 0)
102  break;
103 
104  if (nb->nb_qtype != inverse_qt[md->md_qt]) {
105  rc = -EPERM; /* wrong operation */
106  break;
107  }
108 
109  /* Make a local end point matching the passive address.*/
110  m0_mutex_lock(&tm->ntm_mutex);
111  rc = mem_bmo_ep_create(&match_ep, tm, &md->md_passive, 0);
112  m0_mutex_unlock(&tm->ntm_mutex);
113  if (rc != 0) {
114  match_ep = NULL;
115  break;
116  }
117 
118  /* Search for a remote TM matching this EP address. */
119  rc = mem_find_remote_tm(tm, match_ep, &passive_tm, NULL);
120  if (rc != 0)
121  break;
122 
123  /* We're now operating on the destination TM while holding
124  its mutex. The destination TM is operative.
125  */
126 
127  /* locate the passive buffer */
128  m0_tl_for(m0_net_tm, &passive_tm->ntm_q[md->md_qt], inb) {
129  if(!mem_desc_equal(&inb->nb_desc, &nb->nb_desc))
130  continue;
131  if ((inb->nb_flags & M0_NET_BUF_CANCELLED) == 0)
132  passive_nb = inb;
133  break;
134  } m0_tl_endfor;
135  if (passive_nb == NULL) {
136  rc = -ENOENT;
137  break;
138  }
139 
141  s_buf = nb;
142  d_buf = passive_nb;
143  datalen = nb->nb_length;
144  } else {
145  s_buf = passive_nb;
146  d_buf = nb;
147  datalen = md->md_len;
148  }
149  /*
150  Copy the buffer.
151  The length check was delayed until here so both buffers
152  can get released with appropriate error code.
153  */
154  rc = mem_copy_buffer(d_buf, s_buf, datalen);
155 
156  /* schedule the passive callback */
157  passive_wi = mem_buffer_to_wi(passive_nb);
158  passive_wi->xwi_op = M0_NET_XOP_PASSIVE_BULK_CB;
159  passive_wi->xwi_status = rc;
160  passive_wi->xwi_nbe_length = datalen;
161 
162  passive_tp = mem_tm_to_pvt(passive_tm);
163  mem_wi_add(passive_wi, passive_tp);
164 
165  /* active side gets same status */
166  wi->xwi_status = rc;
167  wi->xwi_nbe_length = datalen;
168  } while (0);
169 
170  /* release the destination TM mutex */
171  if (passive_tm != NULL)
172  m0_mutex_unlock(&passive_tm->ntm_mutex);
173 
174  /* free the local match end point */
175  if (match_ep != NULL)
176  m0_net_end_point_put(match_ep);
177 
178  /* post the send completion callback (will clear M0_NET_BUF_IN_USE) */
179  wi->xwi_status = rc;
181  return;
182 }
183  /* bulkmem */
185 
186 /*
187  * Local variables:
188  * c-indentation-style: "K&R"
189  * c-basic-offset: 8
190  * tab-width: 8
191  * fill-column: 80
192  * scroll-step: 1
193  * End:
194  */
uint8_t * s_buf
Definition: string.h:100
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static struct m0_net_buffer * mem_wi_to_buffer(struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt.h:386
#define NULL
Definition: misc.h:38
uint32_t nbd_len
Definition: net_otw_types.h:37
static int mem_copy_buffer(struct m0_net_buffer *dest_nb, struct m0_net_buffer *src_nb, m0_bcount_t num_bytes)
M0_INTERNAL bool m0_mutex_is_not_locked(const struct m0_mutex *mutex)
Definition: mutex.c:101
static struct m0_mdstore md
Definition: sd_common.c:42
uint8_t * nbd_data
Definition: net_otw_types.h:38
static void mem_wi_add(struct m0_net_bulk_mem_work_item *wi, struct m0_net_bulk_mem_tm_pvt *tp)
m0_bcount_t nb_length
Definition: net.h:1334
static void mem_wi_post_buffer_event(struct m0_net_bulk_mem_work_item *wi)
enum m0_net_bulk_mem_work_opcode xwi_op
Definition: mem_xprt.h:163
uint64_t nb_flags
Definition: net.h:1489
uint64_t m0_bcount_t
Definition: types.h:77
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
#define m0_tl_endfor
Definition: tlist.h:700
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
m0_bcount_t xwi_nbe_length
Definition: mem_xprt.h:176
static bool mem_desc_equal(struct m0_net_buf_desc *d1, struct m0_net_buf_desc *d2)
Definition: mem_xprt_ep.c:255
static int mem_desc_decode(struct m0_net_buf_desc *desc, struct mem_desc **p_md)
Definition: mem_xprt_ep.c:242
static int mem_find_remote_tm(struct m0_net_transfer_mc *tm, struct m0_net_end_point *match_ep, struct m0_net_transfer_mc **p_dest_tm, struct m0_net_end_point **p_dest_ep)
Definition: mem_xprt_msg.c:67
static void mem_wf_active_bulk(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_bulk.c:56
static struct m0_net_bulk_mem_work_item * mem_buffer_to_wi(struct m0_net_buffer *buf)
Definition: mem_xprt.h:399
struct m0_tl ntm_q[M0_NET_QT_NR]
Definition: net.h:877
void m0_net_end_point_put(struct m0_net_end_point *ep)
Definition: ep.c:98
m0_net_queue_type
Definition: net.h:591
static void mem_wf_passive_bulk_cb(struct m0_net_transfer_mc *tm, struct m0_net_bulk_mem_work_item *wi)
Definition: mem_xprt_bulk.c:33
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
static struct m0_net_bulk_mem_tm_pvt * mem_tm_to_pvt(const struct m0_net_transfer_mc *tm)
Definition: mem_xprt.h:231
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
int32_t rc
Definition: trigger_fop.h:47
static int mem_bmo_ep_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *tm, const struct sockaddr_in *sa, uint32_t id)
Definition: mem_xprt_pvt.h:96