[ofa-general] [PATCH] sdp: process RX CQ from interrupt
Amir Vadai
amirv at mellanox.co.il
Mon Jun 22 01:18:44 PDT 2009
Queue RX packets in interrupt context - reduce unnecessary context switches.
Signed-off-by: Amir Vadai <amirv at mellanox.co.il>
---
drivers/infiniband/ulp/sdp/sdp.h | 140 +++++++----
drivers/infiniband/ulp/sdp/sdp_bcopy.c | 66 ++----
drivers/infiniband/ulp/sdp/sdp_cma.c | 114 ++-------
drivers/infiniband/ulp/sdp/sdp_main.c | 124 +++++-----
drivers/infiniband/ulp/sdp/sdp_proc.c | 13 +-
drivers/infiniband/ulp/sdp/sdp_rx.c | 442 ++++++++++++++++++++++---------
drivers/infiniband/ulp/sdp/sdp_tx.c | 160 ++++++++----
7 files changed, 614 insertions(+), 445 deletions(-)
diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h
index 5e78282..57265aa 100644
--- a/drivers/infiniband/ulp/sdp/sdp.h
+++ b/drivers/infiniband/ulp/sdp/sdp.h
@@ -7,11 +7,12 @@
#include <net/tcp.h> /* For urgent data flags */
#include <rdma/ib_verbs.h>
+#undef SDP_LOCKS_CHECK
#define SDPSTATS_ON
-#undef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+#define SDP_PROFILING
#define _sdp_printk(func, line, level, sk, format, arg...) \
- printk(level "%s:%d sdp_sock(%d %d:%d): " format, \
+ printk(level "%s:%d sdp_sock(%5d %d:%d): " format, \
func, line, \
current->pid, \
(sk) ? inet_sk(sk)->num : -1, \
@@ -22,6 +23,34 @@
sdp_printk(KERN_WARNING, sk, format , ## arg)
+#ifdef SDP_LOCKS_CHECK
+#define WARN_ON_UNLOCKED(sk, l) do {\
+ if (unlikely(!spin_is_locked(l))) { \
+ sdp_warn(sk, "lock " #l " should be locked\n"); \
+ WARN_ON(1); \
+ } \
+} while (0)
+
+#define WARN_ON_LOCKED(sk, l) do {\
+ if (unlikely(spin_is_locked(l))) { \
+ sdp_warn(sk, "lock " #l " should be unlocked\n"); \
+ WARN_ON(1); \
+ } \
+} while (0)
+#else
+#define WARN_ON_UNLOCKED(sk, l)
+#define WARN_ON_LOCKED(sk, l)
+#endif
+
+#define rx_ring_lock(ssk, f) do { \
+ spin_lock_irqsave(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#define rx_ring_unlock(ssk, f) do { \
+ spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#ifdef SDP_PROFILING
struct sk_buff;
struct sdpprf_log {
int idx;
@@ -37,7 +66,7 @@ struct sdpprf_log {
int line;
};
-#define SDPPRF_LOG_SIZE 0x10000 /* must be a power of 2 */
+#define SDPPRF_LOG_SIZE 0x20000 /* must be a power of 2 */
extern struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
extern int sdpprf_log_count;
@@ -48,7 +77,6 @@ static inline unsigned long long current_nsec(void)
getnstimeofday(&tv);
return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec;
}
-#if 1
#define sdp_prf(sk, s, format, arg...) ({ \
struct sdpprf_log *l = &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \
l->idx = sdpprf_log_count - 1; \
@@ -66,16 +94,6 @@ static inline unsigned long long current_nsec(void)
#define sdp_prf(sk, s, format, arg...)
#endif
-#if 0
-#if 1
-#define sdp_prf_rx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg)
-#define sdp_prf_tx(sk, s, format, arg...)
-#else
-#define sdp_prf_rx(sk, s, format, arg...)
-#define sdp_prf_tx(sk, s, format, arg...) sdp_prf(sk, s, format, ## arg)
-#endif
-#endif
-
#ifdef CONFIG_INFINIBAND_SDP_DEBUG
extern int sdp_debug_level;
@@ -125,7 +143,6 @@ extern int sdp_data_debug_level;
} while (0)
#else
#define sdp_dbg_data(priv, format, arg...)
-// do { (void) (priv); } while (0)
#define SDP_DUMP_PACKET(sk, str, skb, h)
#endif
@@ -300,23 +317,36 @@ struct sdp_buf {
u64 mapping[SDP_MAX_SEND_SKB_FRAGS + 1];
};
+#define ring_head(ring) (atomic_read(&(ring).head))
+#define ring_tail(ring) (atomic_read(&(ring).tail))
+#define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
struct sdp_tx_ring {
struct sdp_buf *buffer;
- unsigned head;
- unsigned tail;
+ atomic_t head;
+ atomic_t tail;
+ struct ib_cq *cq;
int una_seq;
- unsigned credits;
+ atomic_t credits;
+#define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits))
struct timer_list timer;
u16 poll_cnt;
+};
+
+struct sdp_rx_ring {
+ struct sdp_buf *buffer;
+ atomic_t head;
+ atomic_t tail;
struct ib_cq *cq;
+
+ spinlock_t lock;
};
static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
{
- return SDP_TX_SIZE - (tx_ring->head - tx_ring->tail);
+ return SDP_TX_SIZE - ring_posted(*tx_ring);
}
struct sdp_chrecvbuf {
@@ -329,6 +359,7 @@ struct sdp_sock {
struct list_head sock_list;
struct list_head accept_queue;
struct list_head backlog_queue;
+ struct sk_buff_head rx_backlog;
struct sock *parent;
struct work_struct rx_comp_work;
@@ -362,32 +393,27 @@ struct sdp_sock {
int sdp_disconnect;
int destruct_in_process;
-
+ struct sdp_rx_ring rx_ring;
+ struct sdp_tx_ring tx_ring;
/* Data below will be reset on error */
struct rdma_cm_id *id;
struct ib_device *ib_device;
/* SDP specific */
- struct ib_recv_wr rx_wr;
- unsigned rx_head;
- unsigned rx_tail;
- unsigned mseq_ack;
+ atomic_t mseq_ack;
+#define mseq_ack(ssk) (atomic_read(&ssk->mseq_ack))
unsigned max_bufs; /* Initial buffers offered by other side */
unsigned min_bufs; /* Low water mark to wake senders */
- int remote_credits;
+ atomic_t remote_credits;
+#define remote_credits(ssk) (atomic_read(&ssk->remote_credits))
int poll_cq;
/* rdma specific */
struct ib_qp *qp;
- struct ib_cq *rx_cq;
struct ib_mr *mr;
- struct sdp_buf *rx_ring;
- struct sdp_tx_ring tx_ring;
- struct ib_send_wr tx_wr;
-
/* SDP slow start */
int rcvbuf_scale; /* local recv buf scale for each socket */
int sent_request_head; /* mark the tx_head of the last send resize
@@ -402,8 +428,6 @@ struct sdp_sock {
/* BZCOPY data */
int zcopy_thresh;
-
- struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
};
/* Context used for synchronous zero copy bcopy (BZCOY) */
@@ -499,13 +523,6 @@ static inline void sdp_set_error(struct sock *sk, int err)
sk->sk_error_report(sk);
}
-static inline void sdp_arm_rx_cq(struct sock *sk)
-{
- sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n");
-
- ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP);
-}
-
#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
void _dump_packet(const char *func, int line, struct sock *sk, char *str,
struct sk_buff *skb, const struct sdp_bsdh *h);
@@ -524,28 +541,53 @@ void sdp_remove_sock(struct sdp_sock *ssk);
void sdp_remove_large_sock(struct sdp_sock *ssk);
void sdp_post_keepalive(struct sdp_sock *ssk);
void sdp_start_keepalive_timer(struct sock *sk);
-void sdp_bzcopy_write_space(struct sdp_sock *ssk);
int sdp_init_sock(struct sock *sk);
int __init sdp_proc_init(void);
void sdp_proc_unregister(void);
+/* sdp_tx.c */
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_tx_ring_destroy(struct sdp_sock *ssk);
int sdp_xmit_poll(struct sdp_sock *ssk, int force);
-void sdp_tx_ring_purge(struct sdp_sock *ssk);
void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonagle);
#define sdp_post_sends(ssk, nonagle) _sdp_post_sends(__func__, __LINE__, ssk, nonagle)
-void sdp_process_tx_wc_work(struct work_struct *work);
-void sdp_poll_tx_cq(unsigned long data);
-void _sdp_poll_tx_cq(unsigned long data);
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
+/* sdp_rx.c */
+void sdp_rx_ring_init(struct sdp_sock *ssk);
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_rx_ring_destroy(struct sdp_sock *ssk);
+int sdp_process_rx_q(struct sdp_sock *ssk);
int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
-void sdp_rx_ring_purge(struct sdp_sock *ssk);
void sdp_post_recvs(struct sdp_sock *ssk);
-void sdp_rx_comp_work(struct work_struct *work);
-int sdp_poll_rx_cq(struct sdp_sock *ssk);
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
+
+static inline void sdp_arm_rx_cq(struct sock *sk)
+{
+ sdp_prf(sk, NULL, "Arming RX cq");
+ sdp_dbg_data(sk, "Arming RX cq\n");
+
+ ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
+/* utilities */
+static inline char *mid2str(int mid)
+{
+#define ENUM2STR(e) [e] = #e
+ static char *mid2str[] = {
+ ENUM2STR(SDP_MID_HELLO),
+ ENUM2STR(SDP_MID_HELLO_ACK),
+ ENUM2STR(SDP_MID_DISCONN),
+ ENUM2STR(SDP_MID_CHRCVBUF),
+ ENUM2STR(SDP_MID_CHRCVBUF_ACK),
+ ENUM2STR(SDP_MID_DATA),
+ };
+
+ if (mid >= ARRAY_SIZE(mid2str))
+ return NULL;
+
+ return mid2str[mid];
+}
static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
{
diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
index 5d4441a..b98171e 100644
--- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c
+++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
@@ -45,18 +45,9 @@ void _dump_packet(const char *func, int line, struct sock *sk, char *str,
{
int len = 0;
char buf[256];
-#define ENUM2STR(e) [e] = #e
- static char *mid2str[] = {
- ENUM2STR(SDP_MID_HELLO),
- ENUM2STR(SDP_MID_HELLO_ACK),
- ENUM2STR(SDP_MID_DISCONN),
- ENUM2STR(SDP_MID_CHRCVBUF),
- ENUM2STR(SDP_MID_CHRCVBUF_ACK),
- ENUM2STR(SDP_MID_DATA),
- };
- len += snprintf(buf, 255-len, "skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d "
+ len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x bufs: %d "
"len: %d mseq: %d mseq_ack: %d",
- skb, h->mid, mid2str[h->mid], h->flags,
+ str, skb, h->mid, mid2str(h->mid), h->flags,
ntohs(h->bufs), ntohl(h->len),ntohl(h->mseq),
ntohl(h->mseq_ack));
@@ -117,7 +108,7 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
return (ssk->nonagle & TCP_NAGLE_OFF) ||
skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
- (ssk->tx_ring.tail == ssk->tx_ring.head &&
+ (ring_tail(ssk->tx_ring) == ring_head(ssk->tx_ring) &&
!(ssk->nonagle & TCP_NAGLE_CORK)) ||
(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
}
@@ -125,26 +116,14 @@ static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
int sdp_post_credits(struct sdp_sock *ssk)
{
int post_count = 0;
- struct sk_buff *skb;
sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
"tx ring slots left: %d send_head: %p\n",
- ssk->tx_ring.credits, ssk->remote_credits,
+ tx_credits(ssk), remote_credits(ssk),
sdp_tx_ring_slots_left(&ssk->tx_ring),
ssk->isk.sk.sk_send_head);
- if (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS &&
- sdp_tx_ring_slots_left(&ssk->tx_ring) &&
- (skb = ssk->isk.sk.sk_send_head) &&
- sdp_nagle_off(ssk, skb)) {
- update_send_head(&ssk->isk.sk, skb);
- __skb_dequeue(&ssk->isk.sk.sk_write_queue);
- sdp_post_send(ssk, skb, SDP_MID_DATA);
- post_count++;
- goto out;
- }
-
- if (likely(ssk->tx_ring.credits > 1) &&
+ if (likely(tx_credits(ssk) > 1) &&
likely(sdp_tx_ring_slots_left(&ssk->tx_ring))) {
struct sk_buff *skb;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -156,7 +135,6 @@ int sdp_post_credits(struct sdp_sock *ssk)
post_count++;
}
-out:
if (post_count)
sdp_xmit_poll(ssk, 0);
return post_count;
@@ -188,7 +166,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
gfp_page = GFP_KERNEL;
sdp_dbg_data(&ssk->isk.sk, "credits: %d tx ring slots left: %d send_head: %p\n",
- ssk->tx_ring.credits, sdp_tx_ring_slots_left(&ssk->tx_ring),
+ tx_credits(ssk), sdp_tx_ring_slots_left(&ssk->tx_ring),
ssk->isk.sk.sk_send_head);
if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) {
@@ -197,8 +175,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
}
if (ssk->recv_request &&
- ssk->rx_tail >= ssk->recv_request_head &&
- ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS &&
+ ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
+ tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
sdp_tx_ring_slots_left(&ssk->tx_ring)) {
struct sdp_chrecvbuf *resp_size;
ssk->recv_request = 0;
@@ -214,7 +192,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
post_count++;
}
- if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS &&
+ if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS &&
sdp_tx_ring_slots_left(&ssk->tx_ring) &&
(skb = ssk->isk.sk.sk_send_head) &&
sdp_nagle_off(ssk, skb)) {
@@ -222,7 +200,7 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
sdp_prf(&ssk->isk.sk, skb, "no credits. called from %s:%d", func, line);
}
- while (ssk->tx_ring.credits > SDP_MIN_TX_CREDITS &&
+ while (tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
sdp_tx_ring_slots_left(&ssk->tx_ring) &&
(skb = ssk->isk.sk.sk_send_head) &&
sdp_nagle_off(ssk, skb)) {
@@ -232,9 +210,9 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
post_count++;
}
- if (0 && ssk->tx_ring.credits == SDP_MIN_TX_CREDITS &&
+ if (0 && tx_credits(ssk) == SDP_MIN_TX_CREDITS &&
!ssk->sent_request &&
- ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
+ ring_head(ssk->tx_ring) > ssk->sent_request_head + SDP_RESIZE_WAIT &&
sdp_tx_ring_slots_left(&ssk->tx_ring)) {
struct sdp_chrecvbuf *req_size;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -244,19 +222,19 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
/* FIXME */
BUG_ON(!skb);
ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
- ssk->sent_request_head = ssk->tx_ring.head;
+ ssk->sent_request_head = ring_head(ssk->tx_ring);
req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
req_size->size = htonl(ssk->sent_request);
sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
post_count++;
}
- c = ssk->remote_credits;
+ c = remote_credits(ssk);
if (likely(c > SDP_MIN_TX_CREDITS))
c *= 2;
- if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
- likely(ssk->tx_ring.credits > 1) &&
+ if (unlikely(c < ring_posted(ssk->rx_ring)) &&
+ likely(tx_credits(ssk) > 1) &&
likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
likely((1 << ssk->isk.sk.sk_state) &
(TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
@@ -276,8 +254,8 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
* messages that provide additional credits and also do not contain ULP
* payload. */
if (unlikely(ssk->sdp_disconnect) &&
- !ssk->isk.sk.sk_send_head &&
- ssk->tx_ring.credits > (ssk->remote_credits >= ssk->rx_head - ssk->rx_tail)) {
+ !ssk->isk.sk.sk_send_head &&
+ tx_credits(ssk) > 1) {
ssk->sdp_disconnect = 0;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
sizeof(struct sdp_bsdh),
@@ -287,14 +265,10 @@ void _sdp_post_sends(const char *func, int line, struct sdp_sock *ssk, int nonag
sdp_post_send(ssk, skb, SDP_MID_DISCONN);
post_count++;
}
+
if (post_count) {
sdp_xmit_poll(ssk, 0);
- sdp_prf(&ssk->isk.sk, NULL, "finshed polling from post_sends");
+ sdp_prf(&ssk->isk.sk, NULL, "post_sends finished polling [%s:%d].", func, line);
}
}
-
-static inline int sdp_tx_qp_empty(struct sdp_sock *ssk)
-{
- return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0;
-}
diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c
index e0d0b20..a9dcf77 100644
--- a/drivers/infiniband/ulp/sdp/sdp_cma.c
+++ b/drivers/infiniband/ulp/sdp/sdp_cma.c
@@ -53,10 +53,6 @@ enum {
SDP_HAH_SIZE = 180,
};
-static void sdp_cq_event_handler(struct ib_event *event, void *data)
-{
-}
-
static void sdp_qp_event_handler(struct ib_event *event, void *data)
{
}
@@ -73,36 +69,12 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
.qp_type = IB_QPT_RC,
};
struct ib_device *device = id->device;
- struct ib_cq *rx_cq, *tx_cq;
struct ib_mr *mr;
struct ib_pd *pd;
int rc;
sdp_dbg(sk, "%s\n", __func__);
- sdp_sk(sk)->tx_ring.head = 1;
- sdp_sk(sk)->tx_ring.tail = 1;
- sdp_sk(sk)->rx_head = 1;
- sdp_sk(sk)->rx_tail = 1;
-
- sdp_sk(sk)->tx_ring.buffer = kmalloc(sizeof(*sdp_sk(sk)->tx_ring.buffer) *
- (SDP_TX_SIZE + 1), GFP_KERNEL);
- if (!sdp_sk(sk)->tx_ring.buffer) {
- rc = -ENOMEM;
- sdp_warn(sk, "Unable to allocate TX Ring size %zd.\n",
- sizeof *sdp_sk(sk)->tx_ring.buffer * (SDP_TX_SIZE + 1));
- goto err_tx;
- }
-
- sdp_sk(sk)->rx_ring = kmalloc(sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE,
- GFP_KERNEL);
- if (!sdp_sk(sk)->rx_ring) {
- rc = -ENOMEM;
- sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n",
- sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE);
- goto err_rx;
- }
-
pd = ib_alloc_pd(device);
if (IS_ERR(pd)) {
rc = PTR_ERR(pd);
@@ -118,43 +90,15 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
}
sdp_sk(sk)->mr = mr;
- INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work);
-
- rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler,
- sk, SDP_RX_SIZE, 0);
-
- if (IS_ERR(rx_cq)) {
- rc = PTR_ERR(rx_cq);
- sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc);
- goto err_rx_cq;
- }
- rc = ib_modify_cq(rx_cq, 10, 200);
- if (rc) {
- sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc);
- goto err_tx_cq;
- }
- sdp_warn(sk, "Initialized CQ moderation\n");
- sdp_sk(sk)->rx_cq = rx_cq;
- sdp_arm_rx_cq(sk);
- qp_init_attr.recv_cq = rx_cq;
-
- tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler,
- sk, SDP_TX_SIZE, 0);
-
- if (IS_ERR(tx_cq)) {
- rc = PTR_ERR(tx_cq);
- sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc);
- goto err_tx_cq;
- }
+ if ((rc = sdp_rx_ring_create(sdp_sk(sk), device)))
+ goto err_rx;
- init_timer(&sdp_sk(sk)->tx_ring.timer);
- sdp_sk(sk)->tx_ring.timer.function = _sdp_poll_tx_cq;
- sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk);
- sdp_sk(sk)->tx_ring.poll_cnt = 0;
+ if ((rc = sdp_tx_ring_create(sdp_sk(sk), device)))
+ goto err_tx;
- sdp_sk(sk)->tx_ring.cq = tx_cq;
- qp_init_attr.send_cq = tx_cq;
+ qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq;
+ qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq;
rc = rdma_create_qp(id, pd, &qp_init_attr);
if (rc) {
@@ -170,20 +114,14 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
return 0;
err_qp:
- ib_destroy_cq(tx_cq);
-err_tx_cq:
- ib_destroy_cq(rx_cq);
-err_rx_cq:
+ sdp_tx_ring_destroy(sdp_sk(sk));
+err_tx:
+ sdp_rx_ring_destroy(sdp_sk(sk));
+err_rx:
ib_dereg_mr(sdp_sk(sk)->mr);
err_mr:
ib_dealloc_pd(pd);
err_pd:
- kfree(sdp_sk(sk)->rx_ring);
- sdp_sk(sk)->rx_ring = NULL;
-err_rx:
- kfree(sdp_sk(sk)->tx_ring.buffer);
- sdp_sk(sk)->tx_ring.buffer = NULL;
-err_tx:
return rc;
}
@@ -225,8 +163,10 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
sdp_add_sock(sdp_sk(child));
- sdp_sk(child)->max_bufs = sdp_sk(child)->tx_ring.credits = ntohs(h->bsdh.bufs);
- sdp_sk(child)->min_bufs = sdp_sk(child)->tx_ring.credits / 4;
+ sdp_sk(child)->max_bufs = ntohs(h->bsdh.bufs);
+ atomic_set(&sdp_sk(child)->tx_ring.credits, sdp_sk(child)->max_bufs);
+
+ sdp_sk(child)->min_bufs = tx_credits(sdp_sk(child)) / 4;
sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
sizeof(struct sdp_bsdh);
sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
@@ -236,7 +176,7 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
sdp_dbg(child, "%s recv_frags: %d tx credits %d xmit_size_goal %d send trigger %d\n",
__func__,
sdp_sk(child)->recv_frags,
- sdp_sk(child)->tx_ring.credits,
+ tx_credits(sdp_sk(child)),
sdp_sk(child)->xmit_size_goal,
sdp_sk(child)->min_bufs);
@@ -272,8 +212,9 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
h = event->param.conn.private_data;
SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh);
- sdp_sk(sk)->max_bufs = sdp_sk(sk)->tx_ring.credits = ntohs(h->bsdh.bufs);
- sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4;
+ sdp_sk(sk)->max_bufs = ntohs(h->bsdh.bufs);
+ atomic_set(&sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->max_bufs);
+ sdp_sk(sk)->min_bufs = tx_credits(sdp_sk(sk)) / 4;
sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
sizeof(struct sdp_bsdh);
sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
@@ -282,14 +223,12 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
sdp_sk(sk)->send_frags * PAGE_SIZE);
sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n",
- sdp_sk(sk)->tx_ring.credits,
+ tx_credits(sdp_sk(sk)),
sdp_sk(sk)->xmit_size_goal,
sdp_sk(sk)->send_frags,
sdp_sk(sk)->min_bufs);
sdp_sk(sk)->poll_cq = 1;
- sdp_arm_rx_cq(sk);
- sdp_poll_rx_cq(sdp_sk(sk));
sk->sk_state_change(sk);
sk_wake_async(sk, 0, POLL_OUT);
@@ -349,8 +288,7 @@ static int sdp_disconnected_handler(struct sock *sk)
sdp_dbg(sk, "%s\n", __func__);
- if (ssk->rx_cq)
- sdp_poll_rx_cq(ssk);
+ sdp_process_rx_q(ssk);
if (ssk->tx_ring.cq)
sdp_xmit_poll(ssk, 1);
@@ -400,7 +338,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
rc = rdma_resolve_route(id, SDP_ROUTE_TIMEOUT);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
- sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
+ sdp_warn(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
rc = -ENETUNREACH;
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
@@ -408,11 +346,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
rc = sdp_init_qp(sk, id);
if (rc)
break;
- sdp_sk(sk)->remote_credits = sdp_sk(sk)->rx_head -
- sdp_sk(sk)->rx_tail;
+ atomic_set(&sdp_sk(sk)->remote_credits, ring_posted(sdp_sk(sk)->rx_ring));
memset(&hh, 0, sizeof hh);
hh.bsdh.mid = SDP_MID_HELLO;
- hh.bsdh.bufs = htons(sdp_sk(sk)->remote_credits);
+ hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
hh.max_adverts = 1;
hh.majv_minv = SDP_MAJV_MINV;
@@ -443,11 +380,10 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
break;
}
child = id->context;
- sdp_sk(child)->remote_credits = sdp_sk(child)->rx_head -
- sdp_sk(child)->rx_tail;
+ atomic_set(&sdp_sk(child)->remote_credits, ring_posted(sdp_sk(child)->rx_ring));
memset(&hah, 0, sizeof hah);
hah.bsdh.mid = SDP_MID_HELLO_ACK;
- hah.bsdh.bufs = htons(sdp_sk(child)->remote_credits);
+ hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
hah.majv_minv = SDP_MAJV_MINV;
hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c
index b5322da..c6b17db 100644
--- a/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -206,31 +206,24 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
static void sdp_destroy_qp(struct sdp_sock *ssk)
{
struct ib_pd *pd = NULL;
- struct ib_cq *rx_cq = NULL;
- struct ib_cq *tx_cq = NULL;
+ unsigned long flags;
+
+
+ sdp_dbg(&ssk->isk.sk, "destroying qp\n");
del_timer(&ssk->tx_ring.timer);
+ rx_ring_lock(ssk, flags);
+
+ sdp_rx_ring_destroy(ssk);
+ sdp_tx_ring_destroy(ssk);
+
if (ssk->qp) {
pd = ssk->qp->pd;
- rx_cq = ssk->rx_cq;
- ssk->rx_cq = NULL;
- tx_cq = ssk->tx_ring.cq;
- ssk->tx_ring.cq = NULL;
ib_destroy_qp(ssk->qp);
ssk->qp = NULL;
-
- sdp_rx_ring_purge(ssk);
- sdp_tx_ring_purge(ssk);
- }
-
- if (tx_cq) {
- ib_destroy_cq(tx_cq);
}
- if (rx_cq)
- ib_destroy_cq(rx_cq);
-
if (ssk->mr) {
ib_dereg_mr(ssk->mr);
ssk->mr = NULL;
@@ -241,14 +234,8 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
sdp_remove_large_sock(ssk);
- if (ssk->rx_ring) {
- kfree(ssk->rx_ring);
- ssk->rx_ring = NULL;
- }
- if (ssk->tx_ring.buffer) {
- kfree(ssk->tx_ring.buffer);
- ssk->tx_ring.buffer = NULL;
- }
+ rx_ring_unlock(ssk, flags);
+
}
static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
@@ -257,8 +244,8 @@ static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
sdp_dbg(sk, "%s\n", __func__);
- ssk->keepalive_tx_head = ssk->tx_ring.head;
- ssk->keepalive_rx_head = ssk->rx_head;
+ ssk->keepalive_tx_head = ring_head(ssk->tx_ring);
+ ssk->keepalive_rx_head = ring_head(ssk->rx_ring);
sk_reset_timer(sk, &sk->sk_timer, jiffies + len);
}
@@ -293,8 +280,8 @@ static void sdp_keepalive_timer(unsigned long data)
sk->sk_state == TCP_CLOSE)
goto out;
- if (ssk->keepalive_tx_head == ssk->tx_ring.head &&
- ssk->keepalive_rx_head == ssk->rx_head)
+ if (ssk->keepalive_tx_head == ring_head(ssk->tx_ring) &&
+ ssk->keepalive_rx_head == ring_head(ssk->rx_ring))
sdp_post_keepalive(ssk);
sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(ssk));
@@ -338,17 +325,21 @@ void sdp_reset_sk(struct sock *sk, int rc)
read_lock(&device_removal_lock);
- if (ssk->rx_cq)
- sdp_poll_rx_cq(ssk);
+ sdp_process_rx_q(ssk);
if (ssk->tx_ring.cq)
sdp_xmit_poll(ssk, 1);
- if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
+ if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
+ sdp_warn(sk, "setting state to error. shutdown: %d, mem_free: %d\n",
+ !(sk->sk_shutdown & RCV_SHUTDOWN),
+ !sk_stream_memory_free(sk));
sdp_set_error(sk, rc);
+ }
sdp_destroy_qp(ssk);
+ sdp_dbg(sk, "memset on sdp_sock\n");
memset((void *)&ssk->id, 0, sizeof(*ssk) - offsetof(typeof(*ssk), id));
sk->sk_state_change(sk);
@@ -488,6 +479,7 @@ static void sdp_close(struct sock *sk, long timeout)
lock_sock(sk);
sdp_dbg(sk, "%s\n", __func__);
+ sdp_prf(sk, NULL, __func__);
sdp_delete_keepalive_timer(sk);
@@ -773,10 +765,9 @@ out:
release_sock(sk);
if (newsk) {
lock_sock(newsk);
- if (newssk->rx_cq) {
+ if (newssk->rx_ring.cq) {
newssk->poll_cq = 1;
sdp_arm_rx_cq(&newssk->isk.sk);
- sdp_poll_rx_cq(newssk);
}
release_sock(newsk);
}
@@ -934,7 +925,11 @@ int sdp_init_sock(struct sock *sk)
sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
- ssk->rx_ring = NULL;
+ skb_queue_head_init(&ssk->rx_backlog);
+
+ atomic_set(&ssk->mseq_ack, 0);
+
+ sdp_rx_ring_init(ssk);
ssk->tx_ring.buffer = NULL;
ssk->sdp_disconnect = 0;
ssk->destructed_already = 0;
@@ -1142,14 +1137,13 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
static inline int poll_recv_cq(struct sock *sk)
{
int i;
- if (sdp_sk(sk)->rx_cq) {
- for (i = 0; i < recv_poll; ++i)
- if (!sdp_poll_rx_cq(sdp_sk(sk))) {
- ++recv_poll_hit;
- return 0;
- }
- ++recv_poll_miss;
+ for (i = 0; i < recv_poll; ++i) {
+ if (!sdp_process_rx_q(sdp_sk(sk))) {
+ ++recv_poll_hit;
+ return 0;
+ }
}
+ ++recv_poll_miss;
return 1;
}
@@ -1551,8 +1545,8 @@ static inline int slots_free(struct sdp_sock *ssk)
{
int min_free;
- min_free = MIN(ssk->tx_ring.credits,
- SDP_TX_SIZE - (ssk->tx_ring.head - ssk->tx_ring.tail));
+ min_free = MIN(tx_credits(ssk),
+ SDP_TX_SIZE - ring_posted(ssk->tx_ring));
if (min_free < SDP_MIN_TX_CREDITS)
return 0;
@@ -1608,6 +1602,9 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk->sk_write_pending++;
+ sdp_prf(sk, NULL, "credits: %d, head: %d, tail: %d, busy: %d",
+ tx_credits(ssk), ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring),
+ bz->busy);
sk_wait_event(sk, ¤t_timeo,
sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
sk->sk_write_pending--;
@@ -1627,24 +1624,6 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
return err;
}
-/* like sk_stream_write_space - execpt measures remote credits */
-void sdp_bzcopy_write_space(struct sdp_sock *ssk)
-{
- struct sock *sk = &ssk->isk.sk;
- struct socket *sock = sk->sk_socket;
-
- if (ssk->tx_ring.credits >= ssk->min_bufs &&
- ssk->tx_ring.head == ssk->tx_ring.tail &&
- sock != NULL) {
- clear_bit(SOCK_NOSPACE, &sock->flags);
-
- if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
- wake_up_interruptible(sk->sk_sleep);
- if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
- sock_wake_async(sock, 2, POLL_OUT);
- }
-}
-
/* Like tcp_sendmsg */
/* TODO: check locking */
static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1735,7 +1714,7 @@ new_segment:
if (!skb)
goto wait_for_memory;
- sdp_prf(sk, skb, "Created");
+// sdp_prf(sk, skb, "Created");
BZCOPY_STATE(skb) = bz;
@@ -1751,7 +1730,7 @@ new_segment:
skb_entail(sk, ssk, skb);
copy = size_goal;
} else {
- sdp_prf(sk, skb, "adding %d bytes", copy);
+// sdp_prf(sk, skb, "adding %d bytes", copy);
sdp_dbg_data(sk, "adding to existing skb: %p"
" len = %d, sk_send_head: %p copy: %d\n",
skb, skb->len, sk->sk_send_head, copy);
@@ -1770,10 +1749,8 @@ new_segment:
goto new_segment;
}
-// sdp_prf(sk, skb, "before memcpy %d bytes", copy);
copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) :
sdp_bcopy_get(sk, skb, from, copy);
-// sdp_prf(sk, skb, "after memcpy. result: %d", copy);
if (unlikely(copy < 0)) {
if (!++copy)
goto wait_for_memory;
@@ -1863,6 +1840,20 @@ out_err:
return err;
}
+int dummy_memcpy_toiovec(struct iovec *iov, int len)
+{
+ while (len > 0) {
+ if (iov->iov_len) {
+ int copy = min_t(unsigned int, iov->iov_len, len);
+ len -= copy;
+ iov->iov_len -= copy;
+ iov->iov_base += copy;
+ }
+ iov++;
+ }
+
+ return 0;
+}
/* Like tcp_recvmsg */
/* Maybe use skb_recv_datagram here? */
/* Note this does not seem to handle vectored messages. Relevant? */
@@ -1884,7 +1875,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
lock_sock(sk);
sdp_dbg_data(sk, "%s\n", __func__);
- sdp_prf(sk, skb, "Read from user");
+// sdp_prf(sk, skb, "Read from user");
err = -ENOTCONN;
if (sk->sk_state == TCP_LISTEN)
@@ -2024,6 +2015,7 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
err = skb_copy_datagram_iovec(skb, offset,
/* TODO: skip header? */
msg->msg_iov, used);
+// err = dummy_memcpy_toiovec(msg->msg_iov, used);
sdp_prf(sk, skb, "Copied to user %ld bytes. err = %d", used, err);
if (err) {
sdp_dbg(sk, "%s: skb_copy_datagram_iovec failed"
diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c
index 3778c9a..537b3a6 100644
--- a/drivers/infiniband/ulp/sdp/sdp_proc.c
+++ b/drivers/infiniband/ulp/sdp/sdp_proc.c
@@ -236,15 +236,6 @@ static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n, in
static int sdpstats_seq_show(struct seq_file *seq, void *v)
{
-#define ENUM2STR(e) [e] = #e
- static char *mid2str[] = {
- ENUM2STR(SDP_MID_HELLO),
- ENUM2STR(SDP_MID_HELLO_ACK),
- ENUM2STR(SDP_MID_DISCONN),
- ENUM2STR(SDP_MID_CHRCVBUF),
- ENUM2STR(SDP_MID_CHRCVBUF_ACK),
- ENUM2STR(SDP_MID_DATA),
- };
int i;
seq_printf(seq, "SDP statistics:\n");
@@ -268,9 +259,9 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
seq_printf(seq, "memcpy_count \t\t: %u\n", sdpstats.memcpy_count);
for (i = 0; i < ARRAY_SIZE(sdpstats.post_send); i++) {
- if (mid2str[i]) {
+ if (mid2str(i)) {
seq_printf(seq, "post_send %-20s\t: %d\n",
- mid2str[i], sdpstats.post_send[i]);
+ mid2str(i), sdpstats.post_send[i]);
}
}
diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c
index ba8e0fe..810dcbb 100644
--- a/drivers/infiniband/ulp/sdp/sdp_rx.c
+++ b/drivers/infiniband/ulp/sdp/sdp_rx.c
@@ -39,7 +39,7 @@
static int rcvbuf_scale = 0x10;
-int rcvbuf_initial_size = SDP_HEAD_SIZE;
+int rcvbuf_initial_size = 32 * 1024;
module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644);
MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes.");
@@ -149,22 +149,28 @@ static void sdp_fin(struct sock *sk)
}
}
-
-static void sdp_post_recv(struct sdp_sock *ssk)
+/* lock_sock must be taken before calling this - since rx_ring.head is not
+ * protected (although being atomic
+ */
+static int sdp_post_recv(struct sdp_sock *ssk)
{
struct sdp_buf *rx_req;
int i, rc, frags;
u64 addr;
struct ib_device *dev;
- struct ib_sge *sge;
+ struct ib_recv_wr rx_wr = { 0 };
+ struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+ struct ib_sge *sge = ibsge;
struct ib_recv_wr *bad_wr;
struct sk_buff *skb;
struct page *page;
skb_frag_t *frag;
struct sdp_bsdh *h;
- int id = ssk->rx_head;
+ int id = ring_head(ssk->rx_ring);
gfp_t gfp_page;
+ int ret = 0;
+ WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
/* Now, allocate and repost recv */
/* TODO: allocate from cache */
@@ -194,10 +200,9 @@ static void sdp_post_recv(struct sdp_sock *ssk)
skb->truesize += frag->size;
}
- rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
+ rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
rx_req->skb = skb;
dev = ssk->ib_device;
- sge = ssk->ibsge;
addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
BUG_ON(ib_dma_mapping_error(dev, addr));
@@ -221,27 +226,32 @@ static void sdp_post_recv(struct sdp_sock *ssk)
sge->lkey = ssk->mr->lkey;
}
- ssk->rx_wr.next = NULL;
- ssk->rx_wr.wr_id = id | SDP_OP_RECV;
- ssk->rx_wr.sg_list = ssk->ibsge;
- ssk->rx_wr.num_sge = frags + 1;
- rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
- sdp_prf(&ssk->isk.sk, skb, "rx skb was posted");
+ rx_wr.next = NULL;
+ rx_wr.wr_id = id | SDP_OP_RECV;
+ rx_wr.sg_list = ibsge;
+ rx_wr.num_sge = frags + 1;
+ rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
SDPSTATS_COUNTER_INC(post_recv);
- ++ssk->rx_head;
+ atomic_inc(&ssk->rx_ring.head);
if (unlikely(rc)) {
- sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
+ sdp_warn(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
sdp_reset(&ssk->isk.sk);
+ ret = -1;
}
atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+
+ return ret;
}
-void sdp_post_recvs(struct sdp_sock *ssk)
+/* lock_sock must be taken before calling this */
+static void _sdp_post_recvs(struct sdp_sock *ssk)
{
struct sock *sk = &ssk->isk.sk;
int scale = ssk->rcvbuf_scale;
+ WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
if (unlikely(!ssk->id || ((1 << sk->sk_state) &
(TCPF_CLOSE | TCPF_TIME_WAIT)))) {
return;
@@ -251,12 +261,23 @@ void sdp_post_recvs(struct sdp_sock *ssk)
(top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
scale = 1;
- while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
- (ssk->rx_head - ssk->rx_tail - SDP_MIN_TX_CREDITS) *
+ while ((likely(ring_posted(ssk->rx_ring) < SDP_RX_SIZE) &&
+ (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
(SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
- unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_TX_CREDITS))
- sdp_post_recv(ssk);
+ unlikely(ring_posted(ssk->rx_ring) < SDP_MIN_TX_CREDITS)) {
+ if (sdp_post_recv(ssk))
+ break;
+ }
+}
+
+void sdp_post_recvs(struct sdp_sock *ssk)
+{
+ unsigned long flags;
+
+ rx_ring_lock(ssk, flags);
+ _sdp_post_recvs(ssk);
+ rx_ring_unlock(ssk, flags);
}
static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
@@ -328,9 +349,9 @@ int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
{
if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
- ssk->recv_request_head = ssk->rx_head + 1;
+ ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
else
- ssk->recv_request_head = ssk->rx_tail;
+ ssk->recv_request_head = ring_tail(ssk->rx_ring);
ssk->recv_request = 1;
}
@@ -347,23 +368,17 @@ static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *bu
ssk->sent_request = 0;
}
-static inline int credit_update_needed(struct sdp_sock *ssk, int wc_processed)
+static inline int credit_update_needed(struct sdp_sock *ssk)
{
int c;
- c = ssk->remote_credits;
+ c = remote_credits(ssk);
if (likely(c > SDP_MIN_TX_CREDITS))
c += c/2;
-/* sdp_warn(&ssk->isk.sk, "credits: %d remote credits: %d "
- "tx ring slots left: %d send_head: %p\n",
- ssk->tx_ring.credits, ssk->remote_credits,
- sdp_tx_ring_slots_left(&ssk->tx_ring),
- ssk->isk.sk.sk_send_head);
-*/
- return (unlikely(c < ssk->rx_head - ssk->rx_tail + wc_processed) &&
- likely(ssk->tx_ring.credits > 1) &&
- likely(sdp_tx_ring_slots_left(&ssk->tx_ring)));
+ return unlikely(c < ring_posted(ssk->rx_ring)) &&
+ likely(tx_credits(ssk) > 1) &&
+ likely(sdp_tx_ring_slots_left(&ssk->tx_ring));
}
@@ -374,14 +389,16 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
struct sk_buff *skb;
int i, frags;
- if (unlikely(id != ssk->rx_tail)) {
+ WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
+ if (unlikely(id != ring_tail(ssk->rx_ring))) {
printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
- id, ssk->rx_tail);
+ id, ring_tail(ssk->rx_ring));
return NULL;
}
dev = ssk->ib_device;
- rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
+ rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
skb = rx_req->skb;
ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
DMA_FROM_DEVICE);
@@ -390,67 +407,20 @@ static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
skb_shinfo(skb)->frags[i].size,
DMA_FROM_DEVICE);
- ++ssk->rx_tail;
- --ssk->remote_credits;
+ atomic_inc(&ssk->rx_ring.tail);
+ atomic_dec(&ssk->remote_credits);
return skb;
}
-static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+/* this must be called while sock_lock is taken */
+static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
{
struct sock *sk = &ssk->isk.sk;
int frags;
- struct sk_buff *skb;
struct sdp_bsdh *h;
int pagesz, i;
- skb = sdp_recv_completion(ssk, wc->wr_id);
- if (unlikely(!skb))
- return -1;
-
- sdp_prf(sk, skb, "recv completion");
-
- atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-
- if (unlikely(wc->status)) {
- if (wc->status != IB_WC_WR_FLUSH_ERR) {
- sdp_dbg(sk, "Recv completion with error. Status %d\n",
- wc->status);
- sdp_reset(sk);
- }
- __kfree_skb(skb);
- return 0;
- }
-
- sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
- (int)wc->wr_id, wc->byte_len);
- if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
- printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
- wc->byte_len, sizeof(struct sdp_bsdh));
- __kfree_skb(skb);
- return -1;
- }
- skb->len = wc->byte_len;
- if (likely(wc->byte_len > SDP_HEAD_SIZE))
- skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
- else
- skb->data_len = 0;
- skb->data = skb->head;
-#ifdef NET_SKBUFF_DATA_USES_OFFSET
- skb->tail = skb_headlen(skb);
-#else
- skb->tail = skb->head + skb_headlen(skb);
-#endif
h = (struct sdp_bsdh *)skb->data;
- SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
- skb_reset_transport_header(skb);
- ssk->mseq_ack = ntohl(h->mseq);
- if (ssk->mseq_ack != (int)wc->wr_id)
- printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
- ssk->mseq_ack, (int)wc->wr_id);
-
- SDPSTATS_HIST_LINEAR(credits_before_update, ssk->tx_ring.credits);
- ssk->tx_ring.credits = ntohl(h->mseq_ack) - ssk->tx_ring.head + 1 +
- ntohs(h->bufs);
frags = skb_shinfo(skb)->nr_frags;
pagesz = PAGE_ALIGN(skb->data_len);
@@ -513,13 +483,105 @@ static int sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
return 0;
}
-int sdp_poll_rx_cq(struct sdp_sock *ssk)
+/* called only from irq */
+static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
{
- struct ib_cq *cq = ssk->rx_cq;
+ struct sk_buff *skb;
+ struct sdp_bsdh *h;
+ struct sock *sk = &ssk->isk.sk;
+ int credits_before;
+
+ skb = sdp_recv_completion(ssk, wc->wr_id);
+ if (unlikely(!skb))
+ return NULL;
+
+ atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
+
+ if (unlikely(wc->status)) {
+ if (wc->status != IB_WC_WR_FLUSH_ERR) {
+ sdp_warn(sk, "Recv completion with error. Status %d\n",
+ wc->status);
+ sdp_reset(sk);
+ }
+ __kfree_skb(skb);
+ return NULL;
+ }
+
+ sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
+ (int)wc->wr_id, wc->byte_len);
+ if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
+ printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
+ wc->byte_len, sizeof(struct sdp_bsdh));
+ __kfree_skb(skb);
+ return NULL;
+ }
+ skb->len = wc->byte_len;
+ if (likely(wc->byte_len > SDP_HEAD_SIZE))
+ skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
+ else
+ skb->data_len = 0;
+ skb->data = skb->head;
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+ skb->tail = skb_headlen(skb);
+#else
+ skb->tail = skb->head + skb_headlen(skb);
+#endif
+ h = (struct sdp_bsdh *)skb->data;
+ SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
+ skb_reset_transport_header(skb);
+ atomic_set(&ssk->mseq_ack, ntohl(h->mseq));
+ if (mseq_ack(ssk) != (int)wc->wr_id)
+ printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
+ mseq_ack(ssk), (int)wc->wr_id);
+
+ SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
+
+ credits_before = tx_credits(ssk);
+ atomic_set(&ssk->tx_ring.credits, ntohl(h->mseq_ack) - ring_head(ssk->tx_ring) + 1 +
+ ntohs(h->bufs));
+
+ sdp_prf(&ssk->isk.sk, skb, "RX %s bufs=%d c before:%d after:%d "
+ "mseq:%d, ack:%d", mid2str(h->mid), ntohs(h->bufs), credits_before,
+ tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
+
+ return skb;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+ struct sock *sk = &ssk->isk.sk;
+ struct socket *sock = sk->sk_socket;
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+ sdp_prf(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. tx_head: %d, tx_tail: %d",
+ tx_credits(ssk), ssk->min_bufs,
+ ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
+ }
+
+ if (tx_credits(ssk) >= ssk->min_bufs &&
+ ring_head(ssk->tx_ring) == ring_tail(ssk->tx_ring) &&
+ sock != NULL) {
+ clear_bit(SOCK_NOSPACE, &sock->flags);
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+ wake_up_interruptible(sk->sk_sleep);
+ if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
+ sock_wake_async(sock, 2, POLL_OUT);
+ }
+}
+
+/* only from interrupt.
+ * drain rx cq into rx_backlog queue */
+static int sdp_poll_rx_cq(struct sdp_sock *ssk)
+{
+ struct ib_cq *cq = ssk->rx_ring.cq;
struct ib_wc ibwc[SDP_NUM_WC];
int n, i;
- int ret = -EAGAIN;
int wc_processed = 0;
+ struct sk_buff *skb;
+
+ WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
do {
n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
@@ -527,102 +589,224 @@ int sdp_poll_rx_cq(struct sdp_sock *ssk)
struct ib_wc *wc = &ibwc[i];
BUG_ON(!(wc->wr_id & SDP_OP_RECV));
- sdp_process_rx_wc(ssk, wc);
+ skb = sdp_process_rx_wc(ssk, wc);
+ if (!skb)
+ continue;
+ skb_queue_tail(&ssk->rx_backlog, skb);
wc_processed++;
-
- if (credit_update_needed(ssk, wc_processed)) {
- sdp_prf(&ssk->isk.sk, NULL, "credit update. remote_credits: %d, avail now: %d processed: %d",
- ssk->remote_credits,
- ssk->rx_head - ssk->rx_tail,
- wc_processed);
- sdp_post_recvs(ssk);
- if (sdp_post_credits(ssk))
- wc_processed = 0;
- }
-
- ret = 0;
}
} while (n == SDP_NUM_WC);
- if (!ret) {
- struct sock *sk = &ssk->isk.sk;
+ if (wc_processed)
+ sdp_bzcopy_write_space(ssk);
- sdp_post_recvs(ssk);
+ return wc_processed;
+}
- /* update credits */
- sdp_post_sends(ssk, 0);
+int sdp_process_rx_q(struct sdp_sock *ssk)
+{
+ struct sk_buff *skb;
+ struct sock *sk = &ssk->isk.sk;
+ unsigned long flags;
- if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
- sk_stream_write_space(&ssk->isk.sk);
- } else {
+ if (!ssk->rx_backlog.next || !ssk->rx_backlog.prev) {
+ sdp_warn(&ssk->isk.sk, "polling a zeroed rx_backlog!!!! %p\n", &ssk->rx_backlog);
+ return 0;
+ }
+
+ if (skb_queue_empty(&ssk->rx_backlog)) {
SDPSTATS_COUNTER_INC(rx_poll_miss);
+ return -EAGAIN;
}
- return ret;
+ /* update credits */
+ sdp_post_sends(ssk, 0);
+
+ spin_lock_irqsave(&ssk->rx_backlog.lock, flags);
+ while ((skb = __skb_dequeue(&ssk->rx_backlog))) {
+ sdp_process_rx_skb(ssk, skb);
+ }
+ spin_unlock_irqrestore(&ssk->rx_backlog.lock, flags);
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+ sk_stream_write_space(&ssk->isk.sk);
+
+ return 0;
}
-void sdp_rx_comp_work(struct work_struct *work)
+static void sdp_rx_comp_work(struct work_struct *work)
{
struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
struct sock *sk = &ssk->isk.sk;
struct ib_cq *rx_cq;
lock_sock(sk);
- rx_cq = ssk->rx_cq;
+ rx_cq = ssk->rx_ring.cq;
if (unlikely(!rx_cq))
goto out;
if (unlikely(!ssk->poll_cq)) {
struct rdma_cm_id *id = ssk->id;
+ sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
if (id && id->qp)
rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
goto out;
}
- sdp_poll_rx_cq(ssk);
+ sdp_process_rx_q(ssk);
sdp_xmit_poll(ssk, 1); /* if has pending tx because run out of tx_credits - xmit it */
release_sock(sk);
sk_stream_mem_reclaim(sk);
lock_sock(sk);
- rx_cq = ssk->rx_cq;
+ rx_cq = ssk->rx_ring.cq;
if (unlikely(!rx_cq))
goto out;
- sdp_arm_rx_cq(sk);
- sdp_poll_rx_cq(ssk);
+ sdp_process_rx_q(ssk);
sdp_xmit_poll(ssk, 1);
+
out:
release_sock(sk);
}
-void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
{
struct sock *sk = cq_context;
struct sdp_sock *ssk = sdp_sk(sk);
+ unsigned long flags;
+ int wc_processed = 0;
- WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
+ sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
- if (!ssk->rx_cq)
- sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+ WARN_ON(cq != ssk->rx_ring.cq);
SDPSTATS_COUNTER_INC(rx_int_count);
- sdp_prf(sk, NULL, "rx completion");
+ sdp_prf(sk, NULL, "rx irq");
+
+ rx_ring_lock(ssk, flags);
+
+ if (unlikely(!ssk->poll_cq))
+ sdp_warn(sk, "poll cq is 0. socket was reset or wasn't initialized\n");
+
+ if (!ssk->rx_ring.cq) {
+ sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+ goto out;
+ }
+
+ wc_processed = sdp_poll_rx_cq(ssk);
+ sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
+
+ if (wc_processed) {
+ _sdp_post_recvs(ssk);
+
+ /* Best was to send credit update from here */
+/* sdp_post_credits(ssk); */
- /* issue sdp_rx_comp_work() */
- queue_work(rx_comp_wq, &ssk->rx_comp_work);
+ /* issue sdp_rx_comp_work() */
+ queue_work(rx_comp_wq, &ssk->rx_comp_work);
+ }
+
+ sdp_arm_rx_cq(sk);
+
+out:
+ rx_ring_unlock(ssk, flags);
}
-void sdp_rx_ring_purge(struct sdp_sock *ssk)
+static void sdp_rx_ring_purge(struct sdp_sock *ssk)
{
- struct sk_buff *skb;
+ WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
- while (ssk->rx_head != ssk->rx_tail) {
+ while (ring_posted(ssk->rx_ring) > 0) {
struct sk_buff *skb;
- skb = sdp_recv_completion(ssk, ssk->rx_tail);
+ skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
if (!skb)
break;
atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
__kfree_skb(skb);
}
}
+
+void sdp_rx_ring_init(struct sdp_sock *ssk)
+{
+ ssk->rx_ring.buffer = NULL;
+ spin_lock_init(&ssk->rx_ring.lock);
+}
+
+static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+ struct ib_cq *rx_cq;
+ int rc = 0;
+ unsigned long flags;
+
+ rx_ring_lock(ssk, flags);
+
+ atomic_set(&ssk->rx_ring.head, 1);
+ atomic_set(&ssk->rx_ring.tail, 1);
+
+ ssk->rx_ring.buffer = kmalloc(sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE,
+ GFP_KERNEL);
+ if (!ssk->rx_ring.buffer) {
+ rc = -ENOMEM;
+ sdp_warn(&ssk->isk.sk, "Unable to allocate RX Ring size %zd.\n",
+ sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
+
+ goto out;
+ }
+
+ rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
+ &ssk->isk.sk, SDP_RX_SIZE, 0);
+
+ if (IS_ERR(rx_cq)) {
+ rc = PTR_ERR(rx_cq);
+ sdp_warn(&ssk->isk.sk, "Unable to allocate RX CQ: %d.\n", rc);
+ goto err_cq;
+ }
+
+ rc = ib_modify_cq(rx_cq, 10, 200);
+ if (rc) {
+ sdp_warn(&ssk->isk.sk, "Unable to modify RX CQ: %d.\n", rc);
+ goto err_mod;
+ }
+ sdp_warn(&ssk->isk.sk, "Initialized CQ moderation\n");
+ sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
+
+ INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
+
+ sdp_arm_rx_cq(&ssk->isk.sk);
+
+ goto out;
+
+err_mod:
+ ib_destroy_cq(rx_cq);
+err_cq:
+ kfree(ssk->rx_ring.buffer);
+ ssk->rx_ring.buffer = NULL;
+out:
+ rx_ring_unlock(ssk, flags);
+ return rc;
+}
+
+void sdp_rx_ring_destroy(struct sdp_sock *ssk)
+{
+ WARN_ON_UNLOCKED(&ssk->isk.sk, &ssk->rx_ring.lock);
+
+ if (ssk->rx_ring.buffer) {
+ sdp_rx_ring_purge(ssk);
+
+ kfree(ssk->rx_ring.buffer);
+ ssk->rx_ring.buffer = NULL;
+ }
+
+ if (ssk->rx_ring.cq) {
+ ib_destroy_cq(ssk->rx_ring.cq);
+ ssk->rx_ring.cq = NULL;
+ }
+
+ WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
+}
diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c
index 5e6a2dc..de9d792 100644
--- a/drivers/infiniband/ulp/sdp/sdp_tx.c
+++ b/drivers/infiniband/ulp/sdp/sdp_tx.c
@@ -55,8 +55,11 @@ int sdp_xmit_poll(struct sdp_sock *ssk, int force)
mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
/* Poll the CQ every SDP_TX_POLL_MODER packets */
- if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+ if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0) {
wc_processed = sdp_process_tx_cq(ssk);
+ sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed,
+ ring_posted(ssk->tx_ring));
+ }
return wc_processed;
}
@@ -65,24 +68,16 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
{
struct sdp_buf *tx_req;
struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
- unsigned mseq = ssk->tx_ring.head;
+ unsigned mseq = ring_head(ssk->tx_ring);
int i, rc, frags;
u64 addr;
struct ib_device *dev;
- struct ib_sge *sge;
struct ib_send_wr *bad_wr;
-#define ENUM2STR(e) [e] = #e
- static char *mid2str[] = {
- ENUM2STR(SDP_MID_HELLO),
- ENUM2STR(SDP_MID_HELLO_ACK),
- ENUM2STR(SDP_MID_DISCONN),
- ENUM2STR(SDP_MID_CHRCVBUF),
- ENUM2STR(SDP_MID_CHRCVBUF_ACK),
- ENUM2STR(SDP_MID_DATA),
- };
- sdp_prf(&ssk->isk.sk, skb, "post_send mid = %s, bufs = %d",
- mid2str[mid], ssk->rx_head - ssk->rx_tail);
+ struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+ struct ib_sge *sge = ibsge;
+ struct ib_send_wr tx_wr = { 0 };
+
SDPSTATS_COUNTER_MID_INC(post_send, mid);
SDPSTATS_HIST(send_size, skb->len);
@@ -93,16 +88,19 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
else
h->flags = 0;
- h->bufs = htons(ssk->rx_head - ssk->rx_tail);
+ h->bufs = htons(ring_posted(ssk->rx_ring));
h->len = htonl(skb->len);
h->mseq = htonl(mseq);
- h->mseq_ack = htonl(ssk->mseq_ack);
+ h->mseq_ack = htonl(mseq_ack(ssk));
+
+ sdp_prf(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%d ack:%d",
+ mid2str(mid), ring_posted(ssk->rx_ring), mseq, ntohl(h->mseq_ack));
SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
+
tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
tx_req->skb = skb;
dev = ssk->ib_device;
- sge = ssk->ibsge;
addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
DMA_TO_DEVICE);
tx_req->mapping[0] = addr;
@@ -127,14 +125,14 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
sge->lkey = ssk->mr->lkey;
}
- ssk->tx_wr.next = NULL;
- ssk->tx_wr.wr_id = ssk->tx_ring.head | SDP_OP_SEND;
- ssk->tx_wr.sg_list = ssk->ibsge;
- ssk->tx_wr.num_sge = frags + 1;
- ssk->tx_wr.opcode = IB_WR_SEND;
- ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
+ tx_wr.next = NULL;
+ tx_wr.wr_id = ring_head(ssk->tx_ring) | SDP_OP_SEND;
+ tx_wr.sg_list = ibsge;
+ tx_wr.num_sge = frags + 1;
+ tx_wr.opcode = IB_WR_SEND;
+ tx_wr.send_flags = IB_SEND_SIGNALED;
if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_URG))
- ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
+ tx_wr.send_flags |= IB_SEND_SOLICITED;
{
static unsigned long last_send = 0;
@@ -145,19 +143,15 @@ void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
last_send = jiffies;
}
- rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
- ++ssk->tx_ring.head;
- --ssk->tx_ring.credits;
- ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
+ rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
+ atomic_inc(&ssk->tx_ring.head);
+ atomic_dec(&ssk->tx_ring.credits);
+ atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring));
if (unlikely(rc)) {
sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc);
sdp_set_error(&ssk->isk.sk, -ECONNRESET);
wake_up(&ssk->wq);
}
-
- if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS) {
- sdp_poll_rx_cq(ssk);
- }
}
static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
@@ -168,9 +162,9 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
struct bzcopy_state *bz;
int i, frags;
struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
- if (unlikely(mseq != tx_ring->tail)) {
+ if (unlikely(mseq != ring_tail(*tx_ring))) {
printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
- mseq, tx_ring->tail);
+ mseq, ring_tail(*tx_ring));
goto out;
}
@@ -193,7 +187,7 @@ static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
if (bz)
bz->busy--;
- ++tx_ring->tail;
+ atomic_inc(&tx_ring->tail);
out:
return skb;
@@ -219,7 +213,11 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
}
}
- sdp_prf(&ssk->isk.sk, skb, "tx completion");
+ {
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+ sdp_prf(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
+ }
+
sk_stream_free_skb(&ssk->isk.sk, skb);
return 0;
@@ -281,15 +279,14 @@ static int sdp_process_tx_cq(struct sdp_sock *ssk)
return wc_processed;
}
-void sdp_poll_tx_cq(unsigned long data)
+static void sdp_poll_tx_timeout(unsigned long data)
{
struct sdp_sock *ssk = (struct sdp_sock *)data;
struct sock *sk = &ssk->isk.sk;
u32 inflight, wc_processed;
-
sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
- (u32) ssk->tx_ring.head - ssk->tx_ring.tail);
+ (u32) ring_posted(ssk->tx_ring));
/* Only process if the socket is not in use */
bh_lock_sock(sk);
@@ -303,12 +300,14 @@ void sdp_poll_tx_cq(unsigned long data)
goto out;
wc_processed = sdp_process_tx_cq(ssk);
+ sdp_prf(&ssk->isk.sk, NULL, "processed %d wc's. inflight=%d", wc_processed,
+ ring_posted(ssk->tx_ring));
if (!wc_processed)
SDPSTATS_COUNTER_INC(tx_poll_miss);
else
SDPSTATS_COUNTER_INC(tx_poll_hit);
- inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
+ inflight = (u32) ring_posted(ssk->tx_ring);
/* If there are still packets in flight and the timer has not already
* been scheduled by the Tx routine then schedule it here to guarantee
@@ -322,17 +321,7 @@ out:
bh_unlock_sock(sk);
}
-void _sdp_poll_tx_cq(unsigned long data)
-{
- struct sdp_sock *ssk = (struct sdp_sock *)data;
- struct sock *sk = &ssk->isk.sk;
-
- sdp_prf(sk, NULL, "sdp poll tx timeout expired");
-
- sdp_poll_tx_cq(data);
-}
-
-void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
+static void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
{
struct sock *sk = cq_context;
struct sdp_sock *ssk = sdp_sk(sk);
@@ -344,11 +333,9 @@ void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
void sdp_tx_ring_purge(struct sdp_sock *ssk)
{
- struct sk_buff *skb;
-
- while (ssk->tx_ring.head != ssk->tx_ring.tail) {
+ while (ring_posted(ssk->tx_ring)) {
struct sk_buff *skb;
- skb = sdp_send_completion(ssk, ssk->tx_ring.tail);
+ skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
if (!skb)
break;
__kfree_skb(skb);
@@ -380,3 +367,66 @@ void sdp_post_keepalive(struct sdp_sock *ssk)
sdp_cnt(sdp_keepalive_probes_sent);
}
+static void sdp_tx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+ struct ib_cq *tx_cq;
+ int rc = 0;
+
+ atomic_set(&ssk->tx_ring.head, 1);
+ atomic_set(&ssk->tx_ring.tail, 1);
+
+ ssk->tx_ring.buffer = kmalloc(sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE,
+ GFP_KERNEL);
+ if (!ssk->tx_ring.buffer) {
+ rc = -ENOMEM;
+ sdp_warn(&ssk->isk.sk, "Unable to allocate TX Ring size %zd.\n",
+ sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE);
+
+ goto out;
+ }
+
+ tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
+ &ssk->isk.sk, SDP_TX_SIZE, 0);
+
+ if (IS_ERR(tx_cq)) {
+ rc = PTR_ERR(tx_cq);
+ sdp_warn(&ssk->isk.sk, "Unable to allocate TX CQ: %d.\n", rc);
+ goto err_cq;
+ }
+
+ sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq;
+
+ init_timer(&ssk->tx_ring.timer);
+ ssk->tx_ring.timer.function = sdp_poll_tx_timeout;
+ ssk->tx_ring.timer.data = (unsigned long) ssk;
+ ssk->tx_ring.poll_cnt = 0;
+
+ return 0;
+
+err_cq:
+ kfree(ssk->tx_ring.buffer);
+ ssk->tx_ring.buffer = NULL;
+out:
+ return rc;
+}
+
+void sdp_tx_ring_destroy(struct sdp_sock *ssk)
+{
+ if (ssk->tx_ring.buffer) {
+ sdp_tx_ring_purge(ssk);
+
+ kfree(ssk->tx_ring.buffer);
+ ssk->tx_ring.buffer = NULL;
+ }
+
+ if (ssk->tx_ring.cq) {
+ ib_destroy_cq(ssk->tx_ring.cq);
+ ssk->tx_ring.cq = NULL;
+ }
+
+ WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
+}
--
1.5.3.7
More information about the general
mailing list