Motr  M0
net.c
Go to the documentation of this file.
1 /*
2  * Copyright (c) 2011-2020 Seagate Technology LLC and/or its Affiliates
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  *
16  * For any questions about this software or licensing,
17  * please email opensource@seagate.com or cortx-questions@seagate.com.
18  *
19  */
20 
21 
22 #include <stdio.h>
23 
24 #include "lib/assert.h"
25 #include "motr/magic.h"
26 #include "desim/sim.h"
27 #include "desim/net.h"
28 #include "desim/elevator.h"
29 #include "stob/stob.h" /* m0_stob_id_key_get */
30 
36 M0_TL_DESCR_DEFINE(rpc, "rpcs", static, struct net_rpc,
37  nr_inqueue, nr_magic, M0_DESIM_NET_RPC_MAGIC,
39 M0_TL_DEFINE(rpc, static, struct net_rpc);
40 
41 static void net_srv_loop(struct sim *s, struct sim_thread *t, void *arg)
42 {
43  struct net_srv *srv = arg;
44  struct net_rpc *rpc;
45  unsigned sect = srv->ns_el->e_dev->sd_conf->sc_sector_size;
46  unsigned trid;
47 
48  trid = t - srv->ns_thread;
49  M0_ASSERT(trid < srv->ns_nr_threads);
50 
51  while (1) {
52  unsigned long long count;
53  unsigned long long offset;
54  unsigned dev;
55  unsigned long obj;
56 
57  while (rpc_tlist_is_empty(&srv->ns_queue)) {
59  if (srv->ns_shutdown)
61  }
62 
63  srv->ns_active++;
64  rpc = rpc_tlist_pop(&srv->ns_queue);
65  count = rpc->nr_todo;
67  obj = rpc->nr_stob_id.si_fid.f_key;
68  offset = srv->ns_file_size * obj + rpc->nr_offset;
69 
70  sim_log(s, SLL_TRACE, "S#%s %2i/%2i: [%" PRIu64 "/%u:%lu] "
71  "%10llu %8lu %10llu\n",
72  srv->ns_name, trid, srv->ns_active,
74  rpc->nr_offset, rpc->nr_todo, offset);
75  rpc->nr_srv_thread = t;
76  /* delay before bulk */
79  sim_chan_signal(&rpc->nr_wait);
80  /* wait for bulk completion */
82  /* IO delay after bulk */
84  offset / sect, count / sect);
85  srv->ns_active--;
86  }
87 }
88 
89 static int net_srv_threads_start(struct sim_callout *call)
90 {
91  struct net_srv *srv = call->sc_datum;
92  unsigned i;
93 
94  for (i = 0; i < srv->ns_nr_threads; ++i)
95  sim_thread_init(call->sc_sim, &srv->ns_thread[i], 0,
96  net_srv_loop, srv);
97  return 1;
98 }
99 
100 
101 M0_INTERNAL void net_srv_init(struct sim *s, struct net_srv *srv)
102 {
103  rpc_tlist_init(&srv->ns_queue);
104  sim_chan_init(&srv->ns_incoming, "srv#%s::incoming", srv->ns_name);
106  srv->ns_el = sim_alloc(srv->ns_nr_devices * sizeof srv->ns_el[0]);
108 }
109 
110 M0_INTERNAL void net_srv_fini(struct net_srv *srv)
111 {
112  int i;
113 
114  rpc_tlist_fini(&srv->ns_queue);
115  srv->ns_shutdown = 1;
117 
118  if (srv->ns_thread != NULL) {
119  for (i = 0; i < srv->ns_nr_threads; ++i) {
120  /* drain events added during finalisation. */
123  }
125  }
127 }
128 
129 M0_INTERNAL void net_init(struct net_conf *net)
130 {
131  sim_chan_init(&net->nc_queue, "net::queue");
132  cnt_init(&net->nc_rpc_wait, NULL, "net::rpc_wait");
133  cnt_init(&net->nc_rpc_bulk_wait, NULL, "net::rpc_bulk_wait");
134 }
135 
136 M0_INTERNAL void net_fini(struct net_conf *net)
137 {
138  sim_chan_fini(&net->nc_queue);
139  cnt_fini(&net->nc_rpc_wait);
140  cnt_fini(&net->nc_rpc_bulk_wait);
141 }
142 
143 M0_INTERNAL void net_rpc_init(struct net_rpc *rpc, struct net_conf *conf,
144  struct net_srv *srv, struct m0_stob_id *stob_id,
145  unsigned long long offset, unsigned long nob)
146 {
147  rpc->nr_conf = conf;
148  rpc->nr_srv = srv;
149  rpc->nr_stob_id = *stob_id;
150  rpc->nr_offset = offset;
151  rpc->nr_todo = nob;
152  rpc_tlink_init(rpc);
153  sim_chan_init(&rpc->nr_wait, NULL);
155  rpc->nr_wait.ch_cnt_sleep.c_parent = &conf->nc_rpc_wait;
156  rpc->nr_bulk_wait.ch_cnt_sleep.c_parent = &conf->nc_rpc_bulk_wait;
157 }
158 
159 M0_INTERNAL void net_rpc_fini(struct net_rpc *rpc)
160 {
161  M0_ASSERT(rpc->nr_todo == 0);
162  rpc_tlink_fini(rpc);
163  sim_chan_fini(&rpc->nr_wait);
165  rpc->nr_magic = 0;
166 }
167 
168 static void net_enter(struct sim_thread *t, struct net_conf *n,
169  unsigned long nob)
170 {
171  while (n->nc_nob_inflight + nob > n->nc_nob_max ||
172  n->nc_msg_inflight + 1 > n->nc_msg_max)
173  sim_chan_wait(&n->nc_queue, t);
174  n->nc_nob_inflight += nob;
175  n->nc_msg_inflight += 1;
176 }
177 
178 static void net_leave(struct sim_thread *t, struct net_conf *n,
179  unsigned long nob)
180 {
181  M0_ASSERT(n->nc_nob_inflight >= nob);
182  M0_ASSERT(n->nc_msg_inflight >= 1);
183  n->nc_msg_inflight -= 1;
184  n->nc_nob_inflight -= nob;
185  sim_chan_broadcast(&n->nc_queue);
186 }
187 
188 static void net_tx(struct sim_thread *t, struct net_conf *n,
189  unsigned long nob, sim_time_t min, sim_time_t max)
190 {
191  net_enter(t, n, nob);
192  sim_sleep(t, sim_rnd(min, max));
193  net_leave(t, n, nob);
194 }
195 
196 M0_INTERNAL void net_rpc_send(struct sim_thread *t, struct net_rpc *rpc)
197 {
198  struct net_conf *n;
199 
200  n = rpc->nr_conf;
201  net_tx(t, n, n->nc_rpc_size, n->nc_rpc_delay_min, n->nc_rpc_delay_max);
203  rpc_tlist_add_tail(&rpc->nr_srv->ns_queue, rpc);
204  sim_chan_wait(&rpc->nr_wait, t);
205  net_tx(t, n, n->nc_rpc_size, n->nc_rpc_delay_min, n->nc_rpc_delay_max);
206 }
207 
208 M0_INTERNAL void net_rpc_bulk(struct sim_thread *t, struct net_rpc *rpc)
209 {
210  struct net_conf *n;
211  sim_time_t tx_min;
212  sim_time_t tx_max;
213 
214  n = rpc->nr_conf;
215  tx_min = rpc->nr_todo * 1000000000 / n->nc_rate_max;
216  tx_max = rpc->nr_todo * 1000000000 / n->nc_rate_min;
217  net_tx(t, n, rpc->nr_todo, tx_min, tx_max);
218  rpc->nr_todo = 0;
220 }
221 
222 M0_INTERNAL void net_rpc_process(struct sim_thread *t,
223  struct net_conf *net, struct net_srv *srv,
224  struct m0_stob_id *stob_id,
225  unsigned long long offset, unsigned long count)
226 {
227  struct net_rpc rpc;
228 
229  net_rpc_init(&rpc, net, srv, stob_id, offset, count);
230  net_rpc_send(t, &rpc);
231  net_rpc_bulk(t, &rpc);
232  net_rpc_fini(&rpc);
233 }
234 
237 /*
238  * Local variables:
239  * c-indentation-style: "K&R"
240  * c-basic-offset: 8
241  * tab-width: 8
242  * fill-column: 80
243  * scroll-step: 1
244  * End:
245  */
M0_INTERNAL void sim_thread_init(struct sim *state, struct sim_thread *thread, unsigned stacksize, sim_func_t func, void *arg)
Definition: sim.c:293
Definition: sim.h:152
struct elevator * ns_el
Definition: net.h:65
#define NULL
Definition: misc.h:38
unsigned sc_sector_size
Definition: storage.h:39
struct sim_thread * ns_thread
Definition: net.h:64
Definition: net.h:72
static void net_enter(struct sim_thread *t, struct net_conf *n, unsigned long nob)
Definition: net.c:168
M0_INTERNAL void sim_chan_fini(struct sim_chan *chan)
Definition: sim.c:400
M0_INTERNAL void sim_free(void *ptr)
Definition: sim.c:98
struct net_conf * nr_conf
Definition: net.h:74
sim_time_t ns_pre_bulk_min
Definition: net.h:60
struct sim * st_sim
Definition: sim.h:204
M0_INTERNAL void net_rpc_bulk(struct sim_thread *t, struct net_rpc *rpc)
Definition: net.c:208
Definition: net.h:57
M0_INTERNAL void sim_log(struct sim *s, enum sim_log_level level, const char *format,...)
Definition: sim.c:527
Definition: conf.py:1
M0_INTERNAL void sim_chan_broadcast(struct sim_chan *chan)
Definition: sim.c:456
M0_INTERNAL void cnt_init(struct cnt *cnt, struct cnt *parent, const char *format,...)
Definition: cnt.c:44
static struct foo * obj
Definition: tlist.c:302
M0_INTERNAL void net_rpc_process(struct sim_thread *t, struct net_conf *net, struct net_srv *srv, struct m0_stob_id *stob_id, unsigned long long offset, unsigned long count)
Definition: net.c:222
static m0_bcount_t count
Definition: xcode.c:167
M0_INTERNAL void sim_chan_init(struct sim_chan *chan, char *format,...)
Definition: sim.c:385
M0_INTERNAL void sim_thread_exit(struct sim_thread *thread)
Definition: sim.c:346
M0_INTERNAL void net_rpc_fini(struct net_rpc *rpc)
Definition: net.c:159
M0_TL_DEFINE(src, static, struct source)
Definition: net.h:39
M0_INTERNAL void sim_run(struct sim *state)
Definition: sim.c:129
M0_TL_DESCR_DEFINE(src, "addb2 rpc sources", static, struct source, s_linkage, s_magix, M0_ADDB2_SOURCE_MAGIC, M0_ADDB2_SOURCE_HEAD_MAGIC)
int i
Definition: dir.c:1033
static int net_srv_threads_start(struct sim_callout *call)
Definition: net.c:89
#define PRIu64
Definition: types.h:58
unsigned long nr_todo
Definition: net.h:77
struct sim_thread * nr_srv_thread
Definition: net.h:81
int ns_shutdown
Definition: net.h:67
M0_INTERNAL void sim_thread_fini(struct sim_thread *thread)
Definition: sim.c:333
unsigned ns_active
Definition: net.h:68
#define M0_ASSERT(cond)
struct crate_conf * conf
uint64_t nr_magic
Definition: net.h:82
static struct m0_thread t[8]
Definition: service_ut.c:1230
M0_INTERNAL void sim_timer_add(struct sim *state, sim_time_t delta, sim_call_t *cfunc, void *datum)
Definition: sim.c:189
static long long max(long long a, long long b)
Definition: crate.c:196
struct cnt ch_cnt_sleep
Definition: sim.h:243
struct m0_fid si_fid
Definition: stob.h:105
sim_time_t ns_pre_bulk_max
Definition: net.h:61
M0_INTERNAL void sim_sleep(struct sim_thread *thread, sim_time_t nap)
Definition: sim.c:372
static void net_srv_loop(struct sim *s, struct sim_thread *t, void *arg)
Definition: net.c:41
M0_INTERNAL void sim_chan_wait(struct sim_chan *chan, struct sim_thread *thread)
Definition: sim.c:411
unsigned long long sim_time_t
Definition: sim.h:111
M0_INTERNAL void sim_chan_signal(struct sim_chan *chan)
Definition: sim.c:447
M0_INTERNAL unsigned long long sim_rnd(unsigned long long a, unsigned long long b)
Definition: sim.c:471
static m0_bindex_t offset
Definition: dump.c:173
M0_INTERNAL void * sim_alloc(size_t size)
Definition: sim.c:85
M0_INTERNAL void elevator_io(struct elevator *el, enum storage_req_type type, sector_t sector, unsigned long count)
Definition: elevator.c:93
struct cnt * c_parent
Definition: cnt.h:44
M0_INTERNAL void net_srv_init(struct sim *s, struct net_srv *srv)
Definition: net.c:101
struct sim_chan nr_wait
Definition: net.h:79
struct net_srv * nr_srv
Definition: net.h:73
M0_INTERNAL void net_init(struct net_conf *net)
Definition: net.c:129
void * sc_datum
Definition: sim.h:138
struct sim * sc_sim
Definition: sim.h:142
struct storage_dev * e_dev
Definition: elevator.h:37
static long long min(long long a, long long b)
Definition: crate.c:191
uint64_t n
Definition: fops.h:107
unsigned ns_nr_devices
Definition: net.h:59
struct m0_stob_id nr_stob_id
Definition: net.h:75
uint64_t f_key
Definition: fid.h:40
M0_INTERNAL uint64_t m0_stob_id_dom_id_get(const struct m0_stob_id *stob_id)
Definition: stob.c:260
M0_INTERNAL void net_rpc_send(struct sim_thread *t, struct net_rpc *rpc)
Definition: net.c:196
static void net_tx(struct sim_thread *t, struct net_conf *n, unsigned long nob, sim_time_t min, sim_time_t max)
Definition: net.c:188
char * ns_name
Definition: net.h:69
unsigned long long ns_file_size
Definition: net.h:66
static struct m0_addb2_net * net
Definition: net.c:27
struct sim_chan nr_bulk_wait
Definition: net.h:80
struct m0_tl ns_queue
Definition: net.h:63
unsigned ns_nr_threads
Definition: net.h:58
unsigned long long nr_offset
Definition: net.h:76
M0_INTERNAL void net_rpc_init(struct net_rpc *rpc, struct net_conf *conf, struct net_srv *srv, struct m0_stob_id *stob_id, unsigned long long offset, unsigned long nob)
Definition: net.c:143
M0_INTERNAL void net_srv_fini(struct net_srv *srv)
Definition: net.c:110
static struct m0_addb2_source * s
Definition: consumer.c:39
static void net_leave(struct sim_thread *t, struct net_conf *n, unsigned long nob)
Definition: net.c:178
struct storage_conf * sd_conf
Definition: storage.h:55
struct sim_chan ns_incoming
Definition: net.h:62
Definition: sim.h:287
M0_INTERNAL void cnt_fini(struct cnt *cnt)
Definition: cnt.c:83
static struct net_srv srv
Definition: net_test.c:52
M0_INTERNAL void net_fini(struct net_conf *net)
Definition: net.c:136