[ofa-general] [PATCH] sdp: no tx interrupts
Amir Vadai
amirv at mellanox.co.il
Mon Jun 22 01:18:43 PDT 2009
poll tx cq with timer instead of interrupts
Signed-off-by: Amir Vadai <amirv at mellanox.co.il>
---
drivers/infiniband/ulp/sdp/sdp.h | 34 ++++--
drivers/infiniband/ulp/sdp/sdp_bcopy.c | 222 ++++++++++++++++++++++++-------
drivers/infiniband/ulp/sdp/sdp_cma.c | 72 +++++++----
drivers/infiniband/ulp/sdp/sdp_main.c | 83 ++++++++-----
drivers/infiniband/ulp/sdp/sdp_proc.c | 4 +-
5 files changed, 297 insertions(+), 118 deletions(-)
diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h
index cbef0eb..8128065 100644
--- a/drivers/infiniband/ulp/sdp/sdp.h
+++ b/drivers/infiniband/ulp/sdp/sdp.h
@@ -83,12 +83,14 @@ struct sdpstats {
u32 sendmsg_seglen[25];
u32 send_size[25];
u32 post_recv;
- u32 int_count;
+ u32 rx_int_count;
+ u32 tx_int_count;
u32 bzcopy_poll_miss;
u32 send_wait_for_mem;
u32 send_miss_no_credits;
u32 rx_poll_miss;
u32 tx_poll_miss;
+ u32 tx_poll_hit;
u32 memcpy_count;
u32 credits_before_update[64];
u32 send_interval[25];
@@ -132,6 +134,11 @@ static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
#define sock_put(sk, msg) sock_ref(sk, msg, sock_put)
#define __sock_put(sk, msg) sock_ref(sk, msg, __sock_put)
+/* Interval between sucessive polls in the Tx routine when polling is used
+ instead of interrupts (in per-core Tx rings) - should be power of 2 */
+#define SDP_TX_POLL_MODER 16
+#define SDP_TX_POLL_TIMEOUT (HZ / 4)
+
#define SDP_RESOLVE_TIMEOUT 1000
#define SDP_ROUTE_TIMEOUT 1000
#define SDP_RETRY_COUNT 5
@@ -242,7 +249,10 @@ struct sdp_tx_ring {
int una_seq;
unsigned credits;
+
+ struct timer_list timer;
u16 poll_cnt;
+ struct ib_cq *cq;
};
static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
@@ -258,7 +268,7 @@ struct sdp_sock {
struct list_head backlog_queue;
struct sock *parent;
- struct work_struct work;
+ struct work_struct rx_comp_work;
wait_queue_head_t wq;
struct delayed_work dreq_wait_work;
@@ -308,7 +318,7 @@ struct sdp_sock {
/* rdma specific */
struct ib_qp *qp;
- struct ib_cq *cq;
+ struct ib_cq *rx_cq;
struct ib_mr *mr;
struct sdp_buf *rx_ring;
@@ -349,7 +359,7 @@ struct bzcopy_state {
extern int rcvbuf_initial_size;
extern struct proto sdp_proto;
-extern struct workqueue_struct *comp_wq;
+extern struct workqueue_struct *rx_comp_wq;
extern atomic_t sdp_current_mem_usage;
extern spinlock_t sdp_large_sockets_lock;
@@ -440,11 +450,11 @@ static inline void sdp_set_error(struct sock *sk, int err)
sk->sk_error_report(sk);
}
-static inline void sdp_arm_cq(struct sock *sk)
+static inline void sdp_arm_rx_cq(struct sock *sk)
{
- sdp_dbg_data(sk, "ib_req_notify_cq on cq\n");
+ sdp_dbg_data(sk, "ib_req_notify_cq on RX cq\n");
- ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
+ ib_req_notify_cq(sdp_sk(sk)->rx_cq, IB_CQ_NEXT_COMP);
}
#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
@@ -453,12 +463,15 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s
int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
void sdp_reset(struct sock *sk);
void sdp_reset_sk(struct sock *sk, int rc);
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context);
-void sdp_work(struct work_struct *work);
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context);
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context);
+void sdp_poll_tx_cq(unsigned long data);
+void sdp_rx_comp_work(struct work_struct *work);
+void sdp_process_tx_wc_work(struct work_struct *work);
int sdp_post_credits(struct sdp_sock *ssk);
void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
void sdp_post_recvs(struct sdp_sock *ssk);
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq);
+int sdp_poll_rx_cq(struct sdp_sock *ssk);
void sdp_post_sends(struct sdp_sock *ssk, int nonagle);
void sdp_destroy_work(struct work_struct *work);
void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
@@ -477,6 +490,7 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk);
int sdp_init_sock(struct sock *sk);
int __init sdp_proc_init(void);
void sdp_proc_unregister(void);
+int sdp_xmit_poll(struct sdp_sock *ssk, int force);
static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
{
diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
index 9f7f4a0..29c9761 100644
--- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c
+++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
@@ -244,6 +244,24 @@ void dump_packet(struct sock *sk, char *str, struct sk_buff *skb, const struct s
}
#endif
+static int sdp_process_tx_cq(struct sdp_sock *ssk);
+
+int sdp_xmit_poll(struct sdp_sock *ssk, int force)
+{
+ int wc_processed = 0;
+
+ /* If we don't have a pending timer, set one up to catch our recent
+ post in case the interface becomes idle */
+ if (!timer_pending(&ssk->tx_ring.timer))
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+
+ /* Poll the CQ every SDP_TX_POLL_MODER packets */
+ if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+ wc_processed = sdp_process_tx_cq(ssk);
+
+ return wc_processed;
+}
+
void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
{
struct sdp_buf *tx_req;
@@ -573,6 +591,7 @@ int sdp_post_credits(struct sdp_sock *ssk)
if (!skb)
return -ENOMEM;
sdp_post_send(ssk, skb, SDP_MID_DATA);
+ sdp_xmit_poll(ssk, 0);
}
return 0;
}
@@ -583,6 +602,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
struct sk_buff *skb;
int c;
gfp_t gfp_page;
+ int post_count = 0;
if (unlikely(!ssk->id)) {
if (ssk->isk.sk.sk_send_head) {
@@ -602,7 +622,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
if (ssk->recv_request &&
ssk->rx_tail >= ssk->recv_request_head &&
ssk->tx_ring.credits >= SDP_MIN_TX_CREDITS &&
- ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) {
+ sdp_tx_ring_slots_left(&ssk->tx_ring)) {
struct sdp_chrecvbuf *resp_size;
ssk->recv_request = 0;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -614,6 +634,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size);
resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE);
sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+ post_count++;
}
if (ssk->tx_ring.credits <= SDP_MIN_TX_CREDITS &&
@@ -634,12 +655,13 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
update_send_head(&ssk->isk.sk, skb);
__skb_dequeue(&ssk->isk.sk.sk_write_queue);
sdp_post_send(ssk, skb, SDP_MID_DATA);
+ post_count++;
}
if (ssk->tx_ring.credits == SDP_MIN_TX_CREDITS &&
!ssk->sent_request &&
ssk->tx_ring.head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
- ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) {
+ sdp_tx_ring_slots_left(&ssk->tx_ring)) {
struct sdp_chrecvbuf *req_size;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
sizeof(struct sdp_bsdh) +
@@ -652,6 +674,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
req_size->size = htonl(ssk->sent_request);
sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
+ post_count++;
}
c = ssk->remote_credits;
@@ -660,7 +683,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
likely(ssk->tx_ring.credits > 1) &&
- likely(ssk->tx_ring.head - ssk->tx_ring.tail < SDP_TX_SIZE) &&
+ likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
likely((1 << ssk->isk.sk.sk_state) &
(TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -670,6 +693,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
BUG_ON(!skb);
SDPSTATS_COUNTER_INC(post_send_credits);
sdp_post_send(ssk, skb, SDP_MID_DATA);
+ post_count++;
}
/* send DisConn if needed
@@ -687,7 +711,10 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
/* FIXME */
BUG_ON(!skb);
sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+ post_count++;
}
+ if (post_count)
+ sdp_xmit_poll(ssk, 0);
}
int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
@@ -867,7 +894,6 @@ static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc)
static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
{
struct sk_buff *skb = NULL;
- struct sdp_bsdh *h;
skb = sdp_send_completion(ssk, wc->wr_id);
if (unlikely(!skb))
@@ -881,72 +907,162 @@ static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
sdp_set_error(sk, -ECONNRESET);
wake_up(&ssk->wq);
- queue_work(comp_wq, &ssk->destroy_work);
+ queue_work(rx_comp_wq, &ssk->destroy_work);
}
- goto out;
}
- h = (struct sdp_bsdh *)skb->data;
+ sk_stream_free_skb(&ssk->isk.sk, skb);
- if (likely(h->mid != SDP_MID_DISCONN))
- goto out;
+ return 0;
+}
+
+void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+{
+ struct sock *sk = cq_context;
+ struct sdp_sock *ssk = sdp_sk(sk);
+
+ WARN_ON(ssk->rx_cq && cq != ssk->rx_cq);
+
+ if (!ssk->rx_cq)
+ sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+ SDPSTATS_COUNTER_INC(rx_int_count);
+
+ queue_work(rx_comp_wq, &ssk->rx_comp_work);
+}
- if ((1 << ssk->isk.sk.sk_state) & ~(TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) {
- sdp_dbg(&ssk->isk.sk,
- "%s: sent DISCONNECT from unexpected state %d\n",
- __func__, ssk->isk.sk.sk_state);
+static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+ if (likely(wc->wr_id & SDP_OP_SEND)) {
+ sdp_handle_send_comp(ssk, wc);
+ return;
}
-out:
sk_wmem_free_skb(&ssk->isk.sk, skb);
- return 0;
+ /* Keepalive probe sent cleanup */
+ sdp_cnt(sdp_keepalive_probes_sent);
+
+ if (likely(!wc->status))
+ return;
+
+ sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
+ __func__, wc->status);
+
+ if (wc->status == IB_WC_WR_FLUSH_ERR)
+ return;
+
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
}
-static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+static int sdp_process_tx_cq(struct sdp_sock *ssk)
{
- if (wc->wr_id & SDP_OP_RECV) {
- if (sdp_handle_recv_comp(ssk, wc))
- return;
- } else if (likely(wc->wr_id & SDP_OP_SEND)) {
- if (sdp_handle_send_comp(ssk, wc))
- return;
- } else {
- sdp_cnt(sdp_keepalive_probes_sent);
+ struct ib_wc ibwc[SDP_NUM_WC];
+ int n, i;
+ int wc_processed = 0;
- if (likely(!wc->status))
- return;
+ if (!ssk->tx_ring.cq) {
+ sdp_warn(&ssk->isk.sk, "WARNING: tx irq when tx_cq is destroyed\n");
+ return 0;
+ }
+
+ do {
+ n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
+ for (i = 0; i < n; ++i) {
+ sdp_process_tx_wc(ssk, ibwc + i);
+ wc_processed++;
+ }
+ } while (n == SDP_NUM_WC);
- sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
- __func__, wc->status);
+ sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
- if (wc->status == IB_WC_WR_FLUSH_ERR)
- return;
+ if (wc_processed) {
+ struct sock *sk = &ssk->isk.sk;
+ sdp_post_sends(ssk, 0);
- sdp_set_error(&ssk->isk.sk, -ECONNRESET);
- wake_up(&ssk->wq);
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+ sk_stream_write_space(&ssk->isk.sk);
- return;
}
+
+ return wc_processed;
}
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context)
+void sdp_poll_tx_cq(unsigned long data)
+{
+ struct sdp_sock *ssk = (struct sdp_sock *)data;
+ struct sock *sk = &ssk->isk.sk;
+ u32 inflight, wc_processed;
+
+ sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+ (u32) ssk->tx_ring.head - ssk->tx_ring.tail);
+
+ /* Only process if the socket is not in use */
+ bh_lock_sock(sk);
+ if (sock_owned_by_user(sk)) {
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+ goto out;
+ }
+
+ if (sk->sk_state == TCP_CLOSE)
+ goto out;
+
+ wc_processed = sdp_process_tx_cq(ssk);
+ if (!wc_processed)
+ SDPSTATS_COUNTER_INC(tx_poll_miss);
+ else
+ SDPSTATS_COUNTER_INC(tx_poll_hit);
+
+ inflight = (u32) ssk->tx_ring.head - ssk->tx_ring.tail;
+
+ /* If there are still packets in flight and the timer has not already
+ * been scheduled by the Tx routine then schedule it here to guarantee
+ * completion processing of these packets */
+ if (inflight) { /* TODO: make sure socket is not closed */
+ sdp_dbg_data(sk, "arming timer for more polling\n");
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ }
+
+out:
+ bh_unlock_sock(sk);
+}
+
+
+void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
{
struct sock *sk = cq_context;
struct sdp_sock *ssk = sdp_sk(sk);
- schedule_work(&ssk->work);
- SDPSTATS_COUNTER_INC(int_count);
+
+ sdp_warn(sk, "Got tx comp interrupt\n");
+
+ mod_timer(&ssk->tx_ring.timer, jiffies + 1);
}
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
+
+int sdp_poll_rx_cq(struct sdp_sock *ssk)
{
+ struct ib_cq *cq = ssk->rx_cq;
struct ib_wc ibwc[SDP_NUM_WC];
int n, i;
int ret = -EAGAIN;
+ int updated_credits = 0;
+
do {
n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
for (i = 0; i < n; ++i) {
- sdp_handle_wc(ssk, &ibwc[i]);
+ struct ib_wc *wc = &ibwc[i];
+
+ BUG_ON(!(wc->wr_id & SDP_OP_RECV));
+ sdp_handle_recv_comp(ssk, wc);
+
+ if (!updated_credits) {
+ sdp_post_recvs(ssk);
+ sdp_post_sends(ssk, 0);
+ updated_credits = 1;
+ }
+
ret = 0;
}
} while (n == SDP_NUM_WC);
@@ -968,17 +1084,20 @@ int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
return ret;
}
-void sdp_work(struct work_struct *work)
+static inline int sdp_tx_qp_empty(struct sdp_sock *ssk)
{
- struct sdp_sock *ssk = container_of(work, struct sdp_sock, work);
- struct sock *sk = &ssk->isk.sk;
- struct ib_cq *cq;
+ return (ssk->tx_ring.head - ssk->tx_ring.tail) == 0;
+}
- sdp_dbg_data(sk, "%s\n", __func__);
+void sdp_rx_comp_work(struct work_struct *work)
+{
+ struct sdp_sock *ssk = container_of(work, struct sdp_sock, rx_comp_work);
+ struct sock *sk = &ssk->isk.sk;
+ struct ib_cq *rx_cq;
lock_sock(sk);
- cq = ssk->cq;
- if (unlikely(!cq))
+ rx_cq = ssk->rx_cq;
+ if (unlikely(!rx_cq))
goto out;
if (unlikely(!ssk->poll_cq)) {
@@ -988,15 +1107,18 @@ void sdp_work(struct work_struct *work)
goto out;
}
- sdp_poll_cq(ssk, cq);
+ sdp_poll_rx_cq(ssk);
+ sdp_xmit_poll(ssk, 1); /* if has pending tx because run out of tx_credits - xmit it */
release_sock(sk);
sk_mem_reclaim(sk);
lock_sock(sk);
- cq = ssk->cq;
- if (unlikely(!cq))
+ rx_cq = ssk->rx_cq;
+ if (unlikely(!rx_cq))
goto out;
- sdp_arm_cq(sk);
- sdp_poll_cq(ssk, cq);
+
+ sdp_arm_rx_cq(sk);
+ sdp_poll_rx_cq(ssk);
+ sdp_xmit_poll(ssk, 1);
out:
release_sock(sk);
}
diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c
index 1e1ff9d..7c34637 100644
--- a/drivers/infiniband/ulp/sdp/sdp_cma.c
+++ b/drivers/infiniband/ulp/sdp/sdp_cma.c
@@ -73,7 +73,7 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
.qp_type = IB_QPT_RC,
};
struct ib_device *device = id->device;
- struct ib_cq *cq;
+ struct ib_cq *rx_cq, *tx_cq;
struct ib_mr *mr;
struct ib_pd *pd;
int rc;
@@ -118,32 +118,49 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
}
sdp_sk(sk)->mr = mr;
- INIT_WORK(&sdp_sk(sk)->work, sdp_work);
+ INIT_WORK(&sdp_sk(sk)->rx_comp_work, sdp_rx_comp_work);
- cq = ib_create_cq(device, sdp_completion_handler, sdp_cq_event_handler,
- sk, SDP_TX_SIZE + SDP_RX_SIZE, 0);
+ rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_cq_event_handler,
+ sk, SDP_RX_SIZE, 0);
- if (IS_ERR(cq)) {
- rc = PTR_ERR(cq);
- sdp_warn(sk, "Unable to allocate CQ: %d.\n", rc);
- goto err_cq;
+ if (IS_ERR(rx_cq)) {
+ rc = PTR_ERR(rx_cq);
+ sdp_warn(sk, "Unable to allocate RX CQ: %d.\n", rc);
+ goto err_rx_cq;
}
- rc = ib_modify_cq(cq, 10, 200);
+ rc = ib_modify_cq(rx_cq, 10, 200);
if (rc) {
sdp_warn(sk, "Unable to modify RX CQ: %d.\n", rc);
- goto err_qp;
+ goto err_tx_cq;
}
sdp_warn(sk, "Initialized CQ moderation\n");
+ sdp_sk(sk)->rx_cq = rx_cq;
+ sdp_arm_rx_cq(sk);
+ qp_init_attr.recv_cq = rx_cq;
+
+ tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_cq_event_handler,
+ sk, SDP_TX_SIZE, 0);
+
+ if (IS_ERR(tx_cq)) {
+ rc = PTR_ERR(tx_cq);
+ sdp_warn(sk, "Unable to allocate TX CQ: %d.\n", rc);
+ goto err_tx_cq;
+ }
- qp_init_attr.send_cq = qp_init_attr.recv_cq = cq;
+ init_timer(&sdp_sk(sk)->tx_ring.timer);
+ sdp_sk(sk)->tx_ring.timer.function = sdp_poll_tx_cq;
+ sdp_sk(sk)->tx_ring.timer.data = (unsigned long) sdp_sk(sk);
+ sdp_sk(sk)->tx_ring.poll_cnt = 0;
+
+ sdp_sk(sk)->tx_ring.cq = tx_cq;
+ qp_init_attr.send_cq = tx_cq;
rc = rdma_create_qp(id, pd, &qp_init_attr);
if (rc) {
sdp_warn(sk, "Unable to create QP: %d.\n", rc);
goto err_qp;
}
- sdp_sk(sk)->cq = cq;
sdp_sk(sk)->qp = id->qp;
sdp_sk(sk)->ib_device = device;
@@ -153,8 +170,10 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
return 0;
err_qp:
- ib_destroy_cq(cq);
-err_cq:
+ ib_destroy_cq(tx_cq);
+err_tx_cq:
+ ib_destroy_cq(rx_cq);
+err_rx_cq:
ib_dereg_mr(sdp_sk(sk)->mr);
err_mr:
ib_dealloc_pd(pd);
@@ -162,7 +181,6 @@ err_pd:
kfree(sdp_sk(sk)->rx_ring);
sdp_sk(sk)->rx_ring = NULL;
err_rx:
- WARN_ON(sdp_sk(sk)->tx_ring.head != sdp_sk(sk)->tx_ring.tail);
kfree(sdp_sk(sk)->tx_ring.buffer);
sdp_sk(sk)->tx_ring.buffer = NULL;
err_tx:
@@ -258,21 +276,20 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
sdp_sk(sk)->min_bufs = sdp_sk(sk)->tx_ring.credits / 4;
sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
sizeof(struct sdp_bsdh);
- sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
- PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS);
- sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal,
- sdp_sk(sk)->send_frags * PAGE_SIZE);
+ sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
+ PAGE_SIZE, SDP_MAX_SEND_SKB_FRAGS);
+ sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal,
+ sdp_sk(sk)->send_frags * PAGE_SIZE);
- sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send_frags: %d send trigger %d\n",
- __func__,
+ sdp_dbg(sk, "tx credits %d xmit_size_goal %d send_frags: %d credits update trigger %d\n",
sdp_sk(sk)->tx_ring.credits,
sdp_sk(sk)->xmit_size_goal,
- sdp_sk(sk)->send_frags,
+ sdp_sk(sk)->send_frags,
sdp_sk(sk)->min_bufs);
sdp_sk(sk)->poll_cq = 1;
- sdp_arm_cq(sk);
- sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq);
+ sdp_arm_rx_cq(sk);
+ sdp_poll_rx_cq(sdp_sk(sk));
sk->sk_state_change(sk);
sk_wake_async(sk, 0, POLL_OUT);
@@ -332,8 +349,11 @@ static int sdp_disconnected_handler(struct sock *sk)
sdp_dbg(sk, "%s\n", __func__);
- if (ssk->cq)
- sdp_poll_cq(ssk, ssk->cq);
+ if (ssk->rx_cq)
+ sdp_poll_rx_cq(ssk);
+
+ if (ssk->tx_ring.cq)
+ sdp_xmit_poll(ssk, 1);
if (sk->sk_state == TCP_SYN_RECV) {
sdp_connected_handler(sk, NULL);
diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c
index 6b7494f..454abeb 100644
--- a/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -135,7 +135,8 @@ static int sdp_zcopy_thresh = 65536;
module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
-struct workqueue_struct *comp_wq;
+struct workqueue_struct *sdp_wq;
+struct workqueue_struct *rx_comp_wq;
struct list_head sock_list;
spinlock_t sock_list_lock;
@@ -205,12 +206,17 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
static void sdp_destroy_qp(struct sdp_sock *ssk)
{
struct ib_pd *pd = NULL;
- struct ib_cq *cq = NULL;
+ struct ib_cq *rx_cq = NULL;
+ struct ib_cq *tx_cq = NULL;
+
+ del_timer(&ssk->tx_ring.timer);
if (ssk->qp) {
pd = ssk->qp->pd;
- cq = ssk->cq;
- ssk->cq = NULL;
+ rx_cq = ssk->rx_cq;
+ ssk->rx_cq = NULL;
+ tx_cq = ssk->tx_ring.cq;
+ ssk->tx_ring.cq = NULL;
ib_destroy_qp(ssk->qp);
ssk->qp = NULL;
@@ -231,8 +237,12 @@ static void sdp_destroy_qp(struct sdp_sock *ssk)
}
}
- if (cq)
- ib_destroy_cq(cq);
+ if (tx_cq) {
+ ib_destroy_cq(tx_cq);
+ }
+
+ if (rx_cq)
+ ib_destroy_cq(rx_cq);
if (ssk->mr) {
ib_dereg_mr(ssk->mr);
@@ -341,8 +351,11 @@ void sdp_reset_sk(struct sock *sk, int rc)
read_lock(&device_removal_lock);
- if (ssk->cq)
- sdp_poll_cq(ssk, ssk->cq);
+ if (ssk->rx_cq)
+ sdp_poll_rx_cq(ssk);
+
+ if (ssk->tx_ring.cq)
+ sdp_xmit_poll(ssk, 1);
if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
sdp_set_error(sk, rc);
@@ -355,7 +368,7 @@ void sdp_reset_sk(struct sock *sk, int rc)
/* Don't destroy socket before destroy work does its job */
sock_hold(sk, SOCK_REF_RESET);
- queue_work(comp_wq, &ssk->destroy_work);
+ queue_work(sdp_wq, &ssk->destroy_work);
read_unlock(&device_removal_lock);
}
@@ -773,11 +786,10 @@ out:
release_sock(sk);
if (newsk) {
lock_sock(newsk);
- if (newssk->cq) {
- sdp_dbg(newsk, "%s: ib_req_notify_cq\n", __func__);
+ if (newssk->rx_cq) {
newssk->poll_cq = 1;
- sdp_arm_cq(&newssk->isk.sk);
- sdp_poll_cq(newssk, newssk->cq);
+ sdp_arm_rx_cq(&newssk->isk.sk);
+ sdp_poll_rx_cq(newssk);
}
release_sock(newsk);
}
@@ -847,7 +859,7 @@ static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo)
{
sdp_dbg(&ssk->isk.sk, "Starting dreq wait timeout\n");
- queue_delayed_work(comp_wq, &ssk->dreq_wait_work, timeo);
+ queue_delayed_work(sdp_wq, &ssk->dreq_wait_work, timeo);
ssk->dreq_wait_timeout = 1;
}
@@ -1143,9 +1155,9 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
static inline int poll_recv_cq(struct sock *sk)
{
int i;
- if (sdp_sk(sk)->cq) {
+ if (sdp_sk(sk)->rx_cq) {
for (i = 0; i < recv_poll; ++i)
- if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
+ if (!sdp_poll_rx_cq(sdp_sk(sk))) {
++recv_poll_hit;
return 0;
}
@@ -1157,9 +1169,9 @@ static inline int poll_recv_cq(struct sock *sk)
static inline void poll_send_cq(struct sock *sk)
{
int i;
- if (sdp_sk(sk)->cq) {
+ if (sdp_sk(sk)->tx_ring.cq) {
for (i = 0; i < send_poll; ++i)
- if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
+ if (sdp_xmit_poll(sdp_sk(sk), 1)) {
++send_poll_hit;
return;
}
@@ -1421,12 +1433,9 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
/* We can extend the last page
* fragment. */
merge = 1;
- } else if (i == ssk->send_frags ||
- (!i &&
- !(sk->sk_route_caps & NETIF_F_SG))) {
+ } else if (i == ssk->send_frags) {
/* Need to add new fragment and cannot
- * do this because interface is non-SG,
- * or because all the page slots are
+ * do this because all the page slots are
* busy. */
sdp_mark_push(ssk, skb);
return SDP_NEW_SEG;
@@ -1649,7 +1658,6 @@ void sdp_bzcopy_write_space(struct sdp_sock *ssk)
}
}
-
/* Like tcp_sendmsg */
/* TODO: check locking */
static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1812,7 +1820,9 @@ wait_for_sndbuf:
wait_for_memory:
SDPSTATS_COUNTER_INC(send_wait_for_mem);
if (copied)
- sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
+ sdp_push(sk, ssk, flags & ~MSG_MORE, PAGE_SIZE, TCP_NAGLE_PUSH);
+
+ sdp_xmit_poll(ssk, 1);
err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) :
sk_stream_wait_memory(sk, &timeo);
@@ -2386,17 +2396,25 @@ static int __init sdp_init(void)
sdp_proto.sockets_allocated = sockets_allocated;
sdp_proto.orphan_count = orphan_count;
- comp_wq = create_singlethread_workqueue("comp_wq");
- if (!comp_wq)
+ rx_comp_wq = create_singlethread_workqueue("rx_comp_wq");
+ if (!rx_comp_wq)
goto no_mem_rx_wq;
+ sdp_wq = create_singlethread_workqueue("sdp_wq");
+ if (!sdp_wq)
+ goto no_mem_sdp_wq;
+
rc = proto_register(&sdp_proto, 1);
- if (rc)
+ if (rc) {
+ printk(KERN_WARNING "%s: proto_register failed: %d\n", __func__, rc);
goto error_proto_reg;
+ }
rc = sock_register(&sdp_net_proto);
- if (rc)
+ if (rc) {
+ printk(KERN_WARNING "%s: sock_register failed: %d\n", __func__, rc);
goto error_sock_reg;
+ }
sdp_proc_init();
@@ -2409,7 +2427,9 @@ static int __init sdp_init(void)
error_sock_reg:
proto_unregister(&sdp_proto);
error_proto_reg:
- destroy_workqueue(comp_wq);
+ destroy_workqueue(sdp_wq);
+no_mem_sdp_wq:
+ destroy_workqueue(rx_comp_wq);
no_mem_rx_wq:
kfree(orphan_count);
no_mem_orphan_count:
@@ -2427,7 +2447,8 @@ static void __exit sdp_exit(void)
printk(KERN_WARNING "%s: orphan_count %lld\n", __func__,
percpu_counter_read_positive(orphan_count));
- destroy_workqueue(comp_wq);
+ destroy_workqueue(rx_comp_wq);
+ destroy_workqueue(sdp_wq);
flush_scheduled_work();
diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c
index e759864..0971a49 100644
--- a/drivers/infiniband/ulp/sdp/sdp_proc.c
+++ b/drivers/infiniband/ulp/sdp/sdp_proc.c
@@ -260,9 +260,11 @@ static int sdpstats_seq_show(struct seq_file *seq, void *v)
seq_printf(seq, "rx_poll_miss \t\t: %d\n", sdpstats.rx_poll_miss);
seq_printf(seq, "tx_poll_miss \t\t: %d\n", sdpstats.tx_poll_miss);
+ seq_printf(seq, "tx_poll_hit \t\t: %d\n", sdpstats.tx_poll_hit);
seq_printf(seq, "CQ stats:\n");
- seq_printf(seq, "- interrupts\t\t: %d\n", sdpstats.int_count);
+ seq_printf(seq, "- RX interrupts\t\t: %d\n", sdpstats.rx_int_count);
+ seq_printf(seq, "- TX interrupts\t\t: %d\n", sdpstats.tx_int_count);
return 0;
}
--
1.5.3.7
More information about the general
mailing list