|
#define | M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_NET |
|
#define | EP_DEBUG (1) |
|
#define | MOCK_LNET (0) |
|
#define | SAFE(val, exp, fallback) ((val) != NULL ? (exp) : (fallback)) |
|
#define | EP_F "%s" |
|
#define | EP_P(e) SAFE(e, (e)->e_ep.nep_addr, "null") |
|
#define | EP_FL EP_F "->" EP_F |
|
#define | EP_PL(e) EP_P(SAFE(e, ma_src(ep_ma(e)), NULL)), EP_P(e) |
|
#define | SOCK_F EP_FL "/%i/%" PRIx64 "/%i" |
|
#define | SOCK_P(s) |
|
#define | B_F "[[%i]@%p:"EP_FL"]" |
|
#define | B_P(b) (b)->b_buf->nb_qtype, (b), EP_PL((b)->b_other) |
|
#define | TLOG(...) do { /* M0_LOG(M0_DEBUG, __VA_ARGS__) */; } while (0) |
|
#define | EP_GET(e, f) ({ struct ep *__ep = (e); ep_get(__ep); M0_CNT_INC(__ep->e_r_ ## f); }) |
|
#define | EP_PUT(e, f) ({ struct ep *__ep = (e); ep_put(__ep); M0_CNT_DEC(__ep->e_r_ ## f); }) |
|
#define | __MOTR_NET_SOCK_XCODE_H__ |
|
|
enum | sock_state {
S_INIT,
S_LISTENING,
S_CONNECTING,
S_OPEN,
S_DELETED
} |
|
enum | mover_opcode { M_READ,
M_WRITE,
M_CLOSE,
M_NR
} |
|
enum | sock_flags { HAS_READ = M0_BITS(M_READ),
HAS_WRITE = M0_BITS(M_WRITE),
WRITE_POLL = M0_BITS(M_NR + 1)
} |
|
enum | rw_state {
R_IDLE,
R_PK,
R_HEADER,
R_INTERVAL,
R_PK_DONE,
R_FAIL,
R_DONE,
R_NR,
STATE_NR = R_NR
} |
|
enum | { M0_NET_SOCK_PROTO_VERSION = 1,
M0_NET_SOCK_PROTO_PUT = 0x1,
M0_NET_SOCK_PROTO_GET = 0x2
} |
|
enum | { WADDR_LEN = sizeof(struct in6_addr)
} |
|
|
| M0_TL_DESCR_DEFINE (s, "sockets", static, struct sock, s_linkage, s_magix, M0_NET_SOCK_SOCK_MAGIC, M0_NET_SOCK_SOCK_HEAD_MAGIC) |
|
| M0_TL_DEFINE (s, static, struct sock) |
|
| M0_TL_DESCR_DEFINE (m, "movers", static, struct mover, m_linkage, m_magix, M0_NET_SOCK_MOVER_MAGIC, M0_NET_SOCK_MOVER_HEAD_MAGIC) |
|
| M0_TL_DEFINE (m, static, struct mover) |
|
| M0_TL_DESCR_DEFINE (b, "buffers", static, struct buf, b_linkage, b_magix, M0_NET_SOCK_BUF_MAGIC, M0_NET_SOCK_BUF_HEAD_MAGIC) |
|
| M0_TL_DEFINE (b, static, struct buf) |
|
static int | dom_init (const struct m0_net_xprt *xprt, struct m0_net_domain *dom) |
|
static void | dom_fini (struct m0_net_domain *dom) |
|
static int | ma_init (struct m0_net_transfer_mc *ma) |
|
static int | ma_confine (struct m0_net_transfer_mc *ma, const struct m0_bitmap *processors) |
|
static int | ma_start (struct m0_net_transfer_mc *ma, const char *name) |
|
static int | ma_stop (struct m0_net_transfer_mc *ma, bool cancel) |
|
static void | ma_fini (struct m0_net_transfer_mc *ma) |
|
static int | end_point_create (struct m0_net_end_point **epp, struct m0_net_transfer_mc *ma, const char *name) |
|
static int | buf_register (struct m0_net_buffer *nb) |
|
static void | buf_deregister (struct m0_net_buffer *nb) |
|
static int | buf_add (struct m0_net_buffer *nb) |
|
static void | buf_del (struct m0_net_buffer *nb) |
|
static int | bev_deliver_sync (struct m0_net_transfer_mc *ma) |
|
static void | bev_deliver_all (struct m0_net_transfer_mc *ma) |
|
static bool | bev_pending (struct m0_net_transfer_mc *ma) |
|
static void | bev_notify (struct m0_net_transfer_mc *ma, struct m0_chan *chan) |
|
static m0_bcount_t | get_max_buffer_size (const struct m0_net_domain *dom) |
|
static m0_bcount_t | get_max_buffer_segment_size (const struct m0_net_domain *) |
|
static int32_t | get_max_buffer_segments (const struct m0_net_domain *dom) |
|
static m0_bcount_t | get_max_buffer_desc_size (const struct m0_net_domain *) |
|
static void | poller (struct ma *ma) |
|
static void | ma__fini (struct ma *ma) |
|
static void | ma_prune (struct ma *ma) |
|
static void | ma_lock (struct ma *ma) |
|
static void | ma_unlock (struct ma *ma) |
|
static bool | ma_is_locked (const struct ma *ma) |
|
static bool | ma_invariant (const struct ma *ma) |
|
static void | ma_event_post (struct ma *ma, enum m0_net_tm_state state) |
|
static void | ma_buf_done (struct ma *ma) |
|
static void | ma_buf_timeout (struct ma *ma) |
|
static struct buf * | ma_recv_buf (struct ma *ma, m0_bcount_t len) |
|
static struct ep * | ma_src (struct ma *ma) |
|
static int | ep_find (struct ma *ma, const char *name, struct ep **out) |
|
static int | ep_create (struct ma *ma, struct addr *addr, const char *name, struct ep **out) |
|
static void | ep_free (struct ep *ep) |
|
static void | ep_put (struct ep *ep) |
|
static void | ep_get (struct ep *ep) |
|
static bool | ep_eq (const struct ep *ep, const struct addr *addr) |
|
static struct ma * | ep_ma (struct ep *ep) |
|
static struct ep * | ep_net (struct m0_net_end_point *net) |
|
static void | ep_release (struct m0_ref *ref) |
|
static bool | ep_invariant (const struct ep *ep) |
|
static int | ep_add (struct ep *ep, struct mover *w) |
|
static void | ep_del (struct mover *w) |
|
static int | ep_balance (struct ep *ep) |
|
static int | addr_resolve (struct addr *addr, const char *name) |
|
static int | addr_parse (struct addr *addr, const char *name) |
|
static int | addr_parse_lnet (struct addr *addr, const char *name) |
|
static int | addr_parse_native (struct addr *addr, const char *name) |
|
static void | addr_decode (struct addr *addr, const struct sockaddr *sa) |
|
static void | addr_encode (const struct addr *addr, struct sockaddr *sa) |
|
static char * | addr_print (const struct addr *addr) |
|
static bool | addr_invariant (const struct addr *addr) |
|
static bool | addr_eq (const struct addr *a0, const struct addr *a1) |
|
static int | sock_in (struct sock *s) |
|
static void | sock_out (struct sock *s) |
|
static void | sock_close (struct sock *s) |
|
static void | sock_done (struct sock *s, bool balance) |
|
static void | sock_fini (struct sock *s) |
|
static bool | sock_event (struct sock *s, uint32_t ev) |
|
static int | sock_ctl (struct sock *s, int op, uint32_t flags) |
|
static int | sock_init_fd (int fd, struct sock *s, struct ep *ep, uint32_t flags) |
|
static int | sock_init (int fd, struct ep *src, struct ep *tgt, uint32_t flags) |
|
static struct mover * | sock_writer (struct sock *s) |
|
static bool | sock_invariant (const struct sock *s) |
|
static struct ma * | buf_ma (struct buf *buf) |
|
static bool | buf_invariant (const struct buf *buf) |
|
static void | buf_fini (struct buf *buf) |
|
static int | buf_accept (struct buf *buf, struct mover *m) |
|
static void | buf_done (struct buf *buf, int rc) |
|
static void | buf_complete (struct buf *buf) |
|
static int | bdesc_create (struct addr *addr, struct buf *buf, struct m0_net_buf_desc *out) |
|
static int | bdesc_encode (const struct bdesc *bd, struct m0_net_buf_desc *out) |
|
static int | bdesc_decode (const struct m0_net_buf_desc *nbd, struct bdesc *out) |
|
static void | mover_init (struct mover *m, struct ma *ma, const struct mover_op_vec *vop) |
|
static void | mover_fini (struct mover *m) |
|
static int | mover_op (struct mover *m, struct sock *s, int op) |
|
static bool | mover_is_reader (const struct mover *m) |
|
static bool | mover_is_writer (const struct mover *m) |
|
static bool | mover_invariant (const struct mover *m) |
|
static m0_bcount_t | pk_size (const struct mover *m, const struct sock *s) |
|
static m0_bcount_t | pk_tsize (const struct mover *m) |
|
static m0_bcount_t | pk_dnob (const struct mover *m) |
|
static int | pk_state (const struct mover *w) |
|
static int | pk_io (struct mover *w, struct sock *s, uint64_t flag, struct m0_bufvec *bv, m0_bcount_t size) |
|
static int | pk_iov_prep (struct mover *m, struct iovec *iv, int nr, struct m0_bufvec *bv, m0_bcount_t size, int *count) |
|
static void | pk_header_init (struct mover *m, struct sock *s) |
|
static int | pk_header_done (struct mover *m) |
|
static void | pk_done (struct mover *m) |
|
static void | pk_encdec (struct mover *m, enum m0_xcode_what what) |
|
static void | pk_decode (struct mover *m) |
|
static void | pk_encode (struct mover *m) |
|
static int | stream_idle (struct mover *self, struct sock *s) |
|
static int | stream_pk (struct mover *self, struct sock *s) |
|
static int | stream_header (struct mover *self, struct sock *s) |
|
static int | stream_interval (struct mover *self, struct sock *s) |
|
static int | stream_pk_done (struct mover *self, struct sock *s) |
|
static int | dgram_idle (struct mover *self, struct sock *s) |
|
static int | dgram_pk (struct mover *self, struct sock *s) |
|
static int | dgram_header (struct mover *self, struct sock *s) |
|
static int | dgram_interval (struct mover *self, struct sock *s) |
|
static int | dgram_pk_done (struct mover *self, struct sock *s) |
|
static int | writer_idle (struct mover *self, struct sock *s) |
|
static int | writer_pk (struct mover *self, struct sock *s) |
|
static int | writer_write (struct mover *self, struct sock *s) |
|
static int | writer_pk_done (struct mover *self, struct sock *s) |
|
static int | get_idle (struct mover *self, struct sock *s) |
|
static int | get_pk (struct mover *self, struct sock *s) |
|
static void | writer_done (struct mover *self, struct sock *s) |
|
static void | writer_error (struct mover *w, struct sock *s, int rc) |
|
static m0_bcount_t | stream_pk_size (const struct mover *w, const struct sock *s) |
|
static void | stream_error (struct mover *m, struct sock *s) |
|
static m0_bcount_t | dgram_pk_size (const struct mover *w, const struct sock *s) |
|
static void | dgram_error (struct mover *m, struct sock *s) |
|
static void | ip4_encode (const struct addr *a, struct sockaddr *sa) |
|
static void | ip4_decode (struct addr *a, const struct sockaddr *sa) |
|
static void | ip6_encode (const struct addr *a, struct sockaddr *sa) |
|
static void | ip6_decode (struct addr *a, const struct sockaddr *sa) |
|
static m0_bcount_t | get_rpc_max_seg_size (struct m0_net_domain *ndom) |
|
static uint32_t | get_rpc_max_segs_nr (struct m0_net_domain *ndom) |
|
static void | get_done (struct mover *w, struct sock *s) |
|
M0_INTERNAL int | m0_net_sock_mod_init (void) |
|
M0_INTERNAL void | m0_net_sock_mod_fini (void) |
|
M0_INTERNAL void | mover__print (const struct mover *m) |
|
M0_INTERNAL void | addr__print (const struct addr *addr) |
|
M0_INTERNAL void | sock__print (const struct sock *sock) |
|
M0_INTERNAL void | ep__print (const struct ep *ep) |
|
M0_INTERNAL void | buf__print (const struct buf *buf) |
|
M0_INTERNAL void | ma__print (const struct ma *ma) |
|
| M0_BASSERT (sizeof(struct in_addr)<=sizeof(struct in6_addr)) |
|
struct addrdata | M0_XCA_DOMAIN (rpc) |
|
Overview
net/sock.[ch] contains an implementation of the interfaces defined in net/net.h. Together these interfaces define a network transport layer used by motr. A network transport provides unreliable, unordered, asynchronous point-to-point messaging interface.
The rpc layer (rpc/) is the only user of network transport, it provides a higher level interface that the rest of motr uses.
The documentation uses "io" as a verb, e.g., "X bytes were ioed".
Main abstractions provided by a network transport are:
- an end-point (m0_net_end_point) identifies a network peer. Messages are
sent to and received from peers. An end-points has a string name. The
name syntax depends on the transport, see addr_parse().
- a network buffer (m0_net_buffer) represents an operation of
transferring data between a local memory buffer, specified by
m0_net_buffer::nb_buffer bufvec, and a remote buffer located in the
address space of a peer across network and associated with a network
buffer on that peer.
- a network transfer machine (m0_net_transfer_mc) represents the local
network end-point. It keeps track of buffers being sent to or received
from other peers.
A network buffer is "added" (also referred to as "queued") to a transfer machine (m0_net_buffer_add()) to initiate the operation. When the transfer operation completes a per-buffer completion call-back (m0_net_buffer::nb_callbacks) specified by the user is invoked and the buffer is removed from the transfer machine queue (there is a special mode, when multiple transfer operations and multiple call-back can be served for the same network buffer, see M0_NET_BUF_RETAIN). "Transfer operation completion" here means that the network transfer is done with the operation and won't touch the local buffer memory again. Completion doesn't guarantee that the peer has seen the buffer data.
There are multiple transfer operation types corresponding to queues within a transfer machine (m0_net_transfer_mc::ntm_q[]) and identified by m0_net_buffer::nb_qtype field taken from enum m0_net_queue_type.
M0_NET_QT_MSG_RECV and M0_NET_QT_MSG_SEND operation types correspond to the simplest form of messaging:
- to send a message, the network transport user places the message in the
memory buffer associated with a network buffer and then adds the buffer
to the M0_NET_QT_MSG_SEND queue;
- to receive messages, the user adds some number of buffers to the
M0_NET_QT_MSG_RECV queue. When a data sent via M0_NET_QT_MSG_SEND
operation are received, a buffer from the M0_NET_QT_MSG_RECV queue is
selected and the data are placed in the associated memory buffer. If no
receive buffers are available, the incoming data are dropped with a
error message. See Provisioning section below.
Other operation types are for so-called "bulk" operations which are efficient for transfers of large amounts of data. They are based on "network buffer
descriptors" (m0_net_buf_desc). A network buffer descriptor is a transport-specific datum (opaque to the upper layers) which uniquely identifies a particular buffer within its local peer. Each bulk operation involves 2 network buffers: one passive and one active. When the passive buffer is added to a transfer machine, its buffer descriptor is generated and stored in m0_net_buffer::nb_desc. The user then somehow (e.g., by means of the simple messaging, described above) transmits this descriptor to the other peer. At that peer an active buffer is prepared, the received descriptor is stored in the buffer ->nb_desc field and the buffer is added to the active queue. At that moment data transfer is initiated between these two buffers. The direction of transfer is independent from which buffer is active, which gives 4 queues (M0_NET_QT_PASSIVE_BULK_RECV, M0_NET_QT_PASSIVE_BULK_SEND, M0_NET_QT_ACTIVE_BULK_RECV and M0_NET_QT_ACTIVE_BULK_SEND), which can be used as following:
- buffer A: passive-recv, buffer B: active-send: the descriptor is moved
from A to B, the data are moved from B to A;
- buffer A: passive-send, buffer B: active-recv: the descriptor is moved
from A to B, the data are moved from A to B;
Bulk transfers are exported to the upper layers (io client and io service) via rpc-bulk interface (rpc/bulk.h).
sock network transport is implemented on top of the standard bsd socket interface. It is user-space only.
Data structures
sock.c introduces data-structures corresponding to the abstractions in net.h:
- struct ep: end-point. It embeds generic m0_net_end_point as e::e_ep;
- struct buf: network buffer. It links to the corresponding m0_net_buffer
through buf::b_buf. Generic buffer points back to buf via
m0_net_buffer::nb_xprt_private;
- struct ma: transfer machine (because struct tm is defined in
<time.h>). It links to the corresponding m0_net_transfer_mc through
ma::t_ma. Generic transfer machine points back to ma via
m0_net_transfer_mc::ntm_xprt_private;
Additional data-structures:
- struct addr: network address;
- struct sock: represents a socket---a connection between two
end-points. A sock is a state machine with states taken from enum
sock_state;
- struct packet. Data are transferred through network in units called
"packets". A packet has a header that identifies its source and target
buffers, length, offset, etc.;
- struct mover: a state machine copying data between a socket and a
buffer. A mover is a state machine.
A transfer machine keeps a list of end-points (m0_net_transfer_mc::ntm_end_points). An end-point keeps a list of sockets connected to the peer represented by this end-point (ep::e_sock).
A mover can be either a reader (reading data from a socket and writing them to a buffer) or a writer (reading from a buffer and writing to a socket).
A reader is associated with every socket (sock::s_reader). It reads incoming packets and copies them to the appropriate buffers. This reader is initialised and finalised together with the socket (sock_init(), sock_fini()).
A writer is associated with every M0_NET_QT_MSG_SEND and every M0_NET_QT_ACTIVE_BULK_SEND buffer (buf::b_writer). This writer is initialised when the buffer is queued (buf_add()) and finalised when operation completes. As described in the Protocol section below, a writer is usually associated with a M0_NET_QT_PASSIVE_BULK_SEND buffer as part of the bulk protocol.
An end-point keeps a list of all writers writing data to its sockets (ep::e_writer). Note that a writer is associated with an end-point rather than a particular socket to this end-point. This allows, in principle, use of multiple sockets to write the same buffer in parallel. While writing a particular packet, the writer and the socket are "locked" together and the socket cannot be used to write other packets (because doing so would make it impossible to parse packets at the other end). While locked, the writer mover::m_sock points to the socket (see sock_writer()).
An address uniquely identifies an end-point in the network. An end-point embeds its address (ep::e_a). An address has address family independent part (address family, socket type, protocol and port, all in processor byte order) and address family specific part (addr::a_data). The latter contains the appropriate in{,6}_addr struct in network byte order. An addr is encoded to a sockaddr (addr_encode()) before calling socket functions (like bind(2), connect(2), inet_ntop(3)). An addr is recovered (addr_decode()) from sockaddr returned by accept4(2).
States and transitions
sock uses non-blocking socket operations through linux epoll(2) interface (a switch to poll(2) or select(2) can be made easily).
There are two types of activity in sock module:
- "synchronous" that is initiated by net/net.h entry-points being called
by a user and
- "asynchronous" initiated by network related events, such as incoming
connections or sockets being ready for non-blocking io.
Synchronous activities.
A buffer is added to a transfer machine (m0_net_buf_add()->...->buf_add()).
If the buffer is added to M0_NET_QT_MSG_RECV, M0_NET_QT_PASSIVE_BULK_SEND or M0_NET_QT_PASSIVE_BULK_RECV queue it is just placed on the queue, waiting for the incoming data from a matching buffer.
If the buffer is queued to M0_NET_QT_MSG_SEND or M0_NET_QT_ACTIVE_BULK_SEND, its writer is initialised and this writer is added to the target end-point (ep_add()). If necessary, this initiates the asynchronous creation of a socket toward that end-point (non-blocking connect(2)): ep_balance()->sock_init().
The remaining queuing mode –M0_NET_QT_ACTIVE_BULK_RECV– is curious: it indicates that our side should initiate transfer of data from the remote M0_NET_QT_PASSIVE_BULK_RECV buffer to our buffer. There is no way to do this but by sending a special message to the remote side, see GET packet description in the Protocol section. buf_add() initialises the buffer writer to send a GET packet and adds it to the target end-point (ep_add()) as in the previous case.
Asynchronous activities.
When a transfer machine is started, an end-point, representing the local peer, is created. Then a socket to this end-point is created. This is a "listening socket": listen(2) is called on it and it becomes readable (in epoll sense) when a new incoming connection arrives. This socket forever stays in S_LISTENING mode.
The starting point of asynchronous activity associated with a sock transfer machine is poller(). Currently, this function is ran as a separate thread, but it can easily be adapted to be executed as a chore (m0_locality_chore_init()) within a locality.
poller() gets from epoll_wait(2) a list of readable and writable sockets and calls sock_event(), which is socket state machine transition function. sock_event() handles following cases:
- an incoming connection (the S_LISTENING socket becomes readable):
accept the connection by calling accept4(2), create the end-point for
the remote peer, create the sock for the connection and initialise its
reader;
- outgoing connection is established (an S_CONNECTING socket becomes
writable): this happens when a non-blocking connect(2) completes. Moves
the socket to S_ACTIVE state;
- incoming data on an S_ACTIVE socket: call socket reader (sock_in()) to
handle arrived data;
- a non-blocking write on an S_ACTIVE socket is possible (that is, some
space in the socket in-kernel buffer is available): call writers
associated with the socket end-point (sock_out()) to write to the
socket;
- an incoming GET packet for an M0_NET_QT_PASSIVE_BULK_SEND buffer. The
buffer writer is initialised and transfer of data to the target buffer
starts (pk_header_done());
- an error event is raised for a socket, or a socket is closed by the
peer (sock_close()): close the socket.
What happens here is that there is a collection of movers (readers and writers) associated with end-points, sockets and buffers, and their state machines are advanced when non-blocking io is possible.
Readers and writers are state machines and they have the same set of states (enum rw_state) and the graph of state transitions (rw_conf_state[], rw_conf_trans[]), because, after all, they are just data-copying loops:
- R_IDLE: initial state. Here readers wait for incoming data;
- R_PK: here processing (input or output) of the next packet starts;
- R_HEADER: in this state packet header is ioed. This state loops until
the entire header is ioed;
- R_INTERVAL: once the header has been ioed, the packet payload is
ioed. The payload is split in a number of "intervals", where each
interval is ioed without blocking. This state loops until the entire
payload is ioed;
- R_PK_DONE: packet completed. For a writer: switch to the next packet,
if available and go to R_PK. If all packets have been written, complete
the buffer (buf_done()) and go to R_DONE. For a reader, go back to
R_IDLE.
- R_DONE: writer completes.
The differences between various movers are represented by mover_op_vec structure. It contains functions called when the mover reaches a particular state, see mover_op(). All writers use the same op_vec: writer_op, readers use stream_reader_op or dgram_reader_op depending on the socket type. Writers for GET packets (see Protocol section) use get_op.
Protocol
Network communication over sockets happen in the form of "packets". There are 2 types of packets:
- PUT packet contains data to be copied from the source buffer to the
target buffer. A PUT packet is sent from the peer containing the source
buffer to the peer containing the target buffer;
- GET packet is a request to initiate transfer of the source buffer to
the target buffer. It is sent to the peer containing the source buffer.
A packet on the wire starts with the header (struct packet). The header identifies the source buffer, the target buffer and, for a PUT packet, identifies which part of the source buffer is transmitted in this packet.
In a PUT packet, the header is followed by the payload data. The header specifies the total number of packets used to transfer this buffeer, the starting offset, size and index of this packet and the total buffer size. The receiver uses these data to verify the header (pk_header_done(), buf_accept()), to update the current state of the copy operation and to detect operation completion (pk_done()). PUT packets for the same buffer can be sent out of order and through different sockets.
A GET packet has no payload. GET packets are used to initiate passive-send -> active-recv bulk transfers (see above). Note that a GET packet is not necessarily sent from the target buffer's peer: it can be sent from a 3rd peer to command 2 peers to transfer data between their buffers.
A buffer (either source of target) is identified in the packet by struct bdesc structure, which contains peer address (struct addr) and buffer cookie (see lib/cookie.h).
Concurrency
sock module uses a very simple locking model: all state transitions are protected by a per-tm mutex: m0_net_transfer_mc::ntm_mutex. For synchronous activity, this mutex is taken by the entry-point code in net/ and is not released until the entry-point completes. For asynchronous activity, poller() keeps the lock taken most of the time.
A few items related to concurrency worth mentioning:
- the tm lock is not held while epoll_wait() is executed by poller(). On
the other hand, the pointer to a sock structure is stored inside of
epoll kernel-state (sock_ctl()). This means that sock structures cannot
be freed in a synchronous context, lest the epoll-stored state points
to an invalid memory region. To deal with this, a sock is not freed
immediately. Instead it is moved to S_DELETED state and placed on a
special per-tm list: ma::t_deathrow. Actual freeing is done by
ma_prune() called from poller();
- buffer completion (buf_done()) includes removing the buffer from its
queue and invoking a user-supplied call-back
(m0_net_buffer::nb_callbacks). Completion can happen both
asynchronously (when a transfer operation completes for the buffer or
the buffer times out (ma_buf_timeout())) and synchronously (when the
buffer is canceled by the user (m0_net_buf_del()->...->buf_del()). The
difficulty is that net.h interface specifies that the transfer machine
lock is to be released before invoking the call-back. This cannot be
done in a synchronous context (to avoid breaking invariants), so in
this case the buffer is queued to a special ma::t_done queue which is
processed asynchronously by ma_buf_done().
Socket interface use
sock module uses standard bsd socket interface, but
- accept4(2), a linux-specific system call, is used instead of accept(2)
to set SOCK_NONBLOCK option on the new socket, eliminating the cost of
a separate setsockopt(2) call, see sock_event(). Switch to accept(2) is
trivial;
- similarly, a non-portable extension to socket(2) is used to set
SOCK_NONBLOCK in socket(2), see sock_init_fd();
- bsd Reno "len" fields in socket address structures are optionally used;
- ipv4 and ipv6 protocol families are supported. Unix domain sockets are
not supported.
When a socket is created, it is added to the epoll instance monitored by poller() (sock_init_fd()). All sockets are monitored for read events. Only sockets to end-points with a non-empty list of writers are monitored for writes (ep_balance()).
- Todo:
- It is not clear how to manage write-monitoring in case of multiple "parallel" sockets to the same end-point. If writeability of all such sockets is monitored, epoll_wait() can busy-loop. If not all of them are monitored, they are useless for concurrent writes.
Buffer data are transmitted as a collection of PUT packets. For each packet, first the header is transmitted, then the payload. The payload is transmitted as a sequence of "intervals" (see above). This design is needed to accommodate both stream (IPPROTO_TCP, SOCK_STREAM) and datagram (IPPROTO_UDP, SOCK_DGRAM) sockets within the same mover state machine:
- for stream sockets, packet size is equal to the buffer data size
(stream_pk_size()), that is, the entire buffer is transmitted as a
single packet, consisting of multiple intervals. Note, that it is not
required that the entire header is written in one write;
- for datagram sockets, packet size is equal to the maximal datagram size
(dgram_pk_size()). The buffer is transmitted as a sequence of
packets. Each packet (both header and payload) is transmitted as a
single datagram (i.e., one interval per packet).
In the write path, the differences stream of datagram sockets are hidden in pk_io_prep() and pk_io() that track how much of the packet has been ioed.
In the read path, the differences cannot be hidden, because the header must be read and parsed to understand to which buffer the incoming data should be placed. For a stream socket, a reader reads the header (stream_header()) parses it (pk_header_done()) and then reads payload directly into the target buffer (stream_interval(), pk_io()).
For a datagram socket, the packet header and payload are read together as a single datagram, hence the reader cannot read the data directly in the (yet unknown) buffer. Instead, a per-reader temporary area (mover::m_scratch) is allocated (dgram_pk()). The packet payload is read in this area, the header is parsed and the data are copied in the target buffer (all done in dgram_header()).
When a socket to a remote end-point is opened, the kernel automatically assigns a source address to it (the local ip address and the selected ephemeral port). This address is returned from accept4(2) at the remote peer. The remote peer and creates an end-point for it (sock_event()). Each packet header contains the "real" (not ephemeral) source address, used by the remote end to establish a connection in the opposite direction. As a result, 2 peers exchanging data will use 2 sockets, one in each direction:
*
* +-----------------------+ +-----------------------+
* | peer-A, ip: A | | peer-B, ip: B |
* +-----------------------+ +-----------------------+
* | tm: src address: A:p0 | | tm: src address: B:p1 |
* | | sock: A -> B | |
* | A:eph0 ---------+---------------+---> B:p1 |
* | | sock: B -> A | |
* | A:p0 <--------+---------------+---- B:eph1 |
* | | | |
* +-----------------------+ +-----------------------+
*
*
Here eph0 and eph1 are ephemeral source ports assigned to by kernel.
Data structures life-cycles
Transfer machine life-cycle:
- m0_net_tm_init() -> ... -> ma_init(): allocate ma data structure;
- m0_net_tm_start() -> ... -> ma_start(): start poller and initialise
epoll;
- transfer machine is active;
- m0_net_tm_stop() -> ... -> ma_stop(): stop poller;
- m0_net_tm_fini() -> ... -> ma_fini().
Buffer life-cycle:
- m0_net_buffer_register() -> ... -> buf_register(): register the buffer
with its network transport. Once registered, a buffer can be used for
multiple operartions. Registration is used by some transports (but not
by sock currently) to do things like registering the buffer with a
hardware rdma engine;
- m0_net_buffer_add() -> ... -> buf_add(): queue the buffer. If
necessary, initialise buffer writer, generate buffer descriptor;
- operation is in progress: data are transferred to or from the buffer;
- normal operation completion: buf_done(), ma_buf_done(). User-provided
call-back is invoked;
- alternatively: timeout (ma_buf_timeout());
- alternatively: cancellation: m0_net_buffer_del() -> ... -> buf_del();
- at this point the buffer is removed from the queue, but still
registered. It can be re-queued;
- de-registration: m0_net_buffer_deregister() -> ... ->
buf_deregister(). The buffer is finalised.
End-point life-cycle.
End-points are reference counted. Users can take references to the end-points (m0_net_end_point_create() -> ... -> end_point_create()). An additional reference is taken by each sock to this end-point (sock_init(), sock_fini()) and each buffer writing to this end-point (ep_add(), ep_del()).
Sock life-cycle.
A sock is allocated (sock_init()) when a transfer machine starts (ma_start()), when an outgoing connection is established (ep_add() -> ep_balance()) or when an incoming connection is received (sock_event()).
sock is finalised (file descriptor closed, etc.) by sock_done() when the transfer machine stops, when an error condition is detected on the socket, or when an error happens in a mover locked to the socket (stream_error(), writer_error()). The sock has to be closed in the latter case, because the remote end won't to able to correctly parse data from other writers anyway. The sock structure is not freed by sock_done(), see Concurrency section.
Mover life-cycle.
Movers are embedded in buffers (writers) or socks (readers) and have no independent life-cycle.
Provisioning
Bulk operations happen between buffers known to the user and it is up to the user to allocate and queue both active and passive buffers.
For non-bulk messaging (M0_NET_QT_MSG_RECV, M0_NET_QT_MSG_SEND), on the other hand, the user has no direct control over the buffers selected for receiving messages and cannot guarantee that the receive queue is not empty. To deal with this, provisioning (net/tm_provisioning.[ch]) module tries to keep some number of buffers on the receive queue at all times. Unfortunately, this mechanism is not suitable for sock transport as is, see the comment in tm_provision_recv_q().
sock has its own provisioning, see the comment in pk_header_done().
Limitations and options
Only TCP sockets have been tested so far.
Only 1 socket to a particular end-point is supported, that is, multiple parallel sockets are not opened. Whether they are needed at all is an interesting question.
Once opened, a socket is never closed until an error or tm finalisation. Sockets should perhaps be garbage collected after a period of inactivity.
Packets for a buffer are sent sequentially.
rdma (ROCE or iWARP) is not supported.
Multiple incoming messages in one buffer (see M0_NET_QT_MSG_RECV, m0_net_buffer::nb_min_receive_size, m0_net_buffer::nb_max_receive_msgs) are not supported.
Differences with lnet
sock is a faithful implementation of net.h transport protocol. Moreover it goes out of its way to be lnet compatible: sock understands and supports lnet format of end-point addresses (addr_parse_lnet()).
Still, sock is not a drop-in replacement for lnet. The main compatibility problem is due to buffer fragmentation limitations. Memory associated with a buffer (m0_net_buffer::nb_buffer) is a scatter gather list (m0_bufvec) of "segments" (a segment is a continuous memory buffer). A network transport can can place some restrictions on:
- the total size of a memory buffer (m0_net_domain_get_max_buffer_size(),
get_max_buffer_size());
- the number of segments in a buffer
(m0_net_domain_get_max_buffer_segments(), get_max_buffer_segments());
- the size of a segment (m0_net_domain_get_max_buffer_segment_size(),
get_max_buffer_segment_size()).
These restrictions are required by rdma hardware. Specifically, lnet sets the maximal segment size to 4KB and large buffers submitted to lnet have a large number of small 4KB segments. The actual values of lnet limits are assumed by its users. Specifically, it is assumed that the maximal segment size is so small that the size of any network io is its multiple. That is, it is assumed that all segments, including the last one, of any network buffer have the same size: 4KB and buffer size is simply 4KB times the number of segments.
sock has no inherent fragmentation restrictions (because it internally does all that splitting and assembling of packets anyway). Using a large number of small segments is pure overhead and sock sets the maximal segment size to a very large number (get_max_buffer_segment_size()), which breaks the assumption that all io is done in multiples of segment size.
Fixing the transport users to make no assumptions about fragmentation limits (or using different allocation strategies depending on the transport) is a work in progress.
A brief history
Historically, the first motr network transport was based on sunrpc/xdr library (and similar interfaces in the linux kernel). This was abandoned because of the insurmountable problems with making in reliably multi-threaded and performant.
Next transport was based on lnet (net/lnet/, based on lustre lnet, which is itself based on Sandia portals). lnet is a kernel module, which complicates some deployments.
sock transport (net/sock/) is based on the bsd sockets interface, which is fairly portable. While lacking at the moment advanced lnet features like rdma it is a useful basepoint and can be sufficiently performant for cloud deployments (as opposed to hpc).
◆ __MOTR_NET_SOCK_XCODE_H__
#define __MOTR_NET_SOCK_XCODE_H__ |
◆ B_F
#define B_F "[[%i]@%p:"EP_FL"]" |
◆ B_P
#define B_P |
( |
|
b | ) |
(b)->b_buf->nb_qtype, (b), EP_PL((b)->b_other) |
◆ EP_DEBUG
◆ EP_F
◆ EP_FL
◆ EP_GET
◆ EP_P
#define EP_P |
( |
|
e | ) |
SAFE(e, (e)->e_ep.nep_addr, "null") |
◆ EP_PL
◆ EP_PUT
◆ M0_TRACE_SUBSYSTEM
#define M0_TRACE_SUBSYSTEM M0_TRACE_SUBSYS_NET |
◆ MOCK_LNET
◆ SAFE
#define SAFE |
( |
|
val, |
|
|
|
exp, |
|
|
|
fallback |
|
) |
| ((val) != NULL ? (exp) : (fallback)) |
◆ SOCK_F
#define SOCK_F EP_FL "/%i/%" PRIx64 "/%i" |
◆ SOCK_P
Value: SAFE(
s, (
s)->s_sm.sm_state, 0), \
SAFE(
s, (
s)->s_flags, 0),
SAFE(
s, (
s)->s_fd, 0)
#define SAFE(val, exp, fallback)
static struct m0_addb2_source * s
Definition at line 984 of file sock.c.
◆ TLOG
◆ anonymous enum
Enumerator |
---|
M0_NET_SOCK_PROTO_VERSION | |
M0_NET_SOCK_PROTO_PUT | |
M0_NET_SOCK_PROTO_GET | |
Definition at line 3870 of file sock.c.
◆ anonymous enum
sock module binary structures
Definition at line 61 of file xcode.h.
◆ mover_opcode
Type of event that a mover state machine can handle.
- See also
- mover_op()
Enumerator |
---|
M_READ | |
M_WRITE | |
M_CLOSE | |
M_NR | |
Definition at line 698 of file sock.c.
◆ rw_state
State of a mover state machine, stored in mover::m_sm.sm_state.
The state transition diagram (without failure transitions to R_FAIL) is the following:
*
* +-->IDLE +-----+
* | | | |
* | V V |
* | PK--->HEADER--+
* | ^ |
* | | |
* DONE<---PK_DONE---+ |
* ^ |
* | V
* +---------INTERVAL<--+
* | |
* +-------+
*
*
Enumerator |
---|
R_IDLE | |
R_PK | |
R_HEADER | |
R_INTERVAL | |
R_PK_DONE | |
R_FAIL | |
R_DONE | |
R_NR | |
STATE_NR | |
Definition at line 740 of file sock.c.
◆ sock_flags
Flags stored in sock::s_flags.
Enumerator |
---|
HAS_READ | sock has data available for non-blocking read.
|
HAS_WRITE | Non blocking write is possible on the sock.
|
WRITE_POLL | Non-blocking writes are monitored for this sock by epoll(2).
|
Definition at line 707 of file sock.c.
◆ sock_state
State of a sock state machine. Stored in sock::s_sm.sm_state.
Enumerator |
---|
S_INIT | sock has been initialised.
|
S_LISTENING | sock is listening for incoming connections.
Each transfer machine has a single listening socket, initialised in ma_start()->sock_init().
|
S_CONNECTING | Outgoing connection is being established through this sock.
A non-blocking connect(2) has been executed (sock_init()) and its completion has not been reported (in sock_event()).
|
S_OPEN | An outgoing or incoming connection has been established. Data can be transferred through this sock.
|
S_DELETED | The sock has been finalised (sock_done()) and is now placed on ma::t_deathrow list. It will be collected and freed by ma_prune().
|
Definition at line 662 of file sock.c.
◆ addr__print()
M0_INTERNAL void addr__print |
( |
const struct addr * |
addr | ) |
|
◆ addr_decode()
static void addr_decode |
( |
struct addr * |
addr, |
|
|
const struct sockaddr * |
sa |
|
) |
| |
|
static |
Constructs an addr structure from a sockaddr.
Definition at line 2788 of file sock.c.
◆ addr_encode()
static void addr_encode |
( |
const struct addr * |
addr, |
|
|
struct sockaddr * |
sa |
|
) |
| |
|
static |
Encodes an addr in a sockaddr.
Definition at line 2797 of file sock.c.
◆ addr_eq()
static bool addr_eq |
( |
const struct addr * |
a0, |
|
|
const struct addr * |
a1 |
|
) |
| |
|
static |
Returns true iff 2 addresses are equal.
Definition at line 2806 of file sock.c.
◆ addr_invariant()
static bool addr_invariant |
( |
const struct addr * |
addr | ) |
|
|
static |
◆ addr_parse()
static int addr_parse |
( |
struct addr * |
addr, |
|
|
const char * |
name |
|
) |
| |
|
static |
Parses network address.
The following address formats are supported:
- lnet compatible, see nlx_core_ep_addr_decode():
nid:pid:portal:tmid
for example: "10.0.2.15@tcp:12345:34:123" or
"192.168.96.128@tcp1:12345:31:*"
- native sock format, see socket(2):
family:type:ipaddr[@port]
for example: "inet:stream:lanl.gov@23",
"inet6:dgram:FE80::0202:B3FF:FE1E:8329@6663" or
"unix:dgram:/tmp/socket".
Definition at line 2554 of file sock.c.
◆ addr_parse_lnet()
static int addr_parse_lnet |
( |
struct addr * |
addr, |
|
|
const char * |
name |
|
) |
| |
|
static |
◆ addr_parse_native()
static int addr_parse_native |
( |
struct addr * |
addr, |
|
|
const char * |
name |
|
) |
| |
|
static |
◆ addr_print()
static char * addr_print |
( |
const struct addr * |
a | ) |
|
|
static |
Returns the canonical name for an addr.
Definition at line 2748 of file sock.c.
◆ addr_resolve()
static int addr_resolve |
( |
struct addr * |
addr, |
|
|
const char * |
name |
|
) |
| |
|
static |
Converts address name to address.
Currently this involves only name parsing. In the future, things like dns resolution can be added here.
Definition at line 2518 of file sock.c.
◆ bdesc_create()
Creates the descriptor for a (passive) network buffer.
Definition at line 2969 of file sock.c.
◆ bdesc_decode()
◆ bdesc_encode()
◆ bev_deliver_all()
◆ bev_deliver_sync()
◆ bev_notify()
◆ bev_pending()
◆ buf__print()
M0_INTERNAL void buf__print |
( |
const struct buf * |
buf | ) |
|
◆ buf_accept()
static int buf_accept |
( |
struct buf * |
buf, |
|
|
struct mover * |
m |
|
) |
| |
|
static |
Checks that a valid incoming packet (m->m_pk) is received for "buf".
If first packet is received for the buffer, setup the buffer (initialise bitmap, set peer, etc.). Otherwise, check that the packet matches already ongoing transfer.
Definition at line 2835 of file sock.c.
◆ buf_add()
◆ buf_complete()
static void buf_complete |
( |
struct buf * |
buf | ) |
|
|
static |
Invokes completion call-back (releasing tm lock).
Definition at line 2925 of file sock.c.
◆ buf_del()
◆ buf_deregister()
◆ buf_done()
static void buf_done |
( |
struct buf * |
buf, |
|
|
int |
rc |
|
) |
| |
|
static |
Completes the buffer operation.
Definition at line 2900 of file sock.c.
◆ buf_fini()
static void buf_fini |
( |
struct buf * |
buf | ) |
|
|
static |
Finalises the buffer after transfer completes (including failures and cancellations). Note that buffer completion event hasn't yet been delivered. The buffer still remains attached to the network domain, until finally freed by buf_deregister().
Definition at line 2883 of file sock.c.
◆ buf_invariant()
static bool buf_invariant |
( |
const struct buf * |
buf | ) |
|
|
static |
◆ buf_ma()
static struct ma * buf_ma |
( |
struct buf * |
buf | ) |
|
|
static |
◆ buf_register()
◆ dgram_error()
static void dgram_error |
( |
struct mover * |
m, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
◆ dgram_header()
static int dgram_header |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Processes an incoming packet over an datagram socket.
Read the entire packet (header and payload) in one go. The header is read in self->m_pkbuf (see pk_io_prep()). The payload is read in the scratch area.
Parse and verify the header (pk_header_done()), then copy the payload in the associated buffer.
Definition at line 3552 of file sock.c.
◆ dgram_idle()
static int dgram_idle |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Input is available on the previously idle datagram socket.
Do nothing, fall through to the R_PK state.
Definition at line 3523 of file sock.c.
◆ dgram_interval()
static int dgram_interval |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
◆ dgram_pk()
static int dgram_pk |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Starts processing of an incoming packet over a datagram socket.
Allocate, if still not allocated, a scratch area for the payload.
Definition at line 3533 of file sock.c.
◆ dgram_pk_done()
static int dgram_pk_done |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Completes the processing of an incoming packet over a datagram socket.
Definition at line 3587 of file sock.c.
◆ dgram_pk_size()
Returns the maximal packet size for a datagram socket.
A packet must fit in a single datagram.
Definition at line 3598 of file sock.c.
◆ dom_fini()
◆ dom_init()
◆ end_point_create()
◆ ep__print()
M0_INTERNAL void ep__print |
( |
const struct ep * |
ep | ) |
|
◆ ep_add()
static int ep_add |
( |
struct ep * |
ep, |
|
|
struct mover * |
w |
|
) |
| |
|
static |
Adds a writer to an endpoint.
If necessary creates a new socket to the endpoint.
Definition at line 2414 of file sock.c.
◆ ep_balance()
static int ep_balance |
( |
struct ep * |
ep | ) |
|
|
static |
Updates end-point when a writer is added or removed.
If there are writers, but no sockets, open a socket.
If there are sockets, but no writers, stop monitoring sockets for writer.
Definition at line 2456 of file sock.c.
◆ ep_create()
static int ep_create |
( |
struct ma * |
ma, |
|
|
struct addr * |
addr, |
|
|
const char * |
name, |
|
|
struct ep ** |
out |
|
) |
| |
|
static |
Returns the end-point with a given address.
Either finds and returns an existing end-point with elevated reference count or creates a new end-point.
"name" can be provided by the caller. If it is NULL, the canonical name (addr_print()) is used.
Definition at line 2333 of file sock.c.
◆ ep_del()
static void ep_del |
( |
struct mover * |
w | ) |
|
|
static |
Removes a (completed) writer from its end-point.
Stops monitoring end-point sockets for writes, when the last writer is removed.
Definition at line 2431 of file sock.c.
◆ ep_eq()
static bool ep_eq |
( |
const struct ep * |
ep, |
|
|
const struct addr * |
a0 |
|
) |
| |
|
static |
Returns true iff an end-point has a given addr.
Definition at line 2816 of file sock.c.
◆ ep_find()
static int ep_find |
( |
struct ma * |
ma, |
|
|
const char * |
name, |
|
|
struct ep ** |
out |
|
) |
| |
|
static |
Returns (finds or creates) the end-point with the given name.
Definition at line 2379 of file sock.c.
◆ ep_free()
static void ep_free |
( |
struct ep * |
ep | ) |
|
|
static |
Finalises the end-point.
Definition at line 2493 of file sock.c.
◆ ep_get()
static void ep_get |
( |
struct ep * |
ep | ) |
|
|
static |
◆ ep_invariant()
static bool ep_invariant |
( |
const struct ep * |
ep | ) |
|
|
static |
◆ ep_ma()
static struct ma * ep_ma |
( |
struct ep * |
ep | ) |
|
|
static |
Returns end-point transfer machine.
Definition at line 2387 of file sock.c.
◆ ep_net()
Converts generic end-point to its sock structure.
Definition at line 2393 of file sock.c.
◆ ep_put()
static void ep_put |
( |
struct ep * |
ep | ) |
|
|
static |
◆ ep_release()
static void ep_release |
( |
struct m0_ref * |
ref | ) |
|
|
static |
◆ get_done()
static void get_done |
( |
struct mover * |
w, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Completes a GET packet.
Definition at line 3709 of file sock.c.
◆ get_idle()
static int get_idle |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Starts processing of a GET packet.
Definition at line 3689 of file sock.c.
◆ get_max_buffer_desc_size()
◆ get_max_buffer_segment_size()
◆ get_max_buffer_segments()
static int32_t get_max_buffer_segments |
( |
const struct m0_net_domain * |
dom | ) |
|
|
static |
◆ get_max_buffer_size()
◆ get_pk()
static int get_pk |
( |
struct mover * |
cmd, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Fills GET packet, prepares on-wire representation.
Definition at line 3695 of file sock.c.
◆ get_rpc_max_seg_size()
◆ get_rpc_max_segs_nr()
static uint32_t get_rpc_max_segs_nr |
( |
struct m0_net_domain * |
ndom | ) |
|
|
static |
◆ ip4_decode()
static void ip4_decode |
( |
struct addr * |
a, |
|
|
const struct sockaddr * |
sa |
|
) |
| |
|
static |
Fills an addr struct from an ipv4 sockaddr.
Definition at line 2715 of file sock.c.
◆ ip4_encode()
static void ip4_encode |
( |
const struct addr * |
a, |
|
|
struct sockaddr * |
sa |
|
) |
| |
|
static |
Encodes an addr structure in an ipv4 sockaddr.
Definition at line 2702 of file sock.c.
◆ ip6_decode()
static void ip6_decode |
( |
struct addr * |
a, |
|
|
const struct sockaddr * |
sa |
|
) |
| |
|
static |
Fills an addr struct from an ipv6 sockaddr.
Definition at line 2738 of file sock.c.
◆ ip6_encode()
static void ip6_encode |
( |
const struct addr * |
a, |
|
|
struct sockaddr * |
sa |
|
) |
| |
|
static |
Encodes an addr structure in an ipv6 sockaddr.
Definition at line 2724 of file sock.c.
◆ M0_BASSERT()
M0_BASSERT |
( |
sizeof(struct in_addr)<=sizeof(struct in6_addr) |
| ) |
|
◆ m0_net_sock_mod_fini()
M0_INTERNAL void m0_net_sock_mod_fini |
( |
void |
| ) |
|
◆ m0_net_sock_mod_init()
M0_INTERNAL int m0_net_sock_mod_init |
( |
void |
| ) |
|
◆ M0_TL_DEFINE() [1/3]
M0_TL_DEFINE |
( |
s |
, |
|
|
static |
, |
|
|
struct sock |
|
|
) |
| |
◆ M0_TL_DEFINE() [2/3]
M0_TL_DEFINE |
( |
m |
, |
|
|
static |
, |
|
|
struct mover |
|
|
) |
| |
◆ M0_TL_DEFINE() [3/3]
M0_TL_DEFINE |
( |
b |
, |
|
|
static |
, |
|
|
struct buf |
|
|
) |
| |
◆ M0_TL_DESCR_DEFINE() [1/3]
◆ M0_TL_DESCR_DEFINE() [2/3]
◆ M0_TL_DESCR_DEFINE() [3/3]
◆ M0_XCA_DOMAIN()
◆ ma__fini()
static void ma__fini |
( |
struct ma * |
ma | ) |
|
|
static |
Finalises the bulk of ma state.
This is called from the normal finalisation path (ma_fini()) and in error cleanup case during initialisation (ma_start()).
Definition at line 1484 of file sock.c.
◆ ma__print()
M0_INTERNAL void ma__print |
( |
const struct ma * |
ma | ) |
|
◆ ma_buf_done()
static void ma_buf_done |
( |
struct ma * |
ma | ) |
|
|
static |
Finds buffers pending completion and completes them.
A buffer is placed on ma::t_done queue when its operation is done, but the completion call-back cannot be immediately invoked, for example, because completion happened in a synchronous context.
Definition at line 1683 of file sock.c.
◆ ma_buf_timeout()
static void ma_buf_timeout |
( |
struct ma * |
ma | ) |
|
|
static |
Finds queued buffers that timed out and completes them with a prejudice^Werror.
Definition at line 1656 of file sock.c.
◆ ma_confine()
◆ ma_event_post()
Helper function that posts a ma state change event.
Definition at line 1628 of file sock.c.
◆ ma_fini()
Used as m0_net_xprt_ops::xo_ma_fini().
Definition at line 1530 of file sock.c.
◆ ma_init()
Initialises transport-specific part of the transfer machine.
Allocates sock ma and attaches it to the generic part of ma.
Listening socket (ma::t_listen) cannot be initialised before the local address to bind, which is supplied as a parameter to m0_net_xprt_ops::xo_tm_start(), is known.
Poller thread (ma::t_poller) cannot be started, because a call to m0_net_tm_confine() can be done after initialisation.
->epollfd can be initialised here, but it is easier to initialise everything in ma_start().
Used as m0_net_xprt_ops::xo_tm_init().
Definition at line 1445 of file sock.c.
◆ ma_invariant()
static bool ma_invariant |
( |
const struct ma * |
ma | ) |
|
|
static |
◆ ma_is_locked()
static bool ma_is_locked |
( |
const struct ma * |
ma | ) |
|
|
static |
◆ ma_lock()
static void ma_lock |
( |
struct ma * |
ma | ) |
|
|
static |
◆ ma_prune()
static void ma_prune |
( |
struct ma * |
ma | ) |
|
|
static |
Frees finalised sock structures.
Definition at line 1467 of file sock.c.
◆ ma_recv_buf()
Finds a buffer on M0_NET_QT_MSG_RECV queue, ready to receive "len" bytes of data.
Definition at line 1703 of file sock.c.
◆ ma_src()
static struct ep * ma_src |
( |
struct ma * |
ma | ) |
|
|
static |
Returns the "self" end-point of a transfer machine.
Definition at line 1716 of file sock.c.
◆ ma_start()
Starts initialised ma.
Initialises everything that ma_init() didn't. Note that ma is in M0_NET_TM_STARTING state after this returns. Switch to M0_NET_TM_STARTED happens when the poller thread posts special event.
Used as m0_net_xprt_ops::xo_tm_start().
Definition at line 1551 of file sock.c.
◆ ma_stop()
Stops a ma that has been started or is being started.
Reverses the actions of ma_start(). Again, the ma stays in M0_NET_TM_STOPPING state on return, a state change event is delivered separately by the poller thread.
Used as m0_net_xprt_ops::xo_tm_stop().
Definition at line 1603 of file sock.c.
◆ ma_unlock()
static void ma_unlock |
( |
struct ma * |
ma | ) |
|
|
static |
◆ mover__print()
M0_INTERNAL void mover__print |
( |
const struct mover * |
m | ) |
|
◆ mover_fini()
static void mover_fini |
( |
struct mover * |
m | ) |
|
|
static |
◆ mover_init()
◆ mover_invariant()
static bool mover_invariant |
( |
const struct mover * |
m | ) |
|
|
static |
◆ mover_is_reader()
static bool mover_is_reader |
( |
const struct mover * |
m | ) |
|
|
static |
◆ mover_is_writer()
static bool mover_is_writer |
( |
const struct mover * |
m | ) |
|
|
static |
◆ mover_op()
Runs mover state transition(s).
This is a rather complicated function, which tries to achieve multiple intertwined things:
- common mover state machine infrastructure is used both for readers and
writers. Readers come in multiple types depending on the socket type
(stream vs. datagram), while writers send both GET and PUT
packets. This is abstracted by keeping state transition functions in
mover::m_op::v_op[][] array;
- some state transitions do no io (stream_idle(), stream_pk(), etc.),
they only update mover fields. Some others need socket to be writable
(writer_write(), etc.) or readable (stream_header(), etc.). This
function attempts to execute as many state transitions as possible
without blocking;
- avoid monopolising the processor while achieving the previous goal;
- handle unexpected events (e.g., readability of a socket for a writer),
socket being closed by the peer, network errors and state transition
errors;
- handle writer completion.
Definition at line 3056 of file sock.c.
◆ pk_decode()
static void pk_decode |
( |
struct mover * |
m | ) |
|
|
static |
◆ pk_dnob()
Returns how many payload bytes have been transferred in the current packet.
Definition at line 3140 of file sock.c.
◆ pk_done()
static void pk_done |
( |
struct mover * |
m | ) |
|
|
static |
Completes an incoming packet processing.
If all packets for the buffer have been received, complete the buffer. Disassociate the buffer from the reader.
Definition at line 3406 of file sock.c.
◆ pk_encdec()
◆ pk_encode()
static void pk_encode |
( |
struct mover * |
m | ) |
|
|
static |
◆ pk_header_done()
static int pk_header_done |
( |
struct mover * |
m | ) |
|
|
static |
Completes the processing of current packet header for a reader.
This is called when the header has been completely read.
Sanity checks the header, then tries to associate it with a buffer.
Also handles GET packets (see "isget").
Definition at line 3283 of file sock.c.
◆ pk_header_init()
static void pk_header_init |
( |
struct mover * |
m, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Initialises the header for the current packet in a writer.
Definition at line 3261 of file sock.c.
◆ pk_io()
Does packet io.
"bv" is a data buffer for payload.
"tgt" is how many bytes (both header and payload) to try to io.
Definition at line 3222 of file sock.c.
◆ pk_iov_prep()
Prepares iovec for packet io.
Fills a supplied iovec "iv", with "nr" elements, to launch vectorised io.
Returns the number of elements filled. Returns in *count the total number of bytes to be ioed.
"bv" is a data buffer for payload.
"tgt" is how many bytes (both header and payload) to try to io.
Definition at line 3177 of file sock.c.
◆ pk_size()
Returns preferred packet payload size for a given sock.
All, except for the last, packets for a given source buffer have the same size, determined by this function.
Definition at line 3126 of file sock.c.
◆ pk_state()
static int pk_state |
( |
const struct mover * |
m | ) |
|
|
static |
Returns the new state, to which the writer moves after io.
This function is not suitable for readers, because header (specifically, mover::m_pk::p_size) is not initially known.
Definition at line 3151 of file sock.c.
◆ pk_tsize()
Returns the size of the entire current packet (header plus payload).
Definition at line 3132 of file sock.c.
◆ poller()
static void poller |
( |
struct ma * |
ma | ) |
|
|
static |
Main loop of a per-ma thread that polls sockets.
Definition at line 1360 of file sock.c.
◆ sock__print()
M0_INTERNAL void sock__print |
( |
const struct sock * |
sock | ) |
|
◆ sock_close()
static void sock_close |
( |
struct sock * |
s | ) |
|
|
static |
Processes an "error" event for a socket.
Definition at line 1989 of file sock.c.
◆ sock_ctl()
static int sock_ctl |
( |
struct sock * |
s, |
|
|
int |
op, |
|
|
uint32_t |
flags |
|
) |
| |
|
static |
Adds, removes a socket to the epoll instance, or modifies which events are monitored.
Definition at line 2305 of file sock.c.
◆ sock_done()
static void sock_done |
( |
struct sock * |
s, |
|
|
bool |
balance |
|
) |
| |
|
static |
◆ sock_event()
static bool sock_event |
( |
struct sock * |
s, |
|
|
uint32_t |
ev |
|
) |
| |
|
static |
Implements a state transition in a socket state machine.
Returns true iff the event wasn't processed because of buffer shortage. The caller must re-try.
Definition at line 2201 of file sock.c.
◆ sock_fini()
static void sock_fini |
( |
struct sock * |
s | ) |
|
|
static |
◆ sock_in()
static int sock_in |
( |
struct sock * |
s | ) |
|
|
static |
Processes a "readable" event for a socket.
Definition at line 1959 of file sock.c.
◆ sock_init()
static int sock_init |
( |
int |
fd, |
|
|
struct ep * |
src, |
|
|
struct ep * |
tgt, |
|
|
uint32_t |
flags |
|
) |
| |
|
static |
Allocates and initialises a new socket between given endpoints.
"src" is the source end-point, the "self" end-point of the local transfer machine.
"tgt" is the end-point to which the connection is establishes. If "tgt" is NULL, the socket listening for incoming connections is established.
If "fd" is negative, a new socket is created and connected (without blocking). Otherwise (fd >= 0), the socket already exists (returned from accept4(2), see sock_event()), and a sock structure should be created for it.
Definition at line 2079 of file sock.c.
◆ sock_init_fd()
static int sock_init_fd |
( |
int |
fd, |
|
|
struct sock * |
s, |
|
|
struct ep * |
ep, |
|
|
uint32_t |
flags |
|
) |
| |
|
static |
A helper for sock_init().
Create the socket, set options, bind(2) if necessary.
Definition at line 2154 of file sock.c.
◆ sock_invariant()
static bool sock_invariant |
( |
const struct sock * |
s | ) |
|
|
static |
◆ sock_out()
static void sock_out |
( |
struct sock * |
s | ) |
|
|
static |
Processes a "writable" event for a socket.
Definition at line 1968 of file sock.c.
◆ sock_writer()
static struct mover * sock_writer |
( |
struct sock * |
s | ) |
|
|
static |
Returns the writer locked to the socket, if any.
Definition at line 1999 of file sock.c.
◆ stream_error()
static void stream_error |
( |
struct mover * |
m, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Handles an error for a stream socket.
Definition at line 3510 of file sock.c.
◆ stream_header()
static int stream_header |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Reads packet header (or some part thereof).
Loops in R_HEADER state until the entire header has been read.
Definition at line 3466 of file sock.c.
◆ stream_idle()
static int stream_idle |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Input is available on the previously idle stream socket.
Do nothing, fall through to the R_PK state.
Definition at line 3447 of file sock.c.
◆ stream_interval()
static int stream_interval |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Reads an interval (i.e., part of the packet payload).
Loops in R_INTERVAL until the entire packet is read.
Definition at line 3485 of file sock.c.
◆ stream_pk()
static int stream_pk |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Starts incoming packet processing.
Definition at line 3455 of file sock.c.
◆ stream_pk_done()
static int stream_pk_done |
( |
struct mover * |
self, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Completes processing of an incoming packet.
Definition at line 3492 of file sock.c.
◆ stream_pk_size()
Returns the maximal packet size for a stream socket.
There is no reason to split a buffer into multiple packets over a stream socket, so return a very large value here.
Definition at line 3504 of file sock.c.
◆ writer_done()
static void writer_done |
( |
struct mover * |
w, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Handles R_DONE state in a writer.
Definition at line 3670 of file sock.c.
◆ writer_error()
static void writer_error |
( |
struct mover * |
w, |
|
|
struct sock * |
s, |
|
|
int |
rc |
|
) |
| |
|
static |
Completes a writer.
This handles both normal (rc == 0) and error cases.
Remove the writer from the socket and complete the buffer.
Definition at line 3682 of file sock.c.
◆ writer_idle()
static int writer_idle |
( |
struct mover * |
w, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Initialises a writer.
Definition at line 3612 of file sock.c.
◆ writer_pk()
static int writer_pk |
( |
struct mover * |
w, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Starts a packet write-out.
Select the packet size, lock the writer to the socket.
Definition at line 3629 of file sock.c.
◆ writer_pk_done()
static int writer_pk_done |
( |
struct mover * |
w, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Completes packet write-out.
Unlock the writer from the socket. Is all packets for the buffer have been written, complete the writer, otherwise switch to the next packet.
Definition at line 3658 of file sock.c.
◆ writer_write()
static int writer_write |
( |
struct mover * |
w, |
|
|
struct sock * |
s |
|
) |
| |
|
static |
Writes a part of a packet.
Write as much as the socket allows. This deals with both header and payload.
Definition at line 3646 of file sock.c.
◆ autotm
Bitmap of used transfer machine identifiers.
This is used to allocate unique transfer machine identifiers for LNet network addresses with wildcard transfer machine identifier (like "192.168.96.128@tcp1:12345:31:*").
- Todo:
- Move it to m0 instance or make per-domain.
Definition at line 2578 of file sock.c.
◆ dgram_reader_op
Initial value:= {
.v_name = "dgram-reader",
.v_op = {
}
}
static int dgram_header(struct mover *self, struct sock *s)
static int dgram_idle(struct mover *self, struct sock *s)
static int dgram_interval(struct mover *self, struct sock *s)
static int dgram_pk(struct mover *self, struct sock *s)
static int dgram_pk_done(struct mover *self, struct sock *s)
Definition at line 1151 of file sock.c.
◆ get_op
Initial value:= {
.v_name = "get",
.v_op = {
}
}
static int writer_pk_done(struct mover *self, struct sock *s)
static int writer_write(struct mover *self, struct sock *s)
static void get_done(struct mover *w, struct sock *s)
static int get_idle(struct mover *self, struct sock *s)
static int get_pk(struct mover *self, struct sock *s)
static void writer_error(struct mover *w, struct sock *s, int rc)
Definition at line 1153 of file sock.c.
◆ get_tag
Initial value:= {
}
struct m0_format_footer p_footer
#define offsetof(typ, memb)
Definition at line 1159 of file sock.c.
◆ m0_net_lnet_xprt
◆ m0_net_sock_xprt
Initial value:= {
.nx_name = "sock",
}
static const struct m0_net_xprt_ops xprt_ops
Definition at line 3927 of file sock.c.
◆ M0_XCA_DOMAIN
◆ pf
Initial value:= {
[AF_UNIX] = {
.f_name = "unix"
},
[AF_INET] = {
.f_name = "inet",
},
[AF_INET6] = {
.f_name = "inet6",
}
}
static void ip6_decode(struct addr *a, const struct sockaddr *sa)
static void ip4_decode(struct addr *a, const struct sockaddr *sa)
static void ip4_encode(const struct addr *a, struct sockaddr *sa)
static void ip6_encode(const struct addr *a, struct sockaddr *sa)
Definition at line 1155 of file sock.c.
◆ put_tag
Initial value:= {
}
struct m0_format_footer p_footer
#define offsetof(typ, memb)
Definition at line 1158 of file sock.c.
◆ rw_conf
Initial value:= {
.scf_name = "reader-writer",
}
static struct m0_sm_trans_descr rw_conf_trans[]
static struct m0_sm_state_descr rw_conf_state[]
Definition at line 1148 of file sock.c.
◆ rw_conf_state
◆ rw_conf_trans
Initial value:
Definition at line 3791 of file sock.c.
◆ sock_conf
Initial value:= {
.scf_name = "sock",
}
static struct m0_sm_state_descr sock_conf_state[]
static struct m0_sm_trans_descr sock_conf_trans[]
Definition at line 1147 of file sock.c.
◆ sock_conf_state
Initial value:= {
.sd_name = "init",
},
.sd_name = "listening",
},
.sd_name = "connecting",
},
.sd_name = "active",
},
.sd_name = "deleted",
}
}
Definition at line 3714 of file sock.c.
◆ sock_conf_trans
Initial value:
Definition at line 3739 of file sock.c.
◆ stream_reader_op
Initial value:= {
.v_name = "stream-reader",
.v_op = {
}
}
static int stream_pk_done(struct mover *self, struct sock *s)
static int stream_idle(struct mover *self, struct sock *s)
static int stream_header(struct mover *self, struct sock *s)
static int stream_interval(struct mover *self, struct sock *s)
static int stream_pk(struct mover *self, struct sock *s)
Definition at line 1150 of file sock.c.
◆ stype
Initial value:= {
[SOCK_STREAM] = {
.st_name = "stream",
.st_proto = IPPROTO_TCP
},
[SOCK_DGRAM] = {
.st_name = "dgram",
.st_proto = IPPROTO_UDP
}
}
static void stream_error(struct mover *m, struct sock *s)
static m0_bcount_t stream_pk_size(const struct mover *w, const struct sock *s)
static m0_bcount_t dgram_pk_size(const struct mover *w, const struct sock *s)
static const struct mover_op_vec dgram_reader_op
static void dgram_error(struct mover *m, struct sock *s)
static const struct mover_op_vec stream_reader_op
Definition at line 1156 of file sock.c.
◆ writer_op
Initial value:= {
.v_name = "writer",
.v_op = {
}
}
static int writer_idle(struct mover *self, struct sock *s)
static int writer_pk_done(struct mover *self, struct sock *s)
static int writer_pk(struct mover *self, struct sock *s)
static int writer_write(struct mover *self, struct sock *s)
static void writer_done(struct mover *self, struct sock *s)
static void writer_error(struct mover *w, struct sock *s, int rc)
Definition at line 1152 of file sock.c.
◆ xprt_ops
Initial value:= {
}
static void ma_fini(struct m0_net_transfer_mc *ma)
static m0_bcount_t get_max_buffer_segment_size(const struct m0_net_domain *)
static int ma_confine(struct m0_net_transfer_mc *ma, const struct m0_bitmap *processors)
static void buf_deregister(struct m0_net_buffer *nb)
static void dom_fini(struct m0_net_domain *dom)
static void buf_del(struct m0_net_buffer *nb)
static int ma_stop(struct m0_net_transfer_mc *ma, bool cancel)
static int end_point_create(struct m0_net_end_point **epp, struct m0_net_transfer_mc *ma, const char *name)
static int buf_add(struct m0_net_buffer *nb)
static m0_bcount_t get_max_buffer_size(const struct m0_net_domain *dom)
static int buf_register(struct m0_net_buffer *nb)
static void bev_deliver_all(struct m0_net_transfer_mc *ma)
static int ma_start(struct m0_net_transfer_mc *ma, const char *name)
static int bev_deliver_sync(struct m0_net_transfer_mc *ma)
static void bev_notify(struct m0_net_transfer_mc *ma, struct m0_chan *chan)
static int ma_init(struct m0_net_transfer_mc *ma)
static int dom_init(const struct m0_net_xprt *xprt, struct m0_net_domain *dom)
static m0_bcount_t get_max_buffer_desc_size(const struct m0_net_domain *)
static uint32_t get_rpc_max_segs_nr(struct m0_net_domain *ndom)
static bool bev_pending(struct m0_net_transfer_mc *ma)
static m0_bcount_t get_rpc_max_seg_size(struct m0_net_domain *ndom)
M0_INTERNAL uint32_t default_xo_rpc_max_recv_msgs(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
static int32_t get_max_buffer_segments(const struct m0_net_domain *dom)
M0_INTERNAL m0_bcount_t default_xo_rpc_max_msg_size(struct m0_net_domain *ndom, m0_bcount_t rpc_size)
Definition at line 3888 of file sock.c.