Motr  M0
libfab.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2021 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
176 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_NET
177 #include "lib/trace.h" /* M0_ENTRY() */
178 
179 #ifdef ENABLE_LIBFAB
180 
181 #include <netinet/in.h> /* INET_ADDRSTRLEN */
182 #include <arpa/inet.h> /* inet_pton, htons */
183 #include <sched.h> /* sched_yield */
184 #include <stdlib.h> /* atoi */
185 #include <sys/epoll.h> /* struct epoll_event */
186 #include <unistd.h> /* close */
187 #include "net/buffer_pool.h" /* struct m0_net_buffer_pool */
188 #include "net/net.h" /* struct m0_net_domain */
189 #include "lib/errno.h" /* errno */
190 #include "lib/finject.h" /* M0_FI_ENABLED */
191 #include "lib/hash.h" /* m0_htable */
192 #include "lib/memory.h" /* M0_ALLOC_PTR()*/
193 #include "lib/processor.h" /* m0_processor_is_vm()*/
194 #include "libfab_internal.h"
195 #include "lib/string.h" /* m0_streq */
196 #include "net/net_internal.h" /* m0_net__buffer_invariant() */
197 
198 #define U32_TO_VPTR(a) ((void*)((uintptr_t)a))
199 #define VPTR_TO_U32(a) ((uint32_t)((uintptr_t)a))
200 
201 /* Assert the equivalence of return code for libfabric and motr */
202 M0_BASSERT(FI_SUCCESS == 0);
203 
204 static const char *providers[FAB_FABRIC_PROV_MAX] = {
205  [FAB_FABRIC_PROV_VERBS] = "verbs",
206  [FAB_FABRIC_PROV_TCP] = "tcp",
207  [FAB_FABRIC_PROV_SOCK] = "sockets" };
208 
209 M0_TL_DESCR_DEFINE(fab_buf, "libfab_buf",
210  static, struct m0_fab__buf, fb_linkage, fb_magic,
212 M0_TL_DEFINE(fab_buf, static, struct m0_fab__buf);
213 
214 M0_TL_DESCR_DEFINE(fab_sndbuf, "libfab_sndbuf",
215  static, struct m0_fab__buf, fb_snd_link, fb_sndmagic,
217 M0_TL_DEFINE(fab_sndbuf, static, struct m0_fab__buf);
218 
219 M0_TL_DESCR_DEFINE(fab_fabs, "libfab_fabrics",
220  static, struct m0_fab__fab, fab_link, fab_magic,
222 M0_TL_DEFINE(fab_fabs, static, struct m0_fab__fab);
223 
224 M0_TL_DESCR_DEFINE(fab_bulk, "libfab_bulkops",
225  static, struct m0_fab__bulk_op, fbl_link, fbl_magic,
227 M0_TL_DEFINE(fab_bulk, static, struct m0_fab__bulk_op);
228 
229 static uint32_t libfab_bht_func(const struct m0_htable *ht, const void *key)
230 {
231  const union m0_fab__token *token = key;
232 
233  /*
234  * Max buckets = ((M0_NET_QT_NR + 1) * FAB_NUM_BUCKETS_PER_QTYPE)
235  * The bucket id is defined by the queue_id and the queue_num
236  * fields of the token
237  */
238  return ((token->t_Fields.tf_queue_id * FAB_NUM_BUCKETS_PER_QTYPE) +
239  token->t_Fields.tf_queue_num);
240 }
241 
242 static bool libfab_bht_key_eq(const void *key1, const void *key2)
243 {
244  const union m0_fab__token *token1 = key1;
245  const union m0_fab__token *token2 = key2;
246 
247  return token1->t_val == token2->t_val;
248 }
249 
250 M0_HT_DESCR_DEFINE(fab_bufhash, "Hash of bufs", static, struct m0_fab__buf,
251  fb_htlink, fb_htmagic, M0_NET_LIBFAB_BUF_HT_MAGIC,
253  fb_token, libfab_bht_func, libfab_bht_key_eq);
254 
255 M0_HT_DEFINE(fab_bufhash, static, struct m0_fab__buf, uint32_t);
256 
257 static int libfab_ep_txres_init(struct m0_fab__active_ep *aep,
258  struct m0_fab__tm *tm, void *ctx);
259 static int libfab_ep_rxres_init(struct m0_fab__active_ep *aep,
260  struct m0_fab__tm *tm, void *ctx);
261 static int libfab_pep_res_init(struct m0_fab__passive_ep *pep,
262  struct m0_fab__tm *tm, void *ctx);
263 static struct m0_fab__ep *libfab_ep(struct m0_net_end_point *net);
264 static int libfab_ep_find(struct m0_net_transfer_mc *tm, const char *name,
265  struct m0_net_ip_params *addr,
266  struct m0_net_end_point **epp);
267 static int libfab_ep_create(struct m0_net_transfer_mc *tm, const char *name,
268  struct m0_net_ip_params *addr,
269  struct m0_net_end_point **epp);
270 static int libfab_active_ep_create(struct m0_fab__ep *ep,
271  struct m0_fab__tm *tm);
272 static int libfab_passive_ep_create(struct m0_fab__ep *ep,
273  struct m0_fab__tm *tm);
274 static int libfab_aep_param_free(struct m0_fab__active_ep *aep,
275  struct m0_fab__tm *tm);
276 static int libfab_pep_param_free(struct m0_fab__passive_ep *pep,
277  struct m0_fab__tm *tm);
278 static int libfab_ep_param_free(struct m0_fab__ep *ep, struct m0_fab__tm *tm);
279 static int libfab_pep_res_free(struct m0_fab__pep_res *pep_res,
280  struct m0_fab__tm *tm);
281 static int libfab_ep_txres_free(struct m0_fab__tx_res *tx_res,
282  struct m0_fab__tm *tm);
283 static int libfab_ep_rxres_free(struct m0_fab__rx_res *rx_res,
284  struct m0_fab__tm *tm);
285 static void libfab_poller(struct m0_fab__tm *ma);
286 static int libfab_waitfd_init(struct m0_fab__tm *tm);
287 static void libfab_tm_event_post(struct m0_fab__tm *tm,
288  enum m0_net_tm_state state);
289 static inline void libfab_tm_lock(struct m0_fab__tm *tm);
290 static inline void libfab_tm_unlock(struct m0_fab__tm *tm);
291 static inline void libfab_tm_evpost_lock(struct m0_fab__tm *tm);
292 static inline void libfab_tm_evpost_unlock(struct m0_fab__tm *tm);
293 static inline bool libfab_tm_is_locked(const struct m0_fab__tm *tm);
294 static void libfab_buf_complete(struct m0_fab__buf *buf);
295 static void libfab_buf_done(struct m0_fab__buf *buf, int rc, bool add_to_list);
296 static inline struct m0_fab__tm *libfab_buf_tm(struct m0_fab__buf *buf);
297 static inline struct m0_fab__tm *libfab_buf_ma(struct m0_net_buffer *buf);
298 static bool libfab_tm_invariant(const struct m0_fab__tm *tm);
299 static void libfab_buf_del(struct m0_net_buffer *nb);
300 static inline void libfab_ep_get(struct m0_fab__ep *ep);
301 static void libfab_ep_release(struct m0_ref *ref);
302 static uint64_t libfab_mr_keygen(struct m0_fab__tm *tm);
303 static int libfab_check_for_event(struct fid_eq *eq, uint32_t *ev);
304 static int libfab_check_for_comp(struct fid_cq *cq, uint32_t *ctx,
305  m0_bindex_t *len, uint64_t *rem_cq_data);
306 static void libfab_tm_fini(struct m0_net_transfer_mc *tm);
307 static int libfab_buf_dom_reg(struct m0_net_buffer *nb, struct m0_fab__tm *tm);
308 static int libfab_buf_dom_dereg(struct m0_fab__buf *fbp);
309 static void libfab_pending_bufs_send(struct m0_fab__ep *ep);
310 static int libfab_target_notify(struct m0_fab__buf *buf);
311 static int libfab_conn_init(struct m0_fab__ep *ep, struct m0_fab__tm *ma,
312  struct m0_fab__buf *fbp);
313 static int libfab_conn_accept(struct m0_fab__ep *ep, struct m0_fab__tm *tm,
314  struct fi_info *info);
315 static int libfab_fab_ep_find(struct m0_fab__tm *tm, const char *name,
316  struct m0_net_ip_params *addr,
317  struct m0_fab__ep **ep);
318 static void libfab_ep_pton(struct m0_net_ip_addr *name, uint64_t *out);
319 static void libfab_txep_event_check(struct m0_fab__ep *txep,
320  struct m0_fab__active_ep *aep,
321  struct m0_fab__tm *tm);
322 static int libfab_txep_init(struct m0_fab__active_ep *aep,
323  struct m0_fab__tm *tm, void *ctx);
324 static int libfab_waitfd_bind(struct fid* fid, struct m0_fab__tm *tm,
325  void *ctx);
326 static int libfab_waitfd_unbind(struct fid* fid, struct m0_fab__tm *tm,
327  void *ctx);
328 static inline struct m0_fab__active_ep *libfab_aep_get(struct m0_fab__ep *ep);
329 static int libfab_ping_op(struct m0_fab__active_ep *ep, struct m0_fab__buf *fb);
330 static int libfab_bulk_op(struct m0_fab__active_ep *ep, struct m0_fab__buf *fb);
331 static inline bool libfab_is_verbs(struct m0_fab__tm *tm);
332 static int libfab_txbuf_list_add(struct m0_fab__tm *tm, struct m0_fab__buf *fb,
333  struct m0_fab__active_ep *aep);
334 static void libfab_bufq_process(struct m0_fab__tm *tm);
335 static uint32_t libfab_buf_token_get(struct m0_fab__tm *tm,
336  struct m0_fab__buf *fb);
337 static bool libfab_buf_invariant(const struct m0_fab__buf *buf);
338 
339 /* libfab init and fini() : initialized in motr init */
340 M0_INTERNAL int m0_net_libfab_init(void)
341 {
342  m0_net_xprt_register(&m0_net_libfab_xprt);
343  if (m0_streq(M0_DEFAULT_NETWORK, "LF"))
344  m0_net_xprt_default_set(&m0_net_libfab_xprt);
345  return M0_RC(0);
346 }
347 
348 M0_INTERNAL void m0_net_libfab_fini(void)
349 {
350  m0_net_xprt_deregister(&m0_net_libfab_xprt);
351 }
352 
356 static void libfab_straddr_gen(struct m0_net_ip_params *addr,
357  char *ip)
358 {
359  if (likely((addr->nip_fmt_pvt.ia.nia_family == M0_NET_IP_AF_INET) ||
360  (addr->nip_format == M0_NET_IP_LNET_FORMAT)))
361  inet_ntop(AF_INET, &addr->nip_ip_n.sn[0], ip,
362  LIBFAB_ADDR_LEN_MAX);
363  else if (addr->nip_fmt_pvt.ia.nia_family == M0_NET_IP_AF_INET6)
364  inet_ntop(AF_INET6, &addr->nip_ip_n.ln[0], ip,
365  LIBFAB_ADDR_LEN_MAX);
366  else
367  M0_LOG(M0_ERROR, "Family is not supported.");
368 }
369 
388 static int libfab_ep_addr_decode(struct m0_fab__ep *ep, const char *name)
389 {
390  M0_ENTRY("name=%s", name);
391  M0_PRE(name != NULL);
392 
393  if (name[0] == '\0')
394  return M0_ERR(-EPROTO);
395  else
396  return M0_RC(m0_net_ip_parse(name, &ep->fep_name));
397 }
398 
402 static inline void libfab_tm_lock(struct m0_fab__tm *tm)
403 {
404  m0_mutex_lock(&tm->ftm_ntm->ntm_mutex);
405 }
406 
410 static inline void libfab_tm_unlock(struct m0_fab__tm *tm)
411 {
412  m0_mutex_unlock(&tm->ftm_ntm->ntm_mutex);
413 }
414 
415 static inline int libfab_tm_trylock(struct m0_fab__tm *tm)
416 {
417  return m0_mutex_trylock(&tm->ftm_ntm->ntm_mutex);
418 }
419 
423 static inline void libfab_tm_evpost_lock(struct m0_fab__tm *tm)
424 {
425  m0_mutex_lock(&tm->ftm_evpost);
426 }
427 
431 static inline void libfab_tm_evpost_unlock(struct m0_fab__tm *tm)
432 {
433  m0_mutex_unlock(&tm->ftm_evpost);
434 }
435 
440 static inline bool libfab_tm_is_locked(const struct m0_fab__tm *tm)
441 {
442  return m0_mutex_is_locked(&tm->ftm_ntm->ntm_mutex);
443 }
444 
448 static void libfab_tm_event_post(struct m0_fab__tm *tm,
449  enum m0_net_tm_state state)
450 {
451  struct m0_net_end_point *listen = NULL;
452 
453  if (state == M0_NET_TM_STARTED) {
454  /* Check for LISTENING Passive endpoint */
455  listen = &tm->ftm_pep->fep_nep;
456  M0_ASSERT(listen != NULL);
457  }
458 
460  .nte_type = M0_NET_TEV_STATE_CHANGE,
461  .nte_next_state = state,
462  .nte_time = m0_time_now(),
463  .nte_ep = listen,
464  .nte_tm = tm->ftm_ntm,
465  });
466 }
467 
472 static void libfab_tm_buf_timeout(struct m0_fab__tm *ftm)
473 {
474  struct m0_net_transfer_mc *net = ftm->ftm_ntm;
475  struct m0_net_buffer *nb;
476  struct m0_fab__buf *fb;
477  int i;
478  m0_time_t now = m0_time_now();
479 
480  M0_PRE(libfab_tm_is_locked(ftm));
481  M0_PRE(libfab_tm_invariant(ftm));
482 
483  ftm->ftm_tmout_check = m0_time_from_now(FAB_BUF_TMOUT_CHK_INTERVAL, 0);
484  for (i = 0; i < ARRAY_SIZE(net->ntm_q); ++i) {
485  m0_tl_for(m0_net_tm, &net->ntm_q[i], nb) {
486  if (nb->nb_timeout < now) {
487  fb = nb->nb_xprt_private;
489  libfab_buf_dom_dereg(fb);
490  fb->fb_state = FAB_BUF_TIMEDOUT;
491  libfab_buf_done(fb, -ETIMEDOUT, false);
492  }
493  } m0_tl_endfor;
494  }
495  M0_POST(libfab_tm_invariant(ftm));
496 }
497 
505 static void libfab_tm_buf_done(struct m0_fab__tm *ftm)
506 {
507  struct m0_fab__buf *buffer;
508  int nr = 0;
509 
510  M0_PRE(libfab_tm_is_locked(ftm) && libfab_tm_invariant(ftm));
511  m0_tl_teardown(fab_buf, &ftm->ftm_done, buffer) {
512  libfab_buf_complete(buffer);
513  nr++;
514  }
515 
516  if (nr > 0 && ftm->ftm_ntm->ntm_callback_counter == 0)
517  m0_chan_broadcast(&ftm->ftm_ntm->ntm_chan);
518  M0_POST(libfab_tm_invariant(ftm));
519 }
520 
530 static uint32_t libfab_handle_connect_request_events(struct m0_fab__tm *tm)
531 {
532  struct m0_fab__ep *ep = NULL;
533  struct fid_eq *eq;
534  struct fi_eq_err_entry eq_err = {};
535  struct fi_eq_cm_entry *cm_entry;
536  char entry[(sizeof(struct fi_eq_cm_entry) +
537  sizeof(struct m0_fab__conn_data))];
538  uint32_t event;
539  int rc;
540  struct m0_net_ip_addr addr;
541  int ret;
542 
543  eq = tm->ftm_pep->fep_listen->pep_res.fpr_eq;
544  do {
545  rc = fi_eq_read(eq, &event, &entry, sizeof(entry), 0);
546  ret = rc;
547  if (rc >= (int)sizeof(struct fi_eq_cm_entry) &&
548  event == FI_CONNREQ) {
549  cm_entry = (struct fi_eq_cm_entry *)entry;
550  addr.nia_n = *((struct m0_net_ip_params *)
551  (cm_entry->data));
552  rc = libfab_fab_ep_find(tm, NULL, &addr.nia_n, &ep);
553  if (rc == 0) {
554  rc = libfab_conn_accept(ep, tm, cm_entry->info);
555  if (rc != 0)
556  M0_LOG(M0_ERROR, "Conn accept fail %d",
557  rc);
558  } else
559  M0_LOG(M0_ERROR, "fab_ep_find fail rc=%d", rc);
560  fi_freeinfo(cm_entry->info);
561  } else if (rc == -FI_EAVAIL) {
562  rc = fi_eq_readerr(eq, &eq_err, 0);
563  if (rc != sizeof(eq_err))
564  M0_LOG(M0_ERROR, "fi_eq_readerr error =%s",
565  fi_strerror((int) -rc));
566  else
567  M0_LOG(M0_ERROR, "fi_eq_readerr prov err %d:%s",
568  eq_err.prov_errno,
569  fi_eq_strerror(eq, eq_err.prov_errno,
570  eq_err.err_data, NULL,
571  0));
572  } else if (rc != -EAGAIN)
573  /*
574  * For all other events, there is no error info available.
575  * Hence, all such events can be ignored.
576  */
577  M0_LOG(M0_ERROR, "Unexpected event tm=%p rc=%d", tm, rc);
578  } while (ret != -EAGAIN);
579  return 0;
580 }
581 
586 static void libfab_txep_event_check(struct m0_fab__ep *txep,
587  struct m0_fab__active_ep *aep,
588  struct m0_fab__tm *tm)
589 {
590  struct m0_fab__buf *fbp;
591  uint32_t event;
592  int rc;
593 
594  if (aep->aep_rx_state == FAB_CONNECTING) {
595  do {
596  rc = libfab_check_for_event(aep->aep_rx_res.frr_eq,
597  &event);
598  if (rc >= 0 && event == FI_CONNECTED) {
599  aep->aep_rx_state = FAB_CONNECTED;
600  if (txep == tm->ftm_pep)
601  txep->fep_connlink |=
602  FAB_CONNLINK_RXEP_RDY;
603  }
604  } while (rc != -EAGAIN);
605  }
606 
607  do {
608  rc = libfab_check_for_event(aep->aep_tx_res.ftr_eq, &event);
609  if (rc >= 0) {
610  if (event == FI_CONNECTED) {
611  aep->aep_tx_state = FAB_CONNECTED;
612  if (txep == tm->ftm_pep)
613  txep->fep_connlink |=
614  FAB_CONNLINK_TXEP_RDY;
615  else
616  txep->fep_connlink |=
617  FAB_CONNLINK_TXEP_RDY |
618  FAB_CONNLINK_RXEP_RDY;
619  } else if (event == FI_SHUTDOWN) {
620  /* Flush all events from rxep EQ. */
621  if (aep->aep_rx_res.frr_eq != NULL) {
622  while (libfab_check_for_event(
623  aep->aep_rx_res.frr_eq,
624  &event) != -EAGAIN);
625  }
626  /* Reset and reopen endpoint */
627  libfab_txep_init(aep, tm, txep);
628  }
629  } else if (rc == -ECONNREFUSED &&
630  aep->aep_tx_state == FAB_CONNECTING) {
631  libfab_txep_init(aep, tm, txep);
632  m0_tl_teardown(fab_sndbuf, &txep->fep_sndbuf, fbp) {
633  libfab_buf_done(fbp, rc, false);
634  }
635  }
636  } while (rc != -EAGAIN);
637  /* All other types of events can be ignored */
638 
639  if (txep->fep_connlink == FAB_CONNLINK_RDY_TO_SEND) {
640  libfab_pending_bufs_send(txep);
641  txep->fep_connlink = FAB_CONNLINK_PENDING_SEND_DONE;
642  }
643 
644  /*
645  * For version >= 1.12, the libfabric library does not return the error
646  * -111 (ECONNREFUSED) if the remote service has not yet started. Thus
647  * the connection request does not receive any reply and the endpoint
648  * keeps waiting in the FAB_CONNECTING state. To avoid this, the state
649  * of the endpoint is reset after a timeout of 5s to FAB_DISCONNECTED.
650  * Thus, when the next buffer is posted to the endpoint, it will again
651  * try to establish connection by sending out a new connection request.
652  */
653  if (aep->aep_tx_state == FAB_CONNECTING &&
654  m0_time_is_in_past(aep->aep_connecting_tmout)) {
655  M0_LOG(M0_DEBUG,"Reset Conn from %s to %s",
656  (char*)tm->ftm_pep->fep_name.nia_p,
657  (char*)txep->fep_name.nia_p);
658  libfab_txep_init(aep, tm, txep);
659  m0_tl_teardown(fab_sndbuf, &txep->fep_sndbuf, fbp) {
660  libfab_buf_done(fbp, -ECONNREFUSED, false);
661  }
662  }
663 }
664 
668 static void libfab_rxep_comp_read(struct fid_cq *cq, struct m0_fab__ep *ep,
669  struct m0_fab__tm *tm)
670 {
671  struct m0_fab__buf *fb = NULL;
672  uint32_t token[FAB_MAX_COMP_READ];
673  m0_bindex_t len[FAB_MAX_COMP_READ];
674  uint64_t data[FAB_MAX_COMP_READ];
675  int i;
676  int cnt;
677  uint32_t rem_token;
678 
679  if (cq != NULL) {
680  cnt = libfab_check_for_comp(cq, token, len, data);
681  for (i = 0; i < cnt; i++) {
682  fb = fab_bufhash_htable_lookup(
683  &tm->ftm_bufhash.bht_hash,
684  &token[i]);
685  if (fb != NULL) {
686  if (fb->fb_length == 0)
687  fb->fb_length = len[i];
688  fb->fb_ev_ep = ep;
689  libfab_buf_done(fb, 0, false);
690  }
691  if (data[i] != 0) {
692  rem_token = (uint32_t)data[i];
693  fb = fab_bufhash_htable_lookup(
694  &tm->ftm_bufhash.bht_hash,
695  &rem_token);
696  if (fb != NULL)
697  libfab_buf_done(fb, 0, false);
698  }
699  }
700  }
701 }
702 
706 static void libfab_txep_comp_read(struct fid_cq *cq, struct m0_fab__tm *tm)
707 {
708  struct m0_fab__active_ep *aep;
709  struct m0_fab__buf *fb = NULL;
710  uint32_t token[FAB_MAX_COMP_READ];
711  int i;
712  int cnt;
713 
714  cnt = libfab_check_for_comp(cq, token, NULL, NULL);
715  for (i = 0; i < cnt; i++) {
716  if (token[i] != 0)
717  fb = fab_bufhash_htable_lookup(
718  &tm->ftm_bufhash.bht_hash,
719  &token[i]);
720  else
721  fb = NULL;
722  if (fb != NULL) {
723  aep = libfab_aep_get(fb->fb_txctx);
724  if ((fb->fb_token & M0_NET_QT_NR) == M0_NET_QT_NR) {
725  fab_bufhash_htable_del(
726  &tm->ftm_bufhash.bht_hash, fb);
727  M0_ASSERT(aep->aep_bulk_cnt);
728  --aep->aep_bulk_cnt;
729  aep->aep_txq_full = false;
730  m0_free(fb);
731  } else {
732  if (M0_IN(fb->fb_nb->nb_qtype,
736  M0_ASSERT(aep->aep_bulk_cnt >=
737  fb->fb_wr_cnt);
738  aep->aep_bulk_cnt -= fb->fb_wr_cnt;
739  aep->aep_txq_full = false;
740  }
741  libfab_target_notify(fb);
742  libfab_buf_done(fb, 0, false);
743  }
744  }
745  }
746 }
747 
752 static void libfab_poller(struct m0_fab__tm *tm)
753 {
754  struct m0_net_end_point *net;
755  struct m0_fab__ev_ctx *ctx;
756  struct m0_fab__ep *xep;
757  struct m0_fab__active_ep *aep;
758  struct fid_cq *cq;
759  struct epoll_event ev;
760  int ev_cnt;
761  int ret;
762  int err;
763 
764  libfab_tm_event_post(tm, M0_NET_TM_STARTED);
765  while (tm->ftm_state != FAB_TM_SHUTDOWN) {
766  do {
767  ret = fi_trywait(tm->ftm_fab->fab_fab,
768  tm->ftm_fids.ftf_head,
769  tm->ftm_fids.ftf_cnt);
770  /*
771  * TBD : Add handling of other return values of
772  * fi_trywait() if it returns something other than
773  * -EAGAIN and 0. Also, observed that fi_trywait()
774  * returns -22(EINVAL) which is not mentioned in
775  * libfabric documentation, hence added it to the list
776  * of possible return values of fi_trywait().
777  */
778  if (!M0_IN(ret, (0, -EAGAIN, -EINVAL)))
779  M0_LOG(M0_ERROR, "Unexpected fi_trywait rc=%d",
780  ret);
781 
782  if (ret == 0) {
783  ret = epoll_wait(tm->ftm_epfd, &ev, 1,
784  FAB_WAIT_FD_TMOUT);
785  /*
786  * M0_ERR is omitted because we expect only one
787  * particular error, and this error gets
788  * handled by the loop.
789  */
790  err = ret < 0 ? -errno : 0;
791  if (!M0_IN(ret, (-1, 0, 1)))
793  "Unexpected epoll_wait rc=%d",
794  ret);
795  if (ret == -1 && err != -EINTR)
797  "Unexpected epoll_wait err=%d",
798  err);
799  ev_cnt = ret > 0 ? ret : 0;
800  } else {
801  ev_cnt = 0;
802  err = 0;
803  }
804  } while (err == -EINTR);
805 
806  while (1) {
807  m0_mutex_lock(&tm->ftm_endlock);
808  if (tm->ftm_state == FAB_TM_SHUTDOWN) {
809  m0_mutex_unlock(&tm->ftm_endlock);
810  return;
811  }
812 
813  ret = libfab_tm_trylock(tm);
814  m0_mutex_unlock(&tm->ftm_endlock);
815  if (ret == 0) {
816  /*
817  * Got tm lock.
818  * Let's continue processing events.
819  */
820  break;
821  }
822  }
823 
824  M0_ASSERT(libfab_tm_is_locked(tm) && libfab_tm_invariant(tm));
825 
826  /* Check the common queue of the transfer machine for events */
827  libfab_handle_connect_request_events(tm);
828  libfab_txep_comp_read(tm->ftm_tx_cq, tm);
829 
830  if (ev_cnt > 0) {
831  ctx = ev.data.ptr;
832  if (ctx->evctx_type != FAB_COMMON_Q_EVENT) {
833  /*
834  * Check the private queue of the
835  * endpoint for events.
836  */
837  xep = ctx->evctx_ep;
838  aep = libfab_aep_get(xep);
839  libfab_txep_event_check(xep, aep, tm);
840  cq = aep->aep_rx_res.frr_cq;
841  libfab_rxep_comp_read(cq, xep, tm);
842  }
843  }
844 
845  /*
846  * Process events on private queue of the endpoints in round-
847  * robin fashion.
848  */
849  net = m0_nep_tlist_pop(&tm->ftm_ntm->ntm_end_points);
850  M0_ASSERT(net != NULL);
851  m0_nep_tlist_add_tail(&tm->ftm_ntm->ntm_end_points, net);
852  xep = libfab_ep(net);
853  aep = libfab_aep_get(xep);
854  libfab_txep_event_check(xep, aep, tm);
855  cq = aep->aep_rx_res.frr_cq;
856  libfab_rxep_comp_read(cq, xep, tm);
857 
858  libfab_bufq_process(tm);
859  if (m0_time_is_in_past(tm->ftm_tmout_check))
860  libfab_tm_buf_timeout(tm);
861  libfab_tm_buf_done(tm);
862 
863  M0_ASSERT(libfab_tm_invariant(tm));
864  libfab_tm_unlock(tm);
865  }
866 }
867 
871 static inline struct m0_fab__ep *libfab_ep(struct m0_net_end_point *net)
872 {
873  return net != NULL ? net->nep_xprt_pvt : NULL;
874 }
875 
880 static bool libfab_ep_find_by_num(struct m0_net_ip_addr *addr,
881  struct m0_net_transfer_mc *ntm,
882  struct m0_fab__ep **ep)
883 {
884  struct m0_net_end_point *net;
885 
886  net = m0_tl_find(m0_nep, net, &ntm->ntm_end_points,
887  m0_net_ip_addr_eq(&(libfab_ep(net))->fep_name,
888  addr, true));
889 
890  *ep = net != NULL ? libfab_ep(net) : NULL;
891 
892  return net != NULL;
893 }
894 
899 static bool libfab_ep_find_by_str(const char *name,
900  struct m0_net_transfer_mc *ntm,
901  struct m0_fab__ep **ep)
902 {
903  struct m0_net_end_point *net;
904 
905  net = m0_tl_find(m0_nep, net, &ntm->ntm_end_points,
906  strcmp((libfab_ep(net))->fep_name.nia_p, name) == 0);
907 
908  *ep = net != NULL ? libfab_ep(net) : NULL;
909 
910  return net != NULL;
911 }
912 
922 static int libfab_ep_find(struct m0_net_transfer_mc *tm, const char *name,
923  struct m0_net_ip_params *addr,
924  struct m0_net_end_point **epp)
925 {
926  struct m0_fab__ep *ep;
927  struct m0_fab__active_ep *aep;
928  struct m0_net_ip_addr net_ip = {};
929  struct m0_fab__tm *ma;
930  char *wc = NULL;
931  int rc = 0;
932  bool found = false;
933 
934  if (addr != NULL)
935  net_ip.nia_n = *addr;
936  M0_ASSERT(libfab_tm_is_locked(tm->ntm_xprt_private));
937 
938  if (likely(addr != NULL))
939  found = libfab_ep_find_by_num(&net_ip, tm, &ep);
940  else
941  found = libfab_ep_find_by_str(name, tm, &ep);
942 
943  if (!found) {
944  M0_ASSERT(name != NULL || addr != NULL);
945  if (name != NULL)
946  rc = libfab_ep_create(tm, name, addr, epp);
947  else {
948  m0_net_ip_print(&net_ip);
949  rc = libfab_ep_create(tm, net_ip.nia_p, addr, epp);
950  }
951  } else {
952  *epp = &ep->fep_nep;
953  if (name != NULL && addr != NULL) {
954  wc = strchr(name,'*');
955  /*
956  * In lnet format, the epname can contain a wildchar(*)
957  * which can be present instead of numeric tmid
958  */
959  if (wc != NULL &&
960  ep->fep_name.nia_n.nip_port !=
961  net_ip.nia_n.nip_port) {
962  ep->fep_name.nia_n.nip_ip_n.sn[0] =
963  net_ip.nia_n.nip_ip_n.sn[0];
964  ep->fep_name.nia_n.nip_port =
965  net_ip.nia_n.nip_port;
966  ep->fep_name.nia_n.nip_fmt_pvt.la.nla_tmid =
967  net_ip.nia_n.nip_fmt_pvt.la.nla_tmid;
968  libfab_ep_pton(&ep->fep_name,
969  &ep->fep_name_n);
970  aep = libfab_aep_get(ep);
971  ma = tm->ntm_xprt_private;
972  if (aep->aep_tx_state == FAB_CONNECTED)
973  rc = libfab_txep_init(aep, ma, ep);
974  }
975  }
976 
977  if (rc == 0)
978  libfab_ep_get(ep);
979 
980  }
981 
982  return M0_RC(rc);
983 }
984 
988 static int libfab_ep_create(struct m0_net_transfer_mc *tm, const char *name,
989  struct m0_net_ip_params *addr,
990  struct m0_net_end_point **epp)
991 {
992  struct m0_fab__tm *ma = tm->ntm_xprt_private;
993  struct m0_fab__ep *ep = NULL;
994  int rc;
995 
996  M0_ENTRY("name=%s", name);
997  M0_PRE(name != NULL);
998 
999  M0_ALLOC_PTR(ep);
1000  if (ep == NULL)
1001  return M0_ERR(-ENOMEM);
1002 
1003  M0_ALLOC_PTR(ep->fep_aep);
1004  if (ep->fep_aep == NULL) {
1005  m0_free(ep);
1006  return M0_ERR(-ENOMEM);
1007  }
1008 
1009  ep->fep_listen = NULL;
1010 
1011  rc = libfab_ep_addr_decode(ep, name);
1012  if (rc != 0) {
1013  libfab_aep_param_free(ep->fep_aep, ma);
1014  m0_free(ep);
1015  return M0_ERR(rc);
1016  }
1017 
1018  /*
1019  * Due to wildchar '*' as tmid in lnet format, we need to make sure that
1020  * tmid and port are correctly reconstructed.
1021  */
1022  if (addr != NULL && addr->nip_format == M0_NET_IP_LNET_FORMAT &&
1023  addr->nip_fmt_pvt.la.nla_autotm) {
1024  ep->fep_name.nia_n.nip_port = addr->nip_port;
1025  ep->fep_name.nia_n.nip_fmt_pvt.la.nla_tmid =
1026  addr->nip_fmt_pvt.la.nla_tmid;
1027  }
1028 
1029  rc = libfab_active_ep_create(ep, ma);
1030  if (rc != 0) {
1031  libfab_aep_param_free(ep->fep_aep, ma);
1032  m0_free(ep);
1033  return M0_ERR(rc);
1034  }
1035 
1036  fab_sndbuf_tlist_init(&ep->fep_sndbuf);
1037  *epp = &ep->fep_nep;
1038  return M0_RC(0);
1039 }
1040 
1044 static int libfab_tm_res_init(struct m0_fab__tm *tm)
1045 {
1046  struct m0_fab__fab *fab;
1047  struct m0_fab__passive_ep *pep;
1048  struct fi_cq_attr cq_attr = {};
1049  int rc = 0;
1050 
1051  M0_PRE(tm != NULL);
1052 
1053  pep = tm->ftm_pep->fep_listen;
1054  fab = tm->ftm_fab;
1055  /* Initialise completion queues for tx */
1056  cq_attr.wait_obj = FI_WAIT_FD;
1057  cq_attr.wait_cond = FI_CQ_COND_NONE;
1058  cq_attr.format = FI_CQ_FORMAT_DATA;
1059  cq_attr.size = FAB_MAX_TX_CQ_EV;
1060  rc = fi_cq_open(fab->fab_dom, &cq_attr, &tm->ftm_tx_cq, NULL);
1061  if (rc != 0)
1062  return M0_ERR(rc);
1063 
1064  /* Initialize and bind resources to tx ep */
1065  tm->ftm_txcq_ctx.evctx_type = FAB_COMMON_Q_EVENT;
1066  tm->ftm_txcq_ctx.evctx_ep = NULL;
1067  tm->ftm_txcq_ctx.evctx_dbg = "txep cq";
1068  rc = libfab_waitfd_bind(&tm->ftm_tx_cq->fid, tm, &tm->ftm_txcq_ctx);
1069  if (rc != 0)
1070  return M0_ERR(rc);
1071 
1072  return M0_RC(libfab_txep_init(pep->pep_aep, tm, tm->ftm_pep));
1073 }
1074 
1079 static int libfab_ep_txres_init(struct m0_fab__active_ep *aep,
1080  struct m0_fab__tm *tm, void *ctx)
1081 {
1082  struct fi_eq_attr eq_attr = {};
1083  struct m0_fab__fab *fab;
1084  int rc;
1085 
1086  fab = tm->ftm_fab;
1087 
1088  /* Bind the ep to tx completion queue */
1089  rc = fi_ep_bind(aep->aep_txep, &tm->ftm_tx_cq->fid,
1090  FI_TRANSMIT | FI_RECV | FI_SELECTIVE_COMPLETION);
1091  if (rc != 0)
1092  return M0_ERR(rc);
1093 
1094  /* Initialise and bind event queue */
1095  eq_attr.wait_obj = FI_WAIT_FD;
1096  eq_attr.size = FAB_MAX_AEP_EQ_EV;
1097  rc = fi_eq_open(fab->fab_fab, &eq_attr, &aep->aep_tx_res.ftr_eq, NULL);
1098  if (rc != 0)
1099  return M0_ERR(rc);
1100 
1101  aep->aep_tx_res.ftr_ctx.evctx_type = FAB_PRIVATE_Q_EVENT;
1102  aep->aep_tx_res.ftr_ctx.evctx_ep = ctx;
1103  aep->aep_tx_res.ftr_ctx.evctx_dbg = "txep eq";
1104  rc = libfab_waitfd_bind(&aep->aep_tx_res.ftr_eq->fid, tm,
1105  &aep->aep_tx_res.ftr_ctx);
1106  if (rc != 0)
1107  return M0_ERR(rc);
1108 
1109  rc = fi_ep_bind(aep->aep_txep, &aep->aep_tx_res.ftr_eq->fid, 0);
1110 
1111  return rc != 0 ? M0_ERR(rc) : M0_RC(0);
1112 }
1113 
1118 static int libfab_ep_rxres_init(struct m0_fab__active_ep *aep,
1119  struct m0_fab__tm *tm, void *ctx)
1120 {
1121  struct fi_cq_attr cq_attr = {};
1122  struct fi_eq_attr eq_attr = {};
1123  struct m0_fab__fab *fab;
1124  int rc;
1125 
1126  fab = tm->ftm_fab;
1127 
1128  /* Initialise and bind completion queues for rx */
1129  cq_attr.wait_obj = FI_WAIT_FD;
1130  cq_attr.wait_cond = FI_CQ_COND_NONE;
1131  cq_attr.format = FI_CQ_FORMAT_DATA;
1132  cq_attr.size = FAB_MAX_RX_CQ_EV;
1133  rc = fi_cq_open(fab->fab_dom, &cq_attr, &aep->aep_rx_res.frr_cq, NULL);
1134  if (rc != 0)
1135  return M0_ERR(rc);
1136 
1137  aep->aep_rx_res.frr_cq_ctx.evctx_type = FAB_PRIVATE_Q_EVENT;
1138  aep->aep_rx_res.frr_cq_ctx.evctx_ep = ctx;
1139  aep->aep_rx_res.frr_cq_ctx.evctx_dbg = "rxep cq";
1140  rc = libfab_waitfd_bind(&aep->aep_rx_res.frr_cq->fid, tm,
1141  &aep->aep_rx_res.frr_cq_ctx);
1142  if (rc != 0)
1143  return M0_ERR(rc);
1144 
1145  rc = fi_ep_bind(aep->aep_rxep, &tm->ftm_tx_cq->fid,
1146  FI_TRANSMIT | FI_SELECTIVE_COMPLETION) ? :
1147  fi_ep_bind(aep->aep_rxep, &aep->aep_rx_res.frr_cq->fid, FI_RECV);
1148  if (rc != 0)
1149  return M0_ERR(rc);
1150 
1151  /* Initialise and bind event queue */
1152  eq_attr.wait_obj = FI_WAIT_FD;
1153  eq_attr.size = FAB_MAX_AEP_EQ_EV;
1154  aep->aep_rx_res.frr_eq_ctx.evctx_type = FAB_PRIVATE_Q_EVENT;
1155  aep->aep_rx_res.frr_eq_ctx.evctx_ep = ctx;
1156  aep->aep_rx_res.frr_eq_ctx.evctx_dbg = "rxep eq";
1157  rc = fi_eq_open(fab->fab_fab, &eq_attr, &aep->aep_rx_res.frr_eq,
1158  NULL) ? :
1159  libfab_waitfd_bind(&aep->aep_rx_res.frr_eq->fid, tm,
1160  &aep->aep_rx_res.frr_eq_ctx) ? :
1161  fi_ep_bind(aep->aep_rxep, &aep->aep_rx_res.frr_eq->fid, 0) ? :
1162  fi_ep_bind(aep->aep_rxep, &tm->ftm_rctx->fid, 0);
1163 
1164  return rc != 0 ? M0_ERR(rc) : M0_RC(0);
1165 }
1166 
1171 static int libfab_pep_res_init(struct m0_fab__passive_ep *pep,
1172  struct m0_fab__tm *tm, void *ctx)
1173 {
1174  struct fi_eq_attr eq_attr = {};
1175  int rc = 0;
1176 
1177  /* Initialise and bind event queue */
1178  eq_attr.wait_obj = FI_WAIT_FD;
1179  eq_attr.size = FAB_MAX_PEP_EQ_EV;
1180  rc = fi_eq_open(tm->ftm_fab->fab_fab, &eq_attr, &pep->pep_res.fpr_eq,
1181  NULL);
1182  if (rc != 0)
1183  return M0_ERR(rc);
1184 
1185  pep->pep_res.fpr_ctx.evctx_type = FAB_COMMON_Q_EVENT;
1186  pep->pep_res.fpr_ctx.evctx_ep = ctx;
1187  pep->pep_res.fpr_ctx.evctx_dbg = "pep eq";
1188  rc = libfab_waitfd_bind(&pep->pep_res.fpr_eq->fid, tm,
1189  &pep->pep_res.fpr_ctx) ? :
1190  fi_pep_bind(pep->pep_pep, &pep->pep_res.fpr_eq->fid, 0);
1191 
1192  return rc != 0 ? M0_ERR(rc) : M0_RC(0);
1193 }
1194 
1198 static int libfab_conn_accept(struct m0_fab__ep *ep, struct m0_fab__tm *tm,
1199  struct fi_info *info)
1200 {
1201  struct m0_fab__active_ep *aep;
1202  struct fid_domain *dp;
1203  int rc;
1204 
1205  M0_ENTRY("from ep=%s -> tm=%s", (char*)ep->fep_name.nia_p,
1206  (char*)tm->ftm_pep->fep_name.nia_p);
1207 
1208  aep = libfab_aep_get(ep);
1209  dp = tm->ftm_fab->fab_dom;
1210 
1211  if (aep->aep_rxep != NULL) {
1212  rc = fi_close(&aep->aep_rxep->fid);
1213  if (rc != 0)
1214  M0_LOG(M0_ERROR, "ep close = %d",rc);
1215  libfab_ep_rxres_free(&aep->aep_rx_res, tm);
1216  }
1217  aep->aep_rx_state = FAB_NOT_CONNECTED;
1218  ep->fep_connlink = FAB_CONNLINK_DOWN;
1219 
1220  rc = fi_endpoint(dp, info, &aep->aep_rxep, NULL) ? :
1221  libfab_ep_rxres_init(aep, tm, ep) ? :
1222  fi_enable(aep->aep_rxep) ? :
1223  fi_accept(aep->aep_rxep, NULL, 0);
1224 
1225  if (rc != 0) {
1226  libfab_aep_param_free(aep, tm);
1227  return M0_ERR(rc);
1228  }
1229 
1230  aep->aep_rx_state = FAB_CONNECTING;
1231 
1232  return M0_RC(0);
1233 }
1234 
1238 static int libfab_active_ep_create(struct m0_fab__ep *ep, struct m0_fab__tm *tm)
1239 {
1240  struct m0_net_end_point *net;
1241  struct m0_fab__active_ep *aep;
1242  int rc;
1243 
1244  M0_ASSERT(libfab_tm_is_locked(tm));
1245  aep = ep->fep_aep;
1246  rc = libfab_txep_init(aep, tm, ep);
1247  if (rc != 0) {
1248  libfab_aep_param_free(aep, tm);
1249  return M0_ERR(rc);
1250  }
1251 
1252  net = &ep->fep_nep;
1253  net->nep_xprt_pvt = ep;
1254  net->nep_tm = tm->ftm_ntm;
1255  libfab_ep_pton(&ep->fep_name, &ep->fep_name_n);
1256  m0_nep_tlink_init_at_tail(net, &tm->ftm_ntm->ntm_end_points);
1257  net->nep_addr = (const char *)(&ep->fep_name.nia_p);
1258  m0_ref_init(&ep->fep_nep.nep_ref, 1, &libfab_ep_release);
1259 
1260  return M0_RC(0);
1261 }
1262 
1267 static int libfab_passive_ep_create(struct m0_fab__ep *ep,
1268  struct m0_fab__tm *tm)
1269 {
1270  struct m0_fab__passive_ep *pep;
1271  struct fi_info *hints;
1272  struct fi_info *fi;
1273  enum m0_fab__prov_type idx;
1274  int rc;
1275  int rx_size;
1276  char addr[LIBFAB_ADDR_LEN_MAX] = {};
1277  char port[LIBFAB_PORT_LEN_MAX] = {};
1278 
1279  M0_ENTRY("ep=%s nip_ip_n=[0x%" PRIx64 ",0x%" PRIx64 "] port=%d",
1280  (char*)ep->fep_name.nia_p,
1281  ep->fep_name.nia_n.nip_ip_n.ln[0],
1282  ep->fep_name.nia_n.nip_ip_n.ln[1],
1283  (int)ep->fep_name.nia_n.nip_port);
1284 
1285  M0_ALLOC_PTR(ep->fep_listen);
1286  if (ep->fep_listen == NULL)
1287  return M0_ERR(-ENOMEM);
1288  M0_ALLOC_PTR(ep->fep_listen->pep_aep);
1289  if (ep->fep_listen->pep_aep == NULL) {
1290  m0_free(ep->fep_listen);
1291  return M0_ERR(-ENOMEM);
1292  }
1293 
1294  pep = ep->fep_listen;
1295  ep->fep_listen->pep_aep->aep_rxep = NULL;
1296  ep->fep_listen->pep_aep->aep_txep = NULL;
1297 
1298  libfab_straddr_gen(&ep->fep_name.nia_n, addr);
1299  snprintf(port, ARRAY_SIZE(port), "%d", ep->fep_name.nia_n.nip_port);
1300 
1301  hints = fi_allocinfo();
1302  if (hints == NULL) {
1303  m0_free(pep->pep_aep);
1304  m0_free(pep);
1305  return M0_ERR(-ENOMEM);
1306  }
1307 
1308  hints->ep_attr->type = FI_EP_MSG;
1309  hints->caps = FI_MSG | FI_RMA;
1310  hints->mode |= FI_RX_CQ_DATA;
1311  hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED |
1312  FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
1313  hints->domain_attr->cq_data_size = 4;
1314 
1315  for (idx = 0; idx < FAB_FABRIC_PROV_MAX; idx++) {
1316  hints->fabric_attr->prov_name = (char *)providers[idx];
1317  rc = fi_getinfo(LIBFAB_VERSION, addr, port, FI_SOURCE, hints,
1318  &fi);
1319  if (rc == 0)
1320  break;
1321  }
1322 
1323  if (rc != 0)
1324  return M0_ERR(rc);
1325 
1326  M0_ASSERT(idx < FAB_FABRIC_PROV_MAX);
1327 
1328  M0_LOG(M0_DEBUG, "tm=%s Provider selected %s",
1329  (char*)ep->fep_name.nia_p, fi->fabric_attr->prov_name);
1330  hints->fabric_attr->prov_name = NULL;
1331  tm->ftm_fab->fab_fi = fi;
1332  tm->ftm_fab->fab_prov = idx;
1333  tm->ftm_fab->fab_max_iov = min32u(fi->tx_attr->iov_limit,
1334  fi->tx_attr->rma_iov_limit);
1335  fi_freeinfo(hints);
1336 
1337  M0_ALLOC_ARR(tm->ftm_rem_iov, tm->ftm_fab->fab_max_iov);
1338  M0_ALLOC_ARR(tm->ftm_loc_iov, tm->ftm_fab->fab_max_iov);
1339  if (tm->ftm_rem_iov == NULL || tm->ftm_loc_iov == NULL) {
1340  m0_free(tm->ftm_rem_iov);
1341  m0_free(tm->ftm_loc_iov);
1342  return M0_ERR(-ENOMEM);
1343  }
1344 
1345  rc = fi_fabric(tm->ftm_fab->fab_fi->fabric_attr, &tm->ftm_fab->fab_fab,
1346  NULL) ? :
1347  libfab_waitfd_init(tm) ? :
1348  fi_passive_ep(tm->ftm_fab->fab_fab, tm->ftm_fab->fab_fi,
1349  &pep->pep_pep, NULL) ? :
1350  libfab_pep_res_init(pep, tm, ep) ? :
1351  fi_listen(pep->pep_pep) ? :
1352  fi_domain(tm->ftm_fab->fab_fab, tm->ftm_fab->fab_fi,
1353  &tm->ftm_fab->fab_dom, NULL);
1354 
1355  if (rc != 0) {
1356  libfab_pep_param_free(pep, tm);
1357  return M0_ERR(rc);
1358  }
1359 
1360  rx_size = tm->ftm_fab->fab_fi->rx_attr->size;
1361  tm->ftm_fab->fab_fi->rx_attr->size = FAB_MAX_SRX_SIZE;
1362  rc = fi_srx_context(tm->ftm_fab->fab_dom, tm->ftm_fab->fab_fi->rx_attr,
1363  &tm->ftm_rctx, NULL);
1364  tm->ftm_fab->fab_fi->rx_attr->size = rx_size;
1365  if (rc != 0) {
1366  M0_LOG(M0_ERROR," \n fi_srx_context = %d \n ", rc);
1367  libfab_pep_param_free(pep, tm);
1368  return M0_ERR(rc);
1369  }
1370 
1371  rc = libfab_tm_res_init(tm);
1372  if (rc != 0) {
1373  M0_LOG(M0_ERROR," \n libfab_tm_res_init = %d \n ", rc);
1374  libfab_pep_param_free(pep, tm);
1375  return M0_ERR(rc);
1376  }
1377 
1378  fab_sndbuf_tlist_init(&ep->fep_sndbuf);
1379  m0_ref_init(&tm->ftm_pep->fep_nep.nep_ref, 1, &libfab_ep_release);
1380  libfab_ep_get(tm->ftm_pep);
1381 
1382  return M0_RC(0);
1383 }
1384 
1388 static int libfab_pep_res_free(struct m0_fab__pep_res *pep_res,
1389  struct m0_fab__tm *tm)
1390 {
1391  int rc = 0;
1392 
1393  if (pep_res->fpr_eq != NULL) {
1394  rc = libfab_waitfd_unbind(&pep_res->fpr_eq->fid, tm,
1395  &pep_res->fpr_ctx);
1396  if (rc != 0)
1397  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1398  rc = fi_close(&pep_res->fpr_eq->fid);
1399  if (rc != 0)
1400  M0_LOG(M0_ERROR, "fpr_eq fi_close ret=%d fid=%d",
1401  rc, (int)pep_res->fpr_eq->fid.fclass);
1402  pep_res->fpr_eq = NULL;
1403  }
1404 
1405  return M0_RC(rc);
1406 }
1407 
1411 static int libfab_ep_txres_free(struct m0_fab__tx_res *tx_res,
1412  struct m0_fab__tm *tm)
1413 {
1414  int rc = 0;
1415 
1416  if (tx_res->ftr_eq != NULL) {
1417  rc = libfab_waitfd_unbind(&tx_res->ftr_eq->fid, tm,
1418  &tx_res->ftr_ctx);
1419  if (rc != 0)
1420  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1421  rc = fi_close(&tx_res->ftr_eq->fid);
1422  if (rc != 0)
1423  M0_LOG(M0_ERROR, "ftr_eq fi_close ret=%d fid=%d",
1424  rc, (int)tx_res->ftr_eq->fid.fclass);
1425  tx_res->ftr_eq = NULL;
1426  }
1427 
1428  return M0_RC(rc);
1429 }
1430 
1434 static int libfab_ep_rxres_free(struct m0_fab__rx_res *rx_res,
1435  struct m0_fab__tm *tm)
1436 {
1437  int rc = 0;
1438 
1439  if (rx_res->frr_eq != NULL) {
1440  rc = libfab_waitfd_unbind(&rx_res->frr_eq->fid, tm,
1441  &rx_res->frr_eq_ctx);
1442  if (rc != 0)
1443  M0_LOG(M0_ERROR,"epoll_ctl_del failed %d", rc);
1444  rc = fi_close(&rx_res->frr_eq->fid);
1445  if (rc != 0)
1446  M0_LOG(M0_ERROR, "frr_eq fi_close ret=%d fid=%d",
1447  rc, (int)rx_res->frr_eq->fid.fclass);
1448  rx_res->frr_eq = NULL;
1449  }
1450 
1451  if (rx_res->frr_cq != NULL) {
1452  rc = libfab_waitfd_unbind(&rx_res->frr_cq->fid, tm,
1453  &rx_res->frr_cq_ctx);
1454  if (rc != 0)
1455  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1456  rc = fi_close(&rx_res->frr_cq->fid);
1457  if (rc != 0)
1458  M0_LOG(M0_ERROR, "frr_cq fi_close ret=%d fid=%d",
1459  rc, (int)rx_res->frr_cq->fid.fclass);
1460  rx_res->frr_cq = NULL;
1461  }
1462 
1463  return M0_RC(rc);
1464 }
1465 
1469 static int libfab_aep_param_free(struct m0_fab__active_ep *aep,
1470  struct m0_fab__tm *tm)
1471 {
1472  int rc = 0;
1473 
1474  if (aep == NULL)
1475  return M0_RC(0);
1476  if (aep->aep_txep != NULL) {
1477  rc = fi_close(&aep->aep_txep->fid);
1478  if (rc != 0)
1479  M0_LOG(M0_ERROR, "aep_txep fi_close ret=%d fid=%d",
1480  rc, (int)aep->aep_txep->fid.fclass);
1481  aep->aep_txep = NULL;
1482  }
1483 
1484  if (aep->aep_rxep != NULL) {
1485  rc = fi_close(&aep->aep_rxep->fid);
1486  if (rc != 0)
1487  M0_LOG(M0_ERROR, "aep_rxep fi_close ret=%d fid=%d",
1488  rc, (int)aep->aep_rxep->fid.fclass);
1489  aep->aep_rxep = NULL;
1490  }
1491 
1492  rc = libfab_ep_txres_free(&aep->aep_tx_res, tm);
1493  if (rc != 0)
1494  M0_LOG(M0_ERROR, "ep_txres_free failed %d", rc);
1495 
1496  rc = libfab_ep_rxres_free(&aep->aep_rx_res, tm);
1497  if (rc != 0)
1498  M0_LOG(M0_ERROR, "ep_rxres_free failed %d", rc);
1499 
1500  m0_free(aep);
1501 
1502  return M0_RC(rc);
1503 }
1504 
1508 static int libfab_pep_param_free(struct m0_fab__passive_ep *pep,
1509  struct m0_fab__tm *tm)
1510 {
1511  int rc = 0;
1512 
1513  if (pep == NULL)
1514  return M0_RC(0);
1515 
1516  if (pep->pep_pep != NULL) {
1517  rc = fi_close(&pep->pep_pep->fid);
1518  if (rc != 0)
1519  M0_LOG(M0_ERROR, "fep_pep fi_close ret=%d fid=%d",
1520  rc, (int)pep->pep_pep->fid.fclass);
1521  pep->pep_pep = NULL;
1522  }
1523 
1524  rc = libfab_aep_param_free(pep->pep_aep, tm);
1525  if (rc != 0)
1526  M0_LOG(M0_ERROR, "aep_param_free failed %d", rc);
1527 
1528  rc = libfab_pep_res_free(&pep->pep_res, tm);
1529  if (rc != 0)
1530  M0_LOG(M0_ERROR, "pep_res_free failed %d", rc);
1531 
1532  m0_free(pep);
1533 
1534  return M0_RC(rc);
1535 }
1536 
1540 static int libfab_ep_param_free(struct m0_fab__ep *ep, struct m0_fab__tm *tm)
1541 {
1542  int rc;
1543 
1544  if (ep == NULL)
1545  return M0_RC(0);
1546 
1547  rc = libfab_pep_param_free(ep->fep_listen, tm) ? :
1548  libfab_aep_param_free(ep->fep_aep, tm);
1549 
1550  if (rc != 0)
1551  return M0_ERR(rc);
1552 
1553  M0_SET0(&ep->fep_name);
1554 
1555  m0_free(ep);
1556  return M0_RC(0);
1557 }
1558 
1562 static int libfab_tm_param_free(struct m0_fab__tm *tm)
1563 {
1564  struct m0_fab__bulk_op *op;
1565  struct m0_net_end_point *net;
1566  struct m0_fab__ep *xep;
1567  struct m0_fab__buf *fbp;
1568  int rc;
1569 
1570  if (tm == NULL)
1571  return M0_RC(0);
1572 
1573  if (tm->ftm_poller.t_func != NULL) {
1574  m0_thread_join(&tm->ftm_poller);
1575  m0_thread_fini(&tm->ftm_poller);
1576  }
1577 
1578  M0_ASSERT(libfab_tm_is_locked(tm));
1579  m0_tl_teardown(m0_nep, &tm->ftm_ntm->ntm_end_points, net) {
1580  xep = libfab_ep(net);
1581  rc = libfab_ep_param_free(xep, tm);
1582  }
1583  M0_ASSERT(m0_nep_tlist_is_empty(&tm->ftm_ntm->ntm_end_points));
1584  tm->ftm_ntm->ntm_ep = NULL;
1585 
1586  if (tm->ftm_rctx != NULL) {
1587  rc = fi_close(&tm->ftm_rctx->fid);
1588  if (rc != 0)
1589  M0_LOG(M0_ERROR, "ftm_rctx fi_close ret=%d fid=%d",
1590  rc, (int)tm->ftm_rctx->fid.fclass);
1591  tm->ftm_rctx = NULL;
1592  }
1593 
1594  if (tm->ftm_tx_cq != NULL) {
1595  rc = libfab_waitfd_unbind(&tm->ftm_tx_cq->fid, tm,
1596  &tm->ftm_txcq_ctx);
1597  if (rc != 0)
1598  M0_LOG(M0_ERROR, "epoll_ctl_del failed %d", rc);
1599  rc = fi_close(&tm->ftm_tx_cq->fid);
1600  if (rc != 0)
1601  M0_LOG(M0_ERROR, "tx_cq fi_close ret=%d fid=%d",
1602  rc, (int)tm->ftm_tx_cq->fid.fclass);
1603  tm->ftm_tx_cq = NULL;
1604  }
1605 
1606  close(tm->ftm_epfd);
1607  m0_free(tm->ftm_fids.ftf_head);
1608  m0_free(tm->ftm_fids.ftf_ctx);
1609  m0_free(tm->ftm_rem_iov);
1610  m0_free(tm->ftm_loc_iov);
1611 
1612  m0_htable_for(fab_bufhash, fbp, &tm->ftm_bufhash.bht_hash) {
1613  fab_bufhash_htable_del(&tm->ftm_bufhash.bht_hash, fbp);
1614  } m0_htable_endfor;
1615  fab_bufhash_htable_fini(&tm->ftm_bufhash.bht_hash);
1616 
1617  m0_tl_teardown(fab_bulk, &tm->ftm_bulk, op) {
1618  m0_free(op);
1619  }
1620  fab_bulk_tlist_fini(&tm->ftm_bulk);
1621 
1622  return M0_RC(0);
1623 }
1624 
1629 static int libfab_waitfd_init(struct m0_fab__tm *tm)
1630 {
1631  M0_PRE(tm->ftm_epfd == -1);
1632 
1633  tm->ftm_epfd = epoll_create(1);
1634  if (tm->ftm_epfd < 0)
1635  return M0_ERR(-errno);
1636 
1637  return M0_RC(0);
1638 }
1639 
1643 static inline struct m0_fab__tm *libfab_buf_tm(struct m0_fab__buf *buf)
1644 {
1645  return buf->fb_nb->nb_tm->ntm_xprt_private;
1646 }
1647 
1651 static inline struct m0_fab__tm *libfab_buf_ma(struct m0_net_buffer *buf)
1652 {
1653  return buf->nb_tm != NULL ? buf->nb_tm->ntm_xprt_private : NULL;
1654 }
1655 
1659 static void libfab_buf_fini(struct m0_fab__buf *buf)
1660 {
1661  struct m0_fab__buf *fbp;
1662 
1663  M0_ENTRY("fb=%p q=%d rc=%d", buf, buf->fb_nb->nb_qtype, buf->fb_status);
1664 
1665  libfab_buf_invariant(buf);
1666 
1667  fab_buf_tlink_fini(buf);
1668  if (buf->fb_ev_ep != NULL)
1669  buf->fb_ev_ep = NULL;
1670  if (buf->fb_bulk_op != NULL && fab_bulk_tlink_is_in(buf->fb_bulk_op)) {
1671  fab_bulk_tlist_del(buf->fb_bulk_op);
1672  m0_free(buf->fb_bulk_op);
1673  buf->fb_bulk_op = NULL;
1674  }
1675 
1676  if (buf->fb_txctx != NULL) {
1677  fbp = m0_tl_find(fab_sndbuf, fbp, &buf->fb_txctx->fep_sndbuf,
1678  fbp == buf);
1679  if (fbp != NULL) {
1680  fab_sndbuf_tlist_del(fbp);
1681  M0_LOG(M0_DEBUG, "buf=%p tmout/del before queued", fbp);
1682  }
1683  buf->fb_txctx = NULL;
1684  }
1685  buf->fb_status = 0;
1686  buf->fb_length = 0;
1687  buf->fb_token = 0;
1688  /*
1689  * If the buffer operation has timedout or has been cancelled by
1690  * application, then the buffer has also been de-registered to prevent
1691  * data corruption due to any ongoing operations. In such cases, the
1692  * buffer state is reset to FAB_BUF_INITIALIZED so that it will be
1693  * re-registered when the application will try to re-use it.
1694  */
1695  buf->fb_state = (buf->fb_state == FAB_BUF_CANCELED ||
1696  buf->fb_state == FAB_BUF_TIMEDOUT) ?
1697  FAB_BUF_INITIALIZED : FAB_BUF_REGISTERED;
1698 
1699  M0_LEAVE("fb_state=%d", buf->fb_state);
1700 }
1701 
1705 static bool libfab_dom_invariant(const struct m0_net_domain *dom)
1706 {
1707  struct m0_fab__ndom *fnd = dom->nd_xprt_private;
1708  return _0C(!fab_fabs_tlist_is_empty(&fnd->fnd_fabrics)) &&
1709  _0C(dom->nd_xprt == &m0_net_libfab_xprt);
1710 }
1711 
1715 static bool libfab_tm_invariant(const struct m0_fab__tm *fab_tm)
1716 {
1717  return fab_tm != NULL &&
1718  fab_tm->ftm_ntm->ntm_xprt_private == fab_tm &&
1719  libfab_dom_invariant(fab_tm->ftm_ntm->ntm_dom);
1720 }
1721 
1725 static bool libfab_buf_invariant(const struct m0_fab__buf *buf)
1726 {
1727  const struct m0_net_buffer *nb = buf->fb_nb;
1728 
1729  return (nb->nb_flags == M0_NET_BUF_REGISTERED &&
1730  nb->nb_tm == NULL) ^ /* or (exclusively) ... */
1731  /* it is queued to a machine. */
1733  _0C(nb->nb_tm != NULL) &&
1735 }
1736 
1740 static void libfab_buf_complete(struct m0_fab__buf *buf)
1741 {
1742  struct m0_fab__tm *ma = libfab_buf_tm(buf);
1743  struct m0_net_buffer *nb = buf->fb_nb;
1744  struct m0_net_buffer_event ev = {
1745  .nbe_buffer = nb,
1746  .nbe_status = buf->fb_status,
1747  .nbe_time = m0_time_now()
1748  };
1749 
1750  M0_ENTRY("fb=%p nb=%p q=%d rc=%d", buf, nb, buf->fb_nb->nb_qtype,
1751  buf->fb_status);
1752  if (M0_IN(nb->nb_qtype, (M0_NET_QT_MSG_RECV,
1755  ev.nbe_length = buf->fb_length;
1756  }
1757 
1758  if (nb->nb_qtype == M0_NET_QT_MSG_RECV) {
1759  if (ev.nbe_status == 0 && buf->fb_ev_ep != NULL) {
1760  ev.nbe_ep = &buf->fb_ev_ep->fep_nep;
1761  libfab_ep_get(buf->fb_ev_ep);
1762  }
1763  }
1764  ma->ftm_ntm->ntm_callback_counter++;
1765 
1766  fab_bufhash_htable_del(&ma->ftm_bufhash.bht_hash, buf);
1767  libfab_buf_fini(buf);
1768  M0_ASSERT(libfab_tm_invariant(ma));
1769  libfab_tm_evpost_lock(ma);
1770  libfab_tm_unlock(ma);
1772  libfab_tm_lock(ma);
1773  libfab_tm_evpost_unlock(ma);
1774  M0_ASSERT(libfab_tm_invariant(ma));
1775  M0_ASSERT(M0_IN(ma->ftm_ntm->ntm_state, (M0_NET_TM_STARTED,
1776  M0_NET_TM_STOPPING)));
1777  ma->ftm_ntm->ntm_callback_counter--;
1778 }
1779 
1785 static int libfab_dummy_msg_rcv_chk(struct m0_fab__buf *fbp)
1786 {
1787  struct m0_fab__tm *ma = libfab_buf_tm(fbp);
1788  struct m0_net_buffer *nb = fbp->fb_nb;
1789  struct m0_fab__buf *pas_buf;
1790  struct iovec iv;
1791  uint32_t *ptr;
1792  uint32_t token;
1793  int ret = -1;
1794 
1795  if (fbp->fb_length == (sizeof(uint32_t) * 2)) {
1796  ptr = (uint32_t *)nb->nb_buffer.ov_buf[0];
1797  if (*ptr == FAB_DUMMY_DATA) {
1798  ptr++;
1799  token = *ptr;
1800  pas_buf = fab_bufhash_htable_lookup(
1801  &ma->ftm_bufhash.bht_hash,
1802  &token);
1803  if (pas_buf != NULL)
1804  libfab_buf_complete(pas_buf);
1805 
1806  /*
1807  * Repost this buffer to the receive
1808  * queue without generating a callback
1809  * as it contains only dummy data
1810  */
1811  fbp->fb_length = nb->nb_length;
1812  iv.iov_base = nb->nb_buffer.ov_buf[0];
1813  iv.iov_len = nb->nb_buffer.ov_vec.v_count[0];
1814  M0_ASSERT(fi_recvv(ma->ftm_rctx, &iv,
1815  fbp->fb_mr.bm_desc, 1, 0,
1816  U32_TO_VPTR(fbp->fb_token)) == 0);
1817  ret = 0;
1818  }
1819  }
1820 
1821  return ret;
1822 }
1823 
1827 static void libfab_buf_done(struct m0_fab__buf *buf, int rc, bool add_to_list)
1828 {
1829  struct m0_fab__tm *ma = libfab_buf_tm(buf);
1830  struct m0_net_buffer *nb = buf->fb_nb;
1831 
1832  M0_ENTRY("fb=%p nb=%p q=%d len=%d rc=%d", buf, nb, nb->nb_qtype,
1833  (int)buf->fb_length, rc);
1834  M0_PRE(libfab_tm_is_locked(ma));
1835  /*
1836  * Multiple libfab_buf_done() calls on the same buffer are possible if
1837  * the buffer is cancelled.
1838  */
1839  if (!fab_buf_tlink_is_in(buf)) {
1840  buf->fb_status = buf->fb_status == 0 ? rc : buf->fb_status;
1841  /* Try to finalise. */
1842  if (m0_thread_self() == &ma->ftm_poller && !add_to_list) {
1843  if (libfab_dummy_msg_rcv_chk(buf) != 0)
1844  libfab_buf_complete(buf);
1845  } else {
1846  /*
1847  * Otherwise, postpone finalisation to
1848  * libfab_tm_buf_done().
1849  */
1850  buf->fb_status = rc;
1851  fab_buf_tlist_add_tail(&ma->ftm_done, buf);
1852  }
1853  }
1854 }
1855 
1859 static inline void libfab_ep_get(struct m0_fab__ep *ep)
1860 {
1861  m0_ref_get(&ep->fep_nep.nep_ref);
1862 }
1863 
1870 static void libfab_ep_release(struct m0_ref *ref)
1871 {
1872  struct m0_net_end_point *nep;
1873  struct m0_fab__ep *ep;
1874  struct m0_fab__tm *tm;
1875 
1876  nep = container_of(ref, struct m0_net_end_point, nep_ref);
1877  ep = libfab_ep(nep);
1878  tm = nep->nep_tm->ntm_xprt_private;
1879  M0_LOG(M0_DEBUG, "free endpoint %s", (char*)ep->fep_name.nia_p);
1880 
1881  m0_nep_tlist_del(nep);
1882  libfab_ep_param_free(ep, tm);
1883 }
1884 
1888 static uint64_t libfab_mr_keygen(struct m0_fab__tm *tm)
1889 {
1890  uint64_t key = FAB_MR_KEY + tm->ftm_mr_key_idx;
1891  tm->ftm_mr_key_idx++;
1892  return key;
1893 }
1894 
1898 static int libfab_check_for_event(struct fid_eq *eq, uint32_t *ev)
1899 {
1900  struct fi_eq_cm_entry entry;
1901  struct fi_eq_err_entry err_entry;
1902  uint32_t event = 0;
1903  int rc;
1904 
1905  rc = fi_eq_read(eq, &event, &entry, sizeof(entry), 0);
1906  if (rc == -FI_EAVAIL) {
1907  memset(&err_entry, 0, sizeof(err_entry));
1908  fi_eq_readerr(eq, &err_entry, 0);
1909  rc = -err_entry.err;
1910  M0_LOG(M0_DEBUG, "Error = %d %s %s\n", rc,
1911  fi_strerror(err_entry.err),
1912  fi_eq_strerror(eq, err_entry.prov_errno,
1913  err_entry.err_data,NULL, 0));
1914  }
1915 
1916  *ev = rc < 0 ? 0xFF : event;
1917  return rc;
1918 }
1919 
1925 static int libfab_check_for_comp(struct fid_cq *cq, uint32_t *ctx,
1926  m0_bindex_t *len, uint64_t *data)
1927 {
1928  struct fi_cq_data_entry entry[FAB_MAX_COMP_READ];
1929  struct fi_cq_err_entry err_entry;
1930  uint64_t wr_cqdata = FI_REMOTE_WRITE | FI_REMOTE_CQ_DATA;
1931  int i;
1932  int ret;
1933 
1934  ret = fi_cq_read(cq, entry, FAB_MAX_COMP_READ);
1935  if (ret > 0) {
1936  for (i = 0; i < ret; i++) {
1937  ctx[i] = entry[i].op_context == NULL ? 0 :
1938  VPTR_TO_U32(entry[i].op_context);
1939  if (len != NULL)
1940  len[i] = entry[i].len;
1941  if (data != NULL)
1942  data[i] = ((entry[i].flags & wr_cqdata)) ?
1943  entry[i].data : 0;
1944  }
1945  } else if (ret != -FI_EAGAIN) {
1946  memset(&err_entry, 0, sizeof(err_entry));
1947  fi_cq_readerr(cq, &err_entry, 0);
1948  M0_LOG(M0_DEBUG, "Error = %d %s %s\n", ret,
1949  fi_strerror(err_entry.err),
1950  fi_cq_strerror(cq, err_entry.prov_errno,
1951  err_entry.err_data, NULL, 0));
1952  }
1953 
1954  return ret;
1955 }
1956 
1963 static void libfab_tm_fini(struct m0_net_transfer_mc *tm)
1964 {
1965  struct m0_fab__tm *ma = tm->ntm_xprt_private;
1966  int rc;
1967 
1968  if (ma->ftm_state != FAB_TM_SHUTDOWN) {
1969  while (1) {
1970  libfab_tm_lock(ma);
1971  if (m0_mutex_trylock(&ma->ftm_evpost) != 0) {
1972  libfab_tm_unlock(ma);
1973  } else
1974  break;
1975  }
1976  m0_mutex_unlock(&ma->ftm_evpost);
1977  m0_mutex_lock(&ma->ftm_endlock);
1978  ma->ftm_state = FAB_TM_SHUTDOWN;
1979  m0_mutex_unlock(&ma->ftm_endlock);
1980 
1981  libfab_tm_buf_done(ma);
1982 
1983  rc = libfab_tm_param_free(ma);
1984  if (rc != 0)
1985  M0_LOG(M0_ERROR, "libfab_tm_param_free ret=%d", rc);
1986 
1987  m0_mutex_fini(&ma->ftm_endlock);
1988  m0_mutex_fini(&ma->ftm_evpost);
1989  libfab_tm_unlock(ma);
1990  }
1991 
1992  M0_LEAVE();
1993 }
1994 
1998 static int libfab_bdesc_encode(struct m0_fab__buf *buf)
1999 {
2000  struct m0_fab__bdesc *fbd;
2001  struct fi_rma_iov *iov;
2002  struct m0_net_buf_desc *nbd = &buf->fb_nb->nb_desc;
2003  struct m0_net_buffer *nb = buf->fb_nb;
2004  struct m0_fab__tm *tm = libfab_buf_ma(nb);
2005  int seg_nr = nb->nb_buffer.ov_vec.v_nr;
2006  struct m0_fab__ndom *nd = nb->nb_dom->nd_xprt_private;
2007  int i;
2008  bool is_verbs = libfab_is_verbs(tm);
2009 
2010  M0_PRE(seg_nr <= nd->fnd_seg_nr);
2011 
2012  nbd->nbd_len = (sizeof(struct m0_fab__bdesc) +
2013  (sizeof(struct fi_rma_iov) * seg_nr));
2014  nbd->nbd_data = m0_alloc(nbd->nbd_len);
2015  if (nbd->nbd_data == NULL)
2016  return M0_RC(-ENOMEM);
2017 
2018  fbd = (struct m0_fab__bdesc *)nbd->nbd_data;
2019  fbd->fbd_netaddr = tm->ftm_pep->fep_name.nia_n;
2020  fbd->fbd_buftoken = buf->fb_token;
2021 
2022  fbd->fbd_iov_cnt = (uint32_t)seg_nr;
2023  iov = (struct fi_rma_iov *)(nbd->nbd_data +
2024  sizeof(struct m0_fab__bdesc));
2025 
2026  for (i = 0; i < seg_nr; i++) {
2027  iov[i].addr = is_verbs ? (uint64_t)nb->nb_buffer.ov_buf[i] : 0;
2028  iov[i].key = fi_mr_key(buf->fb_mr.bm_mr[i]);
2029  iov[i].len = nb->nb_buffer.ov_vec.v_count[i];
2030  }
2031 
2032  return M0_RC(0);
2033 }
2034 
2038 static void libfab_bdesc_decode(struct m0_fab__buf *fb,
2039  struct m0_net_ip_params *addr)
2040 {
2041  struct m0_net_buffer *nb = fb->fb_nb;
2042  struct m0_fab__ndom *ndom = nb->nb_dom->nd_xprt_private;
2043 
2044  fb->fb_rbd = (struct m0_fab__bdesc *)(nb->nb_desc.nbd_data);
2045  fb->fb_riov = (struct fi_rma_iov *)(nb->nb_desc.nbd_data +
2046  sizeof(struct m0_fab__bdesc));
2047  *addr = fb->fb_rbd->fbd_netaddr;
2048  M0_ASSERT(fb->fb_rbd->fbd_iov_cnt <= ndom->fnd_seg_nr);
2049 }
2050 
2054 static int libfab_buf_dom_reg(struct m0_net_buffer *nb, struct m0_fab__tm *tm)
2055 {
2056  struct m0_fab__buf *fbp;
2057  struct m0_fab__buf_mr *mr;
2058  struct m0_fab__ndom *ndom;
2059  struct fid_domain *dp;
2060  uint64_t key;
2061  uint32_t retry_cnt;
2062  int seg_nr;
2063  int i;
2064  int ret = 0;
2065 
2066  M0_PRE(nb != NULL && nb->nb_dom != NULL && tm != NULL);
2067  fbp = nb->nb_xprt_private;
2068  seg_nr = nb->nb_buffer.ov_vec.v_nr;
2069  ndom = nb->nb_dom->nd_xprt_private;
2070  dp = tm->ftm_fab->fab_dom;
2071 
2072  M0_ASSERT(fbp != NULL && dp != NULL && ndom != NULL);
2073  M0_ASSERT(seg_nr <= ndom->fnd_seg_nr);
2074 
2075  mr = &fbp->fb_mr;
2076  if (fbp->fb_dp == dp)
2077  return M0_RC(ret);
2078 
2079  if (fbp->fb_state == FAB_BUF_REGISTERED)
2080  M0_LOG(M0_ERROR,"Re-registration of buffer");
2081 
2082  for (i = 0; i < seg_nr; i++) {
2083  /*
2084  * Sometimes the requested key is not available and
2085  * hence try with some other key for registration
2086  */
2087  ret = -1;
2088  retry_cnt = 20;
2089 
2090  while (ret != 0 && retry_cnt > 0) {
2091  key = libfab_mr_keygen(tm);
2092  ret = fi_mr_reg(dp, nb->nb_buffer.ov_buf[i],
2093  nb->nb_buffer.ov_vec.v_count[i],
2094  FAB_MR_ACCESS, FAB_MR_OFFSET, key,
2095  FAB_MR_FLAG, &mr->bm_mr[i], NULL);
2096  --retry_cnt;
2097  }
2098 
2099  if (ret != 0) {
2100  M0_LOG(M0_ERROR, "fi_mr_reg failed %d key=0x%"PRIx64,
2101  ret, key);
2102  break;
2103  }
2104 
2105  mr->bm_desc[i] = fi_mr_desc(mr->bm_mr[i]);
2106  }
2107 
2108  if (ret == 0) {
2109  fbp->fb_dp = dp;
2110  fbp->fb_state = FAB_BUF_REGISTERED;
2111  }
2112 
2113  return M0_RC(ret);
2114 }
2115 
2120 static void libfab_pending_bufs_send(struct m0_fab__ep *ep)
2121 {
2122  struct m0_fab__active_ep *aep;
2123  struct m0_fab__buf *fbp;
2124  struct m0_net_buffer *nb = NULL;
2125  int ret = 0;
2126 
2127  aep = libfab_aep_get(ep);
2128  m0_tl_teardown(fab_sndbuf, &ep->fep_sndbuf, fbp) {
2129  nb = fbp->fb_nb;
2130  fbp->fb_txctx = ep;
2131  switch (nb->nb_qtype) {
2132  case M0_NET_QT_MSG_SEND:
2135  ret = libfab_txbuf_list_add(libfab_buf_ma(nb),
2136  fbp, aep);
2137  break;
2138  default:
2139  M0_ASSERT(0); /* Invalid queue type */
2140  break;
2141  }
2142  if (ret != 0)
2143  libfab_buf_done(fbp, ret, false);
2144  }
2145 
2146  if (nb != NULL)
2147  libfab_bufq_process(libfab_buf_ma(nb));
2148 }
2149 
2154 static int libfab_target_notify(struct m0_fab__buf *buf)
2155 {
2156  struct m0_fab__active_ep *aep;
2157  struct m0_fab__buf *fbp;
2158  struct m0_fab__tm *tm;
2159  struct iovec iv;
2160  struct fi_msg op_msg;
2161  int ret = 0;
2162 
2163  M0_PRE(buf != NULL && buf->fb_txctx != NULL);
2164  aep = libfab_aep_get(buf->fb_txctx);
2165  M0_ASSERT(aep != NULL);
2166 
2167  if (buf->fb_nb->nb_qtype == M0_NET_QT_ACTIVE_BULK_RECV &&
2168  aep->aep_tx_state == FAB_CONNECTED) {
2169  M0_ALLOC_PTR(fbp);
2170  if (fbp == NULL)
2171  return M0_ERR(-ENOMEM);
2172 
2173  fbp->fb_nb = NULL;
2174  fbp->fb_dummy[0] = FAB_DUMMY_DATA;
2175  fbp->fb_dummy[1] = buf->fb_rbd->fbd_buftoken;
2176  fbp->fb_txctx = buf->fb_txctx;
2177  tm = libfab_buf_tm(buf);
2178  fbp->fb_token = libfab_buf_token_get(tm, fbp);
2179  aep->aep_bulk_cnt++;
2180  m0_tlink_init(&fab_bufhash_tl, fbp);
2181  fab_bufhash_htable_add(&tm->ftm_bufhash.bht_hash, fbp);
2182 
2183  iv.iov_base = fbp->fb_dummy;
2184  iv.iov_len = sizeof(fbp->fb_dummy);
2185  op_msg.msg_iov = &iv;
2186  op_msg.desc = NULL;
2187  op_msg.iov_count = 1;
2188  op_msg.addr = 0;
2189  op_msg.context = U32_TO_VPTR(fbp->fb_token);
2190  op_msg.data = 0;
2191  fbp->fb_wr_cnt = 1;
2192  ret = fi_sendmsg(aep->aep_txep, &op_msg, FI_COMPLETION);
2193  if (ret != 0) {
2194  M0_LOG(M0_ERROR,"tgt notify fail %d opcnt=%d", ret,
2195  aep->aep_bulk_cnt);
2196  fab_bufhash_htable_del(&tm->ftm_bufhash.bht_hash, fbp);
2197  --aep->aep_bulk_cnt;
2198  m0_free(fbp);
2199  }
2200  }
2201 
2202  return M0_RC(ret);
2203 }
2204 
2208 static struct m0_fab__fab *libfab_newfab_init(struct m0_fab__ndom *fnd)
2209 {
2210  struct m0_fab__fab *fab = NULL;
2211 
2212  M0_ALLOC_PTR(fab);
2213  if (fab != NULL)
2214  fab_fabs_tlink_init_at_tail(fab, &fnd->fnd_fabrics);
2215  return fab;
2216 }
2217 
2222 static int libfab_dns_resolve_retry(struct m0_fab__ep *ep)
2223 {
2224  struct m0_net_ip_addr *en = &ep->fep_name;
2225  int rc = 0;
2226  enum m0_net_ip_format not_used;
2227  char *fqdn = en->nia_p;
2228  char ip[LIBFAB_ADDR_LEN_MAX] = {};
2229 
2230  /* Verify if ip addr is resolved and ip is valid */
2232  fqdn = strchr(fqdn, ':'); /* Skip '<inet/inet6>:' */
2233  fqdn = strchr(fqdn + 1, ':'); /* Skip '<tcp/verbs>:' */
2234  fqdn++;
2235 
2236  rc = m0_net_hostname_to_ip(fqdn, ip, &not_used);
2237  if (rc == 0) {
2238  inet_pton(en->nia_n.nip_fmt_pvt.ia.nia_family ==
2239  M0_NET_IP_AF_INET ? AF_INET : AF_INET6,
2240  ip, &en->nia_n.nip_ip_n.sn[0]);
2241  libfab_ep_pton(en, &ep->fep_name_n);
2242  M0_LOG(M0_DEBUG, "ip=%s port=%d fqdn=%s", (char *)ip,
2243  (int)en->nia_n.nip_port, (char *)fqdn);
2244  } else
2245  M0_LOG(M0_ERROR, "%s failed with err %d for %s",
2246  rc > 0 ? "gethostbyname()" : "hostname_to_ip()",
2247  rc, fqdn);
2248  }
2249 
2250  return M0_RC(rc);
2251 }
2252 
2257 static int libfab_conn_init(struct m0_fab__ep *ep, struct m0_fab__tm *ma,
2258  struct m0_fab__buf *fbp)
2259 {
2260  struct m0_fab__active_ep *aep;
2261  uint64_t dst;
2262  size_t cm_max_size = 0;
2263  size_t opt_size = sizeof(size_t);
2264  struct m0_fab__conn_data cd;
2265  int ret = 0;
2266 
2267  aep = libfab_aep_get(ep);
2268  if (aep->aep_tx_state == FAB_NOT_CONNECTED) {
2269  /*
2270  * Verify if destination addr is resolved and ip is valid.
2271  * If not resolved, try to resolve again.
2272  */
2273  libfab_dns_resolve_retry(ep);
2274  dst = ep->fep_name_n | 0x02;
2275  cd.fcd_addr = ma->ftm_pep->fep_name.nia_n;
2276 
2277  ret = fi_getopt(&aep->aep_txep->fid, FI_OPT_ENDPOINT,
2278  FI_OPT_CM_DATA_SIZE,
2279  &cm_max_size, &opt_size);
2280  M0_ASSERT(ret == 0 && sizeof(cd) < cm_max_size);
2281 
2282  ret = fi_connect(aep->aep_txep, &dst, &cd, sizeof(cd));
2283  if (ret == 0) {
2284  aep->aep_tx_state = FAB_CONNECTING;
2285  aep->aep_connecting_tmout = m0_time_from_now(
2286  FAB_CONNECTING_TMOUT, 0);
2287  } else
2288  M0_LOG(M0_DEBUG, "Conn req failed ret=%d dst=%"PRIx64,
2289  ret, dst);
2290  }
2291 
2292  if (ret == 0)
2293  fab_sndbuf_tlink_init_at_tail(fbp, &ep->fep_sndbuf);
2294 
2295  /*
2296  * If fi_connect immediately returns -ECONNREFUSED, that means the
2297  * the remote service has not yet started. In this case, set the buffer
2298  * status as -ECONNREFUSED and return the status as 0 so as to avoid
2299  * flooding the network with repeated retries by the RPC layer. The
2300  * buffer status will be automatically returned when the buf_done list
2301  * is processed.
2302  */
2303  if (ret == -ECONNREFUSED) {
2304  libfab_buf_done(fbp, -ECONNREFUSED, true);
2305  ret = 0;
2306  M0_LOG(M0_DEBUG, "Err=%d fb=%p nb=%p", fbp->fb_status, fbp,
2307  fbp->fb_nb);
2308  }
2309 
2310  return ret;
2311 }
2312 
2316 static int libfab_fab_ep_find(struct m0_fab__tm *tm, const char *name,
2317  struct m0_net_ip_params *addr,
2318  struct m0_fab__ep **ep)
2319 {
2320  struct m0_net_transfer_mc *ntm = tm->ftm_ntm;
2321  struct m0_net_end_point *net;
2322  int ret;
2323 
2324  ret = libfab_ep_find(ntm, name, addr, &net);
2325  if (ret == 0)
2326  *ep = libfab_ep(net);
2327 
2328  return M0_RC(ret);
2329 }
2330 
2334 static void libfab_ep_pton(struct m0_net_ip_addr *name, uint64_t *out)
2335 {
2336  uint32_t addr = name->nia_n.nip_ip_n.sn[0];
2337  uint32_t port = name->nia_n.nip_port;
2338 
2339  M0_ASSERT(port < 65536);
2340  port = htonl(port);
2341 
2342  *out = ((uint64_t)addr << 32) | port;
2343 }
2344 
2352 static int libfab_txep_init(struct m0_fab__active_ep *aep,
2353  struct m0_fab__tm *tm, void *ctx)
2354 {
2355  struct m0_fab__ep *ep = (struct m0_fab__ep *)ctx;
2356  struct m0_net_ip_addr *en = &ep->fep_name;
2357  struct m0_fab__fab *fab = tm->ftm_fab;
2358  struct fi_info *info;
2359  struct fi_info *hints = NULL;
2360  int rc;
2361  bool is_verbs = libfab_is_verbs(tm);
2362  char ip[LIBFAB_ADDR_LEN_MAX] = {};
2363  char port[LIBFAB_PORT_LEN_MAX] = {};
2364 
2365  if (aep->aep_txep != NULL) {
2366  rc = fi_close(&aep->aep_txep->fid);
2367  if (rc != 0)
2368  M0_LOG(M0_ERROR,"aep_txep close failed %d",rc);
2369 
2370  rc = libfab_ep_txres_free(&aep->aep_tx_res, tm);
2371  if (rc != 0)
2372  M0_LOG(M0_ERROR,"ep_txres_free failed %d",rc);
2373  }
2374  aep->aep_tx_state = FAB_NOT_CONNECTED;
2375  aep->aep_txq_full = false;
2376  ep->fep_connlink = FAB_CONNLINK_DOWN;
2377 
2378  if (is_verbs) {
2379  hints = fi_allocinfo();
2380  if (hints == NULL)
2381  return M0_ERR(-ENOMEM);
2382  hints->ep_attr->type = FI_EP_MSG;
2383  hints->caps = FI_MSG | FI_RMA;
2384 
2385  hints->mode |= FI_RX_CQ_DATA;
2386  hints->domain_attr->cq_data_size = 4;
2387  hints->domain_attr->mr_mode = FI_MR_LOCAL | FI_MR_ALLOCATED |
2388  FI_MR_PROV_KEY | FI_MR_VIRT_ADDR;
2389  hints->fabric_attr->prov_name =
2390  fab->fab_fi->fabric_attr->prov_name;
2391 
2392  libfab_straddr_gen(&en->nia_n, ip);
2393  snprintf(port, ARRAY_SIZE(port), "%d", en->nia_n.nip_port);
2394  rc = fi_getinfo(LIBFAB_VERSION, ip, port, 0,
2395  hints, &info);
2396  if (rc != 0)
2397  return M0_ERR(rc);
2398  } else
2399  info = tm->ftm_fab->fab_fi;
2400 
2401  rc = fi_endpoint(fab->fab_dom, info, &aep->aep_txep, NULL) ? :
2402  libfab_ep_txres_init(aep, tm, ctx) ? :
2403  fi_enable(aep->aep_txep);
2404 
2405  if (is_verbs) {
2406  hints->fabric_attr->prov_name = NULL;
2407  fi_freeinfo(hints);
2408  fi_freeinfo(info);
2409  }
2410 
2411  return M0_RC(rc);
2412 }
2413 
2419 static int libfab_fid_array_grow(struct m0_fab__tm_fids *tmfid, uint32_t incr)
2420 {
2421  struct m0_fab__ev_ctx **old_ctx = tmfid->ftf_ctx;
2422  struct m0_fab__ev_ctx **new_ctx = NULL;
2423  struct fid **old_fid = tmfid->ftf_head;
2424  struct fid **new_fid = NULL;
2425  uint32_t old_size = tmfid->ftf_arr_size;
2426  uint32_t new_size = old_size + incr;
2427  int i;
2428 
2429  M0_PRE(old_ctx != NULL && old_fid != NULL && old_size < new_size);
2430 
2431  M0_ALLOC_ARR(new_ctx, new_size);
2432  M0_ALLOC_ARR(new_fid, new_size);
2433  if (new_ctx == NULL || new_fid == NULL) {
2434  m0_free(new_ctx);
2435  m0_free(new_fid);
2436  return M0_ERR(-ENOMEM);
2437  }
2438 
2439  /* Copy fids from old array to new array. */
2440  for (i = 0; i < old_size; i++) {
2441  new_ctx[i] = old_ctx[i];
2442  new_fid[i] = old_fid[i];
2443  }
2444  tmfid->ftf_ctx = new_ctx;
2445  tmfid->ftf_head = new_fid;
2446  tmfid->ftf_arr_size = new_size;
2447 
2448  M0_LOG(M0_DEBUG,"old={fid=%p ctx=%p size=%d} new={fid=%p ctx=%p size=%d}",
2449  old_fid, old_ctx, old_size, new_fid, new_ctx, new_size);
2450 
2451  /* Free old array */
2452  m0_free(old_ctx);
2453  m0_free(old_fid);
2454 
2455  return M0_RC(0);
2456 }
2457 
2461 static int libfab_waitfd_bind(struct fid* fid, struct m0_fab__tm *tm, void *ctx)
2462 {
2463  struct m0_fab__tm_fids *tmfid = &tm->ftm_fids;
2464  struct m0_fab__ev_ctx *ptr = ctx;
2465  struct epoll_event ev;
2466  int fd;
2467  int rc;
2468 
2469  rc = fi_control(fid, FI_GETWAIT, &fd);
2470  if (rc != 0)
2471  return M0_ERR(rc);
2472 
2473  ev.events = EPOLLIN;
2474  ev.data.ptr = ptr;
2475  M0_LOG(M0_DEBUG, "ADD_TO_EPOLL %s=%p fd=%d ctx=%p tm=%p pos=%d",
2476  ptr->evctx_dbg, fid, fd, ctx, tm, (int)tmfid->ftf_cnt);
2477  rc = epoll_ctl(tm->ftm_epfd, EPOLL_CTL_ADD, fd, &ev);
2478 
2479  if (rc == 0) {
2480  if (tmfid->ftf_cnt >= (tmfid->ftf_arr_size - 1)) {
2481  rc = libfab_fid_array_grow(tmfid,
2482  FAB_TM_FID_MALLOC_STEP);
2483  if (rc != 0)
2484  return M0_ERR(rc);
2485  }
2486  tmfid->ftf_head[tmfid->ftf_cnt] = fid;
2487  tmfid->ftf_ctx[tmfid->ftf_cnt] = ptr;
2488  ptr->evctx_pos = tmfid->ftf_cnt;
2489  tmfid->ftf_cnt++;
2490  M0_ASSERT(tmfid->ftf_cnt < tmfid->ftf_arr_size);
2491  }
2492 
2493  return M0_RC(rc);
2494 }
2495 
2500 static int libfab_waitfd_unbind(struct fid* fid, struct m0_fab__tm *tm,
2501  void *ctx)
2502 {
2503  struct m0_fab__tm_fids *tmfid = &tm->ftm_fids;
2504  struct m0_fab__ev_ctx *ptr = ctx;
2505  struct epoll_event ev = {};
2506  int fd;
2507  int rc;
2508  int i;
2509 
2510  rc = fi_control(fid, FI_GETWAIT, &fd);
2511  if (rc != 0)
2512  return M0_ERR(rc);
2513 
2514  rc = epoll_ctl(tm->ftm_epfd, EPOLL_CTL_DEL, fd, &ev);
2515  if (rc == 0) {
2516  M0_LOG(M0_DEBUG, "DEL_FROM_EPOLL %s fid=%p fd=%d tm=%p pos=%d",
2517  ptr->evctx_dbg, fid, fd, tm, ptr->evctx_pos);
2518  for (i = ptr->evctx_pos; i < tmfid->ftf_cnt - 1; i++) {
2519  tmfid->ftf_head[i] = tmfid->ftf_head[i + 1];
2520  tmfid->ftf_ctx[i] = tmfid->ftf_ctx[i + 1];
2521  tmfid->ftf_ctx[i]->evctx_pos--;
2522  }
2523  --tmfid->ftf_cnt;
2524  tmfid->ftf_head[tmfid->ftf_cnt] = 0;
2525  tmfid->ftf_ctx[tmfid->ftf_cnt] = 0;
2526  ptr->evctx_pos = 0;
2527  }
2528 
2529  return M0_RC(rc);
2530 }
2531 
2535 static inline struct m0_fab__active_ep *libfab_aep_get(struct m0_fab__ep *ep)
2536 {
2537  return ep->fep_listen == NULL ? ep->fep_aep : ep->fep_listen->pep_aep;
2538 }
2539 
2543 static inline bool libfab_is_verbs(struct m0_fab__tm *tm)
2544 {
2545  return tm->ftm_fab->fab_prov == FAB_FABRIC_PROV_VERBS;
2546 }
2547 
2551 static int libfab_txbuf_list_add(struct m0_fab__tm *tm, struct m0_fab__buf *fb,
2552  struct m0_fab__active_ep *aep)
2553 {
2554  struct m0_fab__bulk_op *op;
2555 
2556  M0_ALLOC_PTR(op);
2557  if (op == NULL)
2558  return M0_ERR(-ENOMEM);
2559  op->fbl_aep = aep;
2560  op->fbl_buf = fb;
2561  fb->fb_bulk_op = op;
2562  fb->fb_wr_cnt = 0;
2563  M0_SET0(&fb->fb_xfer_params);
2564  fab_bulk_tlink_init_at_tail(op, &tm->ftm_bulk);
2565 
2566  return M0_RC(0);
2567 }
2568 
2572 static void libfab_bufq_process(struct m0_fab__tm *tm)
2573 {
2574  struct m0_fab__bulk_op *op;
2575  int ret;
2576 
2577  m0_tl_for(fab_bulk, &tm->ftm_bulk, op) {
2578  /*
2579  * Only post the bulk buffer if the endpoint is in
2580  * connected state.
2581  */
2582  if (op->fbl_aep->aep_tx_state == FAB_CONNECTED &&
2583  !op->fbl_aep->aep_txq_full) {
2584  if (op->fbl_buf->fb_nb->nb_qtype == M0_NET_QT_MSG_SEND)
2585  ret = libfab_ping_op(op->fbl_aep, op->fbl_buf);
2586  else
2587  ret = libfab_bulk_op(op->fbl_aep, op->fbl_buf);
2588 
2589  if (ret == 0) {
2590  fab_bulk_tlist_del(op);
2591  op->fbl_buf->fb_bulk_op = NULL;
2592  m0_free(op);
2593  } else
2594  op->fbl_aep->aep_txq_full = true;
2595  }
2596  } m0_tl_endfor;
2597 
2598 
2599 }
2600 
2605 static int libfab_ping_op(struct m0_fab__active_ep *aep, struct m0_fab__buf *fb)
2606 {
2607  struct fi_msg op_msg;
2608  struct iovec iv;
2609  int ret;
2610 
2611  iv.iov_base = fb->fb_nb->nb_buffer.ov_buf[0];
2612  iv.iov_len = fb->fb_nb->nb_buffer.ov_vec.v_count[0];
2613  op_msg.msg_iov = &iv;
2614  op_msg.desc = fb->fb_mr.bm_desc;
2615  op_msg.iov_count = 1;
2616  op_msg.addr = 0;
2617  op_msg.context = U32_TO_VPTR(fb->fb_token);
2618  op_msg.data = 0;
2619  fb->fb_wr_cnt = 1;
2620  ret = fi_sendmsg(aep->aep_txep, &op_msg, FI_COMPLETION);
2621  if (ret == 0)
2622  aep->aep_bulk_cnt += fb->fb_wr_cnt;
2623 
2624  return ret;
2625 }
2626 
2631 static int libfab_bulk_op(struct m0_fab__active_ep *aep, struct m0_fab__buf *fb)
2632 {
2633  struct m0_fab__buf_xfer_params xp;
2634  struct m0_fab__tm *tm = libfab_buf_tm(fb);
2635  struct fi_msg_rma op_msg;
2636  struct fi_rma_iov *r_iov;
2637  struct fi_rma_iov *remote = tm->ftm_rem_iov;
2638  struct iovec *loc_iv = tm->ftm_loc_iov;
2639  m0_bcount_t *v_cnt;
2640  uint64_t op_flag;
2641  uint32_t loc_slen;
2642  uint32_t rem_slen;
2643  uint32_t wr_cnt = 0;
2644  uint32_t max_iov = tm->ftm_fab->fab_max_iov;
2645  uint32_t idx;
2646  int ret = 0;
2647  bool isread;
2648  bool last_seg = false;
2649 
2650  M0_ENTRY("loc_buf=%p q=%d loc_seg=%d rem_buf=%d rem_seg=%d iov_max=%d",
2651  fb, fb->fb_nb->nb_qtype, fb->fb_nb->nb_buffer.ov_vec.v_nr,
2652  (int)fb->fb_rbd->fbd_buftoken, (int)fb->fb_rbd->fbd_iov_cnt,
2653  (int)max_iov);
2654  M0_PRE(fb->fb_rbd != NULL);
2655  M0_PRE(remote != NULL && loc_iv != NULL);
2656 
2657  v_cnt = fb->fb_nb->nb_buffer.ov_vec.v_count;
2658  /* Pick last succesfully transfered bulk buf params */
2659  xp = fb->fb_xfer_params;
2660  r_iov = fb->fb_riov;
2661  isread = (fb->fb_nb->nb_qtype == M0_NET_QT_ACTIVE_BULK_RECV);
2662 
2663  while (xp.bxp_xfer_len < fb->fb_nb->nb_length) {
2664  for (idx = 0; idx < max_iov && !last_seg; idx++) {
2665  M0_ASSERT(xp.bxp_rem_sidx <= fb->fb_rbd->fbd_iov_cnt);
2666  loc_slen = v_cnt[xp.bxp_loc_sidx] - xp.bxp_loc_soff;
2667  rem_slen = r_iov[xp.bxp_rem_sidx].len - xp.bxp_rem_soff;
2668 
2669  loc_iv[idx].iov_base = fb->fb_nb->nb_buffer.ov_buf[
2670  xp.bxp_loc_sidx] +
2671  xp.bxp_loc_soff;
2672  loc_iv[idx].iov_len = min64u(loc_slen, rem_slen);
2673  remote[idx] = r_iov[xp.bxp_rem_sidx];
2674  remote[idx].addr += xp.bxp_rem_soff;
2675  remote[idx].len -= xp.bxp_rem_soff;
2676 
2677  if (loc_slen > rem_slen) {
2678  xp.bxp_rem_sidx++;
2679  xp.bxp_rem_soff = 0;
2680  xp.bxp_loc_soff += loc_iv[idx].iov_len;
2681  } else {
2682  xp.bxp_loc_sidx++;
2683  xp.bxp_loc_soff = 0;
2684  xp.bxp_rem_soff += loc_iv[idx].iov_len;
2685  if (xp.bxp_rem_soff >=
2686  r_iov[xp.bxp_rem_sidx].len) {
2687  xp.bxp_rem_sidx++;
2688  xp.bxp_rem_soff = 0;
2689  }
2690  }
2691 
2692  xp.bxp_xfer_len += loc_iv[idx].iov_len;
2693  if (xp.bxp_xfer_len >= fb->fb_nb->nb_length)
2694  last_seg = true;
2695  }
2696 
2697  op_msg.msg_iov = &loc_iv[0];
2698  op_msg.desc = &fb->fb_mr.bm_desc[xp.bxp_loc_sidx];
2699  op_msg.iov_count = idx;
2700  op_msg.addr = xp.bxp_rem_soff;
2701  op_msg.rma_iov = &remote[0];
2702  op_msg.rma_iov_count = idx;
2703  op_msg.context = U32_TO_VPTR(fb->fb_token);
2704 
2705  op_msg.data = (isread || (!last_seg)) ? 0 :
2706  fb->fb_rbd->fbd_buftoken;
2707  op_flag = (isread || (!last_seg)) ? 0 : FI_REMOTE_CQ_DATA;
2708  op_flag |= last_seg ? FI_COMPLETION : 0;
2709 
2710  ret = isread ? fi_readmsg(aep->aep_txep, &op_msg, op_flag) :
2711  fi_writemsg(aep->aep_txep, &op_msg, op_flag);
2712 
2713  if (ret != 0) {
2714  M0_LOG(M0_ERROR,"bulk-op failed %d b=%p q=%d l_seg=%d \
2715  opcnt=%d", ret, fb, fb->fb_nb->nb_qtype,
2716  xp.bxp_loc_sidx, aep->aep_bulk_cnt);
2717  break;
2718  } else {
2719  wr_cnt++;
2720  aep->aep_bulk_cnt++;
2721  /* Save last succesfully transfered bulk buf params */
2722  fb->fb_xfer_params = xp;
2723  }
2724  }
2725  fb->fb_wr_cnt += wr_cnt;
2726  return M0_RC(ret);
2727 }
2728 
2732 static uint32_t libfab_buf_token_get(struct m0_fab__tm *tm,
2733  struct m0_fab__buf *fb)
2734 {
2735  union m0_fab__token token;
2736 
2737  token.t_val = 0;
2738  token.t_Fields.tf_queue_id = (fb->fb_nb == NULL) ? M0_NET_QT_NR :
2739  fb->fb_nb->nb_qtype;
2740  ++tm->ftm_op_id;
2741  if (tm->ftm_op_id == 0)
2742  ++tm->ftm_op_id; /* 0 is treated as an invalid value for token*/
2743  /* Queue selection round robin for a queue type */
2744  ++tm->ftm_rr_qt[token.t_Fields.tf_queue_id];
2745 
2746  token.t_Fields.tf_queue_num = (tm->ftm_rr_qt[token.t_Fields.tf_queue_id]
2747  % FAB_NUM_BUCKETS_PER_QTYPE);
2748  token.t_Fields.tf_tag = tm->ftm_op_id;
2749 
2750  return token.t_val;
2751 }
2752 
2753 static int libfab_domain_params_get(struct m0_fab__ndom *fab_ndom)
2754 {
2755  struct fi_info *hints;
2756  struct fi_info *fi;
2757  struct sockaddr_in *v_src;
2758  struct sockaddr_in t_src;
2759  int result = 0;
2760 
2761  hints = fi_allocinfo();
2762  if (hints == NULL)
2763  return M0_ERR(-ENOMEM);
2764  hints->fabric_attr->prov_name = (char *)providers[FAB_FABRIC_PROV_VERBS];
2765  result = fi_getinfo(FI_VERSION(1,11), NULL, NULL, 0, hints, &fi);
2766  if (result == 0) {
2767  /* For Verbs provider */
2768  v_src = fi->src_addr;
2769  inet_ntop(AF_INET, &v_src->sin_addr, fab_ndom->fnd_loc_ip,
2770  ARRAY_SIZE(fab_ndom->fnd_loc_ip));
2771  fab_ndom->fnd_seg_nr = FAB_VERBS_IOV_MAX;
2772  fab_ndom->fnd_seg_size = FAB_VERBS_MAX_BULK_SEG_SIZE;
2773  } else {
2774  /* For TCP/Socket provider */
2775  t_src.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
2776  inet_ntop(AF_INET, &t_src.sin_addr, fab_ndom->fnd_loc_ip,
2777  ARRAY_SIZE(fab_ndom->fnd_loc_ip));
2778  fab_ndom->fnd_seg_nr = FAB_TCP_SOCK_IOV_MAX;
2779  fab_ndom->fnd_seg_size = FAB_TCP_SOCK_MAX_BULK_SEG_SIZE;
2780  }
2781 
2782  hints->fabric_attr->prov_name = NULL;
2783  fi_freeinfo(hints);
2784  fi_freeinfo(fi);
2785  return M0_RC(0);
2786 }
2787 
2788 static int libfab_buf_dom_dereg(struct m0_fab__buf *fbp)
2789 {
2790  m0_time_t tmout;
2791  int i;
2792  int ret = 0;
2793  uint32_t seg_nr;
2794 
2795  M0_PRE(fbp != NULL && fbp->fb_nb != NULL);
2796  seg_nr = fbp->fb_nb->nb_buffer.ov_vec.v_nr;
2797 
2798  for (i = 0; i < seg_nr; i++) {
2799  if (fbp->fb_mr.bm_mr[i] != NULL) {
2800  /*
2801  * If fi_close returns -EBUSY, that means that the
2802  * buffer is in use. In this case keep retry for a max
2803  * time of 5 min to deregister buffer till fi_close
2804  * returns success or some other error code.
2805  */
2806  tmout = m0_time_from_now(300, 0);
2807  ret = -EBUSY;
2808  while (ret == -EBUSY && !m0_time_is_in_past(tmout)) {
2809  ret = fi_close(&fbp->fb_mr.bm_mr[i]->fid);
2810  }
2811  if (ret != 0) {
2812  M0_LOG(M0_ERROR,"mr[%d] close failed %d fb=%p",
2813  i, ret, fbp);
2814  break;
2815  }
2816  fbp->fb_mr.bm_mr[i] = NULL;
2817  }
2818  }
2819 
2820  if (ret == 0) {
2821  fbp->fb_dp = NULL;
2822  fbp->fb_state = FAB_BUF_DEREGISTERED;
2823  }
2824 
2825  return M0_RC(ret);
2826 }
2827 
2828 /*============================================================================*/
2829 
2833 static int libfab_dom_init(const struct m0_net_xprt *xprt,
2834  struct m0_net_domain *dom)
2835 {
2836  struct m0_fab__ndom *fab_ndom;
2837  int ret = 0;
2838 
2839  M0_ENTRY("Running on %s", m0_processor_is_vm() ? "VM" : "HW");
2840 
2841  M0_ALLOC_PTR(fab_ndom);
2842  if (fab_ndom == NULL)
2843  return M0_ERR(-ENOMEM);
2844 
2845  ret = libfab_domain_params_get(fab_ndom);
2846  if (ret != 0)
2847  m0_free(fab_ndom);
2848  else {
2849  dom->nd_xprt_private = fab_ndom;
2850  fab_ndom->fnd_ndom = dom;
2851  m0_mutex_init(&fab_ndom->fnd_lock);
2852  fab_fabs_tlist_init(&fab_ndom->fnd_fabrics);
2853  }
2854  return M0_RC(ret);
2855 }
2856 
2860 static void libfab_dom_fini(struct m0_net_domain *dom)
2861 {
2862  struct m0_fab__ndom *fnd;
2863  struct m0_fab__fab *fab;
2864  int rc;
2865 
2866  M0_ENTRY();
2867  libfab_dom_invariant(dom);
2868  fnd = dom->nd_xprt_private;
2869  m0_tl_teardown(fab_fabs, &fnd->fnd_fabrics, fab) {
2870  if (fab->fab_dom != NULL) {
2871  rc = fi_close(&fab->fab_dom->fid);
2872  if (rc != 0)
2873  M0_LOG(M0_ERROR, "fab_dom fi_close ret=%d", rc);
2874  fab->fab_dom = NULL;
2875  }
2876 
2877  if (fab->fab_fab != NULL) {
2878  rc = fi_close(&fab->fab_fab->fid);
2879  if (rc != 0)
2880  M0_LOG(M0_ERROR, "fab_fabric fi_close ret=%d",
2881  rc);
2882  fab->fab_fab = NULL;
2883  }
2884 
2885  if (fab->fab_fi != NULL) {
2886  fi_freeinfo(fab->fab_fi);
2887  fab->fab_fi = NULL;
2888  }
2889 
2890  m0_free(fab);
2891  }
2892  fab_fabs_tlist_fini(&fnd->fnd_fabrics);
2893 
2894  m0_mutex_fini(&fnd->fnd_lock);
2895  fnd->fnd_ndom = NULL;
2896  m0_free(fnd);
2897  dom->nd_xprt_private = NULL;
2898 
2899  M0_LEAVE();
2900 }
2901 
2905 static void libfab_ma_fini(struct m0_net_transfer_mc *tm)
2906 {
2907  struct m0_fab__tm *ma = tm->ntm_xprt_private;
2908 
2909  M0_ENTRY();
2910  libfab_tm_fini(tm);
2911  tm->ntm_xprt_private = NULL;
2912 
2913  fab_buf_tlist_fini(&ma->ftm_done);
2914  m0_free(ma);
2915 
2916  M0_LEAVE();
2917 }
2918 
2924 static int libfab_ma_init(struct m0_net_transfer_mc *ntm)
2925 {
2926  struct m0_fab__tm *ftm;
2927  int rc = 0;
2928 
2929  M0_ASSERT(ntm->ntm_xprt_private == NULL);
2930  M0_ALLOC_PTR(ftm);
2931  if (ftm != NULL) {
2932  ftm->ftm_epfd = -1;
2933  ftm->ftm_state = FAB_TM_INIT;
2934  ntm->ntm_xprt_private = ftm;
2935  ftm->ftm_ntm = ntm;
2936  ftm->ftm_fids.ftf_cnt = 0;
2937  M0_ALLOC_ARR(ftm->ftm_fids.ftf_head, FAB_TM_FID_MALLOC_STEP);
2938  M0_ALLOC_ARR(ftm->ftm_fids.ftf_ctx, FAB_TM_FID_MALLOC_STEP);
2939  if (ftm->ftm_fids.ftf_head == NULL ||
2940  ftm->ftm_fids.ftf_ctx == NULL) {
2941  m0_free(ftm->ftm_fids.ftf_head);
2942  m0_free(ftm->ftm_fids.ftf_ctx);
2943  return M0_ERR(-ENOMEM);
2944  }
2945  ftm->ftm_fids.ftf_arr_size = FAB_TM_FID_MALLOC_STEP;
2946  fab_buf_tlist_init(&ftm->ftm_done);
2947  fab_bulk_tlist_init(&ftm->ftm_bulk);
2948  ftm->ftm_bufhash.bht_magic = M0_NET_LIBFAB_BUF_HT_HEAD_MAGIC;
2949  rc = fab_bufhash_htable_init(&ftm->ftm_bufhash.bht_hash,
2950  ((M0_NET_QT_NR + 1) *
2951  FAB_NUM_BUCKETS_PER_QTYPE));
2952  } else
2953  rc = M0_ERR(-ENOMEM);
2954 
2955  if (rc != 0 && ftm != NULL)
2956  libfab_ma_fini(ntm);
2957  return M0_RC(rc);
2958 }
2959 
2964 static int libfab_ma_start(struct m0_net_transfer_mc *ntm, const char *name)
2965 {
2966  struct m0_fab__tm *ftm = ntm->ntm_xprt_private;
2967  struct m0_fab__ndom *fnd;
2968  struct m0_net_end_point *nep;
2969  int rc = 0;
2970 
2971  M0_ASSERT(libfab_tm_is_locked(ftm));
2972  M0_ALLOC_PTR(ftm->ftm_pep);
2973  if (ftm->ftm_pep != NULL) {
2974  fnd = ntm->ntm_dom->nd_xprt_private;
2975  rc = libfab_ep_addr_decode(ftm->ftm_pep, name);
2976  if (rc != 0)
2977  return M0_ERR(rc);
2978 
2979  ftm->ftm_fab = libfab_newfab_init(fnd);
2980  ftm->ftm_fab->fab_prov = FAB_FABRIC_PROV_MAX;
2981  rc = libfab_passive_ep_create(ftm->ftm_pep, ftm);
2982  if (rc != 0)
2983  return M0_ERR(rc);
2984 
2985  nep = &ftm->ftm_pep->fep_nep;
2986  nep->nep_xprt_pvt = ftm->ftm_pep;
2987  nep->nep_tm = ntm;
2988  libfab_ep_pton(&ftm->ftm_pep->fep_name,
2989  &ftm->ftm_pep->fep_name_n);
2990  m0_nep_tlink_init_at_tail(nep, &ntm->ntm_end_points);
2991  ftm->ftm_pep->fep_nep.nep_addr = ftm->ftm_pep->fep_name.nia_p;
2992 
2993  m0_mutex_init(&ftm->ftm_endlock);
2994  m0_mutex_init(&ftm->ftm_evpost);
2995 
2996  rc = M0_THREAD_INIT(&ftm->ftm_poller, struct m0_fab__tm *, NULL,
2997  &libfab_poller, ftm, "libfab_tm");
2998  } else
2999  return M0_ERR(-ENOMEM);
3000 
3001  ftm->ftm_state = FAB_TM_STARTED;
3002  ftm->ftm_tmout_check = m0_time_from_now(FAB_BUF_TMOUT_CHK_INTERVAL, 0);
3003 
3004  return M0_RC(rc);
3005 }
3006 
3012 static int libfab_ma_stop(struct m0_net_transfer_mc *net, bool cancel)
3013 {
3014  struct m0_fab__tm *tm = net->ntm_xprt_private;
3015 
3016  M0_PRE(net->ntm_state == M0_NET_TM_STOPPING);
3017 
3018  if (cancel)
3020 
3021  libfab_tm_unlock(tm);
3022  libfab_tm_fini(net);
3023  libfab_tm_event_post(tm, M0_NET_TM_STOPPED);
3024  libfab_tm_lock(tm);
3025 
3026  return M0_RC(0);
3027 }
3028 
3032 static int libfab_ma_confine(struct m0_net_transfer_mc *ma,
3033  const struct m0_bitmap *processors)
3034 {
3035  return M0_ERR(-ENOSYS);
3036 }
3037 
3045 static int libfab_end_point_create(struct m0_net_end_point **epp,
3046  struct m0_net_transfer_mc *tm,
3047  const char *name)
3048 {
3049  M0_ENTRY("name=%s", name);
3050  return (libfab_ep_find(tm, name, NULL, epp));
3051 }
3052 
3060 static void libfab_buf_deregister(struct m0_net_buffer *nb)
3061 {
3062  struct m0_fab__buf *fb = nb->nb_xprt_private;
3063 
3064  M0_ENTRY("fb=%p nb=%p q=%d", fb, nb, nb->nb_qtype);
3066  libfab_buf_invariant(fb));
3067 
3068  libfab_buf_dom_dereg(fb);
3069  libfab_buf_fini(fb);
3070  m0_free(fb->fb_mr.bm_desc);
3071  m0_free(fb->fb_mr.bm_mr);
3072  m0_free(fb);
3073  nb->nb_xprt_private = NULL;
3074 }
3075 
3083 static int libfab_buf_register(struct m0_net_buffer *nb)
3084 {
3085  struct m0_fab__buf *fb;
3086  struct m0_fab__ndom *nd = nb->nb_dom->nd_xprt_private;
3087 
3088  M0_ENTRY("nb=%p q=%d", nb, nb->nb_qtype);
3089 
3090  M0_PRE(nb->nb_xprt_private == NULL);
3091  M0_PRE(nb->nb_dom != NULL);
3092 
3093  M0_ALLOC_PTR(fb);
3094  if (fb == NULL)
3095  return M0_ERR(-ENOMEM);
3096 
3097  M0_ALLOC_ARR(fb->fb_mr.bm_desc, nd->fnd_seg_nr);
3098  M0_ALLOC_ARR(fb->fb_mr.bm_mr, nd->fnd_seg_nr);
3099 
3100  if (fb->fb_mr.bm_desc == NULL || fb->fb_mr.bm_mr == NULL) {
3101  m0_free(fb->fb_mr.bm_desc);
3102  m0_free(fb->fb_mr.bm_mr);
3103  m0_free(fb);
3104  return M0_ERR(-ENOMEM);
3105  }
3106 
3107  fab_buf_tlink_init(fb);
3108  nb->nb_xprt_private = fb;
3109  fb->fb_nb = nb;
3110  fb->fb_state = FAB_BUF_INITIALIZED;
3111 
3112  return M0_RC(0);
3113 }
3114 
3122 static int libfab_buf_add(struct m0_net_buffer *nb)
3123 {
3124  struct m0_fab__buf *fbp = nb->nb_xprt_private;
3125  struct m0_fab__tm *ma = libfab_buf_ma(nb);
3126  struct m0_fab__ep *ep = NULL;
3127  struct m0_fab__active_ep *aep;
3128  struct iovec iv;
3129  struct m0_net_ip_addr addr = {};
3130  int ret = 0;
3131 
3132  M0_ENTRY("fb=%p nb=%p q=%d l=%"PRIu64, fbp, nb, nb->nb_qtype,
3133  nb->nb_length);
3134 
3135  M0_PRE(libfab_tm_is_locked(ma) && libfab_tm_invariant(ma) &&
3136  libfab_buf_invariant(fbp));
3137  M0_PRE(nb->nb_offset == 0); /* Do not support an offset during add. */
3138  M0_PRE((nb->nb_flags & M0_NET_BUF_RETAIN) == 0);
3139 
3140  fab_buf_tlink_init(fbp);
3141  fbp->fb_token = libfab_buf_token_get(ma, fbp);
3142  libfab_buf_dom_reg(nb, ma);
3143  fbp->fb_status = 0;
3144 
3145  switch (nb->nb_qtype) {
3146  case M0_NET_QT_MSG_RECV: {
3147  M0_ASSERT(nb->nb_buffer.ov_vec.v_nr == 1);
3148  fbp->fb_length = nb->nb_length;
3149  iv.iov_base = nb->nb_buffer.ov_buf[0];
3150  iv.iov_len = nb->nb_buffer.ov_vec.v_count[0];
3151  ret = fi_recvv(ma->ftm_rctx, &iv, fbp->fb_mr.bm_desc, 1, 0,
3152  U32_TO_VPTR(fbp->fb_token));
3153  break;
3154  }
3155 
3156  case M0_NET_QT_MSG_SEND: {
3158  M0_ASSERT(nb->nb_buffer.ov_vec.v_nr == 1);
3159  M0_ASSERT(nb->nb_ep != NULL);
3160 
3161  if (nb->nb_ep->nep_xprt_pvt == NULL)
3162  ret = libfab_ep_create(ma->ftm_ntm, nb->nb_ep->nep_addr,
3163  NULL, &nb->nb_ep);
3164  if (ret != 0)
3165  break;
3166  ep = nb->nb_ep->nep_xprt_pvt;
3167  aep = libfab_aep_get(ep);
3168  fbp->fb_txctx = ep;
3169 
3170  if (aep->aep_tx_state != FAB_CONNECTED)
3171  ret = libfab_conn_init(ep, ma, fbp);
3172  else {
3173  ret = libfab_txbuf_list_add(ma, fbp, aep);
3174  libfab_bufq_process(ma);
3175  }
3176  break;
3177  }
3178 
3179  /* For passive buffers, generate the buffer descriptor. */
3181  fbp->fb_length = nb->nb_length;
3182  if (!libfab_is_verbs(ma)) {
3183  ret = libfab_bdesc_encode(fbp);
3184  break;
3185  }
3186  /* else
3187  Intentional fall through */
3188  }
3189 
3191  if (m0_net_tm_tlist_is_empty(
3192  &ma->ftm_ntm->ntm_q[M0_NET_QT_MSG_RECV]))
3193  ret = fi_recv(ma->ftm_rctx, fbp->fb_dummy,
3194  sizeof(fbp->fb_dummy), NULL, 0,
3195  U32_TO_VPTR(fbp->fb_token));
3196 
3197  if (ret == 0)
3198  ret = libfab_bdesc_encode(fbp);
3199  break;
3200  }
3201 
3202  /* For active buffers, decode the passive buffer descriptor */
3204  fbp->fb_length = nb->nb_length;
3205  /* Intentional fall through */
3206 
3208  libfab_bdesc_decode(fbp, &addr.nia_n);
3209  ret = libfab_fab_ep_find(ma, NULL, &addr.nia_n, &ep);
3210  if (ret != 0)
3211  break;
3212  fbp->fb_txctx = ep;
3213  aep = libfab_aep_get(ep);
3214  if (aep->aep_tx_state != FAB_CONNECTED)
3215  ret = libfab_conn_init(ep, ma, fbp);
3216  else {
3217  ret = libfab_txbuf_list_add(ma, fbp, aep);
3218  libfab_bufq_process(ma);
3219  }
3220  break;
3221  }
3222 
3223  default:
3224  M0_IMPOSSIBLE("invalid queue type: %x", nb->nb_qtype);
3225  break;
3226  }
3227 
3228  if (ret == 0) {
3229  fbp->fb_state = FAB_BUF_QUEUED;
3230  m0_tlink_init(&fab_bufhash_tl, fbp);
3231  fab_bufhash_htable_add(&ma->ftm_bufhash.bht_hash, fbp);
3232  }
3233 
3234  return M0_RC(ret);
3235 }
3236 
3244 static void libfab_buf_del(struct m0_net_buffer *nb)
3245 {
3246  struct m0_fab__buf *buf = nb->nb_xprt_private;
3247  struct m0_fab__tm *ma = libfab_buf_ma(nb);
3248 
3249  M0_PRE(libfab_tm_is_locked(ma) && libfab_tm_invariant(ma) &&
3250  libfab_buf_invariant(buf));
3252 
3253  libfab_buf_dom_dereg(buf);
3254  buf->fb_state = FAB_BUF_CANCELED;
3255  libfab_buf_done(buf, -ECANCELED, false);
3256 }
3257 
3263 static int libfab_bev_deliver_sync(struct m0_net_transfer_mc *ma)
3264 {
3265  return 0;
3266 }
3267 
3273 static void libfab_bev_deliver_all(struct m0_net_transfer_mc *ma)
3274 {
3275 
3276 }
3277 
3283 static bool libfab_bev_pending(struct m0_net_transfer_mc *ma)
3284 {
3285  return false;
3286 }
3287 
3293 static void libfab_bev_notify(struct m0_net_transfer_mc *ma,
3294  struct m0_chan *chan)
3295 {
3296 
3297 }
3298 
3306 static m0_bcount_t libfab_get_max_buf_size(const struct m0_net_domain *dom)
3307 {
3308  struct m0_fab__ndom *nd = dom->nd_xprt_private;
3309 
3310  return (m0_bcount_t)(nd->fnd_seg_size * nd->fnd_seg_nr);
3311 }
3312 
3320 static m0_bcount_t libfab_get_max_buf_seg_size(const struct m0_net_domain *dom)
3321 {
3322  return ((struct m0_fab__ndom *)dom->nd_xprt_private)->fnd_seg_size;
3323 }
3324 
3332 static int32_t libfab_get_max_buf_segments(const struct m0_net_domain *dom)
3333 {
3334  return ((struct m0_fab__ndom *)dom->nd_xprt_private)->fnd_seg_nr;
3335 }
3343 static m0_bcount_t libfab_get_max_buf_desc_size(const struct m0_net_domain *dom)
3344 {
3345  struct m0_fab__ndom *nd = dom->nd_xprt_private;
3346  m0_bcount_t max_bd_size = sizeof(struct fi_rma_iov);
3347 
3348  max_bd_size = (max_bd_size * nd->fnd_seg_nr) +
3349  sizeof(struct m0_fab__bdesc);
3350 
3351  return max_bd_size;
3352 }
3353 
3361 static m0_bcount_t libfab_rpc_max_seg_size(struct m0_net_domain *ndom)
3362 {
3363  M0_PRE(ndom != NULL);
3364  return FAB_MAX_RPC_SEG_SIZE;
3365 }
3366 
3374 static uint32_t libfab_rpc_max_segs_nr(struct m0_net_domain *ndom)
3375 {
3376  M0_PRE(ndom != NULL);
3377  return FAB_MAX_RPC_SEG_NR;
3378 }
3379 
3387 static m0_bcount_t libfab_rpc_max_msg_size(struct m0_net_domain *ndom,
3388  m0_bcount_t rpc_size)
3389 {
3390  m0_bcount_t mbs;
3391  M0_PRE(ndom != NULL);
3392 
3393  mbs = libfab_rpc_max_seg_size(ndom) * libfab_rpc_max_segs_nr(ndom);
3394  return rpc_size != 0 ? m0_clip64u(M0_SEG_SIZE, mbs, rpc_size) : mbs;
3395 }
3396 
3404 static uint32_t libfab_rpc_max_recv_msgs(struct m0_net_domain *ndom,
3405  m0_bcount_t rpc_size)
3406 {
3407  M0_PRE(ndom != NULL);
3408  return FAB_MAX_RPC_RECV_MSG_NR;
3409 }
3410 
3411 static const struct m0_net_xprt_ops libfab_xprt_ops = {
3412  .xo_dom_init = &libfab_dom_init,
3413  .xo_dom_fini = &libfab_dom_fini,
3414  .xo_tm_init = &libfab_ma_init,
3415  .xo_tm_confine = &libfab_ma_confine,
3416  .xo_tm_start = &libfab_ma_start,
3417  .xo_tm_stop = &libfab_ma_stop,
3418  .xo_tm_fini = &libfab_ma_fini,
3419  .xo_end_point_create = &libfab_end_point_create,
3420  .xo_buf_register = &libfab_buf_register,
3421  .xo_buf_deregister = &libfab_buf_deregister,
3422  .xo_buf_add = &libfab_buf_add,
3423  .xo_buf_del = &libfab_buf_del,
3424  .xo_bev_deliver_sync = &libfab_bev_deliver_sync,
3425  .xo_bev_deliver_all = &libfab_bev_deliver_all,
3426  .xo_bev_pending = &libfab_bev_pending,
3427  .xo_bev_notify = &libfab_bev_notify,
3428  .xo_get_max_buffer_size = &libfab_get_max_buf_size,
3429  .xo_get_max_buffer_segment_size = &libfab_get_max_buf_seg_size,
3430  .xo_get_max_buffer_segments = &libfab_get_max_buf_segments,
3431  .xo_get_max_buffer_desc_size = &libfab_get_max_buf_desc_size,
3432  .xo_rpc_max_seg_size = &libfab_rpc_max_seg_size,
3433  .xo_rpc_max_segs_nr = &libfab_rpc_max_segs_nr,
3434  .xo_rpc_max_msg_size = &libfab_rpc_max_msg_size,
3435  .xo_rpc_max_recv_msgs = &libfab_rpc_max_recv_msgs,
3436 };
3437 
3438 struct m0_net_xprt m0_net_libfab_xprt = {
3439  .nx_name = "libfab",
3440  .nx_ops = &libfab_xprt_ops
3441 };
3442 M0_EXPORTED(m0_net_libfab_xprt);
3443 
3444 #else /* ENABLE_LIBFAB */
3445 
3446 /* libfab init and fini() : initialized in motr init */
3447 M0_INTERNAL int m0_net_libfab_init(void)
3448 {
3449  return M0_RC(0);
3450 }
3451 
3452 M0_INTERNAL void m0_net_libfab_fini(void)
3453 {
3454 
3455 }
3456 
3457 #endif /* ENABLE_LIBFAB */
3458 
3459 #undef M0_TRACE_SUBSYSTEM
3460 
3463 /*
3464  * Local variables:
3465  * c-indentation-style: "K&R"
3466  * c-basic-offset: 8
3467  * tab-width: 8
3468  * fill-column: 80
3469  * scroll-step: 1
3470  * End:
3471  */
3472 /*
3473  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
3474  */
union m0_net_ip_params::@377 nip_fmt_pvt
M0_INTERNAL int m0_mutex_trylock(struct m0_mutex *mutex)
Definition: mutex.c:84
static void ptr(struct m0_addb2__context *ctx, const uint64_t *v, char *buf)
Definition: dump.c:440
struct m0_net_ip_lnet_addr la
Definition: ip.h:81
static size_t nr
Definition: dump.c:1505
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
#define M0_ALLOC_ARR(arr, nr)
Definition: memory.h:84
struct m0t1fs_fsync_interactions fi
Definition: fsync.c:55
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
enum m0_net_ip_format nip_format
Definition: ip.h:73
#define m0_htable_for(name, var, htable)
Definition: hash.h:483
#define M0_HT_DESCR_DEFINE(name, htname, scope, amb_type, amb_link_field, amb_magic_field, amb_magic, head_magic, key_field, hash_func, key_eq)
Definition: hash.h:326
static uint32_t seg_nr
Definition: net.c:119
M0_INTERNAL void m0_net_libfab_fini(void)
Definition: libfab.c:3452
#define NULL
Definition: misc.h:38
m0_bindex_t nb_offset
Definition: net.h:1344
static struct m0_bufvec dst
Definition: xform.c:61
m0_net_ip_format
Definition: ip.h:33
struct m0_bufvec nb_buffer
Definition: net.h:1322
M0_INTERNAL int m0_net_hostname_to_ip(const char *hostname, char *ip, enum m0_net_ip_format *fmt)
Definition: ip.c:415
M0_INTERNAL void m0_net__tm_cancel(struct m0_net_transfer_mc *tm)
Definition: tm.c:145
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
uint32_t nbd_len
Definition: net_otw_types.h:37
uint64_t m0_time_t
Definition: time.h:37
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
uint16_t nia_family
Definition: ip.h:59
uint8_t * nbd_data
Definition: net_otw_types.h:38
struct m0_vec ov_vec
Definition: vec.h:147
Definition: sock.c:772
struct m0_bufvec data
Definition: di.c:40
m0_bcount_t nb_length
Definition: net.h:1334
uint64_t nb_flags
Definition: net.h:1489
struct m0_net_domain * ntm_dom
Definition: net.h:853
M0_INTERNAL void m0_net_xprt_default_set(const struct m0_net_xprt *xprt)
Definition: net.c:143
uint64_t m0_bindex_t
Definition: types.h:80
uint64_t m0_bcount_t
Definition: types.h:77
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
const char * nep_addr
Definition: net.h:503
#define container_of(ptr, type, member)
Definition: misc.h:33
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
m0_bcount_t nbe_length
Definition: net.h:1226
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
M0_INTERNAL void m0_net_buffer_event_post(const struct m0_net_buffer_event *ev)
Definition: buf.c:314
struct m0_net_ip_params nia_n
Definition: ip.h:87
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
static int struct dentry int struct nameidata * nd
Definition: dir.c:593
void ** ov_buf
Definition: vec.h:149
#define PRIx64
Definition: types.h:61
Definition: sock.c:887
struct m0_tl ntm_end_points
Definition: net.h:856
#define m0_tl_endfor
Definition: tlist.h:700
char nia_p[M0_NET_IP_STRLEN_MAX]
Definition: ip.h:88
struct m0_fid fid
Definition: di.c:46
M0_INTERNAL int m0_net_ip_print(const struct m0_net_ip_addr *nia)
Definition: ip.c:349
return M0_RC(rc)
op
Definition: libdemo.c:64
Definition: sock.c:754
static uint64_t m0_clip64u(uint64_t lo, uint64_t hi, uint64_t x)
Definition: arith.h:131
#define M0_ENTRY(...)
Definition: trace.h:170
uint16_t nip_port
Definition: ip.h:78
static char * addr
Definition: node_k.c:37
int i
Definition: dir.c:1033
#define PRIu64
Definition: types.h:58
M0_INTERNAL void m0_net_xprt_register(const struct m0_net_xprt *xprt)
Definition: net.c:182
struct m0_fid new_fid
Definition: dir.c:625
bool m0_time_is_in_past(m0_time_t t)
Definition: time.c:102
int32_t nbe_status
Definition: net.h:1218
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
Definition: refs.c:24
return M0_ERR(-EOPNOTSUPP)
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
void * ntm_xprt_private
Definition: net.h:886
Definition: cnt.h:36
static int key
Definition: locality.c:283
const char * name
Definition: trace.c:110
struct m0_net_ip_inet_addr ia
Definition: ip.h:80
Definition: refs.h:34
#define m0_tl_teardown(name, head, obj)
Definition: tlist.h:708
if(value==NULL)
Definition: dir.c:350
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
void * nd_xprt_private
Definition: net.h:393
union m0_net_ip_params::@376 nip_ip_n
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
void * nep_xprt_pvt
Definition: net.h:505
m0_time_t m0_time_now(void)
Definition: time.c:134
#define M0_DEFAULT_NETWORK
Definition: config.h:281
m0_net_tm_state
Definition: net.h:630
#define m0_streq(a, b)
Definition: string.h:34
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
static struct m0_stob_domain * dom
Definition: storage.c:38
struct m0_net_domain * nb_dom
Definition: net.h:1351
void * m0_alloc(size_t size)
Definition: memory.c:126
M0_INTERNAL void m0_net_tm_event_post(const struct m0_net_tm_event *ev)
Definition: tm.c:84
M0_INTERNAL void m0_mutex_init(struct m0_mutex *mutex)
Definition: mutex.c:35
#define M0_POST(cond)
Definition: xcode.h:73
static void token(struct ff2c_context *ctx, struct ff2c_term *term, struct ff2c_token *tok)
Definition: parser.c:66
uint32_t v_nr
Definition: vec.h:51
Definition: chan.h:229
struct m0_net_transfer_mc * nep_tm
Definition: net.h:493
m0_bcount_t * v_count
Definition: vec.h:53
static uint64_t min64u(uint64_t a, uint64_t b)
Definition: arith.h:66
M0_INTERNAL void m0_tlink_init(const struct m0_tl_descr *d, void *obj)
Definition: tlist.c:63
#define M0_TL_DEFINE(name, scope, amb_type)
Definition: tlist.h:550
static struct fdmi_ctx ctx
Definition: main.c:80
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
uint16_t nla_tmid
Definition: ip.h:67
static uint32_t min32u(uint32_t a, uint32_t b)
Definition: arith.h:56
char * ep
Definition: sw.h:132
M0_INTERNAL bool m0_net__buffer_invariant(const struct m0_net_buffer *buf)
Definition: buf.c:46
static struct m0_chan chan[RDWR_REQUEST_MAX]
Definition: glob.c:32
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
m0_time_t m0_time_from_now(uint64_t secs, long ns)
Definition: time.c:96
M0_INTERNAL int m0_net_libfab_init(void)
Definition: libfab.c:3447
M0_INTERNAL struct m0_thread * m0_thread_self(void)
Definition: thread.c:122
Definition: addb2.c:200
static struct m0_rm_remote * remote
Definition: rm_fops.c:35
M0_INTERNAL int m0_net_ip_parse(const char *name, struct m0_net_ip_addr *addr)
Definition: ip.c:343
#define _0C(exp)
Definition: assert.h:311
M0_INTERNAL void m0_mutex_fini(struct m0_mutex *mutex)
Definition: mutex.c:42
#define M0_TL_DESCR_DEFINE(name, hname, scope, amb_type, amb_link_field, amb_magic_field, amb_magic, head_magic)
Definition: tlist.h:535
const char * nx_name
Definition: net.h:125
struct m0t1fs_filedata * fd
Definition: dir.c:1030
M0_INTERNAL void m0_net_xprt_deregister(const struct m0_net_xprt *xprt)
Definition: net.c:197
int(* xo_dom_init)(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: net.h:139
static uint64_t found
Definition: base.c:376
static struct m0_addb2_net * net
Definition: net.c:27
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
#define likely(x)
Definition: assert.h:70
uint32_t sn[4]
Definition: ip.h:76
Definition: nucleus.c:42
#define out(...)
Definition: gen.c:41
struct m0_net_xprt * xprt
Definition: module.c:61
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
void m0_free(void *data)
Definition: memory.c:146
M0_INTERNAL bool m0_processor_is_vm(void)
Definition: processor.c:1171
#define m0_htable_endfor
Definition: hash.h:491
void * nb_xprt_private
Definition: net.h:1461
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
#define M0_BASSERT(cond)
int const char void * buffer
Definition: dir.c:435
struct m0_net_end_point * nb_ep
Definition: net.h:1424
#define M0_HT_DEFINE(name, scope, amb_type, key_type)
Definition: hash.h:377
Definition: idx_mock.c:47
#define M0_IMPOSSIBLE(fmt,...)
M0_INTERNAL bool m0_net_ip_addr_eq(const struct m0_net_ip_addr *addr1, const struct m0_net_ip_addr *addr2, bool is_ncmp)
Definition: ip.c:464