Motr  M0
sock.c
Go to the documentation of this file.
1 /* -*- C -*- */
2 /*
3  * Copyright (c) 2019-2020 Seagate Technology LLC and/or its Affiliates
4  *
5  * Licensed under the Apache License, Version 2.0 (the "License");
6  * you may not use this file except in compliance with the License.
7  * You may obtain a copy of the License at
8  *
9  * http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  *
17  * For any questions about this software or licensing,
18  * please email opensource@seagate.com or cortx-questions@seagate.com.
19  *
20  */
21 
22 
607 #include <sys/epoll.h> /* epoll_create */
608 
609 #include <sys/types.h>
610 #include <sys/uio.h>
611 #include <sys/socket.h> /* epoll_create */
612 #include <netinet/in.h> /* INET_ADDRSTRLEN */
613 #include <netinet/ip.h>
614 #include <arpa/inet.h> /* inet_pton, htons */
615 #include <string.h> /* strchr */
616 #include <unistd.h> /* close */
617 
618 #define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_NET
619 #include "lib/trace.h"
620 #include "lib/errno.h" /* EINTR, ENOMEM */
621 #include "lib/thread.h"
622 #include "lib/misc.h" /* ARRAY_SIZE, IS_IN_ARRAY */
623 #include "lib/tlist.h"
624 #include "lib/types.h"
625 #include "lib/string.h" /* m0_strdup */
626 #include "lib/chan.h"
627 #include "lib/memory.h"
628 #include "lib/cookie.h"
629 #include "lib/bitmap.h"
630 #include "lib/refs.h"
631 #include "lib/time.h"
632 #include "sm/sm.h"
633 #include "motr/magic.h"
634 #include "net/net.h"
635 #include "net/buffer_pool.h"
636 #include "net/net_internal.h" /* m0_net__tm_invariant */
637 #include "format/format.h"
638 
639 #include "net/sock/xcode.h"
640 #include "net/sock/xcode_xc.h"
641 
642 #define EP_DEBUG (1)
643 
644 #ifdef ENABLE_SOCK_MOCK_LNET
645 #define MOCK_LNET (1)
646 #else
647 #define MOCK_LNET (0)
648 #endif
649 
650 struct sock;
651 struct mover;
652 struct addr;
653 struct ep;
654 struct buf;
655 struct ma;
656 struct bdesc;
657 struct packet;
658 
691 };
692 
702 
704 };
705 
714 };
715 
740 enum rw_state {
748 
751 };
752 
754 struct ep {
758  struct addr e_a;
760  struct m0_tl e_sock;
762  struct m0_tl e_writer;
763 #ifdef EP_DEBUG
765  int e_r_sock;
766  int e_r_buf;
767  int e_r_find;
768 #endif
769 };
770 
772 struct ma {
800  struct m0_tl t_done;
801 };
802 
806 struct mover_op {
808  int (*o_op)(struct mover *self, struct sock *s);
815  bool o_doesio;
816 };
817 
825 struct mover_op_vec {
827  const char *v_name;
833  void (*v_error)(struct mover *self, struct sock *s, int rc);
837  void (*v_done) (struct mover *self, struct sock *s);
838 };
839 
846 struct mover {
847  uint64_t m_magix;
859  struct sock *m_sock;
861  struct ep *m_ep;
862  struct m0_sm m_sm;
863  const struct mover_op_vec *m_op;
867  struct buf *m_buf;
869  struct packet m_pk;
871  char m_pkbuf[sizeof(struct packet)];
881  void *m_scratch;
882 };
883 
887 struct buf {
888  uint64_t b_magix;
890  uint64_t b_cookie;
894  struct mover b_writer;
898  struct bdesc b_peer;
900  struct ep *b_other;
910 };
911 
913 struct sock {
914  uint64_t s_magix;
916  int s_fd;
918  uint64_t s_flags;
919  struct m0_sm s_sm;
921  struct ep *s_ep;
923  struct mover s_reader;
928 };
929 
935 struct socktype {
936  const char *st_name;
938  const struct mover_op_vec *st_reader;
940  m0_bcount_t (*st_pk_size)(const struct mover *w,
941  const struct sock *s);
946  void (*st_error)(struct mover *m, struct sock *s);
948  int st_proto;
949 };
950 
956 struct pfamily {
957  const char *f_name;
963  void (*f_encode)(const struct addr *a, struct sockaddr *sa);
969  void (*f_decode)(struct addr *a, const struct sockaddr *sa);
970 };
971 
972 /*
973  * Tracing helpers
974  */
975 
976 #define SAFE(val, exp, fallback) ((val) != NULL ? (exp) : (fallback))
977 
978 #define EP_F "%s"
979 #define EP_P(e) SAFE(e, (e)->e_ep.nep_addr, "null")
980 #define EP_FL EP_F "->" EP_F
981 #define EP_PL(e) EP_P(SAFE(e, ma_src(ep_ma(e)), NULL)), EP_P(e)
982 
983 #define SOCK_F EP_FL "/%i/%" PRIx64 "/%i"
984 #define SOCK_P(s) EP_PL(SAFE(s, (s)->s_ep, NULL)), \
985  SAFE(s, (s)->s_sm.sm_state, 0), \
986  SAFE(s, (s)->s_flags, 0), SAFE(s, (s)->s_fd, 0)
987 
988 #define B_F "[[%i]@%p:"EP_FL"]"
989 #define B_P(b) (b)->b_buf->nb_qtype, (b), EP_PL((b)->b_other)
990 
991 #define TLOG(...) do { /* M0_LOG(M0_DEBUG, __VA_ARGS__) */; } while (0)
992 
993 M0_TL_DESCR_DEFINE(s, "sockets",
994  static, struct sock, s_linkage, s_magix,
996 M0_TL_DEFINE(s, static, struct sock);
997 
998 M0_TL_DESCR_DEFINE(m, "movers",
999  static, struct mover, m_linkage, m_magix,
1001 M0_TL_DEFINE(m, static, struct mover);
1002 
1003 M0_TL_DESCR_DEFINE(b, "buffers",
1004  static, struct buf, b_linkage, b_magix,
1006 M0_TL_DEFINE(b, static, struct buf);
1007 
1008 static int dom_init(const struct m0_net_xprt *xprt, struct m0_net_domain *dom);
1009 static void dom_fini(struct m0_net_domain *dom);
1010 static int ma_init(struct m0_net_transfer_mc *ma);
1011 static int ma_confine(struct m0_net_transfer_mc *ma,
1012  const struct m0_bitmap *processors);
1013 static int ma_start(struct m0_net_transfer_mc *ma, const char *name);
1014 static int ma_stop(struct m0_net_transfer_mc *ma, bool cancel);
1015 static void ma_fini(struct m0_net_transfer_mc *ma);
1016 static int end_point_create(struct m0_net_end_point **epp,
1017  struct m0_net_transfer_mc *ma, const char *name);
1018 static int buf_register(struct m0_net_buffer *nb);
1019 static void buf_deregister(struct m0_net_buffer *nb);
1020 static int buf_add(struct m0_net_buffer *nb);
1021 static void buf_del(struct m0_net_buffer *nb);
1022 static int bev_deliver_sync(struct m0_net_transfer_mc *ma);
1023 static void bev_deliver_all(struct m0_net_transfer_mc *ma);
1024 static bool bev_pending(struct m0_net_transfer_mc *ma);
1025 static void bev_notify(struct m0_net_transfer_mc *ma, struct m0_chan *chan);
1026 static m0_bcount_t get_max_buffer_size(const struct m0_net_domain *dom);
1028 static int32_t get_max_buffer_segments(const struct m0_net_domain *dom);
1029 static m0_bcount_t get_max_buffer_desc_size(const struct m0_net_domain *);
1030 
1031 static void poller (struct ma *ma);
1032 static void ma__fini (struct ma *ma);
1033 static void ma_prune (struct ma *ma);
1034 static void ma_lock (struct ma *ma);
1035 static void ma_unlock(struct ma *ma);
1036 static bool ma_is_locked(const struct ma *ma);
1037 static bool ma_invariant(const struct ma *ma);
1038 static void ma_event_post (struct ma *ma, enum m0_net_tm_state state);
1039 static void ma_buf_done (struct ma *ma);
1040 static void ma_buf_timeout(struct ma *ma);
1041 static struct buf *ma_recv_buf(struct ma *ma, m0_bcount_t len);
1042 
1043 static struct ep *ma_src(struct ma *ma);
1044 
1045 static int ep_find (struct ma *ma, const char *name, struct ep **out);
1046 static int ep_create (struct ma *ma, struct addr *addr, const char *name,
1047  struct ep **out);
1048 static void ep_free (struct ep *ep);
1049 static void ep_put (struct ep *ep);
1050 static void ep_get (struct ep *ep);
1051 static bool ep_eq (const struct ep *ep, const struct addr *addr);
1052 static struct ma *ep_ma (struct ep *ep);
1053 static struct ep *ep_net(struct m0_net_end_point *net);
1054 static void ep_release(struct m0_ref *ref);
1055 static bool ep_invariant(const struct ep *ep);
1056 static int ep_add(struct ep *ep, struct mover *w);
1057 static void ep_del(struct mover *w);
1058 static int ep_balance(struct ep *ep);
1059 
1060 static int addr_resolve (struct addr *addr, const char *name);
1061 static int addr_parse (struct addr *addr, const char *name);
1062 static int addr_parse_lnet (struct addr *addr, const char *name);
1063 static int addr_parse_native(struct addr *addr, const char *name);
1064 static void addr_decode (struct addr *addr, const struct sockaddr *sa);
1065 static void addr_encode (const struct addr *addr, struct sockaddr *sa);
1066 static char *addr_print (const struct addr *addr);
1067 static bool addr_invariant (const struct addr *addr);
1068 static bool addr_eq (const struct addr *a0, const struct addr *a1);
1069 
1070 static int sock_in(struct sock *s);
1071 static void sock_out(struct sock *s);
1072 static void sock_close(struct sock *s);
1073 static void sock_done(struct sock *s, bool balance);
1074 static void sock_fini(struct sock *s);
1075 static bool sock_event(struct sock *s, uint32_t ev);
1076 static int sock_ctl(struct sock *s, int op, uint32_t flags);
1077 static int sock_init_fd(int fd, struct sock *s, struct ep *ep, uint32_t flags);
1078 static int sock_init(int fd, struct ep *src, struct ep *tgt, uint32_t flags);
1079 static struct mover *sock_writer(struct sock *s);
1080 static bool sock_invariant(const struct sock *s);
1081 
1082 static struct ma *buf_ma(struct buf *buf);
1083 static bool buf_invariant(const struct buf *buf);
1084 static void buf_fini (struct buf *buf);
1085 static int buf_accept (struct buf *buf, struct mover *m);
1086 static void buf_done (struct buf *buf, int rc);
1087 static void buf_complete (struct buf *buf);
1088 
1089 static int bdesc_create(struct addr *addr, struct buf *buf,
1090  struct m0_net_buf_desc *out);
1091 static int bdesc_encode(const struct bdesc *bd, struct m0_net_buf_desc *out);
1092 static int bdesc_decode(const struct m0_net_buf_desc *nbd, struct bdesc *out);
1093 
1094 static void mover_init(struct mover *m, struct ma *ma,
1095  const struct mover_op_vec *vop);
1096 static void mover_fini(struct mover *m);
1097 static int mover_op (struct mover *m, struct sock *s, int op);
1098 static bool mover_is_reader(const struct mover *m);
1099 static bool mover_is_writer(const struct mover *m);
1100 static bool mover_invariant(const struct mover *m);
1101 
1102 static m0_bcount_t pk_size (const struct mover *m, const struct sock *s);
1103 static m0_bcount_t pk_tsize(const struct mover *m);
1104 static m0_bcount_t pk_dnob (const struct mover *m);
1105 static int pk_state(const struct mover *w);
1106 static int pk_io(struct mover *w, struct sock *s,
1107  uint64_t flag, struct m0_bufvec *bv, m0_bcount_t size);
1108 static int pk_iov_prep(struct mover *m, struct iovec *iv, int nr,
1109  struct m0_bufvec *bv, m0_bcount_t size, int *count);
1110 static void pk_header_init(struct mover *m, struct sock *s);
1111 static int pk_header_done(struct mover *m);
1112 static void pk_done (struct mover *m);
1113 static void pk_encdec(struct mover *m, enum m0_xcode_what what);
1114 static void pk_decode(struct mover *m);
1115 static void pk_encode(struct mover *m);
1116 
1117 static int stream_idle (struct mover *self, struct sock *s);
1118 static int stream_pk (struct mover *self, struct sock *s);
1119 static int stream_header (struct mover *self, struct sock *s);
1120 static int stream_interval(struct mover *self, struct sock *s);
1121 static int stream_pk_done (struct mover *self, struct sock *s);
1122 static int dgram_idle (struct mover *self, struct sock *s);
1123 static int dgram_pk (struct mover *self, struct sock *s);
1124 static int dgram_header (struct mover *self, struct sock *s);
1125 static int dgram_interval (struct mover *self, struct sock *s);
1126 static int dgram_pk_done (struct mover *self, struct sock *s);
1127 static int writer_idle (struct mover *self, struct sock *s);
1128 static int writer_pk (struct mover *self, struct sock *s);
1129 static int writer_write (struct mover *self, struct sock *s);
1130 static int writer_pk_done (struct mover *self, struct sock *s);
1131 static int get_idle (struct mover *self, struct sock *s);
1132 static int get_pk (struct mover *self, struct sock *s);
1133 static void writer_done (struct mover *self, struct sock *s);
1134 static void writer_error (struct mover *w, struct sock *s, int rc);
1135 
1136 static m0_bcount_t stream_pk_size(const struct mover *w, const struct sock *s);
1137 static void stream_error(struct mover *m, struct sock *s);
1138 
1139 static m0_bcount_t dgram_pk_size(const struct mover *w, const struct sock *s);
1140 static void dgram_error(struct mover *m, struct sock *s);
1141 
1142 static void ip4_encode(const struct addr *a, struct sockaddr *sa);
1143 static void ip4_decode(struct addr *a, const struct sockaddr *sa);
1144 static void ip6_encode(const struct addr *a, struct sockaddr *sa);
1145 static void ip6_decode(struct addr *a, const struct sockaddr *sa);
1146 
1147 static const struct m0_sm_conf sock_conf;
1148 static const struct m0_sm_conf rw_conf;
1149 
1150 static const struct mover_op_vec stream_reader_op;
1151 static const struct mover_op_vec dgram_reader_op;
1152 static const struct mover_op_vec writer_op;
1153 static const struct mover_op_vec get_op;
1154 
1155 static const struct pfamily pf[];
1156 static const struct socktype stype[];
1157 
1158 static const struct m0_format_tag put_tag;
1159 static const struct m0_format_tag get_tag;
1160 
1161 static const struct pfamily pf[] = {
1162  [AF_UNIX] = {
1163  .f_name = "unix"
1164  },
1165  [AF_INET] = {
1166  .f_name = "inet",
1167  .f_encode = &ip4_encode,
1168  .f_decode = &ip4_decode
1169  },
1170  [AF_INET6] = {
1171  .f_name = "inet6",
1172  .f_encode = &ip6_encode,
1173  .f_decode = &ip6_decode
1174  }
1175 };
1176 
1177 static const struct socktype stype[] = {
1178  [SOCK_STREAM] = {
1179  .st_name = "stream",
1180  .st_reader = &stream_reader_op,
1181  .st_pk_size = &stream_pk_size,
1182  .st_error = &stream_error,
1183  .st_proto = IPPROTO_TCP
1184  },
1185  [SOCK_DGRAM] = {
1186  .st_name = "dgram",
1187  .st_reader = &dgram_reader_op,
1188  .st_pk_size = &dgram_pk_size,
1189  .st_error = &dgram_error,
1190  .st_proto = IPPROTO_UDP
1191  }
1192 };
1193 
1194 /*
1195  * static const char *rw_name[] = {
1196  * "IDLE",
1197  * "PK",
1198  * "HEADER",
1199  * "INTERVAL",
1200  * "PK_DONE",
1201  * "FAIL",
1202  * "DONE",
1203  * };
1204  */
1205 
1206 #ifdef EP_DEBUG
1207 #define EP_GET(e, f) \
1208  ({ struct ep *__ep = (e); ep_get(__ep); M0_CNT_INC(__ep->e_r_ ## f); })
1209 #define EP_PUT(e, f) \
1210  ({ struct ep *__ep = (e); ep_put(__ep); M0_CNT_DEC(__ep->e_r_ ## f); })
1211 #else
1212 #define EP_GET(e, f) ep_get(e)
1213 #define EP_PUT(e, f) ep_put(e)
1214 #endif
1215 
1216 static bool ma_invariant(const struct ma *ma)
1217 {
1218  const struct m0_net_transfer_mc *net = ma->t_ma;
1219  const struct m0_tl *eps = &net->ntm_end_points;
1220 
1221  return _0C(net != NULL) &&
1222  _0C(net->ntm_xprt_private == ma) &&
1224  s_tlist_invariant(&ma->t_deathrow) &&
1225  /* ma is either fully uninitialised or fully initialised. */
1226  _0C((ma->t_poller.t_func == NULL && ma->t_epollfd == -1 &&
1227  m0_nep_tlist_is_empty(eps) &&
1228  s_tlist_is_empty(&ma->t_deathrow)) ||
1229  (ma->t_poller.t_func != NULL && ma->t_epollfd >= 0 &&
1230  m0_tl_exists(m0_nep, nep, eps,
1231  m0_tl_exists(s, s, &ep_net(nep)->e_sock,
1232  s->s_sm.sm_state == S_LISTENING))) ||
1233  ma->t_shutdown) &&
1234  /* In STARTED state ma is fully initialised. */
1235  _0C(ergo(net->ntm_state == M0_NET_TM_STARTED,
1236  ma->t_epollfd >= 0)) &&
1238  /* Endpoints are unique. */
1239  _0C(m0_tl_forall(m0_nep, p, eps,
1240  m0_tl_forall(m0_nep, q, eps,
1241  ep_eq(ep_net(p), &ep_net(q)->e_a) == (p == q)))) &&
1242  _0C(m0_tl_forall(m0_nep, p, eps, ep_ma(ep_net(p)) == ma &&
1243  ep_invariant(ep_net(p)))) &&
1244  _0C(m0_forall(i, ARRAY_SIZE(net->ntm_q),
1245  m0_tl_forall(m0_net_tm, nb, &net->ntm_q[i],
1246  buf_invariant(nb->nb_xprt_private)))) &&
1248 }
1249 
1250 static bool sock_invariant(const struct sock *s)
1251 {
1252  struct ma *ma = ep_ma(s->s_ep);
1253 
1254  return _0C((s->s_sm.sm_state == S_DELETED) ==
1255  s_tlist_contains(&ma->t_deathrow, s)) &&
1256  _0C((s->s_sm.sm_state != S_DELETED) ==
1257  s_tlist_contains(&s->s_ep->e_sock, s));
1258 }
1259 
1260 static bool buf_invariant(const struct buf *buf)
1261 {
1262  const struct m0_net_buffer *nb = buf->b_buf;
1263  /* Either the buffer is only added to the domain (not associated with a
1264  transfer machine... */
1265  return (nb->nb_flags == M0_NET_BUF_REGISTERED &&
1266  nb->nb_tm == NULL) ^ /* or (exclusively) ... */
1267  /* it is queued to a machine. */
1269  _0C(nb->nb_tm != NULL) &&
1271  mover_invariant(&buf->b_writer))) &&
1273 }
1274 
1275 static bool addr_invariant(const struct addr *a)
1276 {
1277  return _0C(IS_IN_ARRAY(a->a_family, pf)) &&
1278  _0C(pf[a->a_family].f_name != NULL) &&
1279  _0C(IS_IN_ARRAY(a->a_socktype, stype)) &&
1280  _0C(stype[a->a_socktype].st_name != NULL) &&
1281  _0C(M0_IN(a->a_family, (AF_INET, AF_INET6))) &&
1282  _0C(M0_IN(a->a_socktype, (SOCK_STREAM, SOCK_DGRAM))) &&
1283  _0C(M0_IN(a->a_protocol, (0, IPPROTO_TCP, IPPROTO_UDP)));
1284 }
1285 
1286 static bool ep_invariant(const struct ep *ep)
1287 {
1288  const struct ma *ma = ep_ma((void *)ep);
1289  return addr_invariant(&ep->e_a) &&
1290  m0_net__ep_invariant((void *)&ep->e_ep,
1291  (void *)ma->t_ma, true) &&
1292  _0C(ep->e_ep.nep_addr != NULL) &&
1293 #ifdef EP_DEBUG
1294  /*
1295  * Reference counters consistency:
1296  */
1297 
1298  /* Each writer got a reference... */
1299  _0C(ep->e_r_mover == m_tlist_length(&ep->e_writer)) &&
1300  /*
1301  * and each socket (including ones lingering on ma->t_deathrow)
1302  * got a reference.
1303  */
1304  _0C(ep->e_r_sock == s_tlist_length(&ep->e_sock) +
1305  m0_tl_fold(s, s, dead, &ma->t_deathrow, 0,
1306  dead + (s->s_ep == ep))) &&
1307 #endif
1308  _0C(m0_tl_forall(s, s, &ep->e_sock,
1309  s->s_ep == ep && sock_invariant(s))) &&
1310  _0C(m0_tl_forall(m, w, &ep->e_writer,
1311  w->m_ep == ep &&
1312  /*
1313  * The writer locked to a socket is always at
1314  * the head of the writers list.
1315  */
1316  ergo(w->m_sock != NULL,
1317  w == m_tlist_head(&ep->e_writer))));
1318 }
1319 
1320 static bool mover_invariant(const struct mover *m)
1321 {
1322  return _0C(m_tlink_is_in(m) == (m->m_ep != NULL)) &&
1323  _0C(M0_IN(m->m_op, (&writer_op, &get_op)) ||
1325  stype[i].st_reader == m->m_op));
1326 }
1327 
1329 static int dom_init(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
1330 {
1331  M0_ENTRY();
1332  return M0_RC(0);
1333 }
1334 
1336 static void dom_fini(struct m0_net_domain *dom)
1337 {
1338  M0_ENTRY();
1339  M0_LEAVE();
1340 }
1341 
1342 static void ma_lock(struct ma *ma)
1343 {
1344  m0_mutex_lock(&ma->t_ma->ntm_mutex);
1345 }
1346 
1347 static void ma_unlock(struct ma *ma)
1348 {
1349  m0_mutex_unlock(&ma->t_ma->ntm_mutex);
1350 }
1351 
1352 static bool ma_is_locked(const struct ma *ma)
1353 {
1354  return m0_mutex_is_locked(&ma->t_ma->ntm_mutex);
1355 }
1356 
1360 static void poller(struct ma *ma)
1361 {
1362  enum { EV_NR = 256 };
1363  struct epoll_event ev[EV_NR] = {};
1364  int nr;
1365  int i;
1366  /*
1367  * Notify users that ma reached M0_NET_TM_STARTED state.
1368  *
1369  * This also sets ma->ntm_ep.
1370  *
1371  * This should be done once per tm, so if multiple poller threads are
1372  * used, only 1 event should be posted.
1373  *
1374  * @todo there is a race condition here: an application (i.e., the rpc
1375  * layer), might timeout waiting for the ma to start and call
1376  * m0_net_tm_stop(), which moves ma into STOPPING state, but the event
1377  * posted below moves the ma back into STARTED state.
1378  *
1379  * Because of this, we do not assert ma states here.
1380  */
1382  while (1) {
1383  if (ma->t_shutdown)
1384  break;
1385  nr = epoll_wait(ma->t_epollfd, ev, ARRAY_SIZE(ev), 1000);
1386  if (nr == -1) {
1387  M0_LOG(M0_DEBUG, "epoll: %i.", -errno);
1388  M0_ASSERT(errno == EINTR);
1389  continue;
1390  }
1391  /* Check again because epoll() may block for some time */
1392  if (ma->t_shutdown)
1393  break;
1394  M0_LOG(M0_DEBUG, "Got: %d.", nr);
1395  ma_lock(ma);
1397  for (i = 0; i < nr; ++i) {
1398  struct sock *s = ev[i].data.ptr;
1399 
1400  if (s->s_sm.sm_state == S_DELETED)
1401  continue;
1402  if (sock_event(s, ev[i].events))
1403  /*
1404  * Ran out of buffers on the receive queue,
1405  * break out, deliver completion events,
1406  * re-provision.
1407  */
1408  break;
1409  }
1410  /* @todo close long-unused sockets. */
1411  ma_buf_timeout(ma);
1412  /*
1413  * Deliver buffer completion events and re-provision receive
1414  * queue if necessary.
1415  */
1416  ma_buf_done(ma);
1418  /*
1419  * This is the only place, where sock structures are freed,
1420  * except for ma finalisation.
1421  */
1422  ma_prune(ma);
1424  ma_unlock(ma);
1425  }
1426 }
1427 
1445 static int ma_init(struct m0_net_transfer_mc *net)
1446 {
1447  struct ma *ma;
1448  int result;
1449 
1450  M0_ASSERT(net->ntm_xprt_private == NULL);
1451 
1452  M0_ALLOC_PTR(ma);
1453  if (ma != NULL) {
1454  ma->t_epollfd = -1;
1455  ma->t_shutdown = false;
1456  net->ntm_xprt_private = ma;
1457  ma->t_ma = net;
1458  s_tlist_init(&ma->t_deathrow);
1459  b_tlist_init(&ma->t_done);
1460  result = 0;
1461  } else
1462  result = M0_ERR(-ENOMEM);
1463  return M0_RC(result);
1464 }
1465 
1467 static void ma_prune(struct ma *ma)
1468 {
1469  struct sock *sock;
1470 
1472  m0_tl_for(s, &ma->t_deathrow, sock) {
1473  sock_fini(sock);
1474  } m0_tl_endfor;
1475  M0_POST(s_tlist_is_empty(&ma->t_deathrow));
1476 }
1477 
1484 static void ma__fini(struct ma *ma)
1485 {
1486  struct m0_net_end_point *net;
1487 
1489  if (!ma->t_shutdown) {
1490  /* Set the shutdown flag.
1491  * Release the lock and wait for poller(), so that the poller()
1492  * will get a chance to detect this flag and exit.
1493  */
1494  ma->t_shutdown = true;
1495  ma_unlock(ma);
1496  if (ma->t_poller.t_func != NULL) {
1499  }
1500  /* Go on finalizing the ma */
1501  ma_lock(ma);
1502  m0_tl_for(m0_nep, &ma->t_ma->ntm_end_points, net) {
1503  struct ep *ep = ep_net(net);
1504  struct sock *sock;
1505  m0_tl_for(s, &ep->e_sock, sock) {
1506  sock_done(sock, false);
1507  } m0_tl_endfor;
1508  } m0_tl_endfor;
1509  /*
1510  * Finalise epoll after sockets, because sock_done() removes the
1511  * socket from the poll set.
1512  */
1513  if (ma->t_epollfd >= 0) {
1514  close(ma->t_epollfd);
1515  ma->t_epollfd = -1;
1516  }
1517  ma_buf_done(ma);
1518  ma_prune(ma);
1519  b_tlist_fini(&ma->t_done);
1520  s_tlist_fini(&ma->t_deathrow);
1521  M0_ASSERT(m0_nep_tlist_is_empty(&ma->t_ma->ntm_end_points));
1522  ma->t_ma->ntm_ep = NULL;
1523  }
1525 }
1526 
1530 static void ma_fini(struct m0_net_transfer_mc *net)
1531 {
1532  struct ma *ma = net->ntm_xprt_private;
1533 
1534  ma_lock(ma);
1536  ma__fini(ma);
1537  ma_unlock(ma);
1538  net->ntm_xprt_private = NULL;
1539  m0_free(ma);
1540 }
1541 
1551 static int ma_start(struct m0_net_transfer_mc *net, const char *name)
1552 {
1553  struct ma *ma = net->ntm_xprt_private;
1554  int result;
1555 
1557  M0_PRE(net->ntm_state == M0_NET_TM_STARTING);
1558 
1559  /*
1560  * - initialise epoll
1561  *
1562  * - parse the address and create the source endpoint
1563  *
1564  * - create the listening socket
1565  *
1566  * - start the poller thread.
1567  *
1568  * Should be done in this order, because the poller thread uses the
1569  * listening socket to get the source endpoint to post a ma state change
1570  * event (outside of ma lock).
1571  */
1572  ma->t_epollfd = epoll_create(1);
1573  if (ma->t_epollfd >= 0) {
1574  struct ep *ep;
1575 
1576  result = ep_find(ma, name, &ep);
1577  if (result == 0) {
1578  result = sock_init(-1, ep, NULL, EPOLLET);
1579  if (result == 0) {
1580  result = M0_THREAD_INIT(&ma->t_poller,
1581  struct ma *, NULL,
1582  &poller, ma, "socktm");
1583  }
1584  EP_PUT(ep, find);
1585  }
1586  } else
1587  result = -errno;
1588  if (result != 0)
1589  ma__fini(ma);
1591  return M0_RC(result);
1592 }
1593 
1603 static int ma_stop(struct m0_net_transfer_mc *net, bool cancel)
1604 {
1605  struct ma *ma = net->ntm_xprt_private;
1606 
1608  M0_PRE(net->ntm_state == M0_NET_TM_STOPPING);
1609 
1610  if (cancel)
1612  ma__fini(ma);
1613  ma_unlock(ma);
1615  ma_lock(ma);
1616  return 0;
1617 }
1618 
1619 static int ma_confine(struct m0_net_transfer_mc *ma,
1620  const struct m0_bitmap *processors)
1621 {
1622  return -ENOSYS;
1623 }
1624 
1628 static void ma_event_post(struct ma *ma, enum m0_net_tm_state state)
1629 {
1630  struct m0_net_end_point *listen;
1631 
1632  /*
1633  * Find "self" end-point. Cannot use ma_src(), because
1634  * m0_net_transfer_mc::ntm_ep can be not yet or already not set.
1635  */
1636  if (state == M0_NET_TM_STARTED) {
1637  listen = m0_tl_find(m0_nep, ne, &ma->t_ma->ntm_end_points,
1638  m0_tl_exists(s, sock, &ep_net(ne)->e_sock,
1639  sock->s_sm.sm_state == S_LISTENING));
1640  M0_ASSERT(listen != NULL);
1641  } else
1642  listen = NULL;
1644  .nte_type = M0_NET_TEV_STATE_CHANGE,
1645  .nte_next_state = state,
1646  .nte_time = m0_time_now(),
1647  .nte_ep = listen,
1648  .nte_tm = ma->t_ma,
1649  });
1650 }
1651 
1656 static void ma_buf_timeout(struct ma *ma)
1657 {
1658  struct m0_net_transfer_mc *net = ma->t_ma;
1659  int i;
1660  m0_time_t now = m0_time_now();
1661 
1663  for (i = 0; i < ARRAY_SIZE(net->ntm_q); ++i) {
1664  struct m0_net_buffer *nb;
1665 
1666  m0_tl_for(m0_net_tm, &ma->t_ma->ntm_q[i], nb) {
1667  if (nb->nb_timeout < now) {
1669  buf_done(nb->nb_xprt_private, -ETIMEDOUT);
1670  }
1671  } m0_tl_endfor;
1672  }
1674 }
1675 
1683 static void ma_buf_done(struct ma *ma)
1684 {
1685  struct buf *buf;
1686  int nr = 0;
1687 
1689  m0_tl_for(b, &ma->t_done, buf) {
1690  b_tlist_del(buf);
1691  buf_complete(buf);
1692  nr++;
1693  } m0_tl_endfor;
1694  if (nr > 0 && ma->t_ma->ntm_callback_counter == 0)
1697 }
1698 
1703 static struct buf *ma_recv_buf(struct ma *ma, m0_bcount_t len)
1704 {
1705  struct m0_net_buffer *nb;
1706  nb = m0_tl_find(m0_net_tm, nb, &ma->t_ma->ntm_q[M0_NET_QT_MSG_RECV],({
1707  struct buf *b = nb->nb_xprt_private;
1708 
1709  b->b_done.b_words == NULL &&
1710  m0_vec_count(&nb->nb_buffer.ov_vec) >= len;
1711  }));
1712  return nb != NULL ? nb->nb_xprt_private : NULL;
1713 }
1714 
1716 static struct ep *ma_src(struct ma *ma)
1717 {
1718  return ep_net(ma->t_ma->ntm_ep);
1719 }
1720 
1728 static int end_point_create(struct m0_net_end_point **epp,
1729  struct m0_net_transfer_mc *net,
1730  const char *name)
1731 {
1732  struct ep *ep;
1733  struct ma *ma = net->ntm_xprt_private;
1734  int result;
1735 
1737  result = ep_find(ma, name, &ep);
1738  *epp = result == 0 ? &ep->e_ep : NULL;
1739  return M0_RC(result);
1740 }
1741 
1749 static int buf_register(struct m0_net_buffer *nb)
1750 {
1751  struct buf *b;
1752 
1753  M0_ALLOC_PTR(b);
1754  if (b != NULL) {
1755  nb->nb_xprt_private = b;
1756  b->b_buf = nb;
1757  b_tlink_init(b);
1758  return M0_RC(0);
1759  } else
1760  return M0_ERR(-ENOMEM);
1761 }
1762 
1770 static void buf_deregister(struct m0_net_buffer *nb)
1771 {
1772  struct buf *buf = nb->nb_xprt_private;
1773 
1775  buf_fini(buf);
1776  m0_free(buf);
1777  nb->nb_xprt_private = NULL;
1778 }
1779 
1787 static int buf_add(struct m0_net_buffer *nb)
1788 {
1789  struct buf *buf = nb->nb_xprt_private;
1790  struct ma *ma = buf_ma(buf);
1791  struct mover *w = &buf->b_writer;
1792  struct bdesc *peer = &buf->b_peer;
1793  int qt = nb->nb_qtype;
1794  int result;
1795  //printf("add: %p[%i]\n", buf, qt);
1797  /* Next 2 asserts are from nlx_xo_buf_add(). */
1798  M0_PRE(nb->nb_offset == 0); /* Do not support an offset during add. */
1799  M0_PRE((nb->nb_flags & M0_NET_BUF_RETAIN) == 0);
1800 
1802  mover_init(w, ma, &writer_op);
1803  else if (qt == M0_NET_QT_ACTIVE_BULK_RECV)
1804  mover_init(w, ma, &get_op);
1805  w->m_buf = buf;
1806  switch (qt) {
1807  case M0_NET_QT_MSG_RECV:
1808  result = 0;
1809  break;
1810  case M0_NET_QT_MSG_SEND: {
1811  struct ep *ep = ep_net(nb->nb_ep);
1812 
1814  peer->bd_addr = ep->e_a;
1815  result = ep_add(ep, w);
1816  break;
1817  }
1818  case M0_NET_QT_PASSIVE_BULK_RECV: /* For passive buffers, generate */
1819  case M0_NET_QT_PASSIVE_BULK_SEND: /* the buffer descriptor. */
1821  result = bdesc_create(&ma_src(ma)->e_a, buf, &nb->nb_desc);
1822  break;
1823  case M0_NET_QT_ACTIVE_BULK_RECV: /* For active buffers, decode the */
1824  case M0_NET_QT_ACTIVE_BULK_SEND: /* passive buffer descriptor. */
1825  result = bdesc_decode(&nb->nb_desc, peer);
1826  if (result == 0) {
1827  struct ep *ep; /* Passive peer end-point. */
1828  result = ep_create(ma, &peer->bd_addr, NULL, &ep);
1829  if (result == 0) {
1830  result = ep_add(ep, w);
1831  EP_PUT(ep, find);
1832  }
1833  }
1834  break;
1835  default:
1836  M0_IMPOSSIBLE("invalid queue type: %x", qt);
1837  break;
1838  }
1839  if (result != 0)
1840  mover_fini(w);
1842  TLOG(B_F, B_P(buf));
1843  return M0_RC(result);
1844 }
1845 
1853 static void buf_del(struct m0_net_buffer *nb)
1854 {
1855  struct buf *buf = nb->nb_xprt_private;
1856  struct ma *ma = buf_ma(buf);
1857 
1859  TLOG(B_F, B_P(buf));
1861  buf_done(buf, -ECANCELED);
1862 }
1863 
1865 {
1866  return 0;
1867 }
1868 
1870 {
1871 }
1872 
1873 static bool bev_pending(struct m0_net_transfer_mc *ma)
1874 {
1875  return false;
1876 }
1877 
1878 static void bev_notify(struct m0_net_transfer_mc *ma, struct m0_chan *chan)
1879 {
1880 }
1881 
1890 {
1891  if (MOCK_LNET)
1892  return 1024*1024;
1893  else
1894  /* There is no real limit. Return an arbitrary large number. */
1895  return M0_BCOUNT_MAX / 2;
1896 }
1897 
1906 {
1907  if (MOCK_LNET)
1908  return 4096;
1909  else
1910  /* There is no real limit. Return an arbitrary large number. */
1911  return M0_BCOUNT_MAX / 2;
1912 }
1913 
1921 static int32_t get_max_buffer_segments(const struct m0_net_domain *dom)
1922 {
1923  if (MOCK_LNET)
1924  return 512;
1925  else
1926  /* There is no real limit. Return an arbitrary large number. */
1927  return INT32_MAX / 2; /* Beat this, LNet! */
1928 }
1929 
1938 {
1939  return sizeof(struct bdesc);
1940 }
1941 
1943 {
1944  if (MOCK_LNET)
1945  return 4096;
1946  else
1947  return default_xo_rpc_max_seg_size(ndom);
1948 }
1949 
1950 static uint32_t get_rpc_max_segs_nr(struct m0_net_domain *ndom)
1951 {
1952  if (MOCK_LNET)
1953  return 512;
1954  else
1955  return default_xo_rpc_max_segs_nr(ndom);
1956 }
1957 
1959 static int sock_in(struct sock *s)
1960 {
1961  M0_PRE(sock_invariant(s) && s->s_sm.sm_state == S_OPEN);
1962  TLOG(SOCK_F, SOCK_P(s));
1963  s->s_flags |= HAS_READ;
1964  return mover_op(&s->s_reader, s, M_READ);
1965 }
1966 
1968 static void sock_out(struct sock *s)
1969 {
1970  struct mover *w;
1971  int state;
1972 
1974  TLOG(SOCK_F, SOCK_P(s));
1975  s->s_flags |= HAS_WRITE;
1976  /*
1977  * @todo this can monopolise processor. Consider breaking out of this
1978  * loop after some number of iterations.
1979  */
1980  while ((s->s_flags & HAS_WRITE) &&
1981  (w = m_tlist_head(&s->s_ep->e_writer)) != NULL) {
1982  state = mover_op(w, s, M_WRITE);
1983  if (state != R_DONE && w->m_sock != s)
1984  m_tlist_move_tail(&s->s_ep->e_writer, w);
1985  }
1986 }
1987 
1989 static void sock_close(struct sock *s)
1990 {
1992  TLOG(SOCK_F, SOCK_P(s));
1993  mover_op(&s->s_reader, s, M_CLOSE);
1994  if (sock_writer(s) != NULL)
1996 }
1997 
1999 static struct mover *sock_writer(struct sock *s)
2000 {
2001  struct mover *w = m_tlist_head(&s->s_ep->e_writer);
2002 
2003  return w != NULL && w->m_sock == s ? w : NULL;
2004 }
2005 
2011 static void sock_fini(struct sock *s)
2012 {
2013  struct ma *ma = ep_ma(s->s_ep);
2014 
2016  M0_PRE(s->s_ep != NULL);
2017  M0_PRE(s->s_reader.m_sm.sm_conf == NULL);
2018  M0_PRE(s->s_sm.sm_conf != NULL);
2019  M0_PRE(s->s_sm.sm_state == S_DELETED);
2020  M0_PRE(s_tlist_contains(&ma->t_deathrow, s));
2021 
2022  TLOG(SOCK_F, SOCK_P(s));
2023  EP_PUT(s->s_ep, sock);
2024  s->s_ep = NULL;
2025  m0_sm_fini(&s->s_sm);
2026  s_tlink_del_fini(s);
2027  m0_free(s);
2028 }
2029 
2035 static void sock_done(struct sock *s, bool balance)
2036 {
2037  struct ma *ma = ep_ma(s->s_ep);
2038 
2040  M0_PRE(s->s_ep != NULL);
2041  M0_PRE(s->s_reader.m_sm.sm_conf != NULL);
2042  M0_PRE(s->s_sm.sm_conf != NULL);
2044 
2045  TLOG(SOCK_F, SOCK_P(s));
2046  /* This function can be called multiple times, should be idempotent. */
2047  if (s->s_fd > 0)
2048  sock_close(s);
2049  if (s->s_sm.sm_state != S_DELETED) { /* sock_close() might finalise. */
2050  mover_fini(&s->s_reader);
2051  M0_ASSERT(sock_writer(s) == NULL);
2052  if (s->s_fd > 0) {
2053  int result = sock_ctl(s, EPOLL_CTL_DEL, 0);
2054  M0_ASSERT(ergo(result != 0, errno == ENOENT));
2055  shutdown(s->s_fd, SHUT_RDWR);
2056  close(s->s_fd);
2057  s->s_fd = -1;
2058  }
2059  m0_sm_state_set(&s->s_sm, S_DELETED);
2060  s_tlist_move(&ma->t_deathrow, s);
2061  if (balance)
2062  (void)ep_balance(s->s_ep);
2063  }
2064 }
2065 
2079 static int sock_init(int fd, struct ep *src, struct ep *tgt, uint32_t flags)
2080 {
2081  struct ma *ma = ep_ma(src);
2082  struct ep *ep = tgt ?: src;
2083  struct sock *s;
2084  int result;
2085  int state = S_INIT;
2086 
2088  M0_PRE((flags & ~(EPOLLOUT|EPOLLET)) == 0);
2089  M0_PRE(src != NULL);
2090  M0_PRE(M0_IN(ma->t_ma->ntm_ep, (NULL, &src->e_ep)));
2091  M0_PRE(ergo(tgt != NULL, ma == ep_ma(tgt) &&
2092  src->e_a.a_family == tgt->e_a.a_family &&
2093  src->e_a.a_socktype == tgt->e_a.a_socktype &&
2094  src->e_a.a_protocol == tgt->e_a.a_protocol));
2095  M0_ALLOC_PTR(s);
2096  if (s == NULL)
2097  return M0_ERR(-ENOMEM);
2098  s->s_ep = ep;
2099  EP_GET(ep, sock);
2100  s_tlink_init_at(s, &ep->e_sock);
2101  m0_sm_init(&s->s_sm, &sock_conf, state, &ma->t_ma->ntm_group);
2102  mover_init(&s->s_reader, ma, stype[ep->e_a.a_socktype].st_reader);
2103  s->s_reader.m_sock = s;
2104  result = sock_init_fd(fd, s, src, flags);
2105  if (result == 0) {
2106  if (fd >= 0) {
2107  state = S_OPEN;
2108  } else if (tgt == NULL) {
2109  /* Listening. */
2110  if (ep->e_a.a_socktype == SOCK_STREAM)
2111  result = listen(s->s_fd, 128);
2112  if (result == 0)
2113  /*
2114  * Will be readable on an incoming connection.
2115  */
2116  state = S_LISTENING;
2117  else
2118  result = M0_ERR(-errno);
2119  } else {
2120  /* Connecting. */
2121  if (ep->e_a.a_socktype == SOCK_STREAM) {
2122  struct sockaddr_storage sa = {};
2123 
2124  addr_encode(&ep->e_a, (void *)&sa);
2125  result = connect(s->s_fd,
2126  (void *)&sa, sizeof sa);
2127  }
2128  if (result == 0) {
2129  state = S_OPEN;
2130  } else if (errno == EINPROGRESS) {
2131  /*
2132  * Will be writable on a successful connection,
2133  * will be writable and readable on a failure.
2134  */
2135  state = S_CONNECTING;
2136  result = 0;
2137  } else
2138  result = M0_ERR(-errno);
2139  }
2140  if (result == 0)
2141  m0_sm_state_set(&s->s_sm, state);
2142  }
2143  if (result != 0)
2144  sock_done(s, false);
2145  M0_POST(ergo(result == 0, s != NULL && sock_invariant(s)));
2146  return M0_RC(result);
2147 }
2148 
2154 static int sock_init_fd(int fd, struct sock *s, struct ep *ep, uint32_t flags)
2155 {
2156  int result = 0;
2157 
2158  if (fd < 0) {
2159  int flag = true;
2160 
2161  fd = socket(ep->e_a.a_family,
2162  /* Linux: set NONBLOCK immediately. */
2163  ep->e_a.a_socktype | SOCK_NONBLOCK,
2164  ep->e_a.a_protocol);
2165  /*
2166  * Perhaps set some other socket options here? SO_LINGER, etc.
2167  */
2168  if (fd >= 0) {
2169  /* EPOLLET means the socket is for listening. */
2170  if (flags & EPOLLET) {
2171  struct sockaddr_storage sa = {};
2172 
2173  result = setsockopt(fd, SOL_SOCKET,
2174  SO_REUSEADDR,
2175  &flag, sizeof flag);
2176  if (result == 0) {
2177  addr_encode(&ep->e_a, (void *)&sa);
2178  result = bind(fd, (void *)&sa,
2179  sizeof sa);
2180  } else
2181  result = M0_ERR(-errno);
2182  } else
2183  result = 0;
2184  }
2185  }
2186  if (fd >= 0 && result == 0) {
2187  s->s_fd = fd;
2188  result = sock_ctl(s, EPOLL_CTL_ADD, flags & ~EPOLLET);
2189  }
2190  if (result != 0 || fd < 0)
2191  result = M0_ERR(-errno);
2192  return M0_RC(result);
2193 }
2194 
2201 static bool sock_event(struct sock *s, uint32_t ev)
2202 {
2203  enum { EV_ERR = EPOLLRDHUP|EPOLLERR|EPOLLHUP };
2204  struct sockaddr_storage sa = {};
2205  struct addr addr;
2206  int result;
2207 
2209  M0_LOG(M0_DEBUG, "State: %x, event: %x.", s->s_sm.sm_state, ev);
2210  TLOG(SOCK_F": %x", SOCK_P(s), ev);
2211  switch (s->s_sm.sm_state) {
2212  case S_INIT:
2213  case S_DELETED:
2214  default:
2215  M0_IMPOSSIBLE("Wrong state: %x.", s->s_sm.sm_state);
2216  break;
2217  case S_LISTENING:
2218  if (ev == EPOLLIN) { /* A new incoming connection. */
2219  int fd;
2220  socklen_t socklen = sizeof sa;
2221  struct ep *we = s->s_ep;
2222 
2223  addr = we->e_a; /* Copy family, socktype, proto. */
2224  fd = accept4(s->s_fd,
2225  (void *)&sa, &socklen, SOCK_NONBLOCK);
2226  if (fd >= 0) {
2227  struct ep *ep = NULL;
2228 
2229  addr_decode(&addr, (void *)&sa);
2231  result = ep_create(ep_ma(we),
2232  &addr, NULL, &ep) ?:
2233  /*
2234  * Accept incoming connections
2235  * unconditionally. Alternatively, it
2236  * can be rejected by some admission
2237  * policy.
2238  */
2239  sock_init(fd, we, ep, 0);
2240  if (result != 0)
2241  /*
2242  * Maybe already closed in sock_init()
2243  * failure path, do not care.
2244  */
2245  close(fd);
2246  if (ep != NULL)
2247  EP_PUT(ep, find);
2248  } else if (M0_IN(errno, (EWOULDBLOCK, /* BSD */
2249  ECONNABORTED, /* POSIX */
2250  EPROTO)) || /* SVR4 */
2251  M0_IN(errno,
2252  /*
2253  * Linux accept() (and accept4()) passes already-pending network errors
2254  * on the new socket as an error code from accept(). This behavior
2255  * differs from other BSD socket implementations. For reliable operation
2256  * the application should detect the network errors defined for the
2257  * protocol after accept() and treat them like EAGAIN by retrying. In
2258  * the case of TCP/IP, these are... -- https://manpath.be/f14/2/accept4
2259  */
2260  (ENETDOWN, EPROTO, ENOPROTOOPT,
2261  EHOSTDOWN, ENONET,
2262  EHOSTUNREACH, EOPNOTSUPP,
2263  ENETUNREACH))) {
2264  M0_LOG(M0_DEBUG, "Got: %i.", errno);
2265  } else {
2266  M0_LOG(M0_ERROR, "Got: %i.", errno);
2267  }
2268  } else
2269  M0_LOG(M0_ERROR,
2270  "Unexpected event while listening: %x.", ev);
2271  break;
2272  case S_CONNECTING:
2273  if ((ev & (EPOLLOUT|EPOLLIN)) == EPOLLOUT) {
2274  /* Successful connection. */
2275  m0_sm_state_set(&s->s_sm, S_OPEN);
2276  } else if ((ev & (EPOLLOUT|EPOLLIN)) == (EPOLLOUT|EPOLLIN)) {
2277  /* Failed connection. */
2278  sock_done(s, false);
2279  } else {
2280  M0_LOG(M0_ERROR,
2281  "Unexpected event while connecting: %x.", ev);
2282  }
2283  break;
2284  case S_OPEN:
2285  if (ev & EPOLLIN) {
2286  /* Ran out of buffer on the receive queue. */
2287  if (sock_in(s) == -ENOBUFS)
2288  return true;
2289  }
2290  if (s->s_sm.sm_state == S_OPEN && ev & EPOLLOUT)
2291  sock_out(s);
2292  if (s->s_sm.sm_state == S_OPEN && ev & EV_ERR)
2293  sock_close(s);
2294  break;
2295  }
2296  if (ev & ~(EPOLLIN|EPOLLOUT|EV_ERR))
2297  M0_LOG(M0_ERROR, "Unexpected event: %x.", ev);
2298  return false;
2299 }
2300 
2305 static int sock_ctl(struct sock *s, int op, uint32_t flags)
2306 {
2307  int result;
2308 
2309  /* Always monitor errors. */
2310  flags |= EPOLLIN | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
2311  result = epoll_ctl(ep_ma(s->s_ep)->t_epollfd, op, s->s_fd,
2312  &(struct epoll_event){
2313  .events = flags,
2314  .data = { .ptr = s }});
2315  if (result == 0) {
2316  if ((flags & EPOLLOUT) != 0)
2317  s->s_flags |= WRITE_POLL;
2318  else
2319  s->s_flags &= ~WRITE_POLL;
2320  }
2321  return result;
2322 }
2323 
2333 static int ep_create(struct ma *ma, struct addr *addr, const char *name,
2334  struct ep **out)
2335 {
2336  struct ep *ep;
2337  struct m0_net_end_point *net;
2338  char *cname;
2339 
2341  /*
2342  * @todo this is duplicated in every transport. Should be factored out
2343  * by exporting ->xo_ep_eq() method.
2344  */
2345  m0_tl_for(m0_nep, &ma->t_ma->ntm_end_points, net) {
2346  struct ep *xep = ep_net(net);
2347  if (ep_eq(xep, addr)) {
2348  EP_GET(xep, find);
2349  *out = xep;
2350  return M0_RC(0);
2351  }
2352  } m0_tl_endfor;
2353  cname = name != NULL ? m0_strdup(name) : addr_print(addr);
2354  M0_ALLOC_PTR(ep);
2355  if (cname == NULL || ep == NULL) {
2356  m0_free(ep);
2357  m0_free(cname);
2358  return M0_ERR(-ENOMEM);
2359  }
2360  net = &ep->e_ep;
2361  m0_ref_init(&net->nep_ref, 1, &ep_release);
2362 #ifdef EP_DEBUG
2363  ep->e_r_find = 1;
2364 #endif
2365  net->nep_tm = ma->t_ma;
2366  m0_nep_tlink_init_at_tail(net, &ma->t_ma->ntm_end_points);
2367  s_tlist_init(&ep->e_sock);
2368  m_tlist_init(&ep->e_writer);
2369  net->nep_addr = cname;
2370  ep->e_a = *addr;
2371  *out = ep;
2372  M0_POST(*out != NULL && ep_invariant(*out));
2374  TLOG(EP_FL, EP_PL(ep));
2375  return M0_RC(0);
2376 }
2377 
2379 static int ep_find(struct ma *ma, const char *name, struct ep **out)
2380 {
2381  struct addr addr = {};
2382 
2383  return addr_resolve(&addr, name) ?: ep_create(ma, &addr, name, out);
2384 }
2385 
2387 static struct ma *ep_ma(struct ep *ep)
2388 {
2389  return ep->e_ep.nep_tm->ntm_xprt_private;
2390 }
2391 
2393 static struct ep *ep_net(struct m0_net_end_point *net)
2394 {
2395  return container_of(net, struct ep, e_ep);
2396 }
2397 
2404 static void ep_release(struct m0_ref *ref)
2405 {
2406  ep_free(container_of(ref, struct ep, e_ep.nep_ref));
2407 }
2408 
2414 static int ep_add(struct ep *ep, struct mover *w)
2415 {
2418  M0_PRE(w->m_ep == NULL);
2419  m_tlist_add_tail(&ep->e_writer, w);
2420  EP_GET(ep, mover);
2421  w->m_ep = ep;
2422  return M0_RC(ep_balance(ep));
2423 }
2424 
2431 static void ep_del(struct mover *w)
2432 {
2433  struct ep *ep = w->m_ep;
2434 
2435  if (ep != NULL) {
2438  M0_PRE(M0_IN(w->m_sm.sm_state, (R_DONE, R_FAIL)));
2439 
2440  if (m_tlink_is_in(w)) {
2441  m_tlist_del(w);
2442  ep_balance(ep);
2443  EP_PUT(ep, mover);
2444  }
2445  w->m_ep = NULL;
2446  }
2447 }
2448 
2456 static int ep_balance(struct ep *ep)
2457 {
2458  int result = 0;
2459  struct sock *s;
2460 
2461  if (m_tlist_is_empty(&ep->e_writer)) {
2462  /*
2463  * No more writers.
2464  *
2465  * @todo Consider closing the sockets to this endpoint (after
2466  * some time?).
2467  */
2468  s = s_tlist_head(&ep->e_sock);
2469  if (s != NULL)
2470  result = sock_ctl(s, EPOLL_CTL_MOD, 0);
2471  M0_ASSERT(result == 0);
2472  } else {
2473  s = m0_tl_find(s, s, &ep->e_sock, M0_IN(s->s_sm.sm_state,
2474  (S_CONNECTING, S_OPEN)));
2475  if (s == NULL)
2476  result = sock_init(-1, ma_src(ep_ma(ep)), ep, EPOLLOUT);
2477  else {
2478  /*
2479  * @todo Alternatively, more parallel sockets can be
2480  * opened.
2481  */
2482  /* Make sure that at least one socket is writable. */
2483  if (!(s->s_flags & WRITE_POLL))
2484  result = sock_ctl(s, EPOLL_CTL_MOD, EPOLLOUT);
2485  else
2486  result = 0;
2487  }
2488  }
2489  return result;
2490 }
2491 
2493 static void ep_free(struct ep *ep)
2494 {
2495  m0_nep_tlist_del(&ep->e_ep);
2496  m_tlist_fini(&ep->e_writer);
2497  s_tlist_fini(&ep->e_sock);
2498  m0_free((void *)ep->e_ep.nep_addr);
2499  m0_free(ep);
2500 }
2501 
2502 static void ep_put(struct ep *ep)
2503 {
2505 }
2506 
2507 static void ep_get(struct ep *ep)
2508 {
2510 }
2511 
2518 static int addr_resolve(struct addr *addr, const char *name)
2519 {
2520  int result = addr_parse(addr, name);
2521 
2522  if (result == 0) {
2523  /*
2524  * Currently only numberical ip addresses are supported (see
2525  * addr_parse()). They do not require any resolving. In the
2526  * future, use getaddrinfo(3).
2527  */
2528  ;
2529  }
2530  return M0_RC(result);
2531 }
2532 
2554 static int addr_parse(struct addr *addr, const char *name)
2555 {
2556  int result;
2557 
2558  if (name[0] == 0)
2559  result = M0_ERR(-EPROTO);
2560  else if (name[0] < '0' || name[0] > '9')
2561  result = addr_parse_native(addr, name);
2562  else
2563  /* Lnet format. */
2564  result = addr_parse_lnet(addr, name);
2565  M0_POST(ergo(result == 0, addr_invariant(addr)));
2566  return M0_RC(result);
2567 }
2568 
2578 static char autotm[1024] = {};
2579 
2580 static int addr_parse_lnet(struct addr *addr, const char *name)
2581 {
2582  struct sockaddr_in sin;
2583  char *at = strchr(name, '@');
2584  char ip[INET_ADDRSTRLEN] = {};
2585  int nr;
2586  unsigned pid;
2587  unsigned portal;
2588  unsigned tmid;
2589 
2590  if (strncmp(name, "0@lo", 4) == 0) {
2591  sin.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
2592  } else {
2593  if (at == NULL || at - name >= sizeof ip)
2594  return M0_ERR(-EPROTO);
2595  memcpy(ip, name, at - name);
2596  if (inet_pton(AF_INET, ip, &sin.sin_addr) != 1)
2597  return M0_ERR(-EPROTO);
2598  }
2599  sin.sin_family = AF_INET;
2600  if ((at = strchr(at, ':')) == NULL) /* Skip 'tcp...:' bit. */
2601  return M0_ERR(-EPROTO);
2602  nr = sscanf(at + 1, "%u:%u:%u", &pid, &portal, &tmid);
2603  if (nr != 3) {
2604  nr = sscanf(at + 1, "%u:%u:*", &pid, &portal);
2605  if (nr != 2)
2606  return M0_ERR(-EPROTO);
2607  for (nr = 0; nr < ARRAY_SIZE(autotm); ++nr) {
2608  if (autotm[nr] == 0) {
2609  tmid = nr;
2610  break;
2611  }
2612  }
2613  if (nr == ARRAY_SIZE(autotm))
2614  return M0_ERR(-EADDRNOTAVAIL);
2615  }
2616  /*
2617  * Hard-code LUSTRE_SRV_LNET_PID to avoid dependencies on the Lustre
2618  * headers.
2619  */
2620  if (pid != 12345)
2621  return M0_ERR(-EPROTO);
2622  /*
2623  * Deterministically combine portal and tmid into a unique 16-bit port
2624  * number (greater than 1024). Tricky.
2625  *
2626  * Port number is, in binary: tttttttttt1ppppp, that is, 10 bits of tmid
2627  * (which must be less than 1024), followed by a set bit (guaranteeing
2628  * that the port is not reserved), followed by 5 bits of (portal - 30),
2629  * so that portal must be in the range 30..61.
2630  */
2631  if (tmid >= 1024 || (portal - 30) >= 32)
2632  return M0_ERR_INFO(-EPROTO,
2633  "portal: %u, tmid: %u", portal, tmid);
2634  sin.sin_port = htons(tmid | (1 << 10) | ((portal - 30) << 11));
2635  addr->a_family = PF_INET;
2636  addr->a_socktype = SOCK_STREAM;
2637  addr->a_protocol = IPPROTO_TCP;
2638  autotm[tmid] = 1;
2639  addr_decode(addr, (void *)&sin);
2640  return M0_RC(0);
2641 }
2642 
2643 static int addr_parse_native(struct addr *addr, const char *name)
2644 {
2645  int shift;
2646  int result;
2647  int f;
2648  int s;
2649  long port;
2650  char *at;
2651  char *end;
2652  char ip[INET6_ADDRSTRLEN] = {};
2653 
2654  for (f = 0; f < ARRAY_SIZE(pf); ++f) {
2655  if (pf[f].f_name != NULL) {
2656  shift = strlen(pf[f].f_name);
2657  if (strncmp(name, pf[f].f_name, shift) == 0)
2658  break;
2659  }
2660  }
2661  if (f == ARRAY_SIZE(pf) || name[shift] != ':')
2662  return M0_ERR(-EINVAL);
2663  name += shift + 1;
2664  for (s = 0; s < ARRAY_SIZE(stype); ++s) {
2665  if (stype[s].st_name != NULL) {
2666  shift = strlen(stype[s].st_name);
2667  if (strncmp(name, stype[s].st_name, shift) == 0)
2668  break;
2669  }
2670  }
2671  if (s == ARRAY_SIZE(stype) || name[shift] != ':')
2672  return M0_ERR(-EINVAL);
2673  name += shift + 1;
2674  at = strchr(name, '@');
2675  if (at == NULL) {
2676  /* XXX @todo: default port? */
2677  return M0_ERR(-EINVAL);
2678  } else {
2679  port = strtol(at + 1, &end, 10);
2680  if (*end != 0)
2681  return M0_ERR(-EINVAL);
2682  if (errno != 0)
2683  return M0_ERR(-errno);
2684  if (port < 0 || port > USHRT_MAX)
2685  return M0_ERR(-ERANGE);
2686  }
2687  memcpy(ip, name, min64(at - name, ARRAY_SIZE(ip) - 1));
2688  result = inet_pton(f, ip, addr->a_data.v_data);
2689  if (result == 0)
2690  return M0_ERR(-EINVAL);
2691  if (result == -1)
2692  return M0_ERR(-errno);
2693  addr->a_family = f;
2694  addr->a_socktype = s;
2696  addr->a_port = port;
2698  return 0;
2699 }
2700 
2702 static void ip4_encode(const struct addr *a, struct sockaddr *sa)
2703 {
2704  struct sockaddr_in *sin = (void *)sa;
2705 
2706  M0_SET0(sin);
2707 #if 0 /* BSD Reno. */
2708  sin->sin_len = sizeof *sin;
2709 #endif
2710  sin->sin_port = htons(a->a_port);
2711  sin->sin_addr.s_addr = *(uint32_t *)&a->a_data.v_data[0];
2712 }
2713 
2715 static void ip4_decode(struct addr *a, const struct sockaddr *sa)
2716 {
2717  const struct sockaddr_in *sin = (void *)sa;
2718 
2719  a->a_port = ntohs(sin->sin_port);
2720  *((uint32_t *)&a->a_data.v_data[0]) = sin->sin_addr.s_addr;
2721 }
2722 
2724 static void ip6_encode(const struct addr *a, struct sockaddr *sa)
2725 {
2726  struct sockaddr_in6 *sin6 = (void *)sa;
2727 
2728  M0_SET0(sin6);
2729 #if 0 /* BSD Reno. */
2730  sin6->sin6_len = sizeof *sin6;
2731 #endif
2732  sin6->sin6_port = htons(a->a_port);
2733  memcpy(sin6->sin6_addr.s6_addr, a->a_data.v_data,
2734  sizeof sin6->sin6_addr.s6_addr);
2735 }
2736 
2738 static void ip6_decode(struct addr *a, const struct sockaddr *sa)
2739 {
2740  struct sockaddr_in6 *sin6 = (void *)sa;
2741 
2742  a->a_port = ntohs(sin6->sin6_port);
2743  memcpy(a->a_data.v_data, sin6->sin6_addr.s6_addr,
2744  sizeof sin6->sin6_addr.s6_addr);
2745 }
2746 
2748 static char *addr_print(const struct addr *a)
2749 {
2750  char *name;
2751  int nob;
2752  struct sockaddr_storage sa = {};
2753 
2754  enum { MAX_LEN = sizeof("inet6:stream:@65536") + INET6_ADDRSTRLEN };
2755  M0_PRE(addr_invariant(a));
2756 
2757  name = m0_alloc(MAX_LEN);
2758  if (name == NULL)
2759  return NULL;
2760  nob = snprintf(name, MAX_LEN, "%s:%s:",
2762  M0_ASSERT(nob < MAX_LEN);
2763  addr_encode(a, (void *)&sa);
2764  switch (a->a_family) {
2765  case AF_INET: {
2766  struct sockaddr_in *sin = (void *)&sa;
2767 
2768  inet_ntop(AF_INET, &sin->sin_addr, name + nob, MAX_LEN - nob);
2769  break;
2770  }
2771  case AF_INET6: {
2772  struct sockaddr_in6 *sin = (void *)&sa;
2773 
2774  inet_ntop(AF_INET6, &sin->sin6_addr, name + nob, MAX_LEN - nob);
2775  break;
2776  }
2777  default:
2778  M0_IMPOSSIBLE("Wrong family: %i.", a->a_family);
2779  }
2780  nob = strlen(name);
2781  M0_ASSERT(nob < MAX_LEN);
2782  nob += snprintf(name + nob, MAX_LEN - nob, "@%i", a->a_port);
2783  M0_ASSERT(nob < MAX_LEN);
2784  return name;
2785 }
2786 
2788 static void addr_decode(struct addr *addr, const struct sockaddr *sa)
2789 {
2791  M0_PRE(pf[addr->a_family].f_name != NULL);
2792  M0_SET0(&addr->a_data);
2793  pf[addr->a_family].f_decode(addr, sa);
2794 }
2795 
2797 static void addr_encode(const struct addr *addr, struct sockaddr *sa)
2798 {
2800  M0_PRE(pf[addr->a_family].f_name != NULL);
2801  pf[addr->a_family].f_encode(addr, sa);
2802  sa->sa_family = addr->a_family;
2803 }
2804 
2806 static bool addr_eq(const struct addr *a0, const struct addr *a1)
2807 {
2808  return a0->a_family == a1->a_family &&
2809  a0->a_protocol == a1->a_protocol &&
2810  a0->a_socktype == a1->a_socktype &&
2811  memcmp(a0->a_data.v_data, a1->a_data.v_data,
2812  ARRAY_SIZE(a0->a_data.v_data)) == 0;
2813 }
2814 
2816 static bool ep_eq(const struct ep *ep, const struct addr *a0)
2817 {
2818  const struct addr *a1 = &ep->e_a;
2819 
2820  return addr_eq(a0, a1) && a0->a_port == a1->a_port;
2821 }
2822 
2823 static struct ma *buf_ma(struct buf *buf)
2824 {
2825  return buf->b_buf->nb_tm->ntm_xprt_private;
2826 }
2827 
2835 static int buf_accept(struct buf *buf, struct mover *m)
2836 {
2837  struct packet *p = &m->m_pk;
2838  struct bdesc *src = &p->p_src;
2840  int result = 0;
2841 
2843 
2844  if (p->p_offset + p->p_size > length)
2845  return M0_ERR(-EMSGSIZE);
2846  if (p->p_totalsize > length)
2847  return M0_ERR(-EMSGSIZE);
2848  if (buf->b_done.b_words == NULL) {
2849  result = m0_bitmap_init(&buf->b_done, p->p_nr);
2850  if (result != 0)
2851  return result;
2852  m->m_buf = buf;
2853  buf->b_peer = *src;
2854  buf->b_length = p->p_totalsize;
2855  result = ep_create(buf_ma(buf),
2856  &src->bd_addr, NULL, &buf->b_other);
2857 #ifdef EP_DEBUG
2858  if (result == 0) {
2861  }
2862 #endif
2863  } else if (buf->b_done.b_nr != p->p_nr) {
2864  result = M0_ERR(-EPROTO);
2865  } else if (buf->b_length != p->p_totalsize) {
2866  result = M0_ERR(-EPROTO);
2867  } else if (buf->b_other != NULL && !ep_eq(buf->b_other, &src->bd_addr)){
2868  result = M0_ERR(-EPROTO);
2869  } else if (memcmp(&buf->b_peer, src, sizeof *src)) {
2870  result = M0_ERR(-EPROTO);
2871  } else if (m0_bitmap_get(&buf->b_done, p->p_idx)) {
2872  result = M0_ERR(-EPROTO);
2873  }
2874  return result;
2875 }
2876 
2883 static void buf_fini(struct buf *buf)
2884 {
2885  mover_fini(&buf->b_writer);
2886  b_tlink_fini(buf);
2887  if (buf->b_done.b_words > 0)
2889  if (buf->b_other != NULL) {
2890  EP_PUT(buf->b_other, buf);
2891  buf->b_other = NULL;
2892  }
2893  M0_SET0(&buf->b_peer);
2894  buf->b_offset = 0;
2895  buf->b_length = 0;
2896  buf->b_writer.m_sm.sm_rc = 0;
2897 }
2898 
2900 static void buf_done(struct buf *buf, int rc)
2901 {
2902  struct ma *ma = buf_ma(buf);
2903 
2905  /*printf("done: %p[%i] %" PRIi64 " %i\n", buf,
2906  buf->b_buf != NULL ? buf->b_buf->nb_qtype : -1,
2907  buf->b_buf != NULL ? buf->b_buf->nb_length : -1, rc); */
2908  if (buf->b_writer.m_sm.sm_rc == 0) /* Reuse this field for result. */
2909  buf->b_writer.m_sm.sm_rc = rc;
2910  /*
2911  * Multiple buf_done() calls on the same buffer are possible if the
2912  * buffer is cancelled.
2913  */
2914  if (!b_tlink_is_in(buf)) {
2915  /* Try to finalise. */
2916  if (m0_thread_self() == &ma->t_poller)
2917  buf_complete(buf);
2918  else
2919  /* Otherwise, postpone finalisation to ma_buf_done(). */
2920  b_tlist_add_tail(&ma->t_done, buf);
2921  }
2922 }
2923 
2925 static void buf_complete(struct buf *buf)
2926 {
2927  struct ma *ma = buf_ma(buf);
2928 
2929  struct m0_net_buffer *nb = buf->b_buf;
2930  struct m0_net_buffer_event ev = {
2931  .nbe_buffer = nb,
2932  .nbe_status = buf->b_writer.m_sm.sm_rc,
2933  .nbe_time = m0_time_now()
2934  };
2935  if (M0_IN(nb->nb_qtype, (M0_NET_QT_MSG_RECV,
2938  ev.nbe_length = buf->b_length;
2939  }
2940  if (nb->nb_qtype == M0_NET_QT_MSG_RECV) {
2941  if (ev.nbe_status == 0 && buf->b_other != NULL) {
2942  ev.nbe_ep = &buf->b_other->e_ep;
2943  EP_GET(buf->b_other, find);
2944  }
2945  ev.nbe_offset = 0; /* Starting offset not supported. */
2946  }
2948  /*printf("DONE: %p[%i] %" PRIi64 " %i\n", buf,
2949  buf->b_buf != NULL ? buf->b_buf->nb_qtype : -1,
2950  buf->b_length, ev.nbe_status); */
2951  TLOG(B_F" nb: %p %i", B_P(buf), buf->b_buf, ev.nbe_status);
2952  /*
2953  * It's ok to clear buf state, because sock doesn't support
2954  * M0_NET_BUF_RETAIN flag and the buffer will be unqueued
2955  * unconditionally.
2956  */
2957  buf_fini(buf);
2959  ma_unlock(ma);
2961  ma_lock(ma);
2964  M0_NET_TM_STOPPING)));
2966 }
2967 
2969 static int bdesc_create(struct addr *addr, struct buf *buf,
2970  struct m0_net_buf_desc *out)
2971 {
2972  struct bdesc bd = { .bd_addr = *addr };
2973 
2975  return bdesc_encode(&bd, out);
2976 }
2977 
2978 static int bdesc_encode(const struct bdesc *bd, struct m0_net_buf_desc *out)
2979 {
2980  m0_bcount_t len;
2981  int result;
2982 
2983  /* Cannot pass &out->nbd_len below, as it is 32 bits. */
2984  result = m0_xcode_obj_enc_to_buf(&M0_XCODE_OBJ(bdesc_xc, (void *)bd),
2985  (void **)&out->nbd_data, &len);
2986  if (result == 0)
2987  out->nbd_len = len;
2988  else
2989  m0_free0(&out->nbd_data);
2990  return M0_RC(result);
2991 }
2992 
2993 static int bdesc_decode(const struct m0_net_buf_desc *nbd, struct bdesc *out)
2994 {
2995  return m0_xcode_obj_dec_from_buf(&M0_XCODE_OBJ(bdesc_xc, out),
2996  nbd->nbd_data, nbd->nbd_len);
2997 }
2998 
2999 static void mover_init(struct mover *m, struct ma *ma,
3000  const struct mover_op_vec *vop)
3001 {
3002  M0_PRE(m->m_sm.sm_conf == NULL);
3003  M0_SET0(m);
3004  m_tlink_init(m);
3005  m0_sm_init(&m->m_sm, &rw_conf, R_IDLE, &ma->t_ma->ntm_group);
3006  m->m_op = vop;
3008 }
3009 
3010 static void mover_fini(struct mover *m)
3011 {
3012  if (m->m_sm.sm_conf != NULL) { /* Must be idempotent. */
3014  if (m->m_sm.sm_state != R_DONE)
3015  m0_sm_state_set(&m->m_sm, R_DONE);
3016  m0_sm_fini(&m->m_sm);
3017  if (m_tlink_is_in(m)) {
3018  m_tlist_del(m);
3019  EP_PUT(m->m_ep, mover);
3020  m->m_ep = NULL;
3021  }
3022  m_tlink_fini(m);
3023  M0_SET0(&m->m_pk);
3024  m0_free0(&m->m_scratch);
3025  m->m_sm.sm_conf = NULL;
3026  }
3027 }
3028 
3056 static int mover_op(struct mover *m, struct sock *s, int op)
3057 {
3058  int state = m->m_sm.sm_state;
3059 
3060  M0_PRE(IS_IN_ARRAY(state, m->m_op->v_op));
3061  M0_PRE(IS_IN_ARRAY(op, m->m_op->v_op[state]));
3062  M0_PRE(m->m_sm.sm_conf != NULL);
3064  /*
3065  * @todo This can monopolise processor doing state transitions for the
3066  * same mover (i.e., continuous stream of data from the same remote
3067  * end-point). Consider breaking out of this loop after some number of
3068  * iterations or bytes ioed.
3069  */
3070  while (s->s_flags & M0_BITS(op) || !m->m_op->v_op[state][op].o_doesio) {
3071  M0_LOG(M0_DEBUG, "Got %s: state: %x, op: %x.",
3072  m->m_op->v_name, state, op);
3073  if (state == R_DONE) {
3074  if (m->m_op->v_done != NULL)
3075  m->m_op->v_done(m, s);
3076  else
3077  mover_fini(m);
3078  break;
3079  } else if (s->s_sm.sm_state != S_OPEN) {
3080  break;
3081  } else if (m->m_op->v_op[state][op].o_op == NULL) {
3082  state = -EPROTO;
3083  if (op != M_CLOSE) /* Ignore unexpected events. */
3084  break;
3085  } else
3086  state = m->m_op->v_op[state][op].o_op(m, s);
3087  /*
3088  * printf("... %p: %s -> %s (%p)\n",
3089  * m, rw_name[m->m_sm.sm_state],
3090  * state >= 0 ? rw_name[state] : strerror(-state),
3091  * m->m_buf);
3092  */
3093  if (state >= 0) {
3094  M0_ASSERT(IS_IN_ARRAY(state, m->m_op->v_op));
3095  m0_sm_state_set(&m->m_sm, state);
3096  } else {
3097  if (state == -ENOBUFS) /* Unwind and re-provision. */
3098  break;
3099  m0_sm_state_set(&m->m_sm, R_FAIL);
3100  if (m->m_op->v_error != NULL)
3101  m->m_op->v_error(m, s, state);
3102  m0_sm_state_set(&m->m_sm, R_DONE);
3103  stype[s->s_ep->e_a.a_socktype].st_error(m, s);
3104  }
3105  state = m->m_sm.sm_state;
3106  }
3107  return state;
3108 }
3109 
3110 static bool mover_is_reader(const struct mover *m)
3111 {
3112  return !mover_is_writer(m);
3113 }
3114 
3115 static bool mover_is_writer(const struct mover *m)
3116 {
3117  return M0_IN(m->m_op, (&writer_op, &get_op));
3118 }
3119 
3126 static m0_bcount_t pk_size(const struct mover *m, const struct sock *s)
3127 {
3128  return stype[s->s_ep->e_a.a_socktype].st_pk_size(m, s);
3129 }
3130 
3132 static m0_bcount_t pk_tsize(const struct mover *m)
3133 {
3134  return sizeof m->m_pkbuf + m->m_pk.p_size;
3135 }
3136 
3140 static m0_bcount_t pk_dnob(const struct mover *m)
3141 {
3142  return max64(m->m_nob - sizeof m->m_pkbuf, 0);
3143 }
3144 
3151 static int pk_state(const struct mover *m)
3152 {
3153  m0_bcount_t nob = m->m_nob;
3154 
3155  if (nob < sizeof m->m_pkbuf)
3156  return R_HEADER;
3157  else if (nob < pk_tsize(m))
3158  return R_INTERVAL;
3159  else {
3160  M0_ASSERT(nob == pk_tsize(m));
3161  return R_PK_DONE;
3162  }
3163 }
3164 
3177 static int pk_iov_prep(struct mover *m, struct iovec *iv, int nr,
3178  struct m0_bufvec *bv, m0_bcount_t tgt, int *count)
3179 {
3180  struct m0_bufvec_cursor cur;
3181  int idx;
3182 
3183  M0_PRE(nr > 0);
3184  M0_PRE(tgt >= m->m_nob);
3185  M0_PRE(tgt >= sizeof m->m_pkbuf); /* For simplicity assume the entire
3186  header is always ioed. */
3187  if (m->m_nob < sizeof m->m_pkbuf) { /* Cannot use pk_state(): header can
3188  be unintialised for reads. */
3189  iv[0].iov_base = &m->m_pkbuf[m->m_nob];
3190  *count = iv[0].iov_len = sizeof m->m_pkbuf - m->m_nob;
3191  idx = 1;
3192  } else {
3193  *count = 0;
3194  idx = 0;
3195  }
3196  if (tgt == sizeof m->m_pkbuf) /* Only header is ioed. */
3197  return idx;
3198  m0_bufvec_cursor_init(&cur, bv);
3199  m0_bufvec_cursor_move(&cur, m->m_pk.p_offset + pk_dnob(m));
3200  for (; idx < nr && !m0_bufvec_cursor_move(&cur, 0); ++idx) {
3202 
3203  frag = min64u(frag, tgt - m->m_nob - *count);
3204  if (frag == 0)
3205  break;
3206  iv[idx].iov_base = m0_bufvec_cursor_addr(&cur);
3207  *count += (iv[idx].iov_len = frag);
3208  m0_bufvec_cursor_move(&cur, frag);
3209  }
3210  M0_POST(m0_reduce(i, idx, 0ULL, + iv[i].iov_len) == *count);
3211  M0_POST(idx > 0); /* Check that there is some progress. */
3212  return idx;
3213 }
3214 
3222 static int pk_io(struct mover *m, struct sock *s, uint64_t flag,
3223  struct m0_bufvec *bv, m0_bcount_t tgt)
3224 {
3225  struct iovec iv[256] = {};
3226  int count;
3227  int nr;
3228  int rc;
3229 
3230  M0_PRE(M0_IN(flag, (HAS_READ, HAS_WRITE)));
3231  nr = pk_iov_prep(m, iv, ARRAY_SIZE(iv),
3232  bv ?: m->m_buf != NULL ?
3233  &m->m_buf->b_buf->nb_buffer : NULL, tgt, &count);
3234  s->s_flags &= ~flag;
3235  rc = (flag == HAS_READ ? readv : writev)(s->s_fd, iv, nr);
3236  M0_LOG(M0_DEBUG, "flag: %" PRIi64 ", rc: %i, idx: %i, errno: %i.",
3237  flag, rc, nr, errno);
3238  if (rc >= 0) {
3239  m->m_nob += rc;
3240  /*
3241  * If everything was ioed, the socket might have more space in
3242  * the buffer, try to io some more.
3243  */
3244  s->s_flags |= (rc == count ? flag : 0);
3245  } else if (errno == EWOULDBLOCK) { /* Overshoot (see s_flags above). */
3246  rc = 0;
3247  } else if (errno == EINTR) { /* Nothing was ioed, repeat. */
3248  rc = 0;
3249  } else
3250  rc = M0_ERR(-errno);
3251  /*
3252  * printf("%s -> %s, %p: flag: %" PRIx64 ", tgt: %" PRIu64 ", nob %" PRIu64 ","
3253  * " nr: %i, count: %i, rc: %i, sflags: %" PRIx64 "\n",
3254  * ma_src(ep_ma(s->s_ep))->e_ep.nep_addr, s->s_ep->e_ep.nep_addr,
3255  * m, flag, tgt, m->m_nob, nr, count, rc, s->s_flags);
3256  */
3257  return rc;
3258 }
3259 
3261 static void pk_header_init(struct mover *m, struct sock *s)
3262 {
3263  struct packet *p = &m->m_pk;
3264 
3265  M0_PRE(M0_IS0(&p->p_src.bd_cookie));
3267  m0_cookie_init(&p->p_src.bd_cookie, &m->m_buf->b_cookie);
3268  p->p_src.bd_addr = ma_src(ep_ma(s->s_ep))->e_a;
3269  p->p_dst = m->m_buf->b_peer;
3270  p->p_totalsize = m->m_buf->b_buf->nb_length;
3271  M0_POST(!M0_IS0(&p->p_dst));
3272 }
3273 
3283 static int pk_header_done(struct mover *m)
3284 {
3285  struct packet *p = &m->m_pk;
3286  struct m0_format_tag tag;
3287  struct ma *ma = ep_ma(m->m_sock->s_ep);
3288  int result;
3289  bool isget;
3290  bool hassrc;
3291  bool hasdst;
3292  struct buf *buf = NULL;
3293  uint64_t *cookie;
3294 
3295  M0_PRE(m->m_nob >= sizeof *p);
3297 
3298  pk_decode(m);
3299  result = m0_format_footer_verify(p, false);
3300  if (result != 0)
3301  return M0_ERR(result);
3302  m0_format_header_unpack(&tag, &p->p_header);
3303  isget = memcmp(&tag, &get_tag, sizeof tag) == 0;
3304  if (!isget && memcmp(&tag, &put_tag, sizeof tag) != 0)
3305  return M0_ERR(-EPROTO);
3306  if (p->p_idx >= p->p_nr)
3307  return M0_ERR(-EPROTO);
3308  if (p->p_offset + p->p_size < p->p_offset)
3309  return M0_ERR(-EFBIG);
3310  if (p->p_offset + p->p_size > p->p_totalsize)
3311  return M0_ERR(-EPROTO);
3312  if (p->p_idx == 0 && p->p_offset != 0)
3313  return M0_ERR(-EPROTO);
3314  if (p->p_idx == p->p_nr - 1 &&
3315  p->p_offset + p->p_size != p->p_totalsize)
3316  return M0_ERR(-EPROTO);
3317  if (!ep_eq(ma_src(ma), &p->p_dst.bd_addr))
3318  return M0_ERR(-EPROTO);
3319  if (!addr_eq(&m->m_sock->s_ep->e_a, &p->p_src.bd_addr))
3320  return M0_ERR(-EPROTO);
3321  hassrc = !M0_IS0(&p->p_src.bd_cookie);
3322  hasdst = !M0_IS0(&p->p_dst.bd_cookie);
3323  if (!hassrc && !hasdst) /* Go I know not whither */
3324  return M0_ERR(-EPROTO); /* and fetch I know not what? */
3325  if (hasdst) {
3326  result = m0_cookie_dereference(&p->p_dst.bd_cookie, &cookie);
3327  if (result != 0)
3328  return M0_ERR_INFO(result,
3329  "Wrong cookie: %" PRIx64 ":%" PRIx64 "",
3330  p->p_dst.bd_cookie.co_addr,
3331  p->p_dst.bd_cookie.co_generation);
3332  buf = container_of(cookie, struct buf, b_cookie);
3333  if (buf_ma(buf) != ma)
3334  return M0_ERR(-EPERM);
3335  if (!M0_IS0(&buf->b_peer) &&
3336  memcmp(&buf->b_peer, &p->p_src, sizeof p->p_src) != 0)
3337  return M0_ERR(-EPERM);
3338  }
3339  if (isget) {
3340  if (p->p_idx != 0 || p->p_nr != 1 || p->p_size != 0 ||
3341  p->p_offset != 0 || !hasdst)
3342  return M0_ERR(-EPROTO);
3344  return M0_ERR(-EPERM);
3345  buf->b_peer = p->p_src;
3347  buf->b_writer.m_buf = buf;
3348  result = ep_add(m->m_sock->s_ep, &buf->b_writer);
3349  if (result != 0)
3350  buf_done(buf, result);
3351  return R_IDLE;
3352  }
3353  if (!hasdst) {
3354  /* Select a buffer from the receive queue. */
3355  buf = ma_recv_buf(ma, p->p_totalsize);
3356  if (buf == NULL) {
3357  struct m0_net_transfer_mc *tm = ma->t_ma;
3358  struct m0_net_buffer_pool *pool = tm->ntm_recv_pool;
3359  /*
3360  * A user of network transport (such as the rpc module)
3361  * has no control over consumption of buffers on the
3362  * receive queue. It might (and does) so happen that
3363  * this queue becomes empty.
3364  *
3365  * After a buffer completion event has been delivered to
3366  * the user, m0_net_buffer_event_post() tries to
3367  * re-provision the receive queue by calling
3368  * m0_net__tm_provision_recv_q(). This does not always
3369  * work because:
3370  *
3371  * - standard provisioning code does not deal with
3372  * the busy buffers and
3373  *
3374  * - the loop in poller() processes multiple events
3375  * before buffer completions are invoked.
3376  *
3377  * First, try to provision the receive queue right here
3378  * on the spot. This might fail because the try-lock
3379  * below fails or because the pool is out of buffers. In
3380  * case of failure, return -ENOBUFS all the way up to
3381  * sock_event(), which returns true to instruct poller()
3382  * to break out of the loop and to re-provision the
3383  * queue.
3384  */
3385  if (pool != NULL &&
3386  m0_mutex_trylock(&pool->nbp_mutex) == 0) {
3388  /* Got the lock. Add 2 buffers. */
3391  buf = ma_recv_buf(ma, p->p_totalsize);
3392  }
3393  }
3394  if (buf == NULL)
3395  return -ENOBUFS; /* Not always a error. */
3396  }
3397  return buf_accept(buf, m) ?: R_INTERVAL;
3398 }
3399 
3406 static void pk_done(struct mover *m)
3407 {
3408  struct buf *buf = m->m_buf;
3409  struct packet *pk = &m->m_pk;
3410 
3411  M0_PRE(!m0_bitmap_get(&buf->b_done, pk->p_idx));
3413  m->m_buf = NULL;
3414  m0_bitmap_set(&buf->b_done, pk->p_idx, true);
3415  if (m0_bitmap_ffz(&buf->b_done) == -1)
3416  buf_done(buf, 0); /* If all packets have been received, done. */
3417 }
3418 
3419 static void pk_encdec(struct mover *m, enum m0_xcode_what what)
3420 {
3421  struct m0_bufvec_cursor cur;
3422  m0_bcount_t len = sizeof m->m_pkbuf;
3423  void *buf = m->m_pkbuf;
3424  int rc;
3425 
3427  rc = m0_xcode_encdec(&M0_XCODE_OBJ(packet_xc, &m->m_pk), &cur, what);
3428  M0_ASSERT(rc == 0);
3429 }
3430 
3431 static void pk_decode(struct mover *m)
3432 {
3434 }
3435 
3436 static void pk_encode(struct mover *m)
3437 {
3438  m0_format_footer_update(&m->m_pk);
3440 }
3441 
3447 static int stream_idle(struct mover *self, struct sock *s)
3448 {
3449  return R_PK;
3450 }
3451 
3455 static int stream_pk(struct mover *self, struct sock *s)
3456 {
3457  self->m_nob = 0;
3458  return R_HEADER;
3459 }
3460 
3466 static int stream_header(struct mover *self, struct sock *s)
3467 {
3468  int result = pk_io(self, s, HAS_READ, NULL, sizeof self->m_pkbuf);
3469  if (result < 0)
3470  return M0_ERR(result);
3471  /* Cannot use pk_state() with unverified header. */
3472  else if (self->m_nob < sizeof self->m_pkbuf)
3473  return R_HEADER;
3474  else {
3475  M0_ASSERT(self->m_nob == sizeof self->m_pkbuf);
3476  return pk_header_done(self);
3477  }
3478 }
3479 
3485 static int stream_interval(struct mover *self, struct sock *s)
3486 {
3487  int result = pk_io(self, s, HAS_READ, NULL, pk_tsize(self));
3488  return result >= 0 ? pk_state(self) : result;
3489 }
3490 
3492 static int stream_pk_done(struct mover *self, struct sock *s)
3493 {
3494  pk_done(self);
3495  return R_IDLE;
3496 }
3497 
3504 static m0_bcount_t stream_pk_size(const struct mover *w, const struct sock *s)
3505 {
3506  return M0_BSIGNED_MAX / 2;
3507 }
3508 
3510 static void stream_error(struct mover *m, struct sock *s)
3511 {
3512  if (m->m_sock != NULL) {
3513  m->m_sock = NULL;
3514  sock_done(s, true);
3515  }
3516 }
3517 
3523 static int dgram_idle(struct mover *self, struct sock *s)
3524 {
3525  return R_PK;
3526 }
3527 
3533 static int dgram_pk(struct mover *self, struct sock *s)
3534 {
3535  if (self->m_scratch == NULL) {
3536  self->m_scratch = m0_alloc(pk_size(self, s));
3537  if (self->m_scratch == NULL)
3538  return M0_ERR(-ENOMEM);
3539  }
3540  return R_HEADER;
3541 }
3542 
3552 static int dgram_header(struct mover *self, struct sock *s)
3553 {
3554  m0_bcount_t maxsize = pk_size(self, s);
3555  m0_bcount_t dsize;
3556  struct m0_bufvec bv = M0_BUFVEC_INIT_BUF(&self->m_scratch, &maxsize);
3557  struct m0_bufvec_cursor cur;
3558  int result = pk_io(self, s, HAS_READ, &bv,
3559  sizeof self->m_pkbuf + maxsize);
3560  if (result < 0)
3561  return M0_ERR(result);
3562  if (self->m_nob < sizeof self->m_pkbuf)
3563  return M0_ERR(-EPROTO);
3564  result = pk_header_done(self);
3565  if (result < 0)
3566  return M0_ERR(result);
3567  dsize = self->m_pk.p_size;
3568  if (self->m_nob != sizeof self->m_pkbuf + dsize)
3569  return M0_ERR(-EMSGSIZE);
3570  m0_bufvec_cursor_init(&cur, &bv);
3571  m0_bufvec_cursor_move(&cur, self->m_pk.p_offset);
3572  m0_data_to_bufvec_copy(&cur, self->m_scratch, dsize);
3573  return pk_state(self);
3574 }
3575 
3579 static int dgram_interval(struct mover *self, struct sock *s)
3580 {
3581  return pk_state(self);
3582 }
3583 
3587 static int dgram_pk_done(struct mover *self, struct sock *s)
3588 {
3589  pk_done(self);
3590  return R_IDLE;
3591 }
3592 
3598 static m0_bcount_t dgram_pk_size(const struct mover *w, const struct sock *s)
3599 {
3600  return 65535 /* 16 bit "total length" in ip header */
3601  - 8 /* UDP header */
3602  /* IPv4 or IPV6 header size */
3603  - (s->s_ep->e_a.a_family == AF_INET ? 20 : 40)
3604  - sizeof w->m_pkbuf;
3605 }
3606 
3607 static void dgram_error(struct mover *m, struct sock *s)
3608 {
3609 }
3610 
3612 static int writer_idle(struct mover *w, struct sock *s)
3613 {
3614  m0_bcount_t pksize = pk_size(w, s);
3616 
3617  M0_ASSERT(size > 0);
3618  pk_header_init(w, s);
3619  w->m_pk.p_nr = (size + pksize - 1) / pksize;
3621  return R_PK;
3622 }
3623 
3629 static int writer_pk(struct mover *w, struct sock *s)
3630 {
3631  m0_bcount_t pksize = pk_size(w, s);
3633 
3634  w->m_nob = 0;
3635  w->m_pk.p_size = min64u(pksize, size - pk_dnob(w));
3636  pk_encode(w);
3637  w->m_sock = s; /* Lock the socket and the writer together. */
3638  return R_HEADER;
3639 }
3640 
3646 static int writer_write(struct mover *w, struct sock *s)
3647 {
3648  int result = pk_io(w, s, HAS_WRITE, NULL, pk_tsize(w));
3649  return result >= 0 ? pk_state(w) : result;
3650 }
3651 
3658 static int writer_pk_done(struct mover *w, struct sock *s)
3659 {
3660  w->m_sock = NULL;
3661  if (++w->m_pk.p_idx == w->m_pk.p_nr)
3662  return R_DONE;
3663  else {
3664  w->m_pk.p_offset += w->m_pk.p_size;
3665  return R_PK;
3666  }
3667 }
3668 
3670 static void writer_done(struct mover *w, struct sock *s)
3671 {
3672  writer_error(w, s, 0);
3673 }
3674 
3682 static void writer_error(struct mover *w, struct sock *s, int rc)
3683 {
3684  ep_del(w);
3685  buf_done(w->m_buf, rc);
3686 }
3687 
3689 static int get_idle(struct mover *self, struct sock *s)
3690 {
3691  return R_PK;
3692 }
3693 
3695 static int get_pk(struct mover *cmd, struct sock *s)
3696 {
3697  pk_header_init(cmd, s);
3699  cmd->m_nob = 0;
3700  cmd->m_pk.p_nr = 1;
3701  cmd->m_pk.p_totalsize = 0;
3702  cmd->m_pk.p_size = 0;
3703  pk_encode(cmd);
3704  cmd->m_sock = s;
3705  return R_HEADER;
3706 }
3707 
3709 static void get_done(struct mover *w, struct sock *s)
3710 {
3711  ep_del(w);
3712 }
3713 
3715  [S_INIT] = {
3716  .sd_name = "init",
3717  .sd_flags = M0_SDF_INITIAL,
3718  .sd_allowed = M0_BITS(S_LISTENING, S_CONNECTING,
3719  S_OPEN, S_DELETED)
3720  },
3721  [S_LISTENING] = {
3722  .sd_name = "listening",
3723  .sd_allowed = M0_BITS(S_DELETED)
3724  },
3725  [S_CONNECTING] = {
3726  .sd_name = "connecting",
3727  .sd_allowed = M0_BITS(S_OPEN, S_DELETED)
3728  },
3729  [S_OPEN] = {
3730  .sd_name = "active",
3731  .sd_allowed = M0_BITS(S_DELETED)
3732  },
3733  [S_DELETED] = {
3734  .sd_name = "deleted",
3735  .sd_flags = M0_SDF_TERMINAL
3736  }
3737 };
3738 
3740  { "listen", S_INIT, S_LISTENING },
3741  { "connect", S_INIT, S_CONNECTING },
3742  { "accept", S_INIT, S_OPEN },
3743  { "init-deleted", S_INIT, S_DELETED },
3744  { "listen-close", S_LISTENING, S_DELETED },
3745  { "connected", S_CONNECTING, S_OPEN },
3746  { "connect-close", S_CONNECTING, S_DELETED },
3747  { "close", S_OPEN, S_DELETED }
3748 };
3749 
3750 static const struct m0_sm_conf sock_conf = {
3751  .scf_name = "sock",
3752  .scf_nr_states = ARRAY_SIZE(sock_conf_state),
3753  .scf_state = sock_conf_state,
3754  .scf_trans_nr = ARRAY_SIZE(sock_conf_trans),
3755  .scf_trans = sock_conf_trans
3756 };
3757 
3759  [R_IDLE] = {
3760  .sd_name = "idle",
3761  .sd_flags = M0_SDF_INITIAL,
3762  .sd_allowed = M0_BITS(R_PK, R_DONE, R_FAIL)
3763  },
3764  [R_PK] = {
3765  .sd_name = "datagram",
3766  .sd_allowed = M0_BITS(R_HEADER, R_FAIL)
3767  },
3768  [R_HEADER] = {
3769  .sd_name = "header",
3770  .sd_allowed = M0_BITS(R_IDLE, R_HEADER, R_INTERVAL,
3772  },
3773  [R_INTERVAL] = {
3774  .sd_name = "interval",
3775  .sd_allowed = M0_BITS(R_INTERVAL, R_PK_DONE, R_FAIL)
3776  },
3777  [R_PK_DONE] = {
3778  .sd_name = "datagram-done",
3779  .sd_allowed = M0_BITS(R_PK, R_IDLE, R_DONE, R_FAIL)
3780  },
3781  [R_FAIL] = {
3782  .sd_name = "fail",
3783  .sd_allowed = M0_BITS(R_DONE)
3784  },
3785  [R_DONE] = {
3786  .sd_name = "done",
3787  .sd_flags = M0_SDF_TERMINAL
3788  }
3789 };
3790 
3792  { "pk-start", R_IDLE, R_PK },
3793  { "close", R_IDLE, R_DONE },
3794  { "error", R_IDLE, R_FAIL },
3795  { "header-start", R_PK, R_HEADER },
3796  { "pk-error", R_PK, R_FAIL },
3797  { "header-cont", R_HEADER, R_HEADER },
3798  { "get-send-done", R_HEADER, R_PK_DONE },
3799  { "get-rcvd-done", R_HEADER, R_IDLE },
3800  { "interval-start", R_HEADER, R_INTERVAL },
3801  { "header-error", R_HEADER, R_FAIL },
3802  /* This transition is only possible for a GET command. */
3803  { "cmd-done", R_HEADER, R_DONE },
3804  { "interval-cont", R_INTERVAL, R_INTERVAL },
3805  { "interval-done", R_INTERVAL, R_PK_DONE },
3806  { "interval-error", R_INTERVAL, R_FAIL },
3807  { "pk-next", R_PK_DONE, R_PK },
3808  /* This transition is not possible for a writer. */
3809  { "pk-idle", R_PK_DONE, R_IDLE },
3810  { "pk-error", R_PK_DONE, R_FAIL },
3811  { "pk-close", R_PK_DONE, R_DONE },
3812  { "done", R_FAIL, R_DONE },
3813 };
3814 
3815 static const struct m0_sm_conf rw_conf = {
3816  .scf_name = "reader-writer",
3817  .scf_nr_states = ARRAY_SIZE(rw_conf_state),
3818  .scf_state = rw_conf_state,
3819  .scf_trans_nr = ARRAY_SIZE(rw_conf_trans),
3820  .scf_trans = rw_conf_trans
3821 };
3822 
3823 static const struct mover_op_vec stream_reader_op = {
3824  .v_name = "stream-reader",
3825  .v_op = {
3826  [R_IDLE] = { [M_READ] = { &stream_idle, true } },
3827  [R_PK] = { [M_READ] = { &stream_pk, false } },
3828  [R_HEADER] = { [M_READ] = { &stream_header, true } },
3829  [R_INTERVAL] = { [M_READ] = { &stream_interval, true } },
3830  [R_PK_DONE] = { [M_READ] = { &stream_pk_done, false } }
3831  }
3832 };
3833 
3834 static const struct mover_op_vec dgram_reader_op = {
3835  .v_name = "dgram-reader",
3836  .v_op = {
3837  [R_IDLE] = { [M_READ] = { &dgram_idle, true } },
3838  [R_PK] = { [M_READ] = { &dgram_pk, false } },
3839  [R_HEADER] = { [M_READ] = { &dgram_header, true } },
3840  [R_INTERVAL] = { [M_READ] = { &dgram_interval, false } },
3841  [R_PK_DONE] = { [M_READ] = { &dgram_pk_done, false } }
3842  }
3843 };
3844 
3845 static const struct mover_op_vec writer_op = {
3846  .v_name = "writer",
3847  .v_done = &writer_done,
3848  .v_error = &writer_error,
3849  .v_op = {
3850  [R_IDLE] = { [M_WRITE] = { &writer_idle, true } },
3851  [R_PK] = { [M_WRITE] = { &writer_pk, false } },
3852  [R_HEADER] = { [M_WRITE] = { &writer_write /* sic. */, true } },
3853  [R_INTERVAL] = { [M_WRITE] = { &writer_write /* sic. */, true } },
3854  [R_PK_DONE] = { [M_WRITE] = { &writer_pk_done, false } }
3855  }
3856 };
3857 
3858 static const struct mover_op_vec get_op = {
3859  .v_name = "get",
3860  .v_done = &get_done,
3861  .v_error = &writer_error,
3862  .v_op = {
3863  [R_IDLE] = { [M_WRITE] = { &get_idle, true } },
3864  [R_PK] = { [M_WRITE] = { &get_pk, false } },
3865  [R_HEADER] = { [M_WRITE] = { &writer_write, true } },
3866  [R_PK_DONE] = { [M_WRITE] = { &writer_pk_done, false } }
3867  }
3868 };
3869 
3870 enum {
3874 };
3875 
3876 static const struct m0_format_tag put_tag = {
3878  .ot_type = M0_NET_SOCK_PROTO_PUT,
3879  .ot_footer_offset = offsetof(struct packet, p_footer)
3880 };
3881 
3882 static const struct m0_format_tag get_tag = {
3884  .ot_type = M0_NET_SOCK_PROTO_GET,
3885  .ot_footer_offset = offsetof(struct packet, p_footer)
3886 };
3887 
3888 static const struct m0_net_xprt_ops xprt_ops = {
3889  .xo_dom_init = &dom_init,
3890  .xo_dom_fini = &dom_fini,
3891  .xo_tm_init = &ma_init,
3892  .xo_tm_confine = &ma_confine,
3893  .xo_tm_start = &ma_start,
3894  .xo_tm_stop = &ma_stop,
3895  .xo_tm_fini = &ma_fini,
3896  .xo_end_point_create = &end_point_create,
3897  .xo_buf_register = &buf_register,
3898  .xo_buf_deregister = &buf_deregister,
3899  .xo_buf_add = &buf_add,
3900  .xo_buf_del = &buf_del,
3901  .xo_bev_deliver_sync = &bev_deliver_sync,
3902  .xo_bev_deliver_all = &bev_deliver_all,
3903  .xo_bev_pending = &bev_pending,
3904  .xo_bev_notify = &bev_notify,
3905  .xo_get_max_buffer_size = &get_max_buffer_size,
3906  .xo_get_max_buffer_segment_size = &get_max_buffer_segment_size,
3907  .xo_get_max_buffer_segments = &get_max_buffer_segments,
3908  .xo_get_max_buffer_desc_size = &get_max_buffer_desc_size,
3909 
3910  .xo_rpc_max_seg_size = &get_rpc_max_seg_size,
3911  .xo_rpc_max_segs_nr = &get_rpc_max_segs_nr,
3912  .xo_rpc_max_msg_size = default_xo_rpc_max_msg_size,
3913  .xo_rpc_max_recv_msgs = default_xo_rpc_max_recv_msgs,
3914 
3915 };
3916 
3917 #if MOCK_LNET
3918 const struct m0_net_xprt m0_net_lnet_xprt = {
3919  .nx_name = "lnet",
3920  .nx_ops = &xprt_ops
3921 };
3922 M0_EXPORTED(m0_net_lnet_xprt);
3923 #else
3924 extern const struct m0_net_xprt m0_net_lnet_xprt;
3925 #endif
3926 
3928  .nx_name = "sock",
3929  .nx_ops = &xprt_ops
3930 };
3931 M0_EXPORTED(m0_net_sock_xprt);
3932 
3933 M0_INTERNAL int m0_net_sock_mod_init(void)
3934 {
3935  int result;
3936 
3937  if (MOCK_LNET) {
3939  if (m0_streq(M0_DEFAULT_NETWORK, "LNET"))
3941  } else {
3943  if (m0_streq(M0_DEFAULT_NETWORK, "SOCK"))
3945  }
3946  /*
3947  * Ignore SIGPIPE that a write to socket gets when RST is received.
3948  *
3949  * A more elegant approach is to use sendmsg(2) with MSG_NOSIGNAL flag
3950  * instead of writev(2).
3951  */
3952  result = sigaction(SIGPIPE,
3953  &(struct sigaction){ .sa_handler = SIG_IGN }, NULL);
3954  return result != 0 ? M0_ERR(-errno) : 0;
3955 }
3956 
3957 M0_INTERNAL void m0_net_sock_mod_fini(void)
3958 {
3959  if (MOCK_LNET)
3961  else
3963 }
3964 
3965 M0_INTERNAL void mover__print(const struct mover *m)
3966 {
3967  printf("\t%p: %s sock: %p state: %i buf: %p\n", m,
3968  mover_is_reader(m) ? "R" : (m->m_op == &writer_op ? "W" : "G"),
3969  m->m_sock, m->m_sm.sm_state, m->m_buf);
3970 }
3971 
3972 M0_INTERNAL void addr__print(const struct addr *addr)
3973 {
3974  char *s = addr_print(addr);
3975  printf("\t%s\n", s);
3976  m0_free(s);
3977 }
3978 
3979 M0_INTERNAL void sock__print(const struct sock *sock)
3980 {
3981  printf("\t\tfd: %i, flags: %" PRIx64 ", state: %i\n",
3983 }
3984 
3985 M0_INTERNAL void ep__print(const struct ep *ep)
3986 {
3987  struct sock *s;
3988  struct mover *w;
3989  if (ep == NULL)
3990  printf("NULL ep\n");
3991  else {
3992  printf("\t%p: ", ep);
3993  addr__print(&ep->e_a);
3994  m0_tl_for(s, &ep->e_sock, s) {
3995  sock__print(s);
3996  } m0_tl_endfor;
3997  m0_tl_for(m, &ep->e_writer, w) {
3998  mover__print(w);
3999  } m0_tl_endfor;
4000  }
4001 }
4002 
4003 M0_INTERNAL void buf__print(const struct buf *buf)
4004 {
4005  printf("\t%p: %" PRIx64 " bitmap: %"PRIx64
4006  " peer: %" PRIx64 ":%" PRIx64 "\n", buf,
4007  buf->b_cookie,
4008  buf->b_done.b_words != NULL ? buf->b_done.b_words[0] : 0,
4011  ep__print(buf->b_other);
4012 }
4013 
4014 M0_INTERNAL void ma__print(const struct ma *ma)
4015 {
4016  struct m0_net_end_point *ne;
4017  struct m0_net_buffer *nb;
4018  struct m0_net_transfer_mc *tm = ma->t_ma;
4019  int i;
4020 
4021  printf("%p, state: %x\n", ma, ma->t_ma->ntm_state);
4022  m0_tl_for(m0_nep, &ma->t_ma->ntm_end_points, ne) {
4023  ep__print(ep_net(ne));
4024  } m0_tl_endfor;
4025  for (i = 0; i < ARRAY_SIZE(tm->ntm_q); ++i) {
4026  printf("\t ---%i[]---\n", i);
4027  m0_tl_for(m0_net_tm, &tm->ntm_q[i], nb) {
4029  } m0_tl_endfor;
4030  }
4031 }
4032 #undef M0_TRACE_SUBSYSTEM
4033 
4036 /*
4037  * Local variables:
4038  * c-indentation-style: "K&R"
4039  * c-basic-offset: 8
4040  * tab-width: 8
4041  * fill-column: 80
4042  * scroll-step: 1
4043  * End:
4044  */
4045 /*
4046  * vim: tabstop=8 shiftwidth=8 noexpandtab textwidth=80 nowrap
4047  */
Definition: sock.c:685
static void buf_complete(struct buf *buf)
Definition: sock.c:2925
#define M0_BUFVEC_INIT_BUF(addr_ptr, count_ptr)
Definition: vec.h:165
m0_bcount_t p_size
Definition: xcode.h:110
M0_INTERNAL int m0_xcode_encdec(struct m0_xcode_obj *obj, struct m0_bufvec_cursor *cur, enum m0_xcode_what what)
Definition: xcode.c:416
struct m0_tlink m_linkage
Definition: sock.c:865
struct mover b_writer
Definition: sock.c:894
M0_INTERNAL int m0_mutex_trylock(struct m0_mutex *mutex)
Definition: mutex.c:84
static void stream_error(struct mover *m, struct sock *s)
Definition: sock.c:3510
static int dgram_header(struct mover *self, struct sock *s)
Definition: sock.c:3552
static void ma_fini(struct m0_net_transfer_mc *ma)
Definition: sock.c:1530
static struct m0_addb2_philter p
Definition: consumer.c:40
static size_t nr
Definition: dump.c:1505
static void ip6_decode(struct addr *a, const struct sockaddr *sa)
Definition: sock.c:2738
static bool buf_invariant(const struct buf *buf)
Definition: sock.c:1260
static struct buf * ma_recv_buf(struct ma *ma, m0_bcount_t len)
Definition: sock.c:1703
struct m0_net_transfer_mc * nb_tm
Definition: net.h:1357
#define M0_PRE(cond)
Definition: sock.c:699
static char autotm[1024]
Definition: sock.c:2578
m0_bcount_t m_nob
Definition: sock.c:876
M0_INTERNAL int m0_bitmap_init(struct m0_bitmap *map, size_t nr)
Definition: bitmap.c:86
#define B_P(b)
Definition: sock.c:989
static bool addr_eq(const struct addr *a0, const struct addr *a1)
Definition: sock.c:2806
M0_INTERNAL void m0_mutex_unlock(struct m0_mutex *mutex)
Definition: mutex.c:66
static m0_bcount_t get_max_buffer_segment_size(const struct m0_net_domain *)
Definition: sock.c:1905
static void sock_done(struct sock *s, bool balance)
Definition: sock.c:2035
#define m0_strdup(s)
Definition: string.h:43
static void ep_del(struct mover *w)
Definition: sock.c:2431
int(* o_op)(struct mover *self, struct sock *s)
Definition: sock.c:808
char v_data[WADDR_LEN]
Definition: xcode.h:69
M0_INTERNAL void m0_format_header_pack(struct m0_format_header *dest, const struct m0_format_tag *src)
Definition: format.c:40
struct m0_format_footer p_footer
Definition: xcode.h:86
static int writer_idle(struct mover *self, struct sock *s)
Definition: sock.c:3612
M0_INTERNAL void m0_format_header_unpack(struct m0_format_tag *dest, const struct m0_format_header *src)
Definition: format.c:49
int const char const void size_t int flags
Definition: dir.c:328
static void sock_close(struct sock *s)
Definition: sock.c:1989
static struct m0_semaphore q
Definition: rwlock.c:55
static bool ma_invariant(const struct ma *ma)
Definition: sock.c:1216
#define NULL
Definition: misc.h:38
m0_bindex_t nb_offset
Definition: net.h:1344
static void poller(struct ma *ma)
Definition: sock.c:1360
M0_INTERNAL void m0_bitmap_fini(struct m0_bitmap *map)
Definition: bitmap.c:97
static void ip4_decode(struct addr *a, const struct sockaddr *sa)
Definition: sock.c:2715
static int sock_init(int fd, struct ep *src, struct ep *tgt, uint32_t flags)
Definition: sock.c:2079
static struct m0_addb2_mach * m
Definition: consumer.c:38
static int pk_iov_prep(struct mover *m, struct iovec *iv, int nr, struct m0_bufvec *bv, m0_bcount_t size, int *count)
Definition: sock.c:3177
uint64_t m_magix
Definition: sock.c:847
static struct buffer * cur(struct m0_addb2_mach *mach, m0_bcount_t space)
Definition: addb2.c:791
static const struct m0_sm_conf sock_conf
Definition: sock.c:1147
struct m0_bufvec nb_buffer
Definition: net.h:1322
static int dgram_idle(struct mover *self, struct sock *s)
Definition: sock.c:3523
#define ergo(a, b)
Definition: misc.h:293
M0_INTERNAL void m0_net__tm_cancel(struct m0_net_transfer_mc *tm)
Definition: tm.c:145
int m0_thread_join(struct m0_thread *q)
Definition: kthread.c:169
uint32_t nbd_len
Definition: net_otw_types.h:37
static struct ma * ep_ma(struct ep *ep)
Definition: sock.c:2387
M0_INTERNAL int m0_net_sock_mod_init(void)
Definition: sock.c:3933
Definition: sock.c:846
Definition: sm.h:350
uint64_t b_cookie
Definition: sock.c:890
static FILE * f
Definition: adieu.c:79
static uint64_t tag(uint8_t code, uint64_t id)
Definition: addb2.c:1047
static int stream_pk_done(struct mover *self, struct sock *s)
Definition: sock.c:3492
struct m0_thread t_poller
Definition: sock.c:793
uint64_t m0_time_t
Definition: time.h:37
static m0_bcount_t pk_tsize(const struct mover *m)
Definition: sock.c:3132
#define M0_LOG(level,...)
Definition: trace.h:167
M0_LEAVE()
M0_INTERNAL void m0_net_buffer_pool_unlock(struct m0_net_buffer_pool *pool)
Definition: buffer_pool.c:203
static void ma_lock(struct ma *ma)
Definition: sock.c:1342
uint32_t a_socktype
Definition: xcode.h:75
struct m0_sm_group ntm_group
Definition: net.h:821
static int stream_idle(struct mover *self, struct sock *s)
Definition: sock.c:3447
static void mover_init(struct mover *m, struct ma *ma, const struct mover_op_vec *vop)
Definition: sock.c:2999
uint8_t * nbd_data
Definition: net_otw_types.h:38
m0_bindex_t b_length
Definition: sock.c:909
struct m0_vec ov_vec
Definition: vec.h:147
static void ma__fini(struct ma *ma)
Definition: sock.c:1484
enum m0_net_tm_state ntm_state
Definition: net.h:819
Definition: sock.c:772
m0_bcount_t nb_length
Definition: net.h:1334
uint64_t nb_flags
Definition: net.h:1489
static int ma_confine(struct m0_net_transfer_mc *ma, const struct m0_bitmap *processors)
Definition: sock.c:1619
char m_pkbuf[sizeof(struct packet)]
Definition: sock.c:871
const char * v_name
Definition: sock.c:827
M0_INTERNAL void m0_net_xprt_default_set(const struct m0_net_xprt *xprt)
Definition: net.c:143
M0_INTERNAL bool m0_net__ep_invariant(struct m0_net_end_point *ep, struct m0_net_transfer_mc *tm, bool under_tm_mutex)
Definition: ep.c:42
static const struct m0_format_tag get_tag
Definition: sock.c:1159
static int ep_create(struct ma *ma, struct addr *addr, const char *name, struct ep **out)
Definition: sock.c:2333
#define SOCK_P(s)
Definition: sock.c:984
uint64_t m0_bindex_t
Definition: types.h:80
#define m0_exists(var, nr,...)
Definition: misc.h:134
static int ep_add(struct ep *ep, struct mover *w)
Definition: sock.c:2414
struct sock * m_sock
Definition: sock.c:859
static bool addr_invariant(const struct addr *addr)
Definition: sock.c:1275
#define M0_BITS(...)
Definition: misc.h:236
M0_INTERNAL void * m0_bufvec_cursor_addr(struct m0_bufvec_cursor *cur)
Definition: vec.c:597
uint64_t m0_bcount_t
Definition: types.h:77
struct m0_tlink s_linkage
Definition: sock.c:925
#define M0_THREAD_INIT(thread, TYPE, init, func, arg, namefmt,...)
Definition: thread.h:139
static void pk_decode(struct mover *m)
Definition: sock.c:3431
#define SOCK_F
Definition: sock.c:983
struct m0_tlink b_linkage
Definition: sock.c:902
static int void * buf
Definition: dir.c:1019
struct m0_cookie bd_cookie
Definition: xcode.h:91
struct m0_format_header p_header
Definition: xcode.h:97
const char * nep_addr
Definition: net.h:503
m0_bindex_t nbe_offset
Definition: net.h:1238
#define container_of(ptr, type, member)
Definition: misc.h:33
m0_xcode_what
Definition: xcode.h:647
#define M0_SET0(obj)
Definition: misc.h:64
M0_INTERNAL void ep__print(const struct ep *ep)
Definition: sock.c:3985
M0_INTERNAL void m0_mutex_lock(struct m0_mutex *mutex)
Definition: mutex.c:49
#define EP_PUT(e, f)
Definition: sock.c:1209
m0_bindex_t b_offset
Definition: sock.c:904
int e_r_mover
Definition: sock.c:764
M0_INTERNAL void m0_net_sock_mod_fini(void)
Definition: sock.c:3957
m0_bcount_t nbe_length
Definition: net.h:1226
M0_INTERNAL int m0_data_to_bufvec_copy(struct m0_bufvec_cursor *cur, void *data, size_t len)
Definition: vec.c:946
struct m0_net_buffer * nbe_buffer
Definition: net.h:1194
static void ma_unlock(struct ma *ma)
Definition: sock.c:1347
struct bdesc b_peer
Definition: sock.c:898
static int pk_io(struct mover *w, struct sock *s, uint64_t flag, struct m0_bufvec *bv, m0_bcount_t size)
Definition: sock.c:3222
static int writer_pk_done(struct mover *self, struct sock *s)
Definition: sock.c:3658
static int writer_pk(struct mover *self, struct sock *s)
Definition: sock.c:3629
M0_INTERNAL void m0_net_buffer_event_post(const struct m0_net_buffer_event *ev)
Definition: buf.c:314
static void pk_encdec(struct mover *m, enum m0_xcode_what what)
Definition: sock.c:3419
struct m0_net_end_point * nbe_ep
Definition: net.h:1251
uint64_t s_flags
Definition: sock.c:918
#define PRIx64
Definition: types.h:61
bool t_shutdown
Definition: sock.c:796
struct packet m_pk
Definition: sock.c:869
Definition: sock.c:887
static m0_bcount_t count
Definition: xcode.c:167
M0_INTERNAL uint32_t default_xo_rpc_max_segs_nr(struct m0_net_domain *ndom)
Definition: net.c:247
struct m0_tl ntm_end_points
Definition: net.h:856
m0_bindex_t p_offset
Definition: xcode.h:112
#define m0_tl_endfor
Definition: tlist.h:700
uint32_t p_idx
Definition: xcode.h:103
struct m0_bitmap b_done
Definition: sock.c:896
return M0_RC(rc)
op
Definition: libdemo.c:64
Definition: sock.c:754
static bool mover_is_writer(const struct mover *m)
Definition: sock.c:3115
struct m0_chan ntm_chan
Definition: net.h:874
Definition: sock.c:703
static bool mover_invariant(const struct mover *m)
Definition: sock.c:1320
#define M0_ENTRY(...)
Definition: trace.h:170
static int writer_write(struct mover *self, struct sock *s)
Definition: sock.c:3646
static bool sock_invariant(const struct sock *s)
Definition: sock.c:1250
m0_bcount_t p_totalsize
Definition: xcode.h:114
static int ep_balance(struct ep *ep)
Definition: sock.c:2456
#define EP_GET(e, f)
Definition: sock.c:1207
M0_INTERNAL int m0_net__tm_provision_buf(struct m0_net_transfer_mc *tm)
Definition: tm_provision.c:414
static m0_bcount_t stream_pk_size(const struct mover *w, const struct sock *s)
Definition: sock.c:3504
M0_INTERNAL bool m0_bufvec_cursor_move(struct m0_bufvec_cursor *cur, m0_bcount_t count)
Definition: vec.c:574
static void buf_deregister(struct m0_net_buffer *nb)
Definition: sock.c:1770
static char * addr
Definition: node_k.c:37
static int pk_state(const struct mover *w)
Definition: sock.c:3151
int i
Definition: dir.c:1033
static void ip4_encode(const struct addr *a, struct sockaddr *sa)
Definition: sock.c:2702
static void dom_fini(struct m0_net_domain *dom)
Definition: sock.c:1336
uint16_t ot_version
Definition: format.h:63
Definition: sock.c:709
Definition: sock.c:746
M0_INTERNAL void m0_net_xprt_register(const struct m0_net_xprt *xprt)
Definition: net.c:182
bool o_doesio
Definition: sock.c:815
static int64_t max64(int64_t a, int64_t b)
Definition: arith.h:51
int32_t nbe_status
Definition: net.h:1218
const char * f_name
Definition: sock.c:957
struct addr bd_addr
Definition: xcode.h:90
M0_INTERNAL void m0_ref_put(struct m0_ref *ref)
Definition: refs.c:38
#define M0_ERR_INFO(rc, fmt,...)
Definition: trace.h:215
static struct ep * ep_net(struct m0_net_end_point *net)
Definition: sock.c:2393
void m0_ref_init(struct m0_ref *ref, int init_num, void(*release)(struct m0_ref *ref))
Definition: refs.c:24
return M0_ERR(-EOPNOTSUPP)
static const struct m0_net_xprt_ops xprt_ops
Definition: sock.c:3888
static void buf_del(struct m0_net_buffer *nb)
Definition: sock.c:1853
M0_INTERNAL void m0_ref_get(struct m0_ref *ref)
Definition: refs.c:32
int s_fd
Definition: sock.c:916
void * ntm_xprt_private
Definition: net.h:886
static const struct m0_format_tag put_tag
Definition: sock.c:1158
static void ep_free(struct ep *ep)
Definition: sock.c:2493
static bool ma_is_locked(const struct ma *ma)
Definition: sock.c:1352
static bool sock_event(struct sock *s, uint32_t ev)
Definition: sock.c:2201
const char * name
Definition: trace.c:110
struct buf * m_buf
Definition: sock.c:867
Definition: refs.h:34
static const struct socktype stype[]
Definition: sock.c:1156
enum m0_net_queue_type nb_qtype
Definition: net.h:1363
Definition: sock.c:741
static struct ma * buf_ma(struct buf *buf)
Definition: sock.c:2823
#define EP_PL(e)
Definition: sock.c:981
m0_time_t s_last
Definition: sock.c:927
M0_INTERNAL m0_bcount_t m0_bufvec_cursor_step(const struct m0_bufvec_cursor *cur)
Definition: vec.c:581
#define m0_free0(pptr)
Definition: memory.h:77
mover_opcode
Definition: sock.c:698
#define M0_ASSERT(cond)
m0_time_t nb_timeout
Definition: net.h:1387
const char * scf_name
Definition: sm.h:352
static struct m0_sm_state_descr sock_conf_state[]
Definition: sock.c:3714
M0_INTERNAL bool m0_mutex_is_locked(const struct m0_mutex *mutex)
Definition: mutex.c:95
static int stream_header(struct mover *self, struct sock *s)
Definition: sock.c:3466
static int ma_stop(struct m0_net_transfer_mc *ma, bool cancel)
Definition: sock.c:1603
Definition: sock.c:742
m0_time_t m0_time_now(void)
Definition: time.c:134
void m0_sm_state_set(struct m0_sm *mach, int state)
Definition: sm.c:478
struct m0_tl t_deathrow
Definition: sock.c:798
#define M0_DEFAULT_NETWORK
Definition: config.h:281
Definition: xcode.h:89
Definition: tlist.h:251
static int addr_parse_lnet(struct addr *addr, const char *name)
Definition: sock.c:2580
Definition: xcode.h:95
static void addr_encode(const struct addr *addr, struct sockaddr *sa)
Definition: sock.c:2797
m0_net_tm_state
Definition: net.h:630
static int end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *ma, const char *name)
Definition: sock.c:1728
#define m0_streq(a, b)
Definition: string.h:34
M0_INTERNAL void mover__print(const struct mover *m)
Definition: sock.c:3965
static const struct pfamily pf[]
Definition: sock.c:1155
static const struct m0_sm_conf rw_conf
Definition: sock.c:1148
static void pk_encode(struct mover *m)
Definition: sock.c:3436
void(* f_decode)(struct addr *a, const struct sockaddr *sa)
Definition: sock.c:969
void m0_thread_fini(struct m0_thread *q)
Definition: thread.c:92
#define MOCK_LNET
Definition: sock.c:647
static int buf_add(struct m0_net_buffer *nb)
Definition: sock.c:1787
static struct m0_stob_domain * dom
Definition: storage.c:38
struct m0_net_buffer * b_buf
Definition: sock.c:892
Definition: sock.c:806
const struct m0_net_xprt m0_net_sock_xprt
Definition: sock.c:3927
M0_INTERNAL void m0_bufvec_cursor_init(struct m0_bufvec_cursor *cur, const struct m0_bufvec *bvec)
Definition: vec.c:563
int t_epollfd
Definition: sock.c:795
M0_INTERNAL int m0_xcode_obj_enc_to_buf(struct m0_xcode_obj *obj, void **buf, m0_bcount_t *len)
Definition: xcode.c:832
static bool ep_invariant(const struct ep *ep)
Definition: sock.c:1286
M0_INTERNAL int m0_bitmap_ffz(const struct m0_bitmap *map)
Definition: bitmap.c:126
int e_r_buf
Definition: sock.c:766
uint32_t ntm_callback_counter
Definition: net.h:850
M0_INTERNAL m0_bcount_t default_xo_rpc_max_seg_size(struct m0_net_domain *ndom)
Definition: net.c:239
void * m_scratch
Definition: sock.c:881
static void mover_fini(struct mover *m)
Definition: sock.c:3010
void(* v_done)(struct mover *self, struct sock *s)
Definition: sock.c:837
static int addr_parse_native(struct addr *addr, const char *name)
Definition: sock.c:2643
static struct mover * sock_writer(struct sock *s)
Definition: sock.c:1999
uint32_t p_nr
Definition: xcode.h:108
void * m0_alloc(size_t size)
Definition: memory.c:126
M0_INTERNAL void m0_net_tm_event_post(const struct m0_net_tm_event *ev)
Definition: tm.c:84
static m0_bcount_t get_max_buffer_size(const struct m0_net_domain *dom)
Definition: sock.c:1889
int st_proto
Definition: sock.c:948
static void get_done(struct mover *w, struct sock *s)
Definition: sock.c:3709
static int buf_register(struct m0_net_buffer *nb)
Definition: sock.c:1749
static void sock_fini(struct sock *s)
Definition: sock.c:2011
m0_bcount_t(* st_pk_size)(const struct mover *w, const struct sock *s)
Definition: sock.c:940
const struct mover_op_vec * st_reader
Definition: sock.c:938
#define M0_POST(cond)
Definition: xcode.h:73
static int sock_in(struct sock *s)
Definition: sock.c:1959
struct ep * m_ep
Definition: sock.c:861
static void bev_deliver_all(struct m0_net_transfer_mc *ma)
Definition: sock.c:1869
M0_INTERNAL void m0_bitmap_set(struct m0_bitmap *map, size_t idx, bool val)
Definition: bitmap.c:139
static int ma_start(struct m0_net_transfer_mc *ma, const char *name)
Definition: sock.c:1551
int32_t sm_rc
Definition: sm.h:336
static m0_bcount_t pk_dnob(const struct mover *m)
Definition: sock.c:3140
Definition: sock.c:750
Definition: chan.h:229
const struct m0_net_xprt m0_net_lnet_xprt
Definition: lnet_xo.c:679
struct m0_net_transfer_mc * nep_tm
Definition: net.h:493
static void ma_buf_done(struct ma *ma)
Definition: sock.c:1683
static void ep_get(struct ep *ep)
Definition: sock.c:2507
static uint64_t min64u(uint64_t a, uint64_t b)
Definition: arith.h:66
uint32_t a_family
Definition: xcode.h:74
static int addr_resolve(struct addr *addr, const char *name)
Definition: sock.c:2518
struct ep * s_ep
Definition: sock.c:921
static int bev_deliver_sync(struct m0_net_transfer_mc *ma)
Definition: sock.c:1864
struct m0_tl ntm_q[M0_NET_QT_NR]
Definition: net.h:877
M0_INTERNAL int m0_format_footer_verify(const void *buffer, bool iem)
Definition: format.c:149
static void buf_done(struct buf *buf, int rc)
Definition: sock.c:2900
M0_INTERNAL m0_bcount_t m0_vec_count(const struct m0_vec *vec)
Definition: vec.c:53
static void sock_out(struct sock *s)
Definition: sock.c:1968
static char * addr_print(const struct addr *addr)
Definition: sock.c:2748
static void ma_buf_timeout(struct ma *ma)
Definition: sock.c:1656
uint32_t a_port
Definition: xcode.h:77
static int sock_init_fd(int fd, struct sock *s, struct ep *ep, uint32_t flags)
Definition: sock.c:2154
int e_r_find
Definition: sock.c:767
struct m0_ref nep_ref
Definition: net.h:491
uint32_t a_protocol
Definition: xcode.h:76
static struct m0_pool pool
Definition: iter_ut.c:58
struct m0_tl t_done
Definition: sock.c:800
uint64_t s_magix
Definition: sock.c:914
static void writer_done(struct mover *self, struct sock *s)
Definition: sock.c:3670
#define m0_forall(var, nr,...)
Definition: misc.h:112
static m0_bcount_t dgram_pk_size(const struct mover *w, const struct sock *s)
Definition: sock.c:3598
static void bev_notify(struct m0_net_transfer_mc *ma, struct m0_chan *chan)
Definition: sock.c:1878
static bool at(struct ff2c_context *ctx, char c)
Definition: lex.c:77
void(* v_error)(struct mover *self, struct sock *s, int rc)
Definition: sock.c:833
int e_r_sock
Definition: sock.c:765
struct m0_pdclust_tgt_addr tgt
Definition: fd.c:110
static int dgram_interval(struct mover *self, struct sock *s)
Definition: sock.c:3579
static int get_idle(struct mover *self, struct sock *s)
Definition: sock.c:3689
const char * sd_name
Definition: sm.h:383
Definition: sock.c:701
static int ma_init(struct m0_net_transfer_mc *ma)
Definition: sock.c:1445
uint64_t * b_words
Definition: bitmap.h:46
Definition: sock.c:700
char * ep
Definition: sw.h:132
#define EP_FL
Definition: sock.c:980
#define M0_CNT_INC(cnt)
Definition: arith.h:226
static void ep_put(struct ep *ep)
Definition: sock.c:2502
static const struct mover_op_vec dgram_reader_op
Definition: sock.c:1151
struct mover s_reader
Definition: sock.c:923
M0_INTERNAL bool m0_net__buffer_invariant(const struct m0_net_buffer *buf)
Definition: buf.c:46
static struct m0_chan chan[RDWR_REQUEST_MAX]
Definition: queue.c:27
struct m0_net_end_point * ntm_ep
Definition: net.h:868
static void ep_release(struct m0_ref *ref)
Definition: sock.c:2404
M0_INTERNAL void m0_format_footer_update(const void *buffer)
Definition: format.c:95
M0_INTERNAL void m0_sm_init(struct m0_sm *mach, const struct m0_sm_conf *conf, uint32_t state, struct m0_sm_group *grp)
Definition: sm.c:313
#define M0_IS0(obj)
Definition: misc.h:70
Definition: sock.c:743
void(* t_func)(void *)
Definition: thread.h:114
static int dom_init(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: sock.c:1329
#define M0_ALLOC_PTR(ptr)
Definition: memory.h:86
Definition: sock.c:666
M0_INTERNAL bool m0_net__tm_invariant(const struct m0_net_transfer_mc *tm)
Definition: tm.c:67
#define PRIi64
Definition: types.h:59
static int64_t min64(int64_t a, int64_t b)
Definition: arith.h:46
Definition: sock.c:956
M0_INTERNAL int m0_xcode_obj_dec_from_buf(struct m0_xcode_obj *obj, void *buf, m0_bcount_t len)
Definition: xcode.c:850
M0_INTERNAL struct m0_thread * m0_thread_self(void)
Definition: thread.c:122
static m0_bcount_t pk_size(const struct mover *m, const struct sock *s)
Definition: sock.c:3126
static bool flag
Definition: nucleus.c:266
struct m0_tl e_sock
Definition: sock.c:760
M0_INTERNAL void ma__print(const struct ma *ma)
Definition: sock.c:4014
static int bdesc_decode(const struct m0_net_buf_desc *nbd, struct bdesc *out)
Definition: sock.c:2993
Definition: sm.h:301
static m0_bcount_t get_max_buffer_desc_size(const struct m0_net_domain *)
Definition: sock.c:1937
struct m0_tl e_writer
Definition: sock.c:762
struct mover_op v_op[STATE_NR][M_NR]
Definition: sock.c:828
sock_flags
Definition: sock.c:707
M0_INTERNAL bool m0_bitmap_get(const struct m0_bitmap *map, size_t idx)
Definition: bitmap.c:105
m0_bcount_t size
Definition: di.c:39
#define _0C(exp)
Definition: assert.h:311
struct addr e_a
Definition: sock.c:758
static int get_pk(struct mover *self, struct sock *s)
Definition: sock.c:3695
M0_INTERNAL void addr__print(const struct addr *addr)
Definition: sock.c:3972
M0_TL_DESCR_DEFINE(s, "sockets", static, struct sock, s_linkage, s_magix, M0_NET_SOCK_SOCK_MAGIC, M0_NET_SOCK_SOCK_HEAD_MAGIC)
static struct m0_sm_trans_descr rw_conf_trans[]
Definition: sock.c:3791
struct ep * b_other
Definition: sock.c:900
static struct ep * ma_src(struct ma *ma)
Definition: sock.c:1716
Definition: sock.c:747
sock_state
Definition: sock.c:662
const char * nx_name
Definition: net.h:125
#define IS_IN_ARRAY(idx, array)
Definition: misc.h:311
static void ma_event_post(struct ma *ma, enum m0_net_tm_state state)
Definition: sock.c:1628
static int addr_parse(struct addr *addr, const char *name)
Definition: sock.c:2554
static struct m0_sm_trans_descr sock_conf_trans[]
Definition: sock.c:3739
struct m0t1fs_filedata * fd
Definition: dir.c:1030
static bool mover_is_reader(const struct mover *m)
Definition: sock.c:3110
M0_INTERNAL void m0_net_xprt_deregister(const struct m0_net_xprt *xprt)
Definition: net.c:197
static int sock_ctl(struct sock *s, int op, uint32_t flags)
Definition: sock.c:2305
#define M0_CNT_DEC(cnt)
Definition: arith.h:219
#define B_F
Definition: sock.c:988
Definition: sock.c:913
int(* xo_dom_init)(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
Definition: net.h:139
#define M0_XCODE_OBJ(type, ptr)
Definition: xcode.h:962
struct m0_sm s_sm
Definition: sock.c:919
static uint32_t get_rpc_max_segs_nr(struct m0_net_domain *ndom)
Definition: sock.c:1950
struct m0_sm m_sm
Definition: sock.c:862
static struct m0_addb2_net * net
Definition: net.c:27
static bool bev_pending(struct m0_net_transfer_mc *ma)
Definition: sock.c:1873
struct m0_net_buf_desc nb_desc
Definition: net.h:1412
uint64_t b_magix
Definition: sock.c:888
static bool ep_eq(const struct ep *ep, const struct addr *addr)
Definition: sock.c:2816
static int ep_find(struct ma *ma, const char *name, struct ep **out)
Definition: sock.c:2379
static m0_bcount_t get_rpc_max_seg_size(struct m0_net_domain *ndom)
Definition: sock.c:1942
#define TLOG(...)
Definition: sock.c:991
static int mover_op(struct mover *m, struct sock *s, int op)
Definition: sock.c:3056
static void buf_fini(struct buf *buf)
Definition: sock.c:2883
static void writer_error(struct mover *w, struct sock *s, int rc)
Definition: sock.c:3682
#define out(...)
Definition: gen.c:41
static const struct mover_op_vec writer_op
Definition: sock.c:1152
size_t b_nr
Definition: bitmap.h:44
struct m0_net_xprt * xprt
Definition: module.c:61
static int pk_header_done(struct mover *m)
Definition: sock.c:3283
const char * st_name
Definition: sock.c:936
void(* st_error)(struct mover *m, struct sock *s)
Definition: sock.c:946
M0_INTERNAL void buf__print(const struct buf *buf)
Definition: sock.c:4003
static void ma_prune(struct ma *ma)
Definition: sock.c:1467
static const struct mover_op_vec get_op
Definition: sock.c:1153
static void dgram_error(struct mover *m, struct sock *s)
Definition: sock.c:3607
static int stream_interval(struct mover *self, struct sock *s)
Definition: sock.c:3485
static void pk_done(struct mover *m)
Definition: sock.c:3406
#define m0_tl_find(name, var, head,...)
Definition: tlist.h:757
M0_INTERNAL void sock__print(const struct sock *sock)
Definition: sock.c:3979
static int bdesc_encode(const struct bdesc *bd, struct m0_net_buf_desc *out)
Definition: sock.c:2978
struct m0_net_transfer_mc * t_ma
Definition: sock.c:774
#define m0_tl_for(name, head, obj)
Definition: tlist.h:695
M0_INTERNAL uint32_t default_xo_rpc_max_recv_msgs(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: net.c:267
static int buf_accept(struct buf *buf, struct mover *m)
Definition: sock.c:2835
void m0_free(void *data)
Definition: memory.c:146
static struct m0_addb2_source * s
Definition: consumer.c:39
static const struct mover_op_vec stream_reader_op
Definition: sock.c:1150
static struct m0_sm_state_descr rw_conf_state[]
Definition: sock.c:3758
Definition: sock.c:935
static int stream_pk(struct mover *self, struct sock *s)
Definition: sock.c:3455
uint32_t sm_state
Definition: sm.h:307
static int32_t get_max_buffer_segments(const struct m0_net_domain *dom)
Definition: sock.c:1921
M0_INTERNAL m0_bcount_t default_xo_rpc_max_msg_size(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition: net.c:255
void * nb_xprt_private
Definition: net.h:1461
struct m0_pdclust_src_addr src
Definition: fd.c:108
M0_INTERNAL void m0_chan_broadcast(struct m0_chan *chan)
Definition: chan.c:172
struct addrdata a_data
Definition: xcode.h:78
int32_t rc
Definition: trigger_fop.h:47
#define ARRAY_SIZE(a)
Definition: misc.h:45
#define offsetof(typ, memb)
Definition: misc.h:29
const struct m0_sm_conf * sm_conf
Definition: sm.h:320
M0_TL_DEFINE(s, static, struct sock)
#define m0_tl_exists(name, var, head,...)
Definition: tlist.h:774
static int bdesc_create(struct addr *addr, struct buf *buf, struct m0_net_buf_desc *out)
Definition: sock.c:2969
rw_state
Definition: sock.c:740
static void ip6_encode(const struct addr *a, struct sockaddr *sa)
Definition: sock.c:2724
Definition: sock.c:749
struct m0_net_end_point e_ep
Definition: sock.c:756
static void pk_header_init(struct mover *m, struct sock *s)
Definition: sock.c:3261
const struct mover_op_vec * m_op
Definition: sock.c:863
static int dgram_pk(struct mover *self, struct sock *s)
Definition: sock.c:3533
Definition: vec.h:145
struct m0_net_end_point * nb_ep
Definition: net.h:1424
#define INT32_MAX
Definition: types.h:43
#define m0_tl_forall(name, var, head,...)
Definition: tlist.h:735
static void addr_decode(struct addr *addr, const struct sockaddr *sa)
Definition: sock.c:2788
void(* f_encode)(const struct addr *a, struct sockaddr *sa)
Definition: sock.c:963
#define M0_IMPOSSIBLE(fmt,...)
static int dgram_pk_done(struct mover *self, struct sock *s)
Definition: sock.c:3587
M0_INTERNAL void m0_sm_fini(struct m0_sm *mach)
Definition: sm.c:331
struct m0_net_buffer_pool * ntm_recv_pool
Definition: net.h:896