[ofa-general] [PATCH] sdp: all previous patches in one patch
Amir Vadai
amirv at mellanox.co.il
Mon Jun 22 01:18:49 PDT 2009
changes:
* fixed coding style
* make interrupt moderation adaptive
* arm nagle timer on not sent packet instead of on sent packet
* fix bad HELLO/HELLO_ACK buffer size matching + let recvmsg do posts on data packets too
* make bzcopy poll timeout in jiffies instead of iterations count
* fix bad handling for not aligned buffers in bzcopy + removed poll at end of send
* TX from 1 context only. RX with minimal context switches
* don't arm nagle timer for every sent packet
* Do not nagle BZCopy packets
* don't do nagle on first packet
* process RX CQ from interrupt
* move rx/tx stuff to dedicated files: sdp_rx and sdp_tx
* /proc/net/sdpprf - performance utilities
* no tx interrupts
* move tx_ring into dedicated structre + many cosmetic fixes
* Interrupts performance fixes
* added /proc/net/sdpstats + packets dump
Signed-off-by: Amir Vadai <amirv at mellanox.co.il>
---
drivers/infiniband/ulp/sdp/Makefile | 2 +-
drivers/infiniband/ulp/sdp/sdp.h | 455 ++++++++++++++---
drivers/infiniband/ulp/sdp/sdp_bcopy.c | 893 +++++--------------------------
drivers/infiniband/ulp/sdp/sdp_cma.c | 191 +++-----
drivers/infiniband/ulp/sdp/sdp_main.c | 897 +++++++++++++++-----------------
drivers/infiniband/ulp/sdp/sdp_proc.c | 496 ++++++++++++++++++
drivers/infiniband/ulp/sdp/sdp_rx.c | 850 ++++++++++++++++++++++++++++++
drivers/infiniband/ulp/sdp/sdp_tx.c | 442 ++++++++++++++++
8 files changed, 2800 insertions(+), 1426 deletions(-)
diff --git a/drivers/infiniband/ulp/sdp/Makefile b/drivers/infiniband/ulp/sdp/Makefile
index c889cce..b14a16a 100644
--- a/drivers/infiniband/ulp/sdp/Makefile
+++ b/drivers/infiniband/ulp/sdp/Makefile
@@ -3,4 +3,4 @@ EXTRA_CFLAGS += -ggdb
obj-$(CONFIG_INFINIBAND_SDP) += ib_sdp.o
-ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o
+ib_sdp-objs := sdp_main.o sdp_cma.o sdp_bcopy.o sdp_proc.o sdp_tx.o sdp_rx.o
diff --git a/drivers/infiniband/ulp/sdp/sdp.h b/drivers/infiniband/ulp/sdp/sdp.h
index f9c2421..53f4b95 100644
--- a/drivers/infiniband/ulp/sdp/sdp.h
+++ b/drivers/infiniband/ulp/sdp/sdp.h
@@ -6,15 +6,91 @@
#include <net/inet_sock.h>
#include <net/tcp.h> /* For urgent data flags */
#include <rdma/ib_verbs.h>
+#include <linux/sched.h>
-#define sdp_printk(level, sk, format, arg...) \
- printk(level "%s:%d sdp_sock(%d:%d): " format, \
- __func__, __LINE__, \
+#define SDPSTATS_ON
+/* #define SDP_PROFILING */
+
+#define _sdp_printk(func, line, level, sk, format, arg...) \
+ printk(level "%s:%d sdp_sock(%5d:%d %d:%d): " format, \
+ func, line, \
+ current->pid, smp_processor_id(), \
(sk) ? inet_sk(sk)->num : -1, \
(sk) ? ntohs(inet_sk(sk)->dport) : -1, ## arg)
+#define sdp_printk(level, sk, format, arg...) \
+ _sdp_printk(__func__, __LINE__, level, sk, format, ## arg)
#define sdp_warn(sk, format, arg...) \
sdp_printk(KERN_WARNING, sk, format , ## arg)
+#define rx_ring_lock(ssk, f) do { \
+ spin_lock_irqsave(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#define rx_ring_unlock(ssk, f) do { \
+ spin_unlock_irqrestore(&ssk->rx_ring.lock, f); \
+} while (0)
+
+#define SDP_MODPARAM_SINT(var, def_val, msg) \
+ static int var = def_val; \
+ module_param_named(var, var, int, 0644); \
+ MODULE_PARM_DESC(var, msg " [" #def_val "]"); \
+
+#define SDP_MODPARAM_INT(var, def_val, msg) \
+ int var = def_val; \
+ module_param_named(var, var, int, 0644); \
+ MODULE_PARM_DESC(var, msg " [" #def_val "]"); \
+
+#ifdef SDP_PROFILING
+struct sk_buff;
+struct sdpprf_log {
+ int idx;
+ int pid;
+ int cpu;
+ int sk_num;
+ int sk_dport;
+ struct sk_buff *skb;
+ char msg[256];
+
+ unsigned long long time;
+
+ const char *func;
+ int line;
+};
+
+#define SDPPRF_LOG_SIZE 0x20000 /* must be a power of 2 */
+
+extern struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
+extern int sdpprf_log_count;
+
+static inline unsigned long long current_nsec(void)
+{
+ struct timespec tv;
+ getnstimeofday(&tv);
+ return tv.tv_sec * NSEC_PER_SEC + tv.tv_nsec;
+}
+#define sdp_prf1(sk, s, format, arg...) ({ \
+ struct sdpprf_log *l = \
+ &sdpprf_log[sdpprf_log_count++ & (SDPPRF_LOG_SIZE - 1)]; \
+ l->idx = sdpprf_log_count - 1; \
+ l->pid = current->pid; \
+ l->sk_num = (sk) ? inet_sk(sk)->num : -1; \
+ l->sk_dport = (sk) ? ntohs(inet_sk(sk)->dport) : -1; \
+ l->cpu = smp_processor_id(); \
+ l->skb = s; \
+ snprintf(l->msg, sizeof(l->msg) - 1, format, ## arg); \
+ l->time = current_nsec(); \
+ l->func = __func__; \
+ l->line = __LINE__; \
+ 1; \
+})
+#define sdp_prf(sk, s, format, arg...)
+/* #define sdp_prf(sk, s, format, arg...) sdp_prf1(sk, s, format, ## arg) */
+
+#else
+#define sdp_prf1(sk, s, format, arg...)
+#define sdp_prf(sk, s, format, arg...)
+#endif
+
#ifdef CONFIG_INFINIBAND_SDP_DEBUG
extern int sdp_debug_level;
@@ -37,9 +113,9 @@ extern int sdp_debug_level;
})
#define sk_common_release(sk) do { \
- sdp_dbg(sk, "%s:%d - sock_put(" SOCK_REF_BORN ") - refcount = %d " \
- "from withing sk_common_release\n",\
- __FUNCTION__, __LINE__, atomic_read(&(sk)->sk_refcnt)); \
+ sdp_dbg(sk, "%s:%d - sock_put(" SOCK_REF_BORN \
+ ") - refcount = %d from withing sk_common_release\n",\
+ __func__, __LINE__, atomic_read(&(sk)->sk_refcnt));\
sk_common_release(sk); \
} while (0)
@@ -50,15 +126,80 @@ extern int sdp_debug_level;
#endif /* CONFIG_INFINIBAND_SDP_DEBUG */
#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+
extern int sdp_data_debug_level;
-#define sdp_dbg_data(sk, format, arg...) \
- do { \
- if (sdp_data_debug_level > 0) \
- sdp_printk(KERN_DEBUG, sk, format , ## arg); \
+#define sdp_dbg_data(sk, format, arg...) \
+ do { \
+ if (sdp_data_debug_level & 0x2) \
+ sdp_printk(KERN_DEBUG, sk, format , ## arg); \
+ } while (0)
+#define SDP_DUMP_PACKET(sk, str, skb, h) \
+ do { \
+ if (sdp_data_debug_level & 0x1) \
+ dump_packet(sk, str, skb, h); \
} while (0)
#else
-#define sdp_dbg_data(priv, format, arg...) \
- do { (void) (priv); } while (0)
+#define sdp_dbg_data(priv, format, arg...)
+#define SDP_DUMP_PACKET(sk, str, skb, h)
+#endif
+
+#ifdef SDPSTATS_ON
+
+struct sdpstats {
+ u32 post_send[256];
+ u32 sendmsg_bcopy_segment;
+ u32 sendmsg_bzcopy_segment;
+ u32 sendmsg;
+ u32 post_send_credits;
+ u32 sendmsg_nagle_skip;
+ u32 sendmsg_seglen[25];
+ u32 send_size[25];
+ u32 post_recv;
+ u32 rx_int_count;
+ u32 tx_int_count;
+ u32 bzcopy_poll_miss;
+ u32 send_wait_for_mem;
+ u32 send_miss_no_credits;
+ u32 rx_poll_miss;
+ u32 tx_poll_miss;
+ u32 tx_poll_hit;
+ u32 tx_poll_busy;
+ u32 memcpy_count;
+ u32 credits_before_update[64];
+ u32 send_interval[25];
+
+ u32 bz_clean_sum;
+ u32 bz_setup_sum;
+ u32 tx_copy_sum;
+ u32 sendmsg_sum;
+};
+extern struct sdpstats sdpstats;
+
+static inline void sdpstats_hist(u32 *h, u32 val, u32 maxidx, int is_log)
+{
+ int idx = is_log ? ilog2(val) : val;
+ if (idx > maxidx)
+ idx = maxidx;
+
+ h[idx]++;
+}
+
+#define SDPSTATS_COUNTER_INC(stat) do { sdpstats.stat++; } while (0)
+#define SDPSTATS_COUNTER_ADD(stat, val) do { sdpstats.stat += val; } while (0)
+#define SDPSTATS_COUNTER_MID_INC(stat, mid) do { sdpstats.stat[mid]++; } \
+ while (0)
+#define SDPSTATS_HIST(stat, size) \
+ sdpstats_hist(sdpstats.stat, size, ARRAY_SIZE(sdpstats.stat) - 1, 1)
+
+#define SDPSTATS_HIST_LINEAR(stat, size) \
+ sdpstats_hist(sdpstats.stat, size, ARRAY_SIZE(sdpstats.stat) - 1, 0)
+
+#else
+#define SDPSTATS_COUNTER_INC(stat)
+#define SDPSTATS_COUNTER_ADD(stat, val)
+#define SDPSTATS_COUNTER_MID_INC(stat, mid)
+#define SDPSTATS_HIST_LINEAR(stat, size)
+#define SDPSTATS_HIST(stat, size)
#endif
#define SOCK_REF_RESET "RESET"
@@ -72,6 +213,12 @@ extern int sdp_data_debug_level;
#define sock_put(sk, msg) sock_ref(sk, msg, sock_put)
#define __sock_put(sk, msg) sock_ref(sk, msg, __sock_put)
+/* Interval between sucessive polls in the Tx routine when polling is used
+ instead of interrupts (in per-core Tx rings) - should be power of 2 */
+#define SDP_TX_POLL_MODER 16
+#define SDP_TX_POLL_TIMEOUT (HZ / 4)
+#define SDP_NAGLE_TIMEOUT (HZ / 10)
+
#define SDP_RESOLVE_TIMEOUT 1000
#define SDP_ROUTE_TIMEOUT 1000
#define SDP_RETRY_COUNT 5
@@ -81,7 +228,8 @@ extern int sdp_data_debug_level;
#define SDP_TX_SIZE 0x40
#define SDP_RX_SIZE 0x40
-#define SDP_MAX_SEND_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
+#define SDP_MAX_RECV_SKB_FRAGS (PAGE_SIZE > 0x8000 ? 1 : 0x8000 / PAGE_SIZE)
+#define SDP_MAX_SEND_SKB_FRAGS (SDP_MAX_RECV_SKB_FRAGS + 1)
#define SDP_HEAD_SIZE (PAGE_SIZE / 2 + sizeof(struct sdp_bsdh))
#define SDP_NUM_WC 4
#define SDP_MAX_PAYLOAD ((1 << 16) - SDP_HEAD_SIZE)
@@ -92,6 +240,21 @@ extern int sdp_data_debug_level;
#define SDP_OP_RECV 0x800000000LL
#define SDP_OP_SEND 0x400000000LL
+/* how long (in jiffies) to block sender till tx completion*/
+#define SDP_BZCOPY_POLL_TIMEOUT (HZ / 10)
+
+#define SDP_AUTO_CONF 0xffff
+#define AUTO_MOD_DELAY (HZ / 4)
+
+#define BZCOPY_STATE(skb) (*(struct bzcopy_state **)(skb->cb))
+#ifndef MIN
+#define MIN(a, b) (a < b ? a : b)
+#endif
+
+extern struct workqueue_struct *sdp_wq;
+extern struct list_head sock_list;
+extern spinlock_t sock_list_lock;
+
enum sdp_mid {
SDP_MID_HELLO = 0x0,
SDP_MID_HELLO_ACK = 0x1,
@@ -107,7 +270,7 @@ enum sdp_flags {
};
enum {
- SDP_MIN_BUFS = 2
+ SDP_MIN_TX_CREDITS = 2
};
enum {
@@ -129,33 +292,132 @@ struct sdp_bsdh {
__u32 mseq_ack;
};
+union cma_ip_addr {
+ struct in6_addr ip6;
+ struct {
+ __u32 pad[3];
+ __u32 addr;
+ } ip4;
+};
+
+/* TODO: too much? Can I avoid having the src/dst and port here? */
+struct sdp_hh {
+ struct sdp_bsdh bsdh;
+ u8 majv_minv;
+ u8 ipv_cap;
+ u8 rsvd1;
+ u8 max_adverts;
+ __u32 desremrcvsz;
+ __u32 localrcvsz;
+ __u16 port;
+ __u16 rsvd2;
+ union cma_ip_addr src_addr;
+ union cma_ip_addr dst_addr;
+};
+
+struct sdp_hah {
+ struct sdp_bsdh bsdh;
+ u8 majv_minv;
+ u8 ipv_cap;
+ u8 rsvd1;
+ u8 ext_max_adverts;
+ __u32 actrcvsz;
+};
+
struct sdp_buf {
struct sk_buff *skb;
u64 mapping[SDP_MAX_SEND_SKB_FRAGS + 1];
};
+#define ring_head(ring) (atomic_read(&(ring).head))
+#define ring_tail(ring) (atomic_read(&(ring).tail))
+#define ring_posted(ring) (ring_head(ring) - ring_tail(ring))
+
+struct sdp_tx_ring {
+ struct sdp_buf *buffer;
+ atomic_t head;
+ atomic_t tail;
+ struct ib_cq *cq;
+
+ int una_seq;
+ atomic_t credits;
+#define tx_credits(ssk) (atomic_read(&ssk->tx_ring.credits))
+
+ struct timer_list timer;
+ u16 poll_cnt;
+};
+
+struct sdp_rx_ring {
+ struct sdp_buf *buffer;
+ atomic_t head;
+ atomic_t tail;
+ struct ib_cq *cq;
+
+ spinlock_t lock;
+};
+
+static inline int sdp_tx_ring_slots_left(struct sdp_tx_ring *tx_ring)
+{
+ return SDP_TX_SIZE - ring_posted(*tx_ring);
+}
+
+struct sdp_chrecvbuf {
+ u32 size;
+};
+
+#define posts_handler(ssk) atomic_read(&ssk->somebody_is_doing_posts)
+#define posts_handler_get(ssk) atomic_inc(&ssk->somebody_is_doing_posts)
+#define posts_handler_put(ssk) do {\
+ atomic_dec(&ssk->somebody_is_doing_posts); \
+ sdp_do_posts(ssk); \
+} while (0)
+
+struct sdp_moderation {
+ unsigned long last_moder_packets;
+ unsigned long last_moder_tx_packets;
+ unsigned long last_moder_bytes;
+ unsigned long last_moder_jiffies;
+ int last_moder_time;
+ u16 rx_usecs;
+ u16 rx_frames;
+ u16 tx_usecs;
+ u32 pkt_rate_low;
+ u16 rx_usecs_low;
+ u32 pkt_rate_high;
+ u16 rx_usecs_high;
+ u16 sample_interval;
+ u16 adaptive_rx_coal;
+ u32 msg_enable;
+
+ int moder_cnt;
+ int moder_time;
+};
+
struct sdp_sock {
/* sk has to be the first member of inet_sock */
struct inet_sock isk;
struct list_head sock_list;
struct list_head accept_queue;
struct list_head backlog_queue;
+ struct sk_buff_head rx_ctl_q;
struct sock *parent;
- struct work_struct work;
+ struct work_struct rx_comp_work;
wait_queue_head_t wq;
struct delayed_work dreq_wait_work;
struct work_struct destroy_work;
+ atomic_t somebody_is_doing_posts;
+
/* Like tcp_sock */
u16 urg_data;
u32 urg_seq;
u32 copied_seq;
- u32 rcv_nxt;
+#define rcv_nxt(ssk) atomic_read(&(ssk->rcv_nxt))
+ atomic_t rcv_nxt;
int write_seq;
- int snd_una;
int pushed_seq;
int xmit_size_goal;
int nonagle;
@@ -174,32 +436,29 @@ struct sdp_sock {
int sdp_disconnect;
int destruct_in_process;
- struct sdp_buf *rx_ring;
- struct sdp_buf *tx_ring;
+ struct sdp_rx_ring rx_ring;
+ struct sdp_tx_ring tx_ring;
- /* rdma specific */
- struct ib_qp *qp;
- struct ib_cq *cq;
- struct ib_mr *mr;
/* Data below will be reset on error */
struct rdma_cm_id *id;
struct ib_device *ib_device;
/* SDP specific */
- struct ib_recv_wr rx_wr;
- unsigned rx_head;
- unsigned rx_tail;
- unsigned mseq_ack;
- unsigned bufs;
+ atomic_t mseq_ack;
+#define mseq_ack(ssk) (atomic_read(&ssk->mseq_ack))
unsigned max_bufs; /* Initial buffers offered by other side */
unsigned min_bufs; /* Low water mark to wake senders */
- int remote_credits;
+ unsigned long nagle_last_unacked; /* mseq of lastest unacked packet */
+ struct timer_list nagle_timer; /* timeout waiting for ack */
+
+ atomic_t remote_credits;
+#define remote_credits(ssk) (atomic_read(&ssk->remote_credits))
int poll_cq;
- unsigned tx_head;
- unsigned tx_tail;
- struct ib_send_wr tx_wr;
+ /* rdma specific */
+ struct ib_qp *qp;
+ struct ib_mr *mr;
/* SDP slow start */
int rcvbuf_scale; /* local recv buf scale for each socket */
@@ -213,11 +472,14 @@ struct sdp_sock {
int recv_frags; /* max skb frags in recv packets */
int send_frags; /* max skb frags in send packets */
+ unsigned long tx_packets;
+ unsigned long rx_packets;
+ unsigned long tx_bytes;
+ unsigned long rx_bytes;
+ struct sdp_moderation auto_mod;
+
/* BZCOPY data */
int zcopy_thresh;
-
- struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
- struct ib_wc ibwc[SDP_NUM_WC];
};
/* Context used for synchronous zero copy bcopy (BZCOY) */
@@ -236,26 +498,11 @@ struct bzcopy_state {
extern int rcvbuf_initial_size;
extern struct proto sdp_proto;
-extern struct workqueue_struct *sdp_workqueue;
+extern struct workqueue_struct *rx_comp_wq;
extern atomic_t sdp_current_mem_usage;
extern spinlock_t sdp_large_sockets_lock;
-/* just like TCP fs */
-struct sdp_seq_afinfo {
- struct module *owner;
- char *name;
- sa_family_t family;
- int (*seq_show) (struct seq_file *m, void *v);
- struct file_operations *seq_fops;
-};
-
-struct sdp_iter_state {
- sa_family_t family;
- int num;
- struct seq_operations seq_ops;
-};
-
static inline struct sdp_sock *sdp_sk(const struct sock *sk)
{
return (struct sdp_sock *)sk;
@@ -290,9 +537,10 @@ static inline int _sdp_exch_state(const char *func, int line, struct sock *sk,
int old;
spin_lock_irqsave(&sdp_sk(sk)->lock, flags);
-
+
sdp_dbg(sk, "%s:%d - set state: %s -> %s 0x%x\n", func, line,
- sdp_state_str(sk->sk_state), sdp_state_str(state), from_states);
+ sdp_state_str(sk->sk_state),
+ sdp_state_str(state), from_states);
if ((1 << sk->sk_state) & ~from_states) {
sdp_warn(sk, "trying to exchange state from unexpected state "
@@ -327,35 +575,93 @@ static inline void sdp_set_error(struct sock *sk, int err)
sk->sk_error_report(sk);
}
-extern struct workqueue_struct *sdp_workqueue;
+#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+void _dump_packet(const char *func, int line, struct sock *sk, char *str,
+ struct sk_buff *skb, const struct sdp_bsdh *h);
+#define dump_packet(sk, str, skb, h) \
+ _dump_packet(__func__, __LINE__, sk, str, skb, h)
+#endif
-int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
-void sdp_reset(struct sock *sk);
+/* sdp_main.c */
+void sdp_set_default_moderation(struct sdp_sock *ssk);
+int sdp_init_sock(struct sock *sk);
+void sdp_start_keepalive_timer(struct sock *sk);
+void sdp_remove_sock(struct sdp_sock *ssk);
+void sdp_add_sock(struct sdp_sock *ssk);
+void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb);
+void sdp_dreq_wait_timeout_work(struct work_struct *work);
+void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
+void sdp_destroy_work(struct work_struct *work);
void sdp_reset_sk(struct sock *sk, int rc);
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context);
-void sdp_work(struct work_struct *work);
+void sdp_reset(struct sock *sk);
+
+/* sdp_proc.c */
+int __init sdp_proc_init(void);
+void sdp_proc_unregister(void);
+
+/* sdp_cma.c */
+int sdp_cma_handler(struct rdma_cm_id *, struct rdma_cm_event *);
+
+/* sdp_bcopy.c */
int sdp_post_credits(struct sdp_sock *ssk);
+
+/* sdp_tx.c */
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_tx_ring_destroy(struct sdp_sock *ssk);
+int sdp_xmit_poll(struct sdp_sock *ssk, int force);
void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid);
-void sdp_post_recvs(struct sdp_sock *ssk);
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq);
void sdp_post_sends(struct sdp_sock *ssk, int nonagle);
-void sdp_destroy_work(struct work_struct *work);
-void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk);
-void sdp_dreq_wait_timeout_work(struct work_struct *work);
-struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id);
-struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq);
-void sdp_urg(struct sdp_sock *ssk, struct sk_buff *skb);
-void sdp_add_sock(struct sdp_sock *ssk);
-void sdp_remove_sock(struct sdp_sock *ssk);
-void sdp_remove_large_sock(struct sdp_sock *ssk);
+void sdp_nagle_timeout(unsigned long data);
+void sdp_post_keepalive(struct sdp_sock *ssk);
+
+/* sdp_rx.c */
+void sdp_rx_ring_init(struct sdp_sock *ssk);
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device);
+void sdp_rx_ring_destroy(struct sdp_sock *ssk);
int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size);
int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size);
-void sdp_post_keepalive(struct sdp_sock *ssk);
-void sdp_start_keepalive_timer(struct sock *sk);
-void sdp_bzcopy_write_space(struct sdp_sock *ssk);
-int sdp_init_sock(struct sock *sk);
+void sdp_do_posts(struct sdp_sock *ssk);
+void sdp_rx_comp_full(struct sdp_sock *ssk);
+void sdp_remove_large_sock(struct sdp_sock *ssk);
-static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gfp_t gfp)
+static inline void sdp_arm_rx_cq(struct sock *sk)
+{
+ sdp_prf(sk, NULL, "Arming RX cq");
+ sdp_dbg_data(sk, "Arming RX cq\n");
+
+ ib_req_notify_cq(sdp_sk(sk)->rx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
+static inline void sdp_arm_tx_cq(struct sock *sk)
+{
+ sdp_prf(sk, NULL, "Arming TX cq");
+ sdp_dbg_data(sk, "Arming TX cq. credits: %d, posted: %d\n",
+ tx_credits(sdp_sk(sk)), ring_posted(sdp_sk(sk)->tx_ring));
+
+ ib_req_notify_cq(sdp_sk(sk)->tx_ring.cq, IB_CQ_NEXT_COMP);
+}
+
+/* utilities */
+static inline char *mid2str(int mid)
+{
+#define ENUM2STR(e) [e] = #e
+ static char *mid2str[] = {
+ ENUM2STR(SDP_MID_HELLO),
+ ENUM2STR(SDP_MID_HELLO_ACK),
+ ENUM2STR(SDP_MID_DISCONN),
+ ENUM2STR(SDP_MID_CHRCVBUF),
+ ENUM2STR(SDP_MID_CHRCVBUF_ACK),
+ ENUM2STR(SDP_MID_DATA),
+ };
+
+ if (mid >= ARRAY_SIZE(mid2str))
+ return NULL;
+
+ return mid2str[mid];
+}
+
+static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size,
+ gfp_t gfp)
{
struct sk_buff *skb;
@@ -380,5 +686,4 @@ static inline struct sk_buff *sdp_stream_alloc_skb(struct sock *sk, int size, gf
return NULL;
}
-
#endif
diff --git a/drivers/infiniband/ulp/sdp/sdp_bcopy.c b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
index 475d7c3..e2c4ffd 100644
--- a/drivers/infiniband/ulp/sdp/sdp_bcopy.c
+++ b/drivers/infiniband/ulp/sdp/sdp_bcopy.c
@@ -39,459 +39,139 @@
#define SDP_RESIZE_WAIT 16
-struct sdp_chrecvbuf {
- u32 size;
-};
-
-static int rcvbuf_scale = 0x10;
-
-int rcvbuf_initial_size = SDP_HEAD_SIZE;
-module_param_named(rcvbuf_initial_size, rcvbuf_initial_size, int, 0644);
-MODULE_PARM_DESC(rcvbuf_initial_size, "Receive buffer initial size in bytes.");
-
-module_param_named(rcvbuf_scale, rcvbuf_scale, int, 0644);
-MODULE_PARM_DESC(rcvbuf_scale, "Receive buffer size scale factor.");
-
-static int top_mem_usage = 0;
-module_param_named(top_mem_usage, top_mem_usage, int, 0644);
-MODULE_PARM_DESC(top_mem_usage, "Top system wide sdp memory usage for recv (in MB).");
-
-#ifdef CONFIG_PPC
-static int max_large_sockets = 100;
-#else
-static int max_large_sockets = 1000;
-#endif
-module_param_named(max_large_sockets, max_large_sockets, int, 0644);
-MODULE_PARM_DESC(max_large_sockets, "Max number of large sockets (32k buffers).");
-
-#define sdp_cnt(var) do { (var)++; } while (0)
-static unsigned sdp_keepalive_probes_sent = 0;
-
-module_param_named(sdp_keepalive_probes_sent, sdp_keepalive_probes_sent, uint, 0644);
-MODULE_PARM_DESC(sdp_keepalive_probes_sent, "Total number of keepalive probes sent.");
-
-static int curr_large_sockets = 0;
-atomic_t sdp_current_mem_usage;
-spinlock_t sdp_large_sockets_lock;
-
-static int sdp_get_large_socket(struct sdp_sock *ssk)
-{
- int count, ret;
-
- if (ssk->recv_request)
- return 1;
-
- spin_lock_irq(&sdp_large_sockets_lock);
- count = curr_large_sockets;
- ret = curr_large_sockets < max_large_sockets;
- if (ret)
- curr_large_sockets++;
- spin_unlock_irq(&sdp_large_sockets_lock);
-
- return ret;
-}
-
-void sdp_remove_large_sock(struct sdp_sock *ssk)
+#ifdef CONFIG_INFINIBAND_SDP_DEBUG_DATA
+void _dump_packet(const char *func, int line, struct sock *sk, char *str,
+ struct sk_buff *skb, const struct sdp_bsdh *h)
{
- if (ssk->recv_frags) {
- spin_lock_irq(&sdp_large_sockets_lock);
- curr_large_sockets--;
- spin_unlock_irq(&sdp_large_sockets_lock);
- }
-}
-
-/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
-static void sdp_fin(struct sock *sk)
-{
- sdp_dbg(sk, "%s\n", __func__);
+ struct sdp_hh *hh;
+ struct sdp_hah *hah;
+ struct sdp_chrecvbuf *req_size;
+ int len = 0;
+ char buf[256];
+ len += snprintf(buf, 255-len, "%s skb: %p mid: %2x:%-20s flags: 0x%x "
+ "bufs: %d len: %d mseq: %d mseq_ack: %d | ",
+ str, skb, h->mid, mid2str(h->mid), h->flags,
+ ntohs(h->bufs), ntohl(h->len), ntohl(h->mseq),
+ ntohl(h->mseq_ack));
- sk->sk_shutdown |= RCV_SHUTDOWN;
- sock_set_flag(sk, SOCK_DONE);
-
- switch (sk->sk_state) {
- case TCP_SYN_RECV:
- case TCP_ESTABLISHED:
- sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED,
- TCP_CLOSE_WAIT);
+ switch (h->mid) {
+ case SDP_MID_HELLO:
+ hh = (struct sdp_hh *)h;
+ len += snprintf(buf + len, 255-len,
+ "max_adverts: %d majv_minv: %d "
+ "localrcvsz: %d desremrcvsz: %d |",
+ hh->max_adverts, hh->majv_minv,
+ ntohl(hh->localrcvsz),
+ ntohl(hh->desremrcvsz));
break;
-
- case TCP_FIN_WAIT1:
- /* Received a reply FIN - start Infiniband tear down */
- sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n",
- __func__);
-
- sdp_cancel_dreq_wait_timeout(sdp_sk(sk));
-
- sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
-
- if (sdp_sk(sk)->id) {
- rdma_disconnect(sdp_sk(sk)->id);
- } else {
- sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
- return;
- }
+ case SDP_MID_HELLO_ACK:
+ hah = (struct sdp_hah *)h;
+ len += snprintf(buf + len, 255-len, "actrcvz: %d |",
+ ntohl(hah->actrcvsz));
break;
- case TCP_TIME_WAIT:
- /* This is a mutual close situation and we've got the DREQ from
- the peer before the SDP_MID_DISCONNECT */
+ case SDP_MID_CHRCVBUF:
+ case SDP_MID_CHRCVBUF_ACK:
+ req_size = (struct sdp_chrecvbuf *)(h+1);
+ len += snprintf(buf + len, 255-len, "req_size: %d |",
+ ntohl(req_size->size));
break;
- case TCP_CLOSE:
- /* FIN arrived after IB teardown started - do nothing */
- sdp_dbg(sk, "%s: fin in state %s\n",
- __func__, sdp_state_str(sk->sk_state));
- return;
+ case SDP_MID_DATA:
+ len += snprintf(buf + len, 255-len, "data_len: %ld |",
+ ntohl(h->len) - sizeof(struct sdp_bsdh));
default:
- sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n",
- __func__, sk->sk_state);
break;
}
-
-
- sk_mem_reclaim(sk);
-
- if (!sock_flag(sk, SOCK_DEAD)) {
- sk->sk_state_change(sk);
-
- /* Do not send POLL_HUP for half duplex close. */
- if (sk->sk_shutdown == SHUTDOWN_MASK ||
- sk->sk_state == TCP_CLOSE)
- sk_wake_async(sk, 1, POLL_HUP);
- else
- sk_wake_async(sk, 1, POLL_IN);
- }
-}
-
-void sdp_post_keepalive(struct sdp_sock *ssk)
-{
- int rc;
- struct ib_send_wr wr, *bad_wr;
-
- sdp_dbg(&ssk->isk.sk, "%s\n", __func__);
-
- memset(&wr, 0, sizeof(wr));
-
- wr.next = NULL;
- wr.wr_id = 0;
- wr.sg_list = NULL;
- wr.num_sge = 0;
- wr.opcode = IB_WR_RDMA_WRITE;
-
- rc = ib_post_send(ssk->qp, &wr, &bad_wr);
- if (rc) {
- sdp_dbg(&ssk->isk.sk, "ib_post_keepalive failed with status %d.\n", rc);
- sdp_set_error(&ssk->isk.sk, -ECONNRESET);
- wake_up(&ssk->wq);
- }
-
- sdp_cnt(sdp_keepalive_probes_sent);
+ buf[len] = 0;
+ _sdp_printk(func, line, KERN_WARNING, sk, "%s: %s\n", str, buf);
}
+#endif
-void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+static inline void update_send_head(struct sock *sk, struct sk_buff *skb)
{
- struct sdp_buf *tx_req;
- struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
- unsigned mseq = ssk->tx_head;
- int i, rc, frags;
- u64 addr;
- struct ib_device *dev;
- struct ib_sge *sge;
- struct ib_send_wr *bad_wr;
-
- h->mid = mid;
- if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
- h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
- else
- h->flags = 0;
-
- h->bufs = htons(ssk->rx_head - ssk->rx_tail);
- h->len = htonl(skb->len);
- h->mseq = htonl(mseq);
- h->mseq_ack = htonl(ssk->mseq_ack);
-
- tx_req = &ssk->tx_ring[mseq & (SDP_TX_SIZE - 1)];
- tx_req->skb = skb;
- dev = ssk->ib_device;
- sge = ssk->ibsge;
- addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
- DMA_TO_DEVICE);
- tx_req->mapping[0] = addr;
-
- /* TODO: proper error handling */
- BUG_ON(ib_dma_mapping_error(dev, addr));
-
- sge->addr = addr;
- sge->length = skb->len - skb->data_len;
- sge->lkey = ssk->mr->lkey;
- frags = skb_shinfo(skb)->nr_frags;
- for (i = 0; i < frags; ++i) {
- ++sge;
- addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
- skb_shinfo(skb)->frags[i].page_offset,
- skb_shinfo(skb)->frags[i].size,
- DMA_TO_DEVICE);
- BUG_ON(ib_dma_mapping_error(dev, addr));
- tx_req->mapping[i + 1] = addr;
- sge->addr = addr;
- sge->length = skb_shinfo(skb)->frags[i].size;
- sge->lkey = ssk->mr->lkey;
- }
-
- ssk->tx_wr.next = NULL;
- ssk->tx_wr.wr_id = ssk->tx_head | SDP_OP_SEND;
- ssk->tx_wr.sg_list = ssk->ibsge;
- ssk->tx_wr.num_sge = frags + 1;
- ssk->tx_wr.opcode = IB_WR_SEND;
- ssk->tx_wr.send_flags = IB_SEND_SIGNALED;
- if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
- ssk->tx_wr.send_flags |= IB_SEND_SOLICITED;
- rc = ib_post_send(ssk->qp, &ssk->tx_wr, &bad_wr);
- ++ssk->tx_head;
- --ssk->bufs;
- ssk->remote_credits = ssk->rx_head - ssk->rx_tail;
- if (unlikely(rc)) {
- sdp_dbg(&ssk->isk.sk, "ib_post_send failed with status %d.\n", rc);
- sdp_set_error(&ssk->isk.sk, -ECONNRESET);
- wake_up(&ssk->wq);
+ struct page *page;
+ sk->sk_send_head = skb->next;
+ if (sk->sk_send_head == (struct sk_buff *)&sk->sk_write_queue) {
+ sk->sk_send_head = NULL;
+ page = sk->sk_sndmsg_page;
+ if (page) {
+ put_page(page);
+ sk->sk_sndmsg_page = NULL;
+ }
}
}
-struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
+static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
{
- struct ib_device *dev;
- struct sdp_buf *tx_req;
- struct sk_buff *skb;
- struct bzcopy_state *bz;
- int i, frags;
-
- if (unlikely(mseq != ssk->tx_tail)) {
- printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
- mseq, ssk->tx_tail);
- return NULL;
- }
+ int send_now =
+ BZCOPY_STATE(skb) ||
+ (ssk->nonagle & TCP_NAGLE_OFF) ||
+ !ssk->nagle_last_unacked ||
+ skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
+ skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
+ (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
- dev = ssk->ib_device;
- tx_req = &ssk->tx_ring[mseq & (SDP_TX_SIZE - 1)];
- skb = tx_req->skb;
- ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len,
- DMA_TO_DEVICE);
- frags = skb_shinfo(skb)->nr_frags;
- for (i = 0; i < frags; ++i) {
- ib_dma_unmap_page(dev, tx_req->mapping[i + 1],
- skb_shinfo(skb)->frags[i].size,
- DMA_TO_DEVICE);
+ if (send_now) {
+ unsigned long mseq = ring_head(ssk->tx_ring);
+ ssk->nagle_last_unacked = mseq;
+ } else {
+ if (!timer_pending(&ssk->nagle_timer)) {
+ mod_timer(&ssk->nagle_timer,
+ jiffies + SDP_NAGLE_TIMEOUT);
+ sdp_dbg_data(&ssk->isk.sk, "Starting nagle timer\n");
+ }
}
+ sdp_dbg_data(&ssk->isk.sk, "send_now = %d last_unacked = %ld\n",
+ send_now, ssk->nagle_last_unacked);
- ssk->snd_una += TCP_SKB_CB(skb)->end_seq;
- ++ssk->tx_tail;
-
- /* TODO: AIO and real zcopy cdoe; add their context support here */
- bz = *(struct bzcopy_state **)skb->cb;
- if (bz)
- bz->busy--;
-
- return skb;
+ return send_now;
}
-
-static void sdp_post_recv(struct sdp_sock *ssk)
+void sdp_nagle_timeout(unsigned long data)
{
- struct sdp_buf *rx_req;
- int i, rc, frags;
- u64 addr;
- struct ib_device *dev;
- struct ib_sge *sge;
- struct ib_recv_wr *bad_wr;
- struct sk_buff *skb;
- struct page *page;
- skb_frag_t *frag;
- struct sdp_bsdh *h;
- int id = ssk->rx_head;
- gfp_t gfp_page;
-
- /* Now, allocate and repost recv */
- /* TODO: allocate from cache */
-
- if (unlikely(ssk->isk.sk.sk_allocation)) {
- skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
- ssk->isk.sk.sk_allocation);
- gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM;
- } else {
- skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
- GFP_KERNEL);
- gfp_page = GFP_HIGHUSER;
- }
-
- /* FIXME */
- BUG_ON(!skb);
- h = (struct sdp_bsdh *)skb->head;
- for (i = 0; i < ssk->recv_frags; ++i) {
- page = alloc_pages(gfp_page, 0);
- BUG_ON(!page);
- frag = &skb_shinfo(skb)->frags[i];
- frag->page = page;
- frag->page_offset = 0;
-
- /* Bugzilla 1311 */
- if ( sizeof(frag->size) < 4 )
- frag->size = min(PAGE_SIZE, SDP_MAX_PAYLOAD);
- else
- frag->size = PAGE_SIZE;
-
- ++skb_shinfo(skb)->nr_frags;
- skb->len += frag->size;
- skb->data_len += frag->size;
- skb->truesize += frag->size;
- }
-
- rx_req = ssk->rx_ring + (id & (SDP_RX_SIZE - 1));
- rx_req->skb = skb;
- dev = ssk->ib_device;
- sge = ssk->ibsge;
- addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
- BUG_ON(ib_dma_mapping_error(dev, addr));
+ struct sdp_sock *ssk = (struct sdp_sock *)data;
+ struct sock *sk = &ssk->isk.sk;
- rx_req->mapping[0] = addr;
+ sdp_dbg_data(sk, "last_unacked = %ld\n", ssk->nagle_last_unacked);
- /* TODO: proper error handling */
- sge->addr = (u64)addr;
- sge->length = SDP_HEAD_SIZE;
- sge->lkey = ssk->mr->lkey;
- frags = skb_shinfo(skb)->nr_frags;
- for (i = 0; i < frags; ++i) {
- ++sge;
- addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
- skb_shinfo(skb)->frags[i].page_offset,
- skb_shinfo(skb)->frags[i].size,
- DMA_FROM_DEVICE);
- BUG_ON(ib_dma_mapping_error(dev, addr));
- rx_req->mapping[i + 1] = addr;
- sge->addr = addr;
- sge->length = skb_shinfo(skb)->frags[i].size;
- sge->lkey = ssk->mr->lkey;
- }
+ if (!ssk->nagle_last_unacked)
+ goto out2;
- ssk->rx_wr.next = NULL;
- ssk->rx_wr.wr_id = id | SDP_OP_RECV;
- ssk->rx_wr.sg_list = ssk->ibsge;
- ssk->rx_wr.num_sge = frags + 1;
- rc = ib_post_recv(ssk->qp, &ssk->rx_wr, &bad_wr);
- ++ssk->rx_head;
- if (unlikely(rc)) {
- sdp_dbg(&ssk->isk.sk, "ib_post_recv failed with status %d\n", rc);
- sdp_reset(&ssk->isk.sk);
+ /* Only process if the socket is not in use */
+ bh_lock_sock(sk);
+ if (sock_owned_by_user(sk)) {
+ sdp_dbg_data(sk, "socket is busy - will try later\n");
+ goto out;
}
- atomic_add(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-}
-
-void sdp_post_recvs(struct sdp_sock *ssk)
-{
- struct sock *sk = &ssk->isk.sk;
- int scale = ssk->rcvbuf_scale;
-
- if (unlikely(!ssk->id || ((1 << sk->sk_state) &
- (TCPF_CLOSE | TCPF_TIME_WAIT)))) {
+ if (sk->sk_state == TCP_CLOSE) {
+ bh_unlock_sock(sk);
return;
}
- if (top_mem_usage &&
- (top_mem_usage * 0x100000) < atomic_read(&sdp_current_mem_usage) * PAGE_SIZE)
- scale = 1;
-
- while ((likely(ssk->rx_head - ssk->rx_tail < SDP_RX_SIZE) &&
- (ssk->rx_head - ssk->rx_tail - SDP_MIN_BUFS) *
- (SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE) +
- ssk->rcv_nxt - ssk->copied_seq < sk->sk_rcvbuf * scale) ||
- unlikely(ssk->rx_head - ssk->rx_tail < SDP_MIN_BUFS))
- sdp_post_recv(ssk);
-}
-
-struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
-{
- struct sdp_buf *rx_req;
- struct ib_device *dev;
- struct sk_buff *skb;
- int i, frags;
-
- if (unlikely(id != ssk->rx_tail)) {
- printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
- id, ssk->rx_tail);
- return NULL;
- }
+ ssk->nagle_last_unacked = 0;
+ sdp_post_sends(ssk, 0);
- dev = ssk->ib_device;
- rx_req = &ssk->rx_ring[id & (SDP_RX_SIZE - 1)];
- skb = rx_req->skb;
- ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
- DMA_FROM_DEVICE);
- frags = skb_shinfo(skb)->nr_frags;
- for (i = 0; i < frags; ++i)
- ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
- skb_shinfo(skb)->frags[i].size,
- DMA_FROM_DEVICE);
- ++ssk->rx_tail;
- --ssk->remote_credits;
- return skb;
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+ sk_stream_write_space(&ssk->isk.sk);
+out:
+ bh_unlock_sock(sk);
+out2:
+ if (sk->sk_send_head) /* If has pending sends - rearm */
+ mod_timer(&ssk->nagle_timer, jiffies + SDP_NAGLE_TIMEOUT);
}
-/* Here because I do not want queue to fail. */
-static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
- struct sk_buff *skb)
+int sdp_post_credits(struct sdp_sock *ssk)
{
- int skb_len;
- struct sdp_sock *ssk = sdp_sk(sk);
- struct sk_buff *tail = NULL;
-
- /* not needed since sk_rmem_alloc is not currently used
- * TODO - remove this?
- skb_set_owner_r(skb, sk); */
-
- skb_len = skb->len;
+ int post_count = 0;
- TCP_SKB_CB(skb)->seq = ssk->rcv_nxt;
- ssk->rcv_nxt += skb_len;
+ sdp_dbg_data(&ssk->isk.sk, "credits: %d remote credits: %d "
+ "tx ring slots left: %d send_head: %p\n",
+ tx_credits(ssk), remote_credits(ssk),
+ sdp_tx_ring_slots_left(&ssk->tx_ring),
+ ssk->isk.sk.sk_send_head);
- if (likely(skb_len && (tail = skb_peek_tail(&sk->sk_receive_queue))) &&
- unlikely(skb_tailroom(tail) >= skb_len)) {
- skb_copy_bits(skb, 0, skb_put(tail, skb_len), skb_len);
- __kfree_skb(skb);
- skb = tail;
- } else
- skb_queue_tail(&sk->sk_receive_queue, skb);
-
- if (!sock_flag(sk, SOCK_DEAD))
- sk->sk_data_ready(sk, skb_len);
- return skb;
-}
-
-static inline void update_send_head(struct sock *sk, struct sk_buff *skb)
-{
- struct page *page;
- sk->sk_send_head = skb->next;
- if (sk->sk_send_head == (struct sk_buff *)&sk->sk_write_queue) {
- sk->sk_send_head = NULL;
- page = sk->sk_sndmsg_page;
- if (page) {
- put_page(page);
- sk->sk_sndmsg_page = NULL;
- }
- }
-}
-
-static inline int sdp_nagle_off(struct sdp_sock *ssk, struct sk_buff *skb)
-{
- return (ssk->nonagle & TCP_NAGLE_OFF) ||
- skb->next != (struct sk_buff *)&ssk->isk.sk.sk_write_queue ||
- skb->len + sizeof(struct sdp_bsdh) >= ssk->xmit_size_goal ||
- (ssk->tx_tail == ssk->tx_head &&
- !(ssk->nonagle & TCP_NAGLE_CORK)) ||
- (TCP_SKB_CB(skb)->flags & TCPCB_FLAG_PSH);
-}
-
-int sdp_post_credits(struct sdp_sock *ssk)
-{
- if (likely(ssk->bufs > 1) &&
- likely(ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE)) {
+ if (likely(tx_credits(ssk) > 1) &&
+ likely(sdp_tx_ring_slots_left(&ssk->tx_ring))) {
struct sk_buff *skb;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
sizeof(struct sdp_bsdh),
@@ -499,8 +179,12 @@ int sdp_post_credits(struct sdp_sock *ssk)
if (!skb)
return -ENOMEM;
sdp_post_send(ssk, skb, SDP_MID_DATA);
+ post_count++;
}
- return 0;
+
+ if (post_count)
+ sdp_xmit_poll(ssk, 0);
+ return post_count;
}
void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
@@ -509,6 +193,7 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
struct sk_buff *skb;
int c;
gfp_t gfp_page;
+ int post_count = 0;
if (unlikely(!ssk->id)) {
if (ssk->isk.sk.sk_send_head) {
@@ -525,10 +210,15 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
else
gfp_page = GFP_KERNEL;
+ if (sdp_tx_ring_slots_left(&ssk->tx_ring) < SDP_TX_SIZE / 2) {
+ int wc_processed = sdp_xmit_poll(ssk, 1);
+ sdp_dbg_data(&ssk->isk.sk, "freed %d\n", wc_processed);
+ }
+
if (ssk->recv_request &&
- ssk->rx_tail >= ssk->recv_request_head &&
- ssk->bufs >= SDP_MIN_BUFS &&
- ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) {
+ ring_tail(ssk->rx_ring) >= ssk->recv_request_head &&
+ tx_credits(ssk) >= SDP_MIN_TX_CREDITS &&
+ sdp_tx_ring_slots_left(&ssk->tx_ring)) {
struct sdp_chrecvbuf *resp_size;
ssk->recv_request = 0;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -537,45 +227,37 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
gfp_page);
/* FIXME */
BUG_ON(!skb);
- resp_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *resp_size);
+ resp_size = (struct sdp_chrecvbuf *)skb_put(skb,
+ sizeof *resp_size);
resp_size->size = htonl(ssk->recv_frags * PAGE_SIZE);
sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF_ACK);
+ post_count++;
+ }
+
+ if (tx_credits(ssk) <= SDP_MIN_TX_CREDITS &&
+ sdp_tx_ring_slots_left(&ssk->tx_ring) &&
+ ssk->isk.sk.sk_send_head &&
+ sdp_nagle_off(ssk, ssk->isk.sk.sk_send_head)) {
+ SDPSTATS_COUNTER_INC(send_miss_no_credits);
}
- while (ssk->bufs > SDP_MIN_BUFS &&
- ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE &&
+ while (tx_credits(ssk) > SDP_MIN_TX_CREDITS &&
+ sdp_tx_ring_slots_left(&ssk->tx_ring) &&
(skb = ssk->isk.sk.sk_send_head) &&
sdp_nagle_off(ssk, skb)) {
update_send_head(&ssk->isk.sk, skb);
__skb_dequeue(&ssk->isk.sk.sk_write_queue);
sdp_post_send(ssk, skb, SDP_MID_DATA);
+ post_count++;
}
- if (ssk->bufs == SDP_MIN_BUFS &&
- !ssk->sent_request &&
- ssk->tx_head > ssk->sent_request_head + SDP_RESIZE_WAIT &&
- ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) {
- struct sdp_chrecvbuf *req_size;
- skb = sdp_stream_alloc_skb(&ssk->isk.sk,
- sizeof(struct sdp_bsdh) +
- sizeof(*req_size),
- gfp_page);
- /* FIXME */
- BUG_ON(!skb);
- ssk->sent_request = SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
- ssk->sent_request_head = ssk->tx_head;
- req_size = (struct sdp_chrecvbuf *)skb_put(skb, sizeof *req_size);
- req_size->size = htonl(ssk->sent_request);
- sdp_post_send(ssk, skb, SDP_MID_CHRCVBUF);
- }
-
- c = ssk->remote_credits;
- if (likely(c > SDP_MIN_BUFS))
+ c = remote_credits(ssk);
+ if (likely(c > SDP_MIN_TX_CREDITS))
c *= 2;
- if (unlikely(c < ssk->rx_head - ssk->rx_tail) &&
- likely(ssk->bufs > 1) &&
- likely(ssk->tx_head - ssk->tx_tail < SDP_TX_SIZE) &&
+ if (unlikely(c < ring_posted(ssk->rx_ring)) &&
+ likely(tx_credits(ssk) > 1) &&
+ likely(sdp_tx_ring_slots_left(&ssk->tx_ring)) &&
likely((1 << ssk->isk.sk.sk_state) &
(TCPF_ESTABLISHED | TCPF_FIN_WAIT1))) {
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
@@ -583,12 +265,19 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
GFP_KERNEL);
/* FIXME */
BUG_ON(!skb);
+ SDPSTATS_COUNTER_INC(post_send_credits);
sdp_post_send(ssk, skb, SDP_MID_DATA);
+ post_count++;
}
+ /* send DisConn if needed
+ * Do not send DisConn if there is only 1 credit. Compliance with CA4-82
+ * If one credit is available, an implementation shall only send SDP
+ * messages that provide additional credits and also do not contain ULP
+ * payload. */
if (unlikely(ssk->sdp_disconnect) &&
- !ssk->isk.sk.sk_send_head &&
- ssk->bufs > (ssk->remote_credits >= ssk->rx_head - ssk->rx_tail)) {
+ !ssk->isk.sk.sk_send_head &&
+ tx_credits(ssk) > 1) {
ssk->sdp_disconnect = 0;
skb = sdp_stream_alloc_skb(&ssk->isk.sk,
sizeof(struct sdp_bsdh),
@@ -596,307 +285,9 @@ void sdp_post_sends(struct sdp_sock *ssk, int nonagle)
/* FIXME */
BUG_ON(!skb);
sdp_post_send(ssk, skb, SDP_MID_DISCONN);
+ post_count++;
}
-}
-
-int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
-{
- ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
- if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
- ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
- ssk->rcvbuf_scale = rcvbuf_scale;
-
- sdp_post_recvs(ssk);
-
- return 0;
-}
-
-int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
-{
- skb_frag_t skb_frag;
- u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
- u32 max_size = SDP_HEAD_SIZE + SDP_MAX_SEND_SKB_FRAGS * PAGE_SIZE;
-
- /* Bugzilla 1311, Kernels using smaller fragments must reject
- * re-size requests larger than 32k to prevent being sent
- * fragment larger than the receive buffer fragment.
- */
- if ( (sizeof(skb_frag.size) < 4) && (max_size > 0x8000))
- max_size = 0x8000;
-
- if (new_size > curr_size && new_size <= max_size &&
- sdp_get_large_socket(ssk)) {
- ssk->rcvbuf_scale = rcvbuf_scale;
- ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
- if (ssk->recv_frags > SDP_MAX_SEND_SKB_FRAGS)
- ssk->recv_frags = SDP_MAX_SEND_SKB_FRAGS;
- return 0;
- } else
- return -1;
-}
-
-static void sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
-{
- if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
- ssk->recv_request_head = ssk->rx_head + 1;
- else
- ssk->recv_request_head = ssk->rx_tail;
- ssk->recv_request = 1;
-}
-
-static void sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
-{
- u32 new_size = ntohl(buf->size);
-
- if (new_size > ssk->xmit_size_goal) {
- ssk->sent_request = -1;
- ssk->xmit_size_goal = new_size;
- ssk->send_frags =
- PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE;
- } else
- ssk->sent_request = 0;
-}
-
-static int sdp_handle_recv_comp(struct sdp_sock *ssk, struct ib_wc *wc)
-{
- struct sock *sk = &ssk->isk.sk;
- int frags;
- struct sk_buff *skb;
- struct sdp_bsdh *h;
- int pagesz, i;
-
- skb = sdp_recv_completion(ssk, wc->wr_id);
- if (unlikely(!skb))
- return -1;
-
- atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
-
- if (unlikely(wc->status)) {
- if (wc->status != IB_WC_WR_FLUSH_ERR) {
- sdp_dbg(sk, "Recv completion with error. Status %d\n",
- wc->status);
- sdp_reset(sk);
- }
- __kfree_skb(skb);
- return 0;
- }
-
- sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
- (int)wc->wr_id, wc->byte_len);
- if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
- printk(KERN_WARNING "SDP BUG! byte_len %d < %zd\n",
- wc->byte_len, sizeof(struct sdp_bsdh));
- __kfree_skb(skb);
- return -1;
- }
- skb->len = wc->byte_len;
- if (likely(wc->byte_len > SDP_HEAD_SIZE))
- skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
- else
- skb->data_len = 0;
- skb->data = skb->head;
-#ifdef NET_SKBUFF_DATA_USES_OFFSET
- skb->tail = skb_headlen(skb);
-#else
- skb->tail = skb->head + skb_headlen(skb);
-#endif
- h = (struct sdp_bsdh *)skb->data;
- skb_reset_transport_header(skb);
- ssk->mseq_ack = ntohl(h->mseq);
- if (ssk->mseq_ack != (int)wc->wr_id)
- printk(KERN_WARNING "SDP BUG! mseq %d != wrid %d\n",
- ssk->mseq_ack, (int)wc->wr_id);
- ssk->bufs = ntohl(h->mseq_ack) - ssk->tx_head + 1 +
- ntohs(h->bufs);
-
- frags = skb_shinfo(skb)->nr_frags;
- pagesz = PAGE_ALIGN(skb->data_len);
- skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
-
- for (i = skb_shinfo(skb)->nr_frags;
- i < frags; ++i) {
- put_page(skb_shinfo(skb)->frags[i].page);
- skb->truesize -= PAGE_SIZE;
- }
-
- if (unlikely(h->flags & SDP_OOB_PEND))
- sk_send_sigurg(sk);
-
- skb_pull(skb, sizeof(struct sdp_bsdh));
-
- switch (h->mid) {
- case SDP_MID_DATA:
- if (unlikely(skb->len <= 0)) {
- __kfree_skb(skb);
- break;
- }
-
- if (unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
- /* got data in RCV_SHUTDOWN */
- if (sk->sk_state == TCP_FIN_WAIT1) {
- /* go into abortive close */
- sdp_exch_state(sk, TCPF_FIN_WAIT1,
- TCP_TIME_WAIT);
-
- sk->sk_prot->disconnect(sk, 0);
- }
-
- __kfree_skb(skb);
- break;
- }
- skb = sdp_sock_queue_rcv_skb(sk, skb);
- if (unlikely(h->flags & SDP_OOB_PRES))
- sdp_urg(ssk, skb);
- break;
- case SDP_MID_DISCONN:
- __kfree_skb(skb);
- sdp_fin(sk);
- break;
- case SDP_MID_CHRCVBUF:
- sdp_handle_resize_request(ssk,
- (struct sdp_chrecvbuf *)skb->data);
- __kfree_skb(skb);
- break;
- case SDP_MID_CHRCVBUF_ACK:
- sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)skb->data);
- __kfree_skb(skb);
- break;
- default:
- /* TODO: Handle other messages */
- printk(KERN_WARNING "SDP: FIXME MID %d\n", h->mid);
- __kfree_skb(skb);
- }
-
- return 0;
-}
-
-static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
-{
- struct sk_buff *skb;
- struct sdp_bsdh *h;
-
- skb = sdp_send_completion(ssk, wc->wr_id);
- if (unlikely(!skb))
- return -1;
-
- if (unlikely(wc->status)) {
- if (wc->status != IB_WC_WR_FLUSH_ERR) {
- struct sock *sk = &ssk->isk.sk;
- sdp_dbg(sk, "Send completion with error. "
- "Status %d\n", wc->status);
- sdp_set_error(sk, -ECONNRESET);
- wake_up(&ssk->wq);
-
- queue_work(sdp_workqueue, &ssk->destroy_work);
- }
- goto out;
- }
-
- h = (struct sdp_bsdh *)skb->data;
-
- if (likely(h->mid != SDP_MID_DISCONN))
- goto out;
-
- if ((1 << ssk->isk.sk.sk_state) & ~(TCPF_FIN_WAIT1 | TCPF_LAST_ACK)) {
- sdp_dbg(&ssk->isk.sk,
- "%s: sent DISCONNECT from unexpected state %d\n",
- __func__, ssk->isk.sk.sk_state);
- }
-
-out:
- sk_wmem_free_skb(&ssk->isk.sk, skb);
-
- return 0;
-}
-
-static void sdp_handle_wc(struct sdp_sock *ssk, struct ib_wc *wc)
-{
- if (wc->wr_id & SDP_OP_RECV) {
- if (sdp_handle_recv_comp(ssk, wc))
- return;
- } else if (likely(wc->wr_id & SDP_OP_SEND)) {
- if (sdp_handle_send_comp(ssk, wc))
- return;
- } else {
- sdp_cnt(sdp_keepalive_probes_sent);
- if (likely(!wc->status))
- return;
-
- sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
- __func__, wc->status);
-
- if (wc->status == IB_WC_WR_FLUSH_ERR)
- return;
-
- sdp_set_error(&ssk->isk.sk, -ECONNRESET);
- wake_up(&ssk->wq);
-
- return;
- }
-}
-
-void sdp_completion_handler(struct ib_cq *cq, void *cq_context)
-{
- struct sock *sk = cq_context;
- struct sdp_sock *ssk = sdp_sk(sk);
- schedule_work(&ssk->work);
-}
-
-int sdp_poll_cq(struct sdp_sock *ssk, struct ib_cq *cq)
-{
- int n, i;
- int ret = -EAGAIN;
- do {
- n = ib_poll_cq(cq, SDP_NUM_WC, ssk->ibwc);
- for (i = 0; i < n; ++i) {
- sdp_handle_wc(ssk, ssk->ibwc + i);
- ret = 0;
- }
- } while (n == SDP_NUM_WC);
-
- if (!ret) {
- struct sock *sk = &ssk->isk.sk;
-
- sdp_post_recvs(ssk);
- sdp_post_sends(ssk, 0);
-
- if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
- sk_stream_write_space(&ssk->isk.sk);
- }
-
- return ret;
-}
-
-void sdp_work(struct work_struct *work)
-{
- struct sdp_sock *ssk = container_of(work, struct sdp_sock, work);
- struct sock *sk = &ssk->isk.sk;
- struct ib_cq *cq;
-
- sdp_dbg_data(sk, "%s\n", __func__);
-
- lock_sock(sk);
- cq = ssk->cq;
- if (unlikely(!cq))
- goto out;
-
- if (unlikely(!ssk->poll_cq)) {
- struct rdma_cm_id *id = ssk->id;
- if (id && id->qp)
- rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
- goto out;
- }
-
- sdp_poll_cq(ssk, cq);
- release_sock(sk);
- sk_mem_reclaim(sk);
- lock_sock(sk);
- cq = ssk->cq;
- if (unlikely(!cq))
- goto out;
- ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
- sdp_poll_cq(ssk, cq);
-out:
- release_sock(sk);
+ if (post_count)
+ sdp_xmit_poll(ssk, 0);
}
diff --git a/drivers/infiniband/ulp/sdp/sdp_cma.c b/drivers/infiniband/ulp/sdp/sdp_cma.c
index 96d65bd..f7dc7c6 100644
--- a/drivers/infiniband/ulp/sdp/sdp_cma.c
+++ b/drivers/infiniband/ulp/sdp/sdp_cma.c
@@ -46,49 +46,13 @@
#include "sdp_socket.h"
#include "sdp.h"
-union cma_ip_addr {
- struct in6_addr ip6;
- struct {
- __u32 pad[3];
- __u32 addr;
- } ip4;
-};
-
#define SDP_MAJV_MINV 0x22
-/* TODO: too much? Can I avoid having the src/dst and port here? */
-struct sdp_hh {
- struct sdp_bsdh bsdh;
- u8 majv_minv;
- u8 ipv_cap;
- u8 rsvd1;
- u8 max_adverts;
- __u32 desremrcvsz;
- __u32 localrcvsz;
- __u16 port;
- __u16 rsvd2;
- union cma_ip_addr src_addr;
- union cma_ip_addr dst_addr;
-};
-
-struct sdp_hah {
- struct sdp_bsdh bsdh;
- u8 majv_minv;
- u8 ipv_cap;
- u8 rsvd1;
- u8 ext_max_adverts;
- __u32 actrcvsz;
-};
-
enum {
SDP_HH_SIZE = 76,
SDP_HAH_SIZE = 180,
};
-static void sdp_cq_event_handler(struct ib_event *event, void *data)
-{
-}
-
static void sdp_qp_event_handler(struct ib_event *event, void *data)
{
}
@@ -98,43 +62,19 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
struct ib_qp_init_attr qp_init_attr = {
.event_handler = sdp_qp_event_handler,
.cap.max_send_wr = SDP_TX_SIZE,
- .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+ .cap.max_send_sge = SDP_MAX_SEND_SKB_FRAGS,
.cap.max_recv_wr = SDP_RX_SIZE,
- .cap.max_recv_sge = SDP_MAX_SEND_SKB_FRAGS + 1, /* TODO */
+ .cap.max_recv_sge = SDP_MAX_RECV_SKB_FRAGS + 1,
.sq_sig_type = IB_SIGNAL_REQ_WR,
.qp_type = IB_QPT_RC,
};
struct ib_device *device = id->device;
- struct ib_cq *cq;
struct ib_mr *mr;
struct ib_pd *pd;
int rc;
sdp_dbg(sk, "%s\n", __func__);
- sdp_sk(sk)->tx_head = 1;
- sdp_sk(sk)->tx_tail = 1;
- sdp_sk(sk)->rx_head = 1;
- sdp_sk(sk)->rx_tail = 1;
-
- sdp_sk(sk)->tx_ring = kmalloc(sizeof *sdp_sk(sk)->tx_ring * SDP_TX_SIZE,
- GFP_KERNEL);
- if (!sdp_sk(sk)->tx_ring) {
- rc = -ENOMEM;
- sdp_warn(sk, "Unable to allocate TX Ring size %zd.\n",
- sizeof *sdp_sk(sk)->tx_ring * SDP_TX_SIZE);
- goto err_tx;
- }
-
- sdp_sk(sk)->rx_ring = kmalloc(sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE,
- GFP_KERNEL);
- if (!sdp_sk(sk)->rx_ring) {
- rc = -ENOMEM;
- sdp_warn(sk, "Unable to allocate RX Ring size %zd.\n",
- sizeof *sdp_sk(sk)->rx_ring * SDP_RX_SIZE);
- goto err_rx;
- }
-
pd = ib_alloc_pd(device);
if (IS_ERR(pd)) {
rc = PTR_ERR(pd);
@@ -150,27 +90,23 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
}
sdp_sk(sk)->mr = mr;
- INIT_WORK(&sdp_sk(sk)->work, sdp_work);
-
- cq = ib_create_cq(device, sdp_completion_handler, sdp_cq_event_handler,
- sk, SDP_TX_SIZE + SDP_RX_SIZE, 0);
- if (IS_ERR(cq)) {
- rc = PTR_ERR(cq);
- sdp_warn(sk, "Unable to allocate CQ: %d.\n", rc);
- goto err_cq;
- }
+ rc = sdp_rx_ring_create(sdp_sk(sk), device);
+ if (rc)
+ goto err_rx;
- ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
+ rc = sdp_tx_ring_create(sdp_sk(sk), device);
+ if (rc)
+ goto err_tx;
- qp_init_attr.send_cq = qp_init_attr.recv_cq = cq;
+ qp_init_attr.recv_cq = sdp_sk(sk)->rx_ring.cq;
+ qp_init_attr.send_cq = sdp_sk(sk)->tx_ring.cq;
rc = rdma_create_qp(id, pd, &qp_init_attr);
if (rc) {
sdp_warn(sk, "Unable to create QP: %d.\n", rc);
goto err_qp;
}
- sdp_sk(sk)->cq = cq;
sdp_sk(sk)->qp = id->qp;
sdp_sk(sk)->ib_device = device;
@@ -180,18 +116,14 @@ static int sdp_init_qp(struct sock *sk, struct rdma_cm_id *id)
return 0;
err_qp:
- ib_destroy_cq(cq);
-err_cq:
+ sdp_tx_ring_destroy(sdp_sk(sk));
+err_tx:
+ sdp_rx_ring_destroy(sdp_sk(sk));
+err_rx:
ib_dereg_mr(sdp_sk(sk)->mr);
err_mr:
ib_dealloc_pd(pd);
err_pd:
- kfree(sdp_sk(sk)->rx_ring);
- sdp_sk(sk)->rx_ring = NULL;
-err_rx:
- kfree(sdp_sk(sk)->tx_ring);
- sdp_sk(sk)->tx_ring = NULL;
-err_tx:
return rc;
}
@@ -206,6 +138,7 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
sdp_dbg(sk, "%s %p -> %p\n", __func__, sdp_sk(sk)->id, id);
h = event->param.conn.private_data;
+ SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh);
if (!h->max_adverts)
return -EINVAL;
@@ -232,24 +165,21 @@ static int sdp_connect_handler(struct sock *sk, struct rdma_cm_id *id,
sdp_add_sock(sdp_sk(child));
- sdp_sk(child)->max_bufs = sdp_sk(child)->bufs = ntohs(h->bsdh.bufs);
- sdp_sk(child)->min_bufs = sdp_sk(child)->bufs / 4;
+ sdp_sk(child)->max_bufs = ntohs(h->bsdh.bufs);
+ atomic_set(&sdp_sk(child)->tx_ring.credits, sdp_sk(child)->max_bufs);
+
+ sdp_sk(child)->min_bufs = tx_credits(sdp_sk(child)) / 4;
sdp_sk(child)->xmit_size_goal = ntohl(h->localrcvsz) -
sizeof(struct sdp_bsdh);
sdp_sk(child)->send_frags = PAGE_ALIGN(sdp_sk(child)->xmit_size_goal) /
- PAGE_SIZE;
- sdp_init_buffers(sdp_sk(child), ntohl(h->desremrcvsz));
-
- sdp_dbg(child, "%s bufs %d xmit_size_goal %d send trigger %d\n",
- __func__,
- sdp_sk(child)->bufs,
- sdp_sk(child)->xmit_size_goal,
- sdp_sk(child)->min_bufs);
+ PAGE_SIZE + 1; /* +1 to conpensate on not aligned buffers */
+ sdp_init_buffers(sdp_sk(child), rcvbuf_initial_size);
id->context = child;
sdp_sk(child)->id = id;
- list_add_tail(&sdp_sk(child)->backlog_queue, &sdp_sk(sk)->backlog_queue);
+ list_add_tail(&sdp_sk(child)->backlog_queue,
+ &sdp_sk(sk)->backlog_queue);
sdp_sk(child)->parent = sk;
sdp_exch_state(child, TCPF_LISTEN | TCPF_CLOSE, TCP_SYN_RECV);
@@ -269,6 +199,7 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
sdp_dbg(sk, "%s\n", __func__);
sdp_exch_state(sk, TCPF_SYN_SENT, TCP_ESTABLISHED);
+ sdp_set_default_moderation(sdp_sk(sk));
if (sock_flag(sk, SOCK_KEEPOPEN))
sdp_start_keepalive_timer(sk);
@@ -277,22 +208,19 @@ static int sdp_response_handler(struct sock *sk, struct rdma_cm_id *id,
return 0;
h = event->param.conn.private_data;
- sdp_sk(sk)->max_bufs = sdp_sk(sk)->bufs = ntohs(h->bsdh.bufs);
- sdp_sk(sk)->min_bufs = sdp_sk(sk)->bufs / 4;
- sdp_sk(sk)->xmit_size_goal = ntohl(h->actrcvsz) -
- sizeof(struct sdp_bsdh);
- sdp_sk(sk)->send_frags = PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
- PAGE_SIZE;
-
- sdp_dbg(sk, "%s bufs %d xmit_size_goal %d send trigger %d\n",
- __func__,
- sdp_sk(sk)->bufs,
- sdp_sk(sk)->xmit_size_goal,
- sdp_sk(sk)->min_bufs);
+ SDP_DUMP_PACKET(sk, "RX", NULL, &h->bsdh);
+ sdp_sk(sk)->max_bufs = ntohs(h->bsdh.bufs);
+ atomic_set(&sdp_sk(sk)->tx_ring.credits, sdp_sk(sk)->max_bufs);
+ sdp_sk(sk)->min_bufs = tx_credits(sdp_sk(sk)) / 4;
+ sdp_sk(sk)->xmit_size_goal =
+ ntohl(h->actrcvsz) - sizeof(struct sdp_bsdh);
+ sdp_sk(sk)->send_frags = MIN(PAGE_ALIGN(sdp_sk(sk)->xmit_size_goal) /
+ PAGE_SIZE, MAX_SKB_FRAGS) + 1; /* +1 to conpensate on not */
+ /* aligned buffers */
+ sdp_sk(sk)->xmit_size_goal = MIN(sdp_sk(sk)->xmit_size_goal,
+ sdp_sk(sk)->send_frags * PAGE_SIZE);
sdp_sk(sk)->poll_cq = 1;
- ib_req_notify_cq(sdp_sk(sk)->cq, IB_CQ_NEXT_COMP);
- sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq);
sk->sk_state_change(sk);
sk_wake_async(sk, 0, POLL_OUT);
@@ -314,6 +242,8 @@ static int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event)
sdp_exch_state(sk, TCPF_SYN_RECV, TCP_ESTABLISHED);
+ sdp_set_default_moderation(sdp_sk(sk));
+
if (sock_flag(sk, SOCK_KEEPOPEN))
sdp_start_keepalive_timer(sk);
@@ -325,18 +255,12 @@ static int sdp_connected_handler(struct sock *sk, struct rdma_cm_event *event)
sdp_dbg(sk, "parent is going away.\n");
goto done;
}
-#if 0
- /* TODO: backlog */
- if (sk_acceptq_is_full(parent)) {
- sdp_dbg(parent, "%s ECONNREFUSED: parent accept queue full: %d > %d\n", __func__, parent->sk_ack_backlog, parent->sk_max_ack_backlog);
- release_sock(parent);
- return -ECONNREFUSED;
- }
-#endif
+
sk_acceptq_added(parent);
sdp_dbg(parent, "%s child connection established\n", __func__);
list_del_init(&sdp_sk(sk)->backlog_queue);
- list_add_tail(&sdp_sk(sk)->accept_queue, &sdp_sk(parent)->accept_queue);
+ list_add_tail(&sdp_sk(sk)->accept_queue,
+ &sdp_sk(parent)->accept_queue);
parent->sk_state_change(parent);
sk_wake_async(parent, 0, POLL_OUT);
@@ -352,13 +276,13 @@ static int sdp_disconnected_handler(struct sock *sk)
sdp_dbg(sk, "%s\n", __func__);
- if (ssk->cq)
- sdp_poll_cq(ssk, ssk->cq);
+ if (ssk->tx_ring.cq)
+ sdp_xmit_poll(ssk, 1);
if (sk->sk_state == TCP_SYN_RECV) {
sdp_connected_handler(sk, NULL);
- if (ssk->rcv_nxt)
+ if (rcv_nxt(ssk))
return 0;
}
@@ -400,7 +324,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
rc = rdma_resolve_route(id, SDP_ROUTE_TIMEOUT);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
- sdp_dbg(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
+ sdp_warn(sk, "RDMA_CM_EVENT_ADDR_ERROR\n");
rc = -ENETUNREACH;
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
@@ -408,17 +332,17 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
rc = sdp_init_qp(sk, id);
if (rc)
break;
- sdp_sk(sk)->remote_credits = sdp_sk(sk)->rx_head -
- sdp_sk(sk)->rx_tail;
+ atomic_set(&sdp_sk(sk)->remote_credits,
+ ring_posted(sdp_sk(sk)->rx_ring));
memset(&hh, 0, sizeof hh);
hh.bsdh.mid = SDP_MID_HELLO;
- hh.bsdh.bufs = htons(sdp_sk(sk)->remote_credits);
+ hh.bsdh.bufs = htons(remote_credits(sdp_sk(sk)));
hh.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HH_SIZE);
hh.max_adverts = 1;
hh.majv_minv = SDP_MAJV_MINV;
sdp_init_buffers(sdp_sk(sk), rcvbuf_initial_size);
hh.localrcvsz = hh.desremrcvsz = htonl(sdp_sk(sk)->recv_frags *
- PAGE_SIZE + SDP_HEAD_SIZE);
+ PAGE_SIZE + sizeof(struct sdp_bsdh));
hh.max_adverts = 0x1;
inet_sk(sk)->saddr = inet_sk(sk)->rcv_saddr =
((struct sockaddr_in *)&id->route.addr.src_addr)->sin_addr.s_addr;
@@ -428,6 +352,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
conn_param.responder_resources = 4 /* TODO */;
conn_param.initiator_depth = 4 /* TODO */;
conn_param.retry_count = SDP_RETRY_COUNT;
+ SDP_DUMP_PACKET(NULL, "TX", NULL, &hh.bsdh);
rc = rdma_connect(id, &conn_param);
break;
case RDMA_CM_EVENT_ROUTE_ERROR:
@@ -442,22 +367,24 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
break;
}
child = id->context;
- sdp_sk(child)->remote_credits = sdp_sk(child)->rx_head -
- sdp_sk(child)->rx_tail;
+ atomic_set(&sdp_sk(child)->remote_credits,
+ ring_posted(sdp_sk(child)->rx_ring));
memset(&hah, 0, sizeof hah);
hah.bsdh.mid = SDP_MID_HELLO_ACK;
- hah.bsdh.bufs = htons(sdp_sk(child)->remote_credits);
+ hah.bsdh.bufs = htons(remote_credits(sdp_sk(child)));
hah.bsdh.len = htonl(sizeof(struct sdp_bsdh) + SDP_HAH_SIZE);
hah.majv_minv = SDP_MAJV_MINV;
hah.ext_max_adverts = 1; /* Doesn't seem to be mandated by spec,
but just in case */
- hah.actrcvsz = htonl(sdp_sk(child)->recv_frags * PAGE_SIZE + SDP_HEAD_SIZE);
+ hah.actrcvsz = htonl(sdp_sk(child)->recv_frags * PAGE_SIZE +
+ sizeof(struct sdp_bsdh));
memset(&conn_param, 0, sizeof conn_param);
conn_param.private_data_len = sizeof hah;
conn_param.private_data = &hah;
conn_param.responder_resources = 4 /* TODO */;
conn_param.initiator_depth = 4 /* TODO */;
conn_param.retry_count = SDP_RETRY_COUNT;
+ SDP_DUMP_PACKET(sk, "TX", NULL, &hah.bsdh);
rc = rdma_accept(id, &conn_param);
if (rc) {
sdp_sk(child)->id = NULL;
@@ -475,7 +402,7 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
rc = rdma_accept(id, NULL);
if (!rc)
- rc = sdp_post_credits(sdp_sk(sk));
+ rc = sdp_post_credits(sdp_sk(sk)) < 0 ?: 0;
break;
case RDMA_CM_EVENT_CONNECT_ERROR:
sdp_dbg(sk, "RDMA_CM_EVENT_CONNECT_ERROR\n");
@@ -511,8 +438,9 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
if (sk->sk_state != TCP_TIME_WAIT) {
if (sk->sk_state == TCP_CLOSE_WAIT) {
- sdp_dbg(sk, "IB teardown while in TCP_CLOSE_WAIT "
- "taking reference to let close() finish the work\n");
+ sdp_dbg(sk, "IB teardown while in "
+ "TCP_CLOSE_WAIT taking reference to "
+ "let close() finish the work\n");
sock_hold(sk, SOCK_REF_CM_TW);
}
sdp_set_error(sk, EPIPE);
@@ -550,7 +478,6 @@ int sdp_cma_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
sdp_dbg(sk, "event %d done. status %d\n", event->event, rc);
if (parent) {
- sdp_dbg(parent, "deleting child %d done. status %d\n", event->event, rc);
lock_sock(parent);
if (!sdp_sk(parent)->id) { /* TODO: look at SOCK_DEAD? */
sdp_dbg(sk, "parent is going away.\n");
diff --git a/drivers/infiniband/ulp/sdp/sdp_main.c b/drivers/infiniband/ulp/sdp/sdp_main.c
index 7a38c47..8e1a6d7 100644
--- a/drivers/infiniband/ulp/sdp/sdp_main.c
+++ b/drivers/infiniband/ulp/sdp/sdp_main.c
@@ -67,7 +67,6 @@ unsigned int csum_partial_copy_from_user_new (const char *src, char *dst,
#include <linux/socket.h>
#include <net/protocol.h>
#include <net/inet_common.h>
-#include <linux/proc_fs.h>
#include <rdma/rdma_cm.h>
#include <rdma/ib_verbs.h>
/* TODO: remove when sdp_socket.h becomes part of include/linux/socket.h */
@@ -80,66 +79,42 @@ MODULE_DESCRIPTION("InfiniBand SDP module");
MODULE_LICENSE("Dual BSD/GPL");
#ifdef CONFIG_INFINIBAND_SDP_DEBUG
-int sdp_debug_level;
-
-module_param_named(debug_level, sdp_debug_level, int, 0644);
-MODULE_PARM_DESC(debug_level, "Enable debug tracing if > 0.");
+SDP_MODPARAM_INT(sdp_debug_level, 0, "Enable debug tracing if > 0.");
#endif
#ifdef CONFIG_INFINIBAND_SDP_DEBUG
-int sdp_data_debug_level;
-
-module_param_named(data_debug_level, sdp_data_debug_level, int, 0644);
-MODULE_PARM_DESC(data_debug_level, "Enable data path debug tracing if > 0.");
+SDP_MODPARAM_INT(sdp_data_debug_level, 0,
+ "Enable data path debug tracing if > 0.");
#endif
-static int send_poll_hit;
-
-module_param_named(send_poll_hit, send_poll_hit, int, 0644);
-MODULE_PARM_DESC(send_poll_hit, "How many times send poll helped.");
-
-static int send_poll_miss;
-
-module_param_named(send_poll_miss, send_poll_miss, int, 0644);
-MODULE_PARM_DESC(send_poll_miss, "How many times send poll missed.");
-
-static int recv_poll_hit;
-
-module_param_named(recv_poll_hit, recv_poll_hit, int, 0644);
-MODULE_PARM_DESC(recv_poll_hit, "How many times recv poll helped.");
-
-static int recv_poll_miss;
-
-module_param_named(recv_poll_miss, recv_poll_miss, int, 0644);
-MODULE_PARM_DESC(recv_poll_miss, "How many times recv poll missed.");
-
-static int send_poll = 100;
-
-module_param_named(send_poll, send_poll, int, 0644);
-MODULE_PARM_DESC(send_poll, "How many times to poll send.");
-
-static int recv_poll = 1000;
-
-module_param_named(recv_poll, recv_poll, int, 0644);
-MODULE_PARM_DESC(recv_poll, "How many times to poll recv.");
-
-static int send_poll_thresh = 8192;
-
-module_param_named(send_poll_thresh, send_poll_thresh, int, 0644);
-MODULE_PARM_DESC(send_poll_thresh, "Send message size thresh hold over which to start polling.");
-
-static unsigned int sdp_keepalive_time = SDP_KEEPALIVE_TIME;
-
-module_param_named(sdp_keepalive_time, sdp_keepalive_time, uint, 0644);
-MODULE_PARM_DESC(sdp_keepalive_time, "Default idle time in seconds before keepalive probe sent.");
-
-static int sdp_zcopy_thresh = 65536;
-module_param_named(sdp_zcopy_thresh, sdp_zcopy_thresh, int, 0644);
-MODULE_PARM_DESC(sdp_zcopy_thresh, "Zero copy send threshold; 0=0ff.");
-
-struct workqueue_struct *sdp_workqueue;
-
-static struct list_head sock_list;
-static spinlock_t sock_list_lock;
+SDP_MODPARAM_SINT(recv_poll_hit, -1, "How many times recv poll helped.");
+SDP_MODPARAM_SINT(recv_poll_miss, -1, "How many times recv poll missed.");
+SDP_MODPARAM_SINT(recv_poll, 1000, "How many times to poll recv.");
+SDP_MODPARAM_SINT(sdp_keepalive_time, SDP_KEEPALIVE_TIME,
+ "Default idle time in seconds before keepalive probe sent.");
+SDP_MODPARAM_SINT(sdp_zcopy_thresh, 65536, "Zero copy send threshold; 0=0ff.");
+
+#define SDP_RX_COAL_TIME_HIGH 128
+SDP_MODPARAM_SINT(sdp_rx_coal_target, 0x50000,
+ "Target number of bytes to coalesce with interrupt moderation.");
+SDP_MODPARAM_SINT(sdp_rx_coal_time, 0x10, "rx coal time (jiffies).");
+SDP_MODPARAM_SINT(sdp_rx_rate_low, 80000, "rx_rate low (packets/sec).");
+SDP_MODPARAM_SINT(sdp_rx_coal_time_low, 0, "low moderation usec.");
+SDP_MODPARAM_SINT(sdp_rx_rate_high, 100000, "rx_rate high (packets/sec).");
+SDP_MODPARAM_SINT(sdp_rx_coal_time_high, 128, "high moderation usec.");
+SDP_MODPARAM_SINT(sdp_rx_rate_thresh, (200000 / SDP_RX_COAL_TIME_HIGH),
+ "rx rate thresh ().");
+SDP_MODPARAM_SINT(sdp_sample_interval, (HZ / 4), "sample interval (jiffies).");
+
+SDP_MODPARAM_INT(hw_int_mod_count, -1,
+ "forced hw int moderation val. -1 for auto (packets).");
+SDP_MODPARAM_INT(hw_int_mod_usec, -1,
+ "forced hw int moderation val. -1 for auto (usec).");
+
+struct workqueue_struct *sdp_wq;
+struct workqueue_struct *rx_comp_wq;
+
+struct list_head sock_list;
+spinlock_t sock_list_lock;
static DEFINE_RWLOCK(device_removal_lock);
@@ -206,52 +181,38 @@ static int sdp_get_port(struct sock *sk, unsigned short snum)
static void sdp_destroy_qp(struct sdp_sock *ssk)
{
struct ib_pd *pd = NULL;
- struct ib_cq *cq = NULL;
+ unsigned long flags;
+
+
+ sdp_dbg(&ssk->isk.sk, "destroying qp\n");
+ sdp_prf(&ssk->isk.sk, NULL, "destroying qp");
+
+ del_timer(&ssk->tx_ring.timer);
+
+ rx_ring_lock(ssk, flags);
+
+ sdp_rx_ring_destroy(ssk);
+ sdp_tx_ring_destroy(ssk);
if (ssk->qp) {
pd = ssk->qp->pd;
- cq = ssk->cq;
- ssk->cq = NULL;
ib_destroy_qp(ssk->qp);
-
- while (ssk->rx_head != ssk->rx_tail) {
- struct sk_buff *skb;
- skb = sdp_recv_completion(ssk, ssk->rx_tail);
- if (!skb)
- break;
- atomic_sub(SDP_MAX_SEND_SKB_FRAGS, &sdp_current_mem_usage);
- __kfree_skb(skb);
- }
- while (ssk->tx_head != ssk->tx_tail) {
- struct sk_buff *skb;
- skb = sdp_send_completion(ssk, ssk->tx_tail);
- if (!skb)
- break;
- __kfree_skb(skb);
- }
+ ssk->qp = NULL;
}
- if (cq)
- ib_destroy_cq(cq);
-
- if (ssk->mr)
+ if (ssk->mr) {
ib_dereg_mr(ssk->mr);
+ ssk->mr = NULL;
+ }
if (pd)
ib_dealloc_pd(pd);
sdp_remove_large_sock(ssk);
- if (ssk->rx_ring) {
- kfree(ssk->rx_ring);
- ssk->rx_ring = NULL;
- }
- if (ssk->tx_ring) {
- kfree(ssk->tx_ring);
- ssk->tx_ring = NULL;
- }
-}
+ rx_ring_unlock(ssk, flags);
+}
static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
{
@@ -259,8 +220,8 @@ static void sdp_reset_keepalive_timer(struct sock *sk, unsigned long len)
sdp_dbg(sk, "%s\n", __func__);
- ssk->keepalive_tx_head = ssk->tx_head;
- ssk->keepalive_rx_head = ssk->rx_head;
+ ssk->keepalive_tx_head = ring_head(ssk->tx_ring);
+ ssk->keepalive_rx_head = ring_head(ssk->rx_ring);
sk_reset_timer(sk, &sk->sk_timer, jiffies + len);
}
@@ -295,8 +256,8 @@ static void sdp_keepalive_timer(unsigned long data)
sk->sk_state == TCP_CLOSE)
goto out;
- if (ssk->keepalive_tx_head == ssk->tx_head &&
- ssk->keepalive_rx_head == ssk->rx_head)
+ if (ssk->keepalive_tx_head == ring_head(ssk->tx_ring) &&
+ ssk->keepalive_rx_head == ring_head(ssk->rx_ring))
sdp_post_keepalive(ssk);
sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(ssk));
@@ -306,7 +267,7 @@ out:
sock_put(sk, SOCK_REF_BORN);
}
-static void sdp_init_timer(struct sock *sk)
+static void sdp_init_keepalive_timer(struct sock *sk)
{
init_timer(&sk->sk_timer);
@@ -332,6 +293,150 @@ void sdp_start_keepalive_timer(struct sock *sk)
sdp_reset_keepalive_timer(sk, sdp_keepalive_time_when(sdp_sk(sk)));
}
+void sdp_set_default_moderation(struct sdp_sock *ssk)
+{
+ struct sock *sk = &ssk->isk.sk;
+ struct sdp_moderation *mod = &ssk->auto_mod;
+ int rx_buf_size;
+
+ if (hw_int_mod_count > -1 || hw_int_mod_usec > -1) {
+ int err;
+
+ mod->adaptive_rx_coal = 0;
+
+ if (hw_int_mod_count > 0 && hw_int_mod_usec > 0) {
+ err = ib_modify_cq(ssk->rx_ring.cq, hw_int_mod_count,
+ hw_int_mod_usec);
+ if (err)
+ sdp_warn(sk,
+ "Failed modifying moderation for cq");
+ else
+ sdp_dbg(sk,
+ "Using fixed interrupt moderation\n");
+ }
+ return;
+ }
+
+ mod->adaptive_rx_coal = 1;
+ sdp_dbg(sk, "Using adaptive interrupt moderation\n");
+
+ /* If we haven't received a specific coalescing setting
+ * (module param), we set the moderation paramters as follows:
+ * - moder_cnt is set to the number of mtu sized packets to
+ * satisfy our coelsing target.
+ * - moder_time is set to a fixed value.
+ */
+ rx_buf_size = (ssk->recv_frags * PAGE_SIZE) + sizeof(struct sdp_bsdh);
+ mod->moder_cnt = sdp_rx_coal_target / rx_buf_size + 1;
+ mod->moder_time = sdp_rx_coal_time;
+ sdp_dbg(sk, "Default coalesing params for buf size:%d - "
+ "moder_cnt:%d moder_time:%d\n",
+ rx_buf_size, mod->moder_cnt, mod->moder_time);
+
+ /* Reset auto-moderation params */
+ mod->pkt_rate_low = sdp_rx_rate_low;
+ mod->rx_usecs_low = sdp_rx_coal_time_low;
+ mod->pkt_rate_high = sdp_rx_rate_high;
+ mod->rx_usecs_high = sdp_rx_coal_time_high;
+ mod->sample_interval = sdp_sample_interval;
+
+ mod->last_moder_time = SDP_AUTO_CONF;
+ mod->last_moder_jiffies = 0;
+ mod->last_moder_packets = 0;
+ mod->last_moder_tx_packets = 0;
+ mod->last_moder_bytes = 0;
+}
+
+/* If tx and rx packet rates are not balanced, assume that
+ * traffic is mainly BW bound and apply maximum moderation.
+ * Otherwise, moderate according to packet rate */
+static inline int calc_moder_time(int rate, struct sdp_moderation *mod,
+ int tx_pkt_diff, int rx_pkt_diff)
+{
+ if (2 * tx_pkt_diff > 3 * rx_pkt_diff ||
+ 2 * rx_pkt_diff > 3 * tx_pkt_diff)
+ return mod->rx_usecs_high;
+
+ if (rate < mod->pkt_rate_low)
+ return mod->rx_usecs_low;
+
+ if (rate > mod->pkt_rate_high)
+ return mod->rx_usecs_high;
+
+ return (rate - mod->pkt_rate_low) *
+ (mod->rx_usecs_high - mod->rx_usecs_low) /
+ (mod->pkt_rate_high - mod->pkt_rate_low) +
+ mod->rx_usecs_low;
+}
+
+static void sdp_auto_moderation(struct sdp_sock *ssk)
+{
+ struct sdp_moderation *mod = &ssk->auto_mod;
+
+ unsigned long period = jiffies - mod->last_moder_jiffies;
+ unsigned long packets;
+ unsigned long rate;
+ unsigned long avg_pkt_size;
+ unsigned long tx_pkt_diff;
+ unsigned long rx_pkt_diff;
+ int moder_time;
+ int err;
+
+ if (!mod->adaptive_rx_coal)
+ return;
+
+ if (period < mod->sample_interval)
+ return;
+
+ if (!mod->last_moder_jiffies || !period)
+ goto out;
+
+ tx_pkt_diff = ((unsigned long) (ssk->tx_packets -
+ mod->last_moder_tx_packets));
+ rx_pkt_diff = ((unsigned long) (ssk->rx_packets -
+ mod->last_moder_packets));
+ packets = max(tx_pkt_diff, rx_pkt_diff);
+ rate = packets * HZ / period;
+ avg_pkt_size = packets ? ((unsigned long) (ssk->rx_bytes -
+ mod->last_moder_bytes)) / packets : 0;
+
+ /* Apply auto-moderation only when packet rate exceeds a rate that
+ * it matters */
+ if (rate > sdp_rx_rate_thresh) {
+ moder_time = calc_moder_time(rate, mod, tx_pkt_diff,
+ rx_pkt_diff);
+ } else {
+ /* When packet rate is low, use default moderation rather
+ * than 0 to prevent interrupt storms if traffic suddenly
+ * increases */
+ moder_time = mod->moder_time;
+ }
+
+ sdp_dbg_data(&ssk->isk.sk, "tx rate:%lu rx_rate:%lu\n",
+ tx_pkt_diff * HZ / period, rx_pkt_diff * HZ / period);
+
+ sdp_dbg_data(&ssk->isk.sk, "Rx moder_time changed from:%d to %d "
+ "period:%lu [jiff] packets:%lu avg_pkt_size:%lu "
+ "rate:%lu [p/s])\n",
+ mod->last_moder_time, moder_time, period, packets,
+ avg_pkt_size, rate);
+
+ if (moder_time != mod->last_moder_time) {
+ mod->last_moder_time = moder_time;
+ err = ib_modify_cq(ssk->rx_ring.cq, mod->moder_cnt, moder_time);
+ if (err) {
+ sdp_dbg_data(&ssk->isk.sk,
+ "Failed modifying moderation for cq");
+ }
+ }
+
+out:
+ mod->last_moder_packets = ssk->rx_packets;
+ mod->last_moder_tx_packets = ssk->tx_packets;
+ mod->last_moder_bytes = ssk->rx_bytes;
+ mod->last_moder_jiffies = jiffies;
+}
+
void sdp_reset_sk(struct sock *sk, int rc)
{
struct sdp_sock *ssk = sdp_sk(sk);
@@ -340,11 +445,15 @@ void sdp_reset_sk(struct sock *sk, int rc)
read_lock(&device_removal_lock);
- if (ssk->cq)
- sdp_poll_cq(ssk, ssk->cq);
+ if (ssk->tx_ring.cq)
+ sdp_xmit_poll(ssk, 1);
- if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk))
+ if (!(sk->sk_shutdown & RCV_SHUTDOWN) || !sk_stream_memory_free(sk)) {
+ sdp_warn(sk, "setting state to error\n");
sdp_set_error(sk, rc);
+ }
+
+ sdp_destroy_qp(ssk);
memset((void *)&ssk->id, 0, sizeof(*ssk) - offsetof(typeof(*ssk), id));
@@ -352,7 +461,7 @@ void sdp_reset_sk(struct sock *sk, int rc)
/* Don't destroy socket before destroy work does its job */
sock_hold(sk, SOCK_REF_RESET);
- queue_work(sdp_workqueue, &ssk->destroy_work);
+ queue_work(sdp_wq, &ssk->destroy_work);
read_unlock(&device_removal_lock);
}
@@ -430,7 +539,7 @@ static void sdp_destruct(struct sock *sk)
ssk->destructed_already = 1;
sdp_remove_sock(ssk);
-
+
sdp_close_sk(sk);
if (ssk->parent)
@@ -485,6 +594,7 @@ static void sdp_close(struct sock *sk, long timeout)
lock_sock(sk);
sdp_dbg(sk, "%s\n", __func__);
+ sdp_prf(sk, NULL, __func__);
sdp_delete_keepalive_timer(sk);
@@ -510,7 +620,7 @@ static void sdp_close(struct sock *sk, long timeout)
* descriptor close, not protocol-sourced closes, because the
* reader process may not have drained the data yet!
*/
- while ((skb = __skb_dequeue(&sk->sk_receive_queue)) != NULL) {
+ while ((skb = skb_dequeue(&sk->sk_receive_queue)) != NULL) {
data_was_unread = 1;
__kfree_skb(skb);
}
@@ -701,13 +811,9 @@ static int sdp_wait_for_connect(struct sock *sk, long timeo)
TASK_INTERRUPTIBLE);
release_sock(sk);
if (list_empty(&ssk->accept_queue)) {
- sdp_dbg(sk, "%s schedule_timeout\n", __func__);
timeo = schedule_timeout(timeo);
- sdp_dbg(sk, "%s schedule_timeout done\n", __func__);
}
- sdp_dbg(sk, "%s lock_sock\n", __func__);
lock_sock(sk);
- sdp_dbg(sk, "%s lock_sock done\n", __func__);
err = 0;
if (!list_empty(&ssk->accept_queue))
break;
@@ -730,7 +836,7 @@ static int sdp_wait_for_connect(struct sock *sk, long timeo)
/* Like inet_csk_accept */
static struct sock *sdp_accept(struct sock *sk, int flags, int *err)
{
- struct sdp_sock *newssk, *ssk;
+ struct sdp_sock *newssk = NULL, *ssk;
struct sock *newsk;
int error;
@@ -761,7 +867,8 @@ static struct sock *sdp_accept(struct sock *sk, int flags, int *err)
goto out_err;
}
- newssk = list_entry(ssk->accept_queue.next, struct sdp_sock, accept_queue);
+ newssk = list_entry(ssk->accept_queue.next, struct sdp_sock,
+ accept_queue);
list_del_init(&newssk->accept_queue);
newssk->parent = NULL;
sk_acceptq_removed(sk);
@@ -770,11 +877,9 @@ out:
release_sock(sk);
if (newsk) {
lock_sock(newsk);
- if (newssk->cq) {
- sdp_dbg(newsk, "%s: ib_req_notify_cq\n", __func__);
+ if (newssk->rx_ring.cq) {
newssk->poll_cq = 1;
- ib_req_notify_cq(newssk->cq, IB_CQ_NEXT_COMP);
- sdp_poll_cq(newssk, newssk->cq);
+ sdp_arm_rx_cq(&newssk->isk.sk);
}
release_sock(newsk);
}
@@ -807,8 +912,8 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
else if (sock_flag(sk, SOCK_URGINLINE) ||
!ssk->urg_data ||
before(ssk->urg_seq, ssk->copied_seq) ||
- !before(ssk->urg_seq, ssk->rcv_nxt)) {
- answ = ssk->rcv_nxt - ssk->copied_seq;
+ !before(ssk->urg_seq, rcv_nxt(ssk))) {
+ answ = rcv_nxt(ssk) - ssk->copied_seq;
/* Subtract 1, if FIN is in queue. */
if (answ && !skb_queue_empty(&sk->sk_receive_queue))
@@ -829,7 +934,7 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
if ((1 << sk->sk_state) & (TCPF_SYN_SENT | TCPF_SYN_RECV))
answ = 0;
else
- answ = ssk->write_seq - ssk->snd_una;
+ answ = ssk->write_seq - ssk->tx_ring.una_seq;
break;
default:
return -ENOIOCTLCMD;
@@ -837,14 +942,14 @@ static int sdp_ioctl(struct sock *sk, int cmd, unsigned long arg)
/* TODO: Need to handle:
case SIOCOUTQ:
*/
- return put_user(answ, (int __user *)arg);
+ return put_user(answ, (int __user *)arg);
}
static inline void sdp_start_dreq_wait_timeout(struct sdp_sock *ssk, int timeo)
{
sdp_dbg(&ssk->isk.sk, "Starting dreq wait timeout\n");
- queue_delayed_work(sdp_workqueue, &ssk->dreq_wait_work, timeo);
+ queue_delayed_work(sdp_wq, &ssk->dreq_wait_work, timeo);
ssk->dreq_wait_timeout = 1;
}
@@ -866,7 +971,8 @@ void sdp_cancel_dreq_wait_timeout(struct sdp_sock *ssk)
void sdp_destroy_work(struct work_struct *work)
{
- struct sdp_sock *ssk = container_of(work, struct sdp_sock, destroy_work);
+ struct sdp_sock *ssk = container_of(work, struct sdp_sock,
+ destroy_work);
struct sock *sk = &ssk->isk.sk;
sdp_dbg(sk, "%s: refcnt %d\n", __func__, atomic_read(&sk->sk_refcnt));
@@ -908,12 +1014,10 @@ void sdp_dreq_wait_timeout_work(struct work_struct *work)
release_sock(sk);
- if (sdp_sk(sk)->id) {
+ if (sdp_sk(sk)->id)
rdma_disconnect(sdp_sk(sk)->id);
- } else {
- sdp_warn(sk, "NOT SENDING DREQ - no need to wait for timewait exit\n");
+ else
sock_put(sk, SOCK_REF_CM_TW);
- }
out:
sock_put(sk, SOCK_REF_DREQ_TO);
@@ -932,13 +1036,19 @@ int sdp_init_sock(struct sock *sk)
sk->sk_route_caps |= NETIF_F_SG | NETIF_F_NO_CSUM;
- ssk->rx_ring = NULL;
- ssk->tx_ring = NULL;
+ skb_queue_head_init(&ssk->rx_ctl_q);
+
+ atomic_set(&ssk->mseq_ack, 0);
+
+ sdp_rx_ring_init(ssk);
+ ssk->tx_ring.buffer = NULL;
ssk->sdp_disconnect = 0;
ssk->destructed_already = 0;
ssk->destruct_in_process = 0;
spin_lock_init(&ssk->lock);
+ atomic_set(&ssk->somebody_is_doing_posts, 0);
+
return 0;
}
@@ -976,7 +1086,7 @@ static void sdp_mark_push(struct sdp_sock *ssk, struct sk_buff *skb)
{
TCP_SKB_CB(skb)->flags |= TCPCB_FLAG_PSH;
ssk->pushed_seq = ssk->write_seq;
- sdp_post_sends(ssk, 0);
+ sdp_do_posts(ssk);
}
static inline void sdp_push_pending_frames(struct sock *sk)
@@ -984,7 +1094,6 @@ static inline void sdp_push_pending_frames(struct sock *sk)
struct sk_buff *skb = sk->sk_send_head;
if (skb) {
sdp_mark_push(sdp_sk(sk), skb);
- sdp_post_sends(sdp_sk(sk), 0);
}
}
@@ -1072,8 +1181,11 @@ static int sdp_setsockopt(struct sock *sk, int level, int optname,
ssk->keepalive_time = val * HZ;
if (sock_flag(sk, SOCK_KEEPOPEN) &&
- !((1 << sk->sk_state) & (TCPF_CLOSE | TCPF_LISTEN)))
- sdp_reset_keepalive_timer(sk, ssk->keepalive_time);
+ !((1 << sk->sk_state) &
+ (TCPF_CLOSE | TCPF_LISTEN))) {
+ sdp_reset_keepalive_timer(sk,
+ ssk->keepalive_time);
+ }
}
break;
case SDP_ZCOPY_THRESH:
@@ -1140,30 +1252,16 @@ static int sdp_getsockopt(struct sock *sk, int level, int optname,
static inline int poll_recv_cq(struct sock *sk)
{
int i;
- if (sdp_sk(sk)->cq) {
- for (i = 0; i < recv_poll; ++i)
- if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
- ++recv_poll_hit;
- return 0;
- }
- ++recv_poll_miss;
+ for (i = 0; i < recv_poll; ++i) {
+ if (!skb_queue_empty(&sk->sk_receive_queue)) {
+ ++recv_poll_hit;
+ return 0;
+ }
}
+ ++recv_poll_miss;
return 1;
}
-static inline void poll_send_cq(struct sock *sk)
-{
- int i;
- if (sdp_sk(sk)->cq) {
- for (i = 0; i < send_poll; ++i)
- if (!sdp_poll_cq(sdp_sk(sk), sdp_sk(sk)->cq)) {
- ++send_poll_hit;
- return;
- }
- ++send_poll_miss;
- }
-}
-
/* Like tcp_recv_urg */
/*
* Handle reading urgent data. BSD has very simple semantics for
@@ -1218,30 +1316,8 @@ static int sdp_recv_urg(struct sock *sk, long timeo,
return -EAGAIN;
}
-static void sdp_rcv_space_adjust(struct sock *sk)
-{
- sdp_post_recvs(sdp_sk(sk));
- sdp_post_sends(sdp_sk(sk), 0);
-}
-
-static unsigned int sdp_current_mss(struct sock *sk, int large_allowed)
-{
- /* TODO */
- return PAGE_SIZE;
-}
-
-static int forced_push(struct sdp_sock *sk)
-{
- /* TODO */
- return 0;
-}
-
-static inline int select_size(struct sock *sk, struct sdp_sock *ssk)
-{
- return 0;
-}
-
-static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags)
+static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk,
+ int flags)
{
if (unlikely(flags & MSG_OOB)) {
struct sk_buff *skb = sk->sk_write_queue.prev;
@@ -1249,12 +1325,11 @@ static inline void sdp_mark_urg(struct sock *sk, struct sdp_sock *ssk, int flags
}
}
-static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags,
- int mss_now, int nonagle)
+static inline void sdp_push(struct sock *sk, struct sdp_sock *ssk, int flags)
{
if (sk->sk_send_head)
sdp_mark_urg(sk, ssk, flags);
- sdp_post_sends(ssk, nonagle);
+ sdp_do_posts(sdp_sk(sk));
}
static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
@@ -1270,24 +1345,21 @@ static inline void skb_entail(struct sock *sk, struct sdp_sock *ssk,
ssk->nonagle &= ~TCP_NAGLE_PUSH;
}
-static void sdp_push_one(struct sock *sk, unsigned int mss_now)
-{
-}
-
static inline struct bzcopy_state *sdp_bz_cleanup(struct bzcopy_state *bz)
{
- int i, max_retry;
+ int i;
struct sdp_sock *ssk = (struct sdp_sock *)bz->ssk;
/* Wait for in-flight sends; should be quick */
if (bz->busy) {
struct sock *sk = &ssk->isk.sk;
+ unsigned long timeout = jiffies + SDP_BZCOPY_POLL_TIMEOUT;
- for (max_retry = 0; max_retry < 10000; max_retry++) {
- poll_send_cq(sk);
-
+ while (jiffies < timeout) {
+ sdp_xmit_poll(sdp_sk(sk), 1);
if (!bz->busy)
break;
+ SDPSTATS_COUNTER_INC(bzcopy_poll_miss);
}
if (bz->busy)
@@ -1319,11 +1391,14 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
int thresh;
mm_segment_t cur_fs;
- cur_fs = get_fs();
-
thresh = ssk->zcopy_thresh ? : sdp_zcopy_thresh;
- if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK))
+ if (thresh == 0 || len < thresh || !capable(CAP_IPC_LOCK)) {
+ SDPSTATS_COUNTER_INC(sendmsg_bcopy_segment);
return NULL;
+ }
+ SDPSTATS_COUNTER_INC(sendmsg_bzcopy_segment);
+
+ cur_fs = get_fs();
/*
* Since we use the TCP segmentation fields of the skb to map user
@@ -1347,7 +1422,8 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
bz->busy = 0;
bz->ssk = ssk;
bz->page_cnt = PAGE_ALIGN(len + bz->cur_offset) >> PAGE_SHIFT;
- bz->pages = kcalloc(bz->page_cnt, sizeof(struct page *), GFP_KERNEL);
+ bz->pages = kcalloc(bz->page_cnt, sizeof(struct page *),
+ GFP_KERNEL);
if (!bz->pages) {
kfree(bz);
@@ -1386,7 +1462,6 @@ static struct bzcopy_state *sdp_bz_setup(struct sdp_sock *ssk,
return bz;
}
-
#define TCP_PAGE(sk) (sk->sk_sndmsg_page)
#define TCP_OFF(sk) (sk->sk_sndmsg_off)
static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
@@ -1413,12 +1488,9 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
/* We can extend the last page
* fragment. */
merge = 1;
- } else if (i == ssk->send_frags ||
- (!i &&
- !(sk->sk_route_caps & NETIF_F_SG))) {
+ } else if (i == ssk->send_frags) {
/* Need to add new fragment and cannot
- * do this because interface is non-SG,
- * or because all the page slots are
+ * do this because all the page slots are
* busy. */
sdp_mark_push(ssk, skb);
return SDP_NEW_SEG;
@@ -1445,6 +1517,7 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
/* Time to copy data. We are close to
* the end! */
+ SDPSTATS_COUNTER_ADD(memcpy_count, copy);
err = skb_copy_to_page(sk, from, skb, page,
off, copy);
if (err) {
@@ -1478,7 +1551,6 @@ static inline int sdp_bcopy_get(struct sock *sk, struct sk_buff *skb,
return copy;
}
-
static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
unsigned char __user *from, int copy,
struct bzcopy_state *bz)
@@ -1539,23 +1611,27 @@ static inline int sdp_bzcopy_get(struct sock *sk, struct sk_buff *skb,
return copy;
}
-static inline int slots_free(struct sdp_sock *ssk)
+/* return the min of:
+ * - tx credits
+ * - free slots in tx_ring (not including SDP_MIN_TX_CREDITS
+ */
+static inline int tx_slots_free(struct sdp_sock *ssk)
{
int min_free;
- min_free = SDP_TX_SIZE - (ssk->tx_head - ssk->tx_tail);
- if (ssk->bufs < min_free)
- min_free = ssk->bufs;
- min_free -= (min_free < SDP_MIN_BUFS) ? min_free : SDP_MIN_BUFS;
+ min_free = MIN(tx_credits(ssk),
+ SDP_TX_SIZE - ring_posted(ssk->tx_ring));
+ if (min_free < SDP_MIN_TX_CREDITS)
+ return 0;
- return min_free;
+ return min_free - SDP_MIN_TX_CREDITS;
};
/* like sk_stream_memory_free - except measures remote credits */
static inline int sdp_bzcopy_slots_avail(struct sdp_sock *ssk,
struct bzcopy_state *bz)
{
- return slots_free(ssk) > bz->busy;
+ return tx_slots_free(ssk) > bz->busy;
}
/* like sk_stream_wait_memory - except waits on remote credits */
@@ -1595,14 +1671,24 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
+ posts_handler_put(ssk);
+
if (sdp_bzcopy_slots_avail(ssk, bz))
break;
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
sk->sk_write_pending++;
+
+ if (tx_credits(ssk) > SDP_MIN_TX_CREDITS)
+ sdp_arm_tx_cq(sk);
+
sk_wait_event(sk, ¤t_timeo,
sdp_bzcopy_slots_avail(ssk, bz) && vm_wait);
sk->sk_write_pending--;
+ sdp_prf1(sk, NULL, "finished wait for mem");
+
+ posts_handler_get(ssk);
+ sdp_do_posts(ssk);
if (vm_wait) {
vm_wait -= current_timeo;
@@ -1619,25 +1705,6 @@ static int sdp_bzcopy_wait_memory(struct sdp_sock *ssk, long *timeo_p,
return err;
}
-/* like sk_stream_write_space - execpt measures remote credits */
-void sdp_bzcopy_write_space(struct sdp_sock *ssk)
-{
- struct sock *sk = &ssk->isk.sk;
- struct socket *sock = sk->sk_socket;
-
- if (ssk->bufs >= ssk->min_bufs &&
- ssk->tx_head == ssk->tx_tail &&
- sock != NULL) {
- clear_bit(SOCK_NOSPACE, &sock->flags);
-
- if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
- wake_up_interruptible(sk->sk_sleep);
- if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
- sock_wake_async(sock, 2, POLL_OUT);
- }
-}
-
-
/* Like tcp_sendmsg */
/* TODO: check locking */
static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
@@ -1647,14 +1714,20 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
struct sdp_sock *ssk = sdp_sk(sk);
struct sk_buff *skb;
int iovlen, flags;
- int mss_now, size_goal;
+ int size_goal;
int err, copied;
long timeo;
struct bzcopy_state *bz = NULL;
-
+ unsigned long long a, b, c;
+ unsigned long long start, end;
+ SDPSTATS_COUNTER_INC(sendmsg);
lock_sock(sk);
sdp_dbg_data(sk, "%s\n", __func__);
+ rdtscll(start);
+
+ posts_handler_get(ssk);
+
flags = msg->msg_flags;
timeo = sock_sndtimeo(sk, flags & MSG_DONTWAIT);
@@ -1666,7 +1739,6 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
/* This should be in poll */
clear_bit(SOCK_ASYNC_NOSPACE, &sk->sk_socket->flags);
- mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
size_goal = ssk->xmit_size_goal;
/* Ok commence sending. */
@@ -1688,9 +1760,16 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
if (size_goal > SDP_MAX_PAYLOAD)
size_goal = SDP_MAX_PAYLOAD;
+ SDPSTATS_HIST(sendmsg_seglen, seglen);
+
+ rdtscll(a);
if (bz)
sdp_bz_cleanup(bz);
+ rdtscll(b);
bz = sdp_bz_setup(ssk, from, seglen, size_goal);
+ rdtscll(c);
+ SDPSTATS_COUNTER_ADD(bz_clean_sum, b - a);
+ SDPSTATS_COUNTER_ADD(bz_setup_sum, c - b);
if (IS_ERR(bz)) {
bz = NULL;
err = PTR_ERR(bz);
@@ -1704,8 +1783,7 @@ static int sdp_sendmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
if (!sk->sk_send_head ||
(copy = size_goal - skb->len) <= 0 ||
- bz != *(struct bzcopy_state **)skb->cb) {
-
+ bz != BZCOPY_STATE(skb)) {
new_segment:
/*
* Allocate a new segment
@@ -1722,12 +1800,12 @@ new_segment:
goto wait_for_sndbuf;
}
- skb = sdp_stream_alloc_skb(sk, select_size(sk, ssk),
- sk->sk_allocation);
+ skb = sdp_stream_alloc_skb(sk, 0,
+ sk->sk_allocation);
if (!skb)
goto wait_for_memory;
- *((struct bzcopy_state **)skb->cb) = bz;
+ BZCOPY_STATE(skb) = bz;
/*
* Check whether we can use HW checksum.
@@ -1739,6 +1817,11 @@ new_segment:
skb_entail(sk, ssk, skb);
copy = size_goal;
+ } else {
+ sdp_dbg_data(sk, "adding to existing skb: %p"
+ " len = %d, sk_send_head: %p "
+ "copy: %d\n",
+ skb, skb->len, sk->sk_send_head, copy);
}
/* Try to append data to the end of skb. */
@@ -1753,8 +1836,12 @@ new_segment:
goto new_segment;
}
+ rdtscll(a);
copy = (bz) ? sdp_bzcopy_get(sk, skb, from, copy, bz) :
sdp_bcopy_get(sk, skb, from, copy);
+ rdtscll(b);
+ if (copy > 0)
+ SDPSTATS_COUNTER_ADD(tx_copy_sum, b - a);
if (unlikely(copy < 0)) {
if (!++copy)
goto wait_for_memory;
@@ -1777,48 +1864,60 @@ new_segment:
if ((seglen -= copy) == 0 && iovlen == 0)
goto out;
- if (skb->len < mss_now || (flags & MSG_OOB))
+ if (skb->len < PAGE_SIZE || (flags & MSG_OOB))
continue;
-
- if (forced_push(ssk)) {
- sdp_mark_push(ssk, skb);
- /* TODO: and push pending frames mss_now */
- /* sdp_push_pending(sk, ssk, mss_now, TCP_NAGLE_PUSH); */
- } else if (skb == sk->sk_send_head)
- sdp_push_one(sk, mss_now);
continue;
wait_for_sndbuf:
set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
wait_for_memory:
+ sdp_prf(sk, skb, "wait for mem");
+ SDPSTATS_COUNTER_INC(send_wait_for_mem);
if (copied)
- sdp_push(sk, ssk, flags & ~MSG_MORE, mss_now, TCP_NAGLE_PUSH);
+ sdp_push(sk, ssk, flags & ~MSG_MORE);
+
+ sdp_xmit_poll(ssk, 1);
+
+ if (bz) {
+ err = sdp_bzcopy_wait_memory(ssk, &timeo, bz);
+ } else {
+ posts_handler_put(ssk);
+
+ sdp_arm_tx_cq(sk);
+
+ err = sk_stream_wait_memory(sk, &timeo);
+
+ posts_handler_get(ssk);
+ sdp_do_posts(ssk);
+ }
- err = (bz) ? sdp_bzcopy_wait_memory(ssk, &timeo, bz) :
- sk_stream_wait_memory(sk, &timeo);
if (err)
goto do_error;
- mss_now = sdp_current_mss(sk, !(flags&MSG_OOB));
size_goal = ssk->xmit_size_goal;
}
}
out:
if (copied) {
- sdp_push(sk, ssk, flags, mss_now, ssk->nonagle);
+ sdp_push(sk, ssk, flags);
if (bz)
bz = sdp_bz_cleanup(bz);
- else
- if (size > send_poll_thresh)
- poll_send_cq(sk);
}
+ posts_handler_put(ssk);
+
+ sdp_auto_moderation(ssk);
+
+ rdtscll(end);
+ SDPSTATS_COUNTER_ADD(sendmsg_sum, end - start);
release_sock(sk);
return copied;
do_fault:
+ sdp_prf(sk, skb, "prepare fault");
+
if (!skb->len) {
if (sk->sk_send_head == skb)
sk->sk_send_head = NULL;
@@ -1833,6 +1932,9 @@ out_err:
if (bz)
bz = sdp_bz_cleanup(bz);
err = sk_stream_error(sk, flags, err);
+
+ posts_handler_put(ssk);
+
release_sock(sk);
return err;
}
@@ -1841,7 +1943,7 @@ out_err:
/* Maybe use skb_recv_datagram here? */
/* Note this does not seem to handle vectored messages. Relevant? */
static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
- size_t len, int noblock, int flags,
+ size_t len, int noblock, int flags,
int *addr_len)
{
struct sk_buff *skb = NULL;
@@ -1858,6 +1960,10 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
lock_sock(sk);
sdp_dbg_data(sk, "%s\n", __func__);
+ posts_handler_get(ssk);
+
+ sdp_prf(sk, skb, "Read from user");
+
err = -ENOTCONN;
if (sk->sk_state == TCP_LISTEN)
goto out;
@@ -1878,12 +1984,14 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
do {
u32 offset;
- /* Are we at urgent data? Stop if we have read anything or have SIGURG pending. */
+ /* Are we at urgent data? Stop if we have read anything or have
+ * SIGURG pending. */
if (ssk->urg_data && ssk->urg_seq == *seq) {
if (copied)
break;
if (signal_pending(current)) {
- copied = timeo ? sock_intr_errno(timeo) : -EAGAIN;
+ copied = timeo ? sock_intr_errno(timeo) :
+ -EAGAIN;
break;
}
}
@@ -1897,8 +2005,9 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
goto found_fin_ok;
if (before(*seq, TCP_SKB_CB(skb)->seq)) {
- printk(KERN_INFO "recvmsg bug: copied %X "
- "seq %X\n", *seq, TCP_SKB_CB(skb)->seq);
+ sdp_warn(sk, "recvmsg bug: copied %X seq %X\n",
+ *seq, TCP_SKB_CB(skb)->seq);
+ sdp_reset(sk);
break;
}
@@ -1962,15 +2071,24 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
release_sock(sk);
lock_sock(sk);
} else if (rc) {
- sdp_dbg_data(sk, "%s: sk_wait_data %ld\n", __func__, timeo);
+ sdp_dbg_data(sk, "sk_wait_data %ld\n", timeo);
+
+ posts_handler_put(ssk);
+
+ /* socket lock is released inside sk_wait_data */
sk_wait_data(sk, &timeo);
+
+ posts_handler_get(ssk);
+ sdp_prf(sk, NULL, "got data");
+
+ sdp_do_posts(ssk);
}
continue;
found_ok_skb:
- sdp_dbg_data(sk, "%s: found_ok_skb len %d\n", __func__, skb->len);
- sdp_dbg_data(sk, "%s: len %Zd offset %d\n", __func__, len, offset);
- sdp_dbg_data(sk, "%s: copied %d target %d\n", __func__, copied, target);
+ sdp_dbg_data(sk, "found_ok_skb len %d\n", skb->len);
+ sdp_dbg_data(sk, "len %Zd offset %d\n", len, offset);
+ sdp_dbg_data(sk, "copied %d target %d\n", copied, target);
used = skb->len - offset;
if (len < used)
used = len;
@@ -2011,9 +2129,9 @@ static int sdp_recvmsg(struct kiocb *iocb, struct sock *sk, struct msghdr *msg,
len -= used;
*seq += used;
- sdp_dbg_data(sk, "%s: done copied %d target %d\n", __func__, copied, target);
+ sdp_dbg_data(sk, "done copied %d target %d\n", copied, target);
- sdp_rcv_space_adjust(sk);
+ sdp_do_posts(sdp_sk(sk));
skip_copy:
if (ssk->urg_data && after(ssk->copied_seq, ssk->urg_seq))
ssk->urg_data = 0;
@@ -2021,22 +2139,32 @@ skip_copy:
continue;
offset = 0;
- if (!(flags & MSG_PEEK))
- sk_eat_skb(sk, skb, 0);
-
+ if (!(flags & MSG_PEEK)) {
+ struct sdp_bsdh *h;
+ h = (struct sdp_bsdh *)skb_transport_header(skb);
+ sdp_prf1(sk, skb, "READ finished. mseq: %d mseq_ack:%d",
+ ntohl(h->mseq), ntohl(h->mseq_ack));
+ skb_unlink(skb, &sk->sk_receive_queue);
+ __kfree_skb(skb);
+ }
continue;
found_fin_ok:
++*seq;
- if (!(flags & MSG_PEEK))
- sk_eat_skb(sk, skb, 0);
-
+ if (!(flags & MSG_PEEK)) {
+ skb_unlink(skb, &sk->sk_receive_queue);
+ __kfree_skb(skb);
+ }
break;
- } while (len > 0);
- release_sock(sk);
- return copied;
+ } while (len > 0);
+ err = copied;
out:
+
+ posts_handler_put(ssk);
+
+ sdp_auto_moderation(ssk);
+
release_sock(sk);
return err;
@@ -2128,8 +2256,8 @@ static unsigned int sdp_poll(struct file *file, struct socket *socket,
/*
* Adjust for memory in later kernels
*/
- if (!sk_stream_memory_free(sk) || !slots_free(ssk))
- mask &= ~(POLLOUT | POLLWRNORM | POLLWRBAND);
+ if (!sk_stream_memory_free(sk) || !tx_slots_free(ssk))
+ mask &= ~(POLLOUT | POLLWRNORM | POLLWRBAND);
/* TODO: Slightly ugly: it would be nicer if there was function
* like datagram_poll that didn't include poll_wait,
@@ -2221,7 +2349,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
struct sock *sk;
int rc;
- sdp_dbg(NULL, "%s: type %d protocol %d\n", __func__, sock->type, protocol);
+ sdp_dbg(NULL, "type %d protocol %d\n", sock->type, protocol);
if (net != &init_net)
return -EAFNOSUPPORT;
@@ -2256,7 +2384,7 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
sk->sk_destruct = sdp_destruct;
- sdp_init_timer(sk);
+ sdp_init_keepalive_timer(sk);
sock->ops = &sdp_proto_ops;
sock->state = SS_UNCONNECTED;
@@ -2266,192 +2394,6 @@ static int sdp_create_socket(struct net *net, struct socket *sock, int protocol)
return 0;
}
-#ifdef CONFIG_PROC_FS
-
-static void *sdp_get_idx(struct seq_file *seq, loff_t pos)
-{
- int i = 0;
- struct sdp_sock *ssk;
-
- if (!list_empty(&sock_list))
- list_for_each_entry(ssk, &sock_list, sock_list) {
- if (i == pos)
- return ssk;
- i++;
- }
-
- return NULL;
-}
-
-static void *sdp_seq_start(struct seq_file *seq, loff_t *pos)
-{
- void *start = NULL;
- struct sdp_iter_state* st = seq->private;
-
- st->num = 0;
-
- if (!*pos)
- return SEQ_START_TOKEN;
-
- spin_lock_irq(&sock_list_lock);
- start = sdp_get_idx(seq, *pos - 1);
- if (start)
- sock_hold((struct sock *)start, SOCK_REF_SEQ);
- spin_unlock_irq(&sock_list_lock);
-
- return start;
-}
-
-static void *sdp_seq_next(struct seq_file *seq, void *v, loff_t *pos)
-{
- struct sdp_iter_state* st = seq->private;
- void *next = NULL;
-
- spin_lock_irq(&sock_list_lock);
- if (v == SEQ_START_TOKEN)
- next = sdp_get_idx(seq, 0);
- else
- next = sdp_get_idx(seq, *pos);
- if (next)
- sock_hold((struct sock *)next, SOCK_REF_SEQ);
- spin_unlock_irq(&sock_list_lock);
-
- *pos += 1;
- st->num++;
-
- return next;
-}
-
-static void sdp_seq_stop(struct seq_file *seq, void *v)
-{
-}
-
-#define TMPSZ 150
-
-static int sdp_seq_show(struct seq_file *seq, void *v)
-{
- struct sdp_iter_state* st;
- struct sock *sk = v;
- char tmpbuf[TMPSZ + 1];
- unsigned int dest;
- unsigned int src;
- int uid;
- unsigned long inode;
- __u16 destp;
- __u16 srcp;
- __u32 rx_queue, tx_queue;
-
- if (v == SEQ_START_TOKEN) {
- seq_printf(seq, "%-*s\n", TMPSZ - 1,
- " sl local_address rem_address uid inode"
- " rx_queue tx_queue state");
- goto out;
- }
-
- st = seq->private;
-
- dest = inet_sk(sk)->daddr;
- src = inet_sk(sk)->rcv_saddr;
- destp = ntohs(inet_sk(sk)->dport);
- srcp = ntohs(inet_sk(sk)->sport);
- uid = sock_i_uid(sk);
- inode = sock_i_ino(sk);
- rx_queue = sdp_sk(sk)->rcv_nxt - sdp_sk(sk)->copied_seq;
- tx_queue = sdp_sk(sk)->write_seq - sdp_sk(sk)->snd_una;
-
- sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu %08X:%08X %X",
- st->num, src, srcp, dest, destp, uid, inode,
- rx_queue, tx_queue, sk->sk_state);
-
- seq_printf(seq, "%-*s\n", TMPSZ - 1, tmpbuf);
-
- sock_put(sk, SOCK_REF_SEQ);
-out:
- return 0;
-}
-
-static int sdp_seq_open(struct inode *inode, struct file *file)
-{
- struct sdp_seq_afinfo *afinfo = PDE(inode)->data;
- struct seq_file *seq;
- struct sdp_iter_state *s;
- int rc;
-
- if (unlikely(afinfo == NULL))
- return -EINVAL;
-
- s = kzalloc(sizeof(*s), GFP_KERNEL);
- if (!s)
- return -ENOMEM;
- s->family = afinfo->family;
- s->seq_ops.start = sdp_seq_start;
- s->seq_ops.next = sdp_seq_next;
- s->seq_ops.show = afinfo->seq_show;
- s->seq_ops.stop = sdp_seq_stop;
-
- rc = seq_open(file, &s->seq_ops);
- if (rc)
- goto out_kfree;
- seq = file->private_data;
- seq->private = s;
-out:
- return rc;
-out_kfree:
- kfree(s);
- goto out;
-}
-
-
-static struct file_operations sdp_seq_fops;
-static struct sdp_seq_afinfo sdp_seq_afinfo = {
- .owner = THIS_MODULE,
- .name = "sdp",
- .family = AF_INET_SDP,
- .seq_show = sdp_seq_show,
- .seq_fops = &sdp_seq_fops,
-};
-
-
-static int __init sdp_proc_init(void)
-{
- int rc = 0;
- struct proc_dir_entry *p;
-
- sdp_seq_afinfo.seq_fops->owner = sdp_seq_afinfo.owner;
- sdp_seq_afinfo.seq_fops->open = sdp_seq_open;
- sdp_seq_afinfo.seq_fops->read = seq_read;
- sdp_seq_afinfo.seq_fops->llseek = seq_lseek;
- sdp_seq_afinfo.seq_fops->release = seq_release_private;
-
- p = proc_net_fops_create(&init_net, sdp_seq_afinfo.name, S_IRUGO,
- sdp_seq_afinfo.seq_fops);
- if (p)
- p->data = &sdp_seq_afinfo;
- else
- rc = -ENOMEM;
-
- return rc;
-}
-
-static void sdp_proc_unregister(void)
-{
- proc_net_remove(&init_net, sdp_seq_afinfo.name);
- memset(sdp_seq_afinfo.seq_fops, 0, sizeof(*sdp_seq_afinfo.seq_fops));
-}
-
-#else /* CONFIG_PROC_FS */
-
-static int __init sdp_proc_init(void)
-{
- return 0;
-}
-
-static void sdp_proc_unregister(void)
-{
-
-}
-#endif /* CONFIG_PROC_FS */
-
static void sdp_add_device(struct ib_device *device)
{
}
@@ -2499,12 +2441,12 @@ kill_socks:
sk->sk_shutdown |= RCV_SHUTDOWN;
sdp_reset(sk);
- if ((1 << sk->sk_state) &
+ if ((1 << sk->sk_state) &
(TCPF_FIN_WAIT1 | TCPF_CLOSE_WAIT |
TCPF_LAST_ACK | TCPF_TIME_WAIT)) {
sock_put(sk, SOCK_REF_CM_TW);
}
-
+
schedule();
spin_lock_irq(&sock_list_lock);
@@ -2532,39 +2474,44 @@ static struct ib_client sdp_client = {
static int __init sdp_init(void)
{
- int rc;
+ int rc = -ENOMEM;
INIT_LIST_HEAD(&sock_list);
spin_lock_init(&sock_list_lock);
spin_lock_init(&sdp_large_sockets_lock);
sockets_allocated = kmalloc(sizeof(*sockets_allocated), GFP_KERNEL);
+ if (!sockets_allocated)
+ goto no_mem_sockets_allocated;
+
orphan_count = kmalloc(sizeof(*orphan_count), GFP_KERNEL);
+ if (!orphan_count)
+ goto no_mem_orphan_count;
+
percpu_counter_init(sockets_allocated, 0);
percpu_counter_init(orphan_count, 0);
sdp_proto.sockets_allocated = sockets_allocated;
sdp_proto.orphan_count = orphan_count;
+ rx_comp_wq = create_singlethread_workqueue("rx_comp_wq");
+ if (!rx_comp_wq)
+ goto no_mem_rx_wq;
- sdp_workqueue = create_singlethread_workqueue("sdp");
- if (!sdp_workqueue) {
- return -ENOMEM;
- }
+ sdp_wq = create_singlethread_workqueue("sdp_wq");
+ if (!sdp_wq)
+ goto no_mem_sdp_wq;
rc = proto_register(&sdp_proto, 1);
if (rc) {
- printk(KERN_WARNING "%s: proto_register failed: %d\n", __func__, rc);
- destroy_workqueue(sdp_workqueue);
- return rc;
+ printk(KERN_WARNING "proto_register failed: %d\n", rc);
+ goto error_proto_reg;
}
rc = sock_register(&sdp_net_proto);
if (rc) {
- printk(KERN_WARNING "%s: sock_register failed: %d\n", __func__, rc);
- proto_unregister(&sdp_proto);
- destroy_workqueue(sdp_workqueue);
- return rc;
+ printk(KERN_WARNING "sock_register failed: %d\n", rc);
+ goto error_sock_reg;
}
sdp_proc_init();
@@ -2574,6 +2521,19 @@ static int __init sdp_init(void)
ib_register_client(&sdp_client);
return 0;
+
+error_sock_reg:
+ proto_unregister(&sdp_proto);
+error_proto_reg:
+ destroy_workqueue(sdp_wq);
+no_mem_sdp_wq:
+ destroy_workqueue(rx_comp_wq);
+no_mem_rx_wq:
+ kfree(orphan_count);
+no_mem_orphan_count:
+ kfree(sockets_allocated);
+no_mem_sockets_allocated:
+ return rc;
}
static void __exit sdp_exit(void)
@@ -2584,7 +2544,10 @@ static void __exit sdp_exit(void)
if (percpu_counter_read_positive(orphan_count))
printk(KERN_WARNING "%s: orphan_count %lld\n", __func__,
percpu_counter_read_positive(orphan_count));
- destroy_workqueue(sdp_workqueue);
+
+ destroy_workqueue(rx_comp_wq);
+ destroy_workqueue(sdp_wq);
+
flush_scheduled_work();
BUG_ON(!list_empty(&sock_list));
diff --git a/drivers/infiniband/ulp/sdp/sdp_proc.c b/drivers/infiniband/ulp/sdp/sdp_proc.c
new file mode 100644
index 0000000..924e2ee
--- /dev/null
+++ b/drivers/infiniband/ulp/sdp/sdp_proc.c
@@ -0,0 +1,496 @@
+/*
+ * Copyright (c) 2008 Mellanox Technologies Ltd. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+#include <linux/proc_fs.h>
+#include "sdp_socket.h"
+#include "sdp.h"
+
+#ifdef CONFIG_PROC_FS
+
+#define PROC_SDP_STATS "sdpstats"
+#define PROC_SDP_PERF "sdpprf"
+
+/* just like TCP fs */
+struct sdp_seq_afinfo {
+ struct module *owner;
+ char *name;
+ sa_family_t family;
+ int (*seq_show) (struct seq_file *m, void *v);
+ struct file_operations *seq_fops;
+};
+
+struct sdp_iter_state {
+ sa_family_t family;
+ int num;
+ struct seq_operations seq_ops;
+};
+
+static void *sdp_get_idx(struct seq_file *seq, loff_t pos)
+{
+ int i = 0;
+ struct sdp_sock *ssk;
+
+ if (!list_empty(&sock_list))
+ list_for_each_entry(ssk, &sock_list, sock_list) {
+ if (i == pos)
+ return ssk;
+ i++;
+ }
+
+ return NULL;
+}
+
+static void *sdp_seq_start(struct seq_file *seq, loff_t *pos)
+{
+ void *start = NULL;
+ struct sdp_iter_state *st = seq->private;
+
+ st->num = 0;
+
+ if (!*pos)
+ return SEQ_START_TOKEN;
+
+ spin_lock_irq(&sock_list_lock);
+ start = sdp_get_idx(seq, *pos - 1);
+ if (start)
+ sock_hold((struct sock *)start, SOCK_REF_SEQ);
+ spin_unlock_irq(&sock_list_lock);
+
+ return start;
+}
+
+static void *sdp_seq_next(struct seq_file *seq, void *v, loff_t *pos)
+{
+ struct sdp_iter_state *st = seq->private;
+ void *next = NULL;
+
+ spin_lock_irq(&sock_list_lock);
+ if (v == SEQ_START_TOKEN)
+ next = sdp_get_idx(seq, 0);
+ else
+ next = sdp_get_idx(seq, *pos);
+ if (next)
+ sock_hold((struct sock *)next, SOCK_REF_SEQ);
+ spin_unlock_irq(&sock_list_lock);
+
+ *pos += 1;
+ st->num++;
+
+ return next;
+}
+
+static void sdp_seq_stop(struct seq_file *seq, void *v)
+{
+}
+
+#define TMPSZ 150
+
+static int sdp_seq_show(struct seq_file *seq, void *v)
+{
+ struct sdp_iter_state *st;
+ struct sock *sk = v;
+ char tmpbuf[TMPSZ + 1];
+ unsigned int dest;
+ unsigned int src;
+ int uid;
+ unsigned long inode;
+ __u16 destp;
+ __u16 srcp;
+ __u32 rx_queue, tx_queue;
+
+ if (v == SEQ_START_TOKEN) {
+ seq_printf(seq, "%-*s\n", TMPSZ - 1,
+ " sl local_address rem_address "
+ "uid inode rx_queue tx_queue state");
+ goto out;
+ }
+
+ st = seq->private;
+
+ dest = inet_sk(sk)->daddr;
+ src = inet_sk(sk)->rcv_saddr;
+ destp = ntohs(inet_sk(sk)->dport);
+ srcp = ntohs(inet_sk(sk)->sport);
+ uid = sock_i_uid(sk);
+ inode = sock_i_ino(sk);
+ rx_queue = rcv_nxt(sdp_sk(sk)) - sdp_sk(sk)->copied_seq;
+ tx_queue = sdp_sk(sk)->write_seq - sdp_sk(sk)->tx_ring.una_seq;
+
+ sprintf(tmpbuf, "%4d: %08X:%04X %08X:%04X %5d %lu %08X:%08X %X",
+ st->num, src, srcp, dest, destp, uid, inode,
+ rx_queue, tx_queue, sk->sk_state);
+
+ seq_printf(seq, "%-*s\n", TMPSZ - 1, tmpbuf);
+
+ sock_put(sk, SOCK_REF_SEQ);
+out:
+ return 0;
+}
+
+static int sdp_seq_open(struct inode *inode, struct file *file)
+{
+ struct sdp_seq_afinfo *afinfo = PDE(inode)->data;
+ struct seq_file *seq;
+ struct sdp_iter_state *s;
+ int rc;
+
+ if (unlikely(afinfo == NULL))
+ return -EINVAL;
+
+ s = kzalloc(sizeof(*s), GFP_KERNEL);
+ if (!s)
+ return -ENOMEM;
+ s->family = afinfo->family;
+ s->seq_ops.start = sdp_seq_start;
+ s->seq_ops.next = sdp_seq_next;
+ s->seq_ops.show = afinfo->seq_show;
+ s->seq_ops.stop = sdp_seq_stop;
+
+ rc = seq_open(file, &s->seq_ops);
+ if (rc)
+ goto out_kfree;
+ seq = file->private_data;
+ seq->private = s;
+out:
+ return rc;
+out_kfree:
+ kfree(s);
+ goto out;
+}
+
+
+static struct file_operations sdp_seq_fops;
+static struct sdp_seq_afinfo sdp_seq_afinfo = {
+ .owner = THIS_MODULE,
+ .name = "sdp",
+ .family = AF_INET_SDP,
+ .seq_show = sdp_seq_show,
+ .seq_fops = &sdp_seq_fops,
+};
+
+#ifdef SDPSTATS_ON
+struct sdpstats sdpstats = { { 0 } };
+
+static void sdpstats_seq_hist(struct seq_file *seq, char *str, u32 *h, int n,
+ int is_log)
+{
+ int i;
+ u32 max = 0;
+
+ seq_printf(seq, "%s:\n", str);
+
+ for (i = 0; i < n; i++) {
+ if (h[i] > max)
+ max = h[i];
+ }
+
+ if (max == 0) {
+ seq_printf(seq, " - all values are 0\n");
+ return;
+ }
+
+ for (i = 0; i < n; i++) {
+ char s[51];
+ int j = 50 * h[i] / max;
+ int val = is_log ? (i == n-1 ? 0 : 1<<i) : i;
+ memset(s, '*', j);
+ s[j] = '\0';
+
+ seq_printf(seq, "%10d | %-50s - %d\n", val, s, h[i]);
+ }
+}
+
+static int sdpstats_seq_show(struct seq_file *seq, void *v)
+{
+ int i;
+
+ seq_printf(seq, "SDP statistics:\n");
+
+ sdpstats_seq_hist(seq, "sendmsg_seglen", sdpstats.sendmsg_seglen,
+ ARRAY_SIZE(sdpstats.sendmsg_seglen), 1);
+
+ sdpstats_seq_hist(seq, "send_size", sdpstats.send_size,
+ ARRAY_SIZE(sdpstats.send_size), 1);
+
+ sdpstats_seq_hist(seq, "credits_before_update",
+ sdpstats.credits_before_update,
+ ARRAY_SIZE(sdpstats.credits_before_update), 0);
+
+ seq_printf(seq, "sdp_sendmsg() calls\t\t: %d\n", sdpstats.sendmsg);
+ seq_printf(seq, "bcopy segments \t\t: %d\n",
+ sdpstats.sendmsg_bcopy_segment);
+ seq_printf(seq, "bzcopy segments \t\t: %d\n",
+ sdpstats.sendmsg_bzcopy_segment);
+ seq_printf(seq, "post_send_credits \t\t: %d\n",
+ sdpstats.post_send_credits);
+ seq_printf(seq, "memcpy_count \t\t: %u\n",
+ sdpstats.memcpy_count);
+
+ for (i = 0; i < ARRAY_SIZE(sdpstats.post_send); i++) {
+ if (mid2str(i)) {
+ seq_printf(seq, "post_send %-20s\t: %d\n",
+ mid2str(i), sdpstats.post_send[i]);
+ }
+ }
+
+ seq_printf(seq, "\n");
+ seq_printf(seq, "post_recv \t\t: %d\n", sdpstats.post_recv);
+ seq_printf(seq, "BZCopy poll miss \t\t: %d\n",
+ sdpstats.bzcopy_poll_miss);
+ seq_printf(seq, "send_wait_for_mem \t\t: %d\n",
+ sdpstats.send_wait_for_mem);
+ seq_printf(seq, "send_miss_no_credits\t\t: %d\n",
+ sdpstats.send_miss_no_credits);
+
+ seq_printf(seq, "rx_poll_miss \t\t: %d\n", sdpstats.rx_poll_miss);
+ seq_printf(seq, "tx_poll_miss \t\t: %d\n", sdpstats.tx_poll_miss);
+ seq_printf(seq, "tx_poll_busy \t\t: %d\n", sdpstats.tx_poll_busy);
+ seq_printf(seq, "tx_poll_hit \t\t: %d\n", sdpstats.tx_poll_hit);
+
+ seq_printf(seq, "CQ stats:\n");
+ seq_printf(seq, "- RX interrupts\t\t: %d\n", sdpstats.rx_int_count);
+ seq_printf(seq, "- TX interrupts\t\t: %d\n", sdpstats.tx_int_count);
+
+ seq_printf(seq, "bz_clean \t\t: %d\n", sdpstats.sendmsg ?
+ sdpstats.bz_clean_sum / sdpstats.sendmsg : 0);
+ seq_printf(seq, "bz_setup \t\t: %d\n", sdpstats.sendmsg ?
+ sdpstats.bz_setup_sum / sdpstats.sendmsg : 0);
+ seq_printf(seq, "tx_copy \t\t: %d\n", sdpstats.sendmsg ?
+ sdpstats.tx_copy_sum / sdpstats.sendmsg : 0);
+ seq_printf(seq, "sendmsg \t\t: %d\n", sdpstats.sendmsg ?
+ sdpstats.sendmsg_sum / sdpstats.sendmsg : 0);
+ return 0;
+}
+
+static ssize_t sdpstats_write(struct file *file, const char __user *buf,
+ size_t count, loff_t *offs)
+{
+ memset(&sdpstats, 0, sizeof(sdpstats));
+ printk(KERN_WARNING "Cleared sdp statistics\n");
+
+ return count;
+}
+
+static int sdpstats_seq_open(struct inode *inode, struct file *file)
+{
+ return single_open(file, sdpstats_seq_show, NULL);
+}
+
+static struct file_operations sdpstats_fops = {
+ .owner = THIS_MODULE,
+ .open = sdpstats_seq_open,
+ .read = seq_read,
+ .write = sdpstats_write,
+ .llseek = seq_lseek,
+ .release = single_release,
+};
+
+#endif
+
+#ifdef SDP_PROFILING
+struct sdpprf_log sdpprf_log[SDPPRF_LOG_SIZE];
+int sdpprf_log_count;
+
+static unsigned long long start_t;
+
+static int sdpprf_show(struct seq_file *m, void *v)
+{
+ struct sdpprf_log *l = v;
+ unsigned long nsec_rem, t;
+
+ if (!sdpprf_log_count) {
+ seq_printf(m, "No performance logs\n");
+ goto out;
+ }
+
+ t = l->time - start_t;
+ nsec_rem = do_div(t, 1000000000);
+
+ seq_printf(m, "%-6d: [%5lu.%06lu] %-50s - [%d{%d} %d:%d] "
+ "skb: %p %s:%d\n",
+ l->idx, (unsigned long)t, nsec_rem/1000,
+ l->msg, l->pid, l->cpu, l->sk_num, l->sk_dport,
+ l->skb, l->func, l->line);
+out:
+ return 0;
+}
+
+static void *sdpprf_start(struct seq_file *p, loff_t *pos)
+{
+ int idx = *pos;
+
+ if (!*pos) {
+ if (!sdpprf_log_count)
+ return SEQ_START_TOKEN;
+ }
+
+ if (*pos >= MIN(sdpprf_log_count, SDPPRF_LOG_SIZE - 1))
+ return NULL;
+
+ if (sdpprf_log_count >= SDPPRF_LOG_SIZE - 1) {
+ int off = sdpprf_log_count & (SDPPRF_LOG_SIZE - 1);
+ idx = (idx + off) & (SDPPRF_LOG_SIZE - 1);
+
+ }
+
+ if (!start_t)
+ start_t = sdpprf_log[idx].time;
+ return &sdpprf_log[idx];
+}
+
+static void *sdpprf_next(struct seq_file *p, void *v, loff_t *pos)
+{
+ struct sdpprf_log *l = v;
+
+ if (++*pos >= MIN(sdpprf_log_count, SDPPRF_LOG_SIZE - 1))
+ return NULL;
+
+ ++l;
+ if (l - &sdpprf_log[0] >= SDPPRF_LOG_SIZE - 1)
+ return &sdpprf_log[0];
+
+ return l;
+}
+
+static void sdpprf_stop(struct seq_file *p, void *v)
+{
+}
+
+
+const struct seq_operations sdpprf_ops = {
+ .start = sdpprf_start,
+ .stop = sdpprf_stop,
+ .next = sdpprf_next,
+ .show = sdpprf_show,
+};
+
+static int sdpprf_open(struct inode *inode, struct file *file)
+{
+ int res;
+
+ res = seq_open(file, &sdpprf_ops);
+
+ return res;
+}
+
+static ssize_t sdpprf_write(struct file *file, const char __user *buf,
+ size_t count, loff_t *offs)
+{
+ sdpprf_log_count = 0;
+ printk(KERN_INFO "Cleared sdpprf statistics\n");
+
+ return count;
+}
+
+static const struct file_operations sdpprf_fops = {
+ .open = sdpprf_open,
+ .read = seq_read,
+ .llseek = seq_lseek,
+ .release = seq_release,
+ .write = sdpprf_write,
+};
+#endif /* SDP_PROFILING */
+
+int __init sdp_proc_init(void)
+{
+ struct proc_dir_entry *p = NULL;
+ struct proc_dir_entry *sdpstats = NULL;
+ struct proc_dir_entry *sdpprf = NULL;
+
+ sdp_seq_afinfo.seq_fops->owner = sdp_seq_afinfo.owner;
+ sdp_seq_afinfo.seq_fops->open = sdp_seq_open;
+ sdp_seq_afinfo.seq_fops->read = seq_read;
+ sdp_seq_afinfo.seq_fops->llseek = seq_lseek;
+ sdp_seq_afinfo.seq_fops->release = seq_release_private;
+
+ p = proc_net_fops_create(&init_net, sdp_seq_afinfo.name, S_IRUGO,
+ sdp_seq_afinfo.seq_fops);
+ if (p)
+ p->data = &sdp_seq_afinfo;
+ else
+ goto no_mem;
+
+#ifdef SDPSTATS_ON
+
+ sdpstats = proc_net_fops_create(&init_net, PROC_SDP_STATS,
+ S_IRUGO | S_IWUGO, &sdpstats_fops);
+ if (!sdpstats)
+ goto no_mem;
+
+#endif
+
+#ifdef SDP_PROFILING
+ sdpprf = proc_net_fops_create(&init_net, PROC_SDP_PERF,
+ S_IRUGO | S_IWUGO, &sdpprf_fops);
+ if (!sdpprf)
+ goto no_mem;
+#endif
+
+ return 0;
+no_mem:
+ if (sdpprf)
+ proc_net_remove(&init_net, PROC_SDP_PERF);
+
+ if (sdpstats)
+ proc_net_remove(&init_net, PROC_SDP_STATS);
+
+ if (p)
+ proc_net_remove(&init_net, sdp_seq_afinfo.name);
+
+ return -ENOMEM;
+}
+
+void sdp_proc_unregister(void)
+{
+ proc_net_remove(&init_net, sdp_seq_afinfo.name);
+ memset(sdp_seq_afinfo.seq_fops, 0, sizeof(*sdp_seq_afinfo.seq_fops));
+
+#ifdef SDPSTATS_ON
+ proc_net_remove(&init_net, PROC_SDP_STATS);
+#endif
+#ifdef SDP_PROFILING
+ proc_net_remove(&init_net, PROC_SDP_PERF);
+#endif
+}
+
+#else /* CONFIG_PROC_FS */
+
+int __init sdp_proc_init(void)
+{
+ return 0;
+}
+
+void sdp_proc_unregister(void)
+{
+
+}
+#endif /* CONFIG_PROC_FS */
diff --git a/drivers/infiniband/ulp/sdp/sdp_rx.c b/drivers/infiniband/ulp/sdp/sdp_rx.c
new file mode 100644
index 0000000..ed5c2ea
--- /dev/null
+++ b/drivers/infiniband/ulp/sdp/sdp_rx.c
@@ -0,0 +1,850 @@
+/*
+ * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#include <linux/interrupt.h>
+#include <linux/dma-mapping.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "sdp.h"
+
+SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
+ "Receive buffer initial size in bytes.");
+SDP_MODPARAM_SINT(rcvbuf_scale, 0x10,
+ "Receive buffer size scale factor.");
+SDP_MODPARAM_SINT(top_mem_usage, 0,
+ "Top system wide sdp memory usage for recv (in MB).");
+
+#ifdef CONFIG_PPC
+SDP_MODPARAM_SINT(max_large_sockets, 100,
+ "Max number of large sockets (32k buffers).");
+#else
+SDP_MODPARAM_SINT(max_large_sockets, 1000,
+ "Max number of large sockets (32k buffers).");
+#endif
+
+static int curr_large_sockets;
+atomic_t sdp_current_mem_usage;
+spinlock_t sdp_large_sockets_lock;
+
+static int sdp_get_large_socket(struct sdp_sock *ssk)
+{
+ int count, ret;
+
+ if (ssk->recv_request)
+ return 1;
+
+ spin_lock_irq(&sdp_large_sockets_lock);
+ count = curr_large_sockets;
+ ret = curr_large_sockets < max_large_sockets;
+ if (ret)
+ curr_large_sockets++;
+ spin_unlock_irq(&sdp_large_sockets_lock);
+
+ return ret;
+}
+
+void sdp_remove_large_sock(struct sdp_sock *ssk)
+{
+ if (ssk->recv_frags) {
+ spin_lock_irq(&sdp_large_sockets_lock);
+ curr_large_sockets--;
+ spin_unlock_irq(&sdp_large_sockets_lock);
+ }
+}
+
+/* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
+static void sdp_fin(struct sock *sk)
+{
+ sdp_dbg(sk, "%s\n", __func__);
+
+ sk->sk_shutdown |= RCV_SHUTDOWN;
+ sock_set_flag(sk, SOCK_DONE);
+
+ switch (sk->sk_state) {
+ case TCP_SYN_RECV:
+ case TCP_ESTABLISHED:
+ sdp_exch_state(sk, TCPF_SYN_RECV | TCPF_ESTABLISHED,
+ TCP_CLOSE_WAIT);
+ break;
+
+ case TCP_FIN_WAIT1:
+ /* Received a reply FIN - start Infiniband tear down */
+ sdp_dbg(sk, "%s: Starting Infiniband tear down sending DREQ\n",
+ __func__);
+
+ sdp_cancel_dreq_wait_timeout(sdp_sk(sk));
+
+ sdp_exch_state(sk, TCPF_FIN_WAIT1, TCP_TIME_WAIT);
+
+ if (sdp_sk(sk)->id) {
+ rdma_disconnect(sdp_sk(sk)->id);
+ } else {
+ sdp_warn(sk, "%s: sdp_sk(sk)->id is NULL\n", __func__);
+ return;
+ }
+ break;
+ case TCP_TIME_WAIT:
+ /* This is a mutual close situation and we've got the DREQ from
+ the peer before the SDP_MID_DISCONNECT */
+ break;
+ case TCP_CLOSE:
+ /* FIN arrived after IB teardown started - do nothing */
+ sdp_dbg(sk, "%s: fin in state %s\n",
+ __func__, sdp_state_str(sk->sk_state));
+ return;
+ default:
+ sdp_warn(sk, "%s: FIN in unexpected state. sk->sk_state=%d\n",
+ __func__, sk->sk_state);
+ break;
+ }
+
+
+ sk_mem_reclaim(sk);
+
+ if (!sock_flag(sk, SOCK_DEAD)) {
+ sk->sk_state_change(sk);
+
+ /* Do not send POLL_HUP for half duplex close. */
+ if (sk->sk_shutdown == SHUTDOWN_MASK ||
+ sk->sk_state == TCP_CLOSE)
+ sk_wake_async(sk, 1, POLL_HUP);
+ else
+ sk_wake_async(sk, 1, POLL_IN);
+ }
+}
+
+static int sdp_post_recv(struct sdp_sock *ssk)
+{
+ struct sdp_buf *rx_req;
+ int i, rc, frags;
+ u64 addr;
+ struct ib_device *dev;
+ struct ib_recv_wr rx_wr = { 0 };
+ struct ib_sge ibsge[SDP_MAX_RECV_SKB_FRAGS + 1];
+ struct ib_sge *sge = ibsge;
+ struct ib_recv_wr *bad_wr;
+ struct sk_buff *skb;
+ struct page *page;
+ skb_frag_t *frag;
+ struct sdp_bsdh *h;
+ int id = ring_head(ssk->rx_ring);
+ gfp_t gfp_page;
+ int ret = 0;
+
+ /* Now, allocate and repost recv */
+ /* TODO: allocate from cache */
+
+ if (unlikely(ssk->isk.sk.sk_allocation)) {
+ skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
+ ssk->isk.sk.sk_allocation);
+ gfp_page = ssk->isk.sk.sk_allocation | __GFP_HIGHMEM;
+ } else {
+ skb = sdp_stream_alloc_skb(&ssk->isk.sk, SDP_HEAD_SIZE,
+ GFP_KERNEL);
+ gfp_page = GFP_HIGHUSER;
+ }
+
+ sdp_prf(&ssk->isk.sk, skb, "Posting skb");
+ /* FIXME */
+ BUG_ON(!skb);
+ h = (struct sdp_bsdh *)skb->head;
+ for (i = 0; i < ssk->recv_frags; ++i) {
+ page = alloc_pages(gfp_page, 0);
+ BUG_ON(!page);
+ frag = &skb_shinfo(skb)->frags[i];
+ frag->page = page;
+ frag->page_offset = 0;
+ frag->size = min(PAGE_SIZE, SDP_MAX_PAYLOAD);
+ ++skb_shinfo(skb)->nr_frags;
+ skb->len += frag->size;
+ skb->data_len += frag->size;
+ skb->truesize += frag->size;
+ }
+
+ rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
+ rx_req->skb = skb;
+ dev = ssk->ib_device;
+ addr = ib_dma_map_single(dev, h, SDP_HEAD_SIZE, DMA_FROM_DEVICE);
+ BUG_ON(ib_dma_mapping_error(dev, addr));
+
+ rx_req->mapping[0] = addr;
+
+ /* TODO: proper error handling */
+ sge->addr = (u64)addr;
+ sge->length = SDP_HEAD_SIZE;
+ sge->lkey = ssk->mr->lkey;
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i) {
+ ++sge;
+ addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
+ skb_shinfo(skb)->frags[i].page_offset,
+ skb_shinfo(skb)->frags[i].size,
+ DMA_FROM_DEVICE);
+ BUG_ON(ib_dma_mapping_error(dev, addr));
+ rx_req->mapping[i + 1] = addr;
+ sge->addr = addr;
+ sge->length = skb_shinfo(skb)->frags[i].size;
+ sge->lkey = ssk->mr->lkey;
+ }
+
+ rx_wr.next = NULL;
+ rx_wr.wr_id = id | SDP_OP_RECV;
+ rx_wr.sg_list = ibsge;
+ rx_wr.num_sge = frags + 1;
+ rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
+ atomic_inc(&ssk->rx_ring.head);
+ if (unlikely(rc)) {
+ sdp_warn(&ssk->isk.sk, "ib_post_recv failed. status %d\n", rc);
+
+ lock_sock(&ssk->isk.sk);
+ sdp_reset(&ssk->isk.sk);
+ release_sock(&ssk->isk.sk);
+
+ ret = -1;
+ }
+
+ SDPSTATS_COUNTER_INC(post_recv);
+ atomic_add(ssk->recv_frags, &sdp_current_mem_usage);
+
+ return ret;
+}
+
+static inline int sdp_post_recvs_needed(struct sdp_sock *ssk)
+{
+ struct sock *sk = &ssk->isk.sk;
+ int scale = ssk->rcvbuf_scale;
+ int buffer_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
+ unsigned long max_bytes;
+
+ if (top_mem_usage && (top_mem_usage * 0x100000) <
+ atomic_read(&sdp_current_mem_usage) * PAGE_SIZE) {
+ scale = 1;
+ }
+
+ max_bytes = sk->sk_rcvbuf * scale;
+
+ if (unlikely(ring_posted(ssk->rx_ring) >= SDP_RX_SIZE))
+ return 0;
+
+ if (likely(ring_posted(ssk->rx_ring) > SDP_MIN_TX_CREDITS)) {
+ unsigned long bytes_in_process =
+ (ring_posted(ssk->rx_ring) - SDP_MIN_TX_CREDITS) *
+ buffer_size;
+ bytes_in_process += rcv_nxt(ssk) - ssk->copied_seq;
+
+ if (bytes_in_process >= max_bytes) {
+ sdp_prf(sk, NULL,
+ "bytes_in_process:%ld > max_bytes:%ld",
+ bytes_in_process, max_bytes);
+ return 0;
+ }
+ }
+
+ return 1;
+}
+
+static inline void sdp_post_recvs(struct sdp_sock *ssk)
+{
+again:
+ while (sdp_post_recvs_needed(ssk)) {
+ if (sdp_post_recv(ssk))
+ goto out;
+ }
+
+ sk_stream_mem_reclaim(&ssk->isk.sk);
+
+ if (sdp_post_recvs_needed(ssk))
+ goto again;
+out:
+ sk_stream_mem_reclaim(&ssk->isk.sk);
+}
+
+static inline struct sk_buff *sdp_sock_queue_rcv_skb(struct sock *sk,
+ struct sk_buff *skb)
+{
+ int skb_len;
+ struct sdp_sock *ssk = sdp_sk(sk);
+
+ /* not needed since sk_rmem_alloc is not currently used
+ * TODO - remove this?
+ skb_set_owner_r(skb, sk); */
+
+ skb_len = skb->len;
+
+ TCP_SKB_CB(skb)->seq = rcv_nxt(ssk);
+ atomic_add(skb_len, &ssk->rcv_nxt);
+
+ skb_queue_tail(&sk->sk_receive_queue, skb);
+
+ if (!sock_flag(sk, SOCK_DEAD))
+ sk->sk_data_ready(sk, skb_len);
+ return skb;
+}
+
+int sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
+{
+ ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) / PAGE_SIZE;
+ if (ssk->recv_frags > SDP_MAX_RECV_SKB_FRAGS)
+ ssk->recv_frags = SDP_MAX_RECV_SKB_FRAGS;
+ ssk->rcvbuf_scale = rcvbuf_scale;
+
+ sdp_post_recvs(ssk);
+
+ return 0;
+}
+
+int sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
+{
+ u32 curr_size = SDP_HEAD_SIZE + ssk->recv_frags * PAGE_SIZE;
+#if defined(__ia64__)
+ /* for huge PAGE_SIZE systems, aka IA64, limit buffers size
+ [re-]negotiation to a known+working size that will not
+ trigger a HW error/rc to be interpreted as a IB_WC_LOC_LEN_ERR */
+ u32 max_size = (SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE) <=
+ 32784 ?
+ (SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE) : 32784;
+#else
+ u32 max_size = SDP_HEAD_SIZE + SDP_MAX_RECV_SKB_FRAGS * PAGE_SIZE;
+#endif
+
+ if (new_size > curr_size && new_size <= max_size &&
+ sdp_get_large_socket(ssk)) {
+ ssk->rcvbuf_scale = rcvbuf_scale;
+ ssk->recv_frags = PAGE_ALIGN(new_size - SDP_HEAD_SIZE) /
+ PAGE_SIZE;
+ if (ssk->recv_frags > SDP_MAX_RECV_SKB_FRAGS)
+ ssk->recv_frags = SDP_MAX_RECV_SKB_FRAGS;
+ return 0;
+ } else
+ return -1;
+}
+
+static void sdp_handle_resize_request(struct sdp_sock *ssk,
+ struct sdp_chrecvbuf *buf)
+{
+ if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
+ ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
+ else
+ ssk->recv_request_head = ring_tail(ssk->rx_ring);
+ ssk->recv_request = 1;
+}
+
+static void sdp_handle_resize_ack(struct sdp_sock *ssk,
+ struct sdp_chrecvbuf *buf)
+{
+ u32 new_size = ntohl(buf->size);
+
+ if (new_size > ssk->xmit_size_goal) {
+ ssk->sent_request = -1;
+ ssk->xmit_size_goal = new_size;
+ ssk->send_frags =
+ PAGE_ALIGN(ssk->xmit_size_goal) / PAGE_SIZE + 1;
+ } else
+ ssk->sent_request = 0;
+}
+
+static inline int credit_update_needed(struct sdp_sock *ssk)
+{
+ int c;
+
+ c = remote_credits(ssk);
+ if (likely(c > SDP_MIN_TX_CREDITS))
+ c += c/2;
+
+ return unlikely(c < ring_posted(ssk->rx_ring)) &&
+ likely(tx_credits(ssk) > 1) &&
+ likely(sdp_tx_ring_slots_left(&ssk->tx_ring));
+}
+
+
+static struct sk_buff *sdp_recv_completion(struct sdp_sock *ssk, int id)
+{
+ struct sdp_buf *rx_req;
+ struct ib_device *dev;
+ struct sk_buff *skb;
+ int i, frags;
+
+ if (unlikely(id != ring_tail(ssk->rx_ring))) {
+ printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
+ id, ring_tail(ssk->rx_ring));
+ return NULL;
+ }
+
+ dev = ssk->ib_device;
+ rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
+ skb = rx_req->skb;
+ ib_dma_unmap_single(dev, rx_req->mapping[0], SDP_HEAD_SIZE,
+ DMA_FROM_DEVICE);
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i)
+ ib_dma_unmap_page(dev, rx_req->mapping[i + 1],
+ skb_shinfo(skb)->frags[i].size,
+ DMA_FROM_DEVICE);
+ atomic_inc(&ssk->rx_ring.tail);
+ atomic_dec(&ssk->remote_credits);
+ return skb;
+}
+
+/* socket lock should be taken before calling this */
+static int sdp_process_rx_ctl_skb(struct sdp_sock *ssk, struct sk_buff *skb)
+{
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb_transport_header(skb);
+ struct sock *sk = &ssk->isk.sk;
+
+ switch (h->mid) {
+ case SDP_MID_DATA:
+ WARN_ON(!(sk->sk_shutdown & RCV_SHUTDOWN));
+
+ sdp_warn(sk, "DATA after socket rcv was shutdown\n");
+
+ /* got data in RCV_SHUTDOWN */
+ if (sk->sk_state == TCP_FIN_WAIT1) {
+ sdp_warn(sk, "RX data when state = FIN_WAIT1\n");
+ /* go into abortive close */
+ sdp_exch_state(sk, TCPF_FIN_WAIT1,
+ TCP_TIME_WAIT);
+
+ sk->sk_prot->disconnect(sk, 0);
+ }
+ __kfree_skb(skb);
+
+ break;
+ case SDP_MID_DISCONN:
+ sdp_dbg_data(sk, "Handling RX disconnect\n");
+ sdp_prf(sk, NULL, "Handling RX disconnect");
+ sdp_fin(sk);
+ sdp_prf(sk, NULL, "Queueing fin skb - release recvmsg");
+ /* Enqueue fin skb to release sleeping recvmsg */
+ sdp_sock_queue_rcv_skb(sk, skb);
+ break;
+ case SDP_MID_CHRCVBUF:
+ sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
+ sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)h);
+ __kfree_skb(skb);
+ break;
+ case SDP_MID_CHRCVBUF_ACK:
+ sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
+ sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)h);
+ __kfree_skb(skb);
+ break;
+ default:
+ /* TODO: Handle other messages */
+ sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
+ __kfree_skb(skb);
+ }
+
+ return 0;
+}
+
+static int sdp_process_rx_skb(struct sdp_sock *ssk, struct sk_buff *skb)
+{
+ struct sock *sk = &ssk->isk.sk;
+ int frags;
+ struct sdp_bsdh *h;
+ int pagesz, i;
+ unsigned long mseq_ack;
+ int credits_before;
+
+ h = (struct sdp_bsdh *)skb_transport_header(skb);
+
+ SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
+
+ mseq_ack = ntohl(h->mseq_ack);
+ credits_before = tx_credits(ssk);
+ atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
+ 1 + ntohs(h->bufs));
+ if (mseq_ack >= ssk->nagle_last_unacked)
+ ssk->nagle_last_unacked = 0;
+
+ sdp_prf1(&ssk->isk.sk, skb, "RX %s +%d c:%d->%d mseq:%d ack:%d",
+ mid2str(h->mid), ntohs(h->bufs), credits_before,
+ tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
+
+ frags = skb_shinfo(skb)->nr_frags;
+ pagesz = PAGE_ALIGN(skb->data_len);
+ skb_shinfo(skb)->nr_frags = pagesz / PAGE_SIZE;
+
+ for (i = skb_shinfo(skb)->nr_frags; i < frags; ++i) {
+ put_page(skb_shinfo(skb)->frags[i].page);
+ skb->truesize -= PAGE_SIZE;
+ }
+
+/* if (unlikely(h->flags & SDP_OOB_PEND))
+ sk_send_sigurg(sk);*/
+
+ skb_pull(skb, sizeof(struct sdp_bsdh));
+
+ if (h->mid != SDP_MID_DATA ||
+ unlikely(sk->sk_shutdown & RCV_SHUTDOWN)) {
+ sdp_prf(sk, NULL, "Control skb - queing to control queue");
+ skb_queue_tail(&ssk->rx_ctl_q, skb);
+ return 0;
+ }
+
+ if (unlikely(skb->len <= 0)) {
+ __kfree_skb(skb);
+ return 0;
+ }
+
+ sdp_prf(sk, NULL, "queueing a %s skb",
+ (h->mid == SDP_MID_DATA ? "data" : "disconnect"));
+ skb = sdp_sock_queue_rcv_skb(sk, skb);
+
+/* if (unlikely(h->flags & SDP_OOB_PRES))
+ sdp_urg(ssk, skb);*/
+
+ return 0;
+}
+
+/* called only from irq */
+static struct sk_buff *sdp_process_rx_wc(struct sdp_sock *ssk,
+ struct ib_wc *wc)
+{
+ struct sk_buff *skb;
+ struct sdp_bsdh *h;
+ struct sock *sk = &ssk->isk.sk;
+ int mseq;
+
+ skb = sdp_recv_completion(ssk, wc->wr_id);
+ if (unlikely(!skb))
+ return NULL;
+
+ atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
+
+ if (unlikely(wc->status)) {
+ if (wc->status != IB_WC_WR_FLUSH_ERR) {
+ sdp_warn(sk, "Recv completion with error. Status %d\n",
+ wc->status);
+ sdp_reset(sk);
+ }
+ __kfree_skb(skb);
+ return NULL;
+ }
+
+ sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
+ (int)wc->wr_id, wc->byte_len);
+ if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
+ sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
+ wc->byte_len, sizeof(struct sdp_bsdh));
+ __kfree_skb(skb);
+ return NULL;
+ }
+ skb->len = wc->byte_len;
+ if (likely(wc->byte_len > SDP_HEAD_SIZE))
+ skb->data_len = wc->byte_len - SDP_HEAD_SIZE;
+ else
+ skb->data_len = 0;
+ skb->data = skb->head;
+#ifdef NET_SKBUFF_DATA_USES_OFFSET
+ skb->tail = skb_headlen(skb);
+#else
+ skb->tail = skb->head + skb_headlen(skb);
+#endif
+ h = (struct sdp_bsdh *)skb->data;
+ SDP_DUMP_PACKET(&ssk->isk.sk, "RX", skb, h);
+ skb_reset_transport_header(skb);
+
+ ssk->rx_packets++;
+ ssk->rx_bytes += skb->len;
+
+ mseq = ntohl(h->mseq);
+ atomic_set(&ssk->mseq_ack, mseq);
+ if (mseq != (int)wc->wr_id)
+ sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
+ mseq, (int)wc->wr_id);
+
+ return skb;
+}
+
+/* like sk_stream_write_space - execpt measures remote credits */
+static void sdp_bzcopy_write_space(struct sdp_sock *ssk)
+{
+ struct sock *sk = &ssk->isk.sk;
+ struct socket *sock = sk->sk_socket;
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) {
+ sdp_prf1(&ssk->isk.sk, NULL, "credits: %d, min_bufs: %d. "
+ "tx_head: %d, tx_tail: %d",
+ tx_credits(ssk), ssk->min_bufs,
+ ring_head(ssk->tx_ring), ring_tail(ssk->tx_ring));
+ }
+
+ if (tx_credits(ssk) >= ssk->min_bufs && sock != NULL) {
+ clear_bit(SOCK_NOSPACE, &sock->flags);
+ sdp_prf1(sk, NULL, "Waking up sleepers");
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+ wake_up_interruptible(sk->sk_sleep);
+ if (sock->fasync_list && !(sk->sk_shutdown & SEND_SHUTDOWN))
+ sock_wake_async(sock, 2, POLL_OUT);
+ }
+}
+
+/* only from interrupt. */
+static int sdp_poll_rx_cq(struct sdp_sock *ssk)
+{
+ struct ib_cq *cq = ssk->rx_ring.cq;
+ struct ib_wc ibwc[SDP_NUM_WC];
+ int n, i;
+ int wc_processed = 0;
+ struct sk_buff *skb;
+
+ do {
+ n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
+ for (i = 0; i < n; ++i) {
+ struct ib_wc *wc = &ibwc[i];
+
+ BUG_ON(!(wc->wr_id & SDP_OP_RECV));
+ skb = sdp_process_rx_wc(ssk, wc);
+ if (!skb)
+ continue;
+
+ sdp_process_rx_skb(ssk, skb);
+ wc_processed++;
+ }
+ } while (n == SDP_NUM_WC);
+
+ if (wc_processed)
+ sdp_bzcopy_write_space(ssk);
+
+ return wc_processed;
+}
+
+void sdp_rx_comp_work(struct work_struct *work)
+{
+ struct sdp_sock *ssk = container_of(work, struct sdp_sock,
+ rx_comp_work);
+ struct sock *sk = &ssk->isk.sk;
+
+ sdp_prf(sk, NULL, "%s", __func__);
+
+ if (unlikely(!ssk->qp)) {
+ sdp_prf(sk, NULL, "qp was destroyed");
+ return;
+ }
+ if (unlikely(!ssk->rx_ring.cq)) {
+ sdp_prf(sk, NULL, "rx_ring.cq is NULL");
+ return;
+ }
+
+ if (unlikely(!ssk->poll_cq)) {
+ struct rdma_cm_id *id = ssk->id;
+ if (id && id->qp)
+ rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
+ return;
+ }
+
+ lock_sock(sk);
+
+ sdp_do_posts(ssk);
+
+ release_sock(sk);
+}
+
+void sdp_do_posts(struct sdp_sock *ssk)
+{
+ struct sock *sk = &ssk->isk.sk;
+ int xmit_poll_force;
+ struct sk_buff *skb;
+
+ while ((skb = skb_dequeue(&ssk->rx_ctl_q)))
+ sdp_process_rx_ctl_skb(ssk, skb);
+
+ if (sk->sk_state == TCP_TIME_WAIT)
+ return;
+
+ if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
+ return;
+
+ sdp_post_recvs(ssk);
+
+ if (ring_posted(ssk->tx_ring))
+ sdp_xmit_poll(ssk, 1);
+
+ sdp_post_sends(ssk, 0);
+
+ sk_stream_mem_reclaim(sk);
+
+ xmit_poll_force = sk->sk_write_pending &&
+ (tx_credits(ssk) > SDP_MIN_TX_CREDITS);
+
+ if (credit_update_needed(ssk) || xmit_poll_force) {
+ /* if has pending tx because run out of tx_credits - xmit it */
+ sdp_prf(sk, NULL, "Processing to free pending sends");
+ sdp_xmit_poll(ssk, xmit_poll_force);
+ sdp_prf(sk, NULL, "Sending credit update");
+ sdp_post_sends(ssk, 0);
+ }
+
+}
+
+static void sdp_rx_irq(struct ib_cq *cq, void *cq_context)
+{
+ struct sock *sk = cq_context;
+ struct sdp_sock *ssk = sdp_sk(sk);
+ unsigned long flags;
+ int wc_processed = 0;
+ int credits_before;
+
+ sdp_dbg_data(&ssk->isk.sk, "rx irq called\n");
+
+ if (cq != ssk->rx_ring.cq) {
+ sdp_warn(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
+ return;
+ }
+
+ SDPSTATS_COUNTER_INC(rx_int_count);
+
+ sdp_prf(sk, NULL, "rx irq");
+
+ rx_ring_lock(ssk, flags);
+
+ credits_before = tx_credits(ssk);
+
+ if (!ssk->rx_ring.cq) {
+ sdp_warn(&ssk->isk.sk, "WARNING: rx irq after cq destroyed\n");
+
+ goto out;
+ }
+
+ wc_processed = sdp_poll_rx_cq(ssk);
+ sdp_prf(&ssk->isk.sk, NULL, "processed %d", wc_processed);
+
+ if (wc_processed) {
+ sdp_prf(&ssk->isk.sk, NULL, "credits: %d -> %d",
+ credits_before, tx_credits(ssk));
+
+ if (posts_handler(ssk) || (sk->sk_socket &&
+ test_bit(SOCK_ASYNC_WAITDATA, &sk->sk_socket->flags))) {
+
+ sdp_prf(&ssk->isk.sk, NULL,
+ "Somebody is doing the post work for me. %d",
+ posts_handler(ssk));
+
+ } else {
+ sdp_prf(&ssk->isk.sk, NULL, "Queuing work. ctl_q: %d",
+ !skb_queue_empty(&ssk->rx_ctl_q));
+ queue_work(rx_comp_wq, &ssk->rx_comp_work);
+ }
+ }
+ sdp_arm_rx_cq(sk);
+
+out:
+ rx_ring_unlock(ssk, flags);
+}
+
+static void sdp_rx_ring_purge(struct sdp_sock *ssk)
+{
+ while (ring_posted(ssk->rx_ring) > 0) {
+ struct sk_buff *skb;
+ skb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
+ if (!skb)
+ break;
+ atomic_sub(skb_shinfo(skb)->nr_frags, &sdp_current_mem_usage);
+ __kfree_skb(skb);
+ }
+}
+
+void sdp_rx_ring_init(struct sdp_sock *ssk)
+{
+ ssk->rx_ring.buffer = NULL;
+ spin_lock_init(&ssk->rx_ring.lock);
+}
+
+static void sdp_rx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+ struct ib_cq *rx_cq;
+ int rc = 0;
+ unsigned long flags;
+
+ rx_ring_lock(ssk, flags);
+
+ atomic_set(&ssk->rx_ring.head, 1);
+ atomic_set(&ssk->rx_ring.tail, 1);
+
+ ssk->rx_ring.buffer = kmalloc(
+ sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
+ if (!ssk->rx_ring.buffer) {
+ rc = -ENOMEM;
+ sdp_warn(&ssk->isk.sk,
+ "Unable to allocate RX Ring size %zd.\n",
+ sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
+
+ goto out;
+ }
+
+ /* TODO: use vector=IB_CQ_VECTOR_LEAST_ATTACHED when implemented
+ * in ofed-1.5 */
+ rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
+ &ssk->isk.sk, SDP_RX_SIZE, 0);
+
+ if (IS_ERR(rx_cq)) {
+ rc = PTR_ERR(rx_cq);
+ sdp_warn(&ssk->isk.sk, "Unable to allocate RX CQ: %d.\n", rc);
+ goto err_cq;
+ }
+
+ sdp_sk(&ssk->isk.sk)->rx_ring.cq = rx_cq;
+
+ INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
+
+ sdp_arm_rx_cq(&ssk->isk.sk);
+
+ goto out;
+
+err_cq:
+ kfree(ssk->rx_ring.buffer);
+ ssk->rx_ring.buffer = NULL;
+out:
+ rx_ring_unlock(ssk, flags);
+ return rc;
+}
+
+void sdp_rx_ring_destroy(struct sdp_sock *ssk)
+{
+ if (ssk->rx_ring.buffer) {
+ sdp_rx_ring_purge(ssk);
+
+ kfree(ssk->rx_ring.buffer);
+ ssk->rx_ring.buffer = NULL;
+ }
+
+ if (ssk->rx_ring.cq) {
+ ib_destroy_cq(ssk->rx_ring.cq);
+ ssk->rx_ring.cq = NULL;
+ }
+
+ WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
+}
diff --git a/drivers/infiniband/ulp/sdp/sdp_tx.c b/drivers/infiniband/ulp/sdp/sdp_tx.c
new file mode 100644
index 0000000..d53e838
--- /dev/null
+++ b/drivers/infiniband/ulp/sdp/sdp_tx.c
@@ -0,0 +1,442 @@
+/*
+ * Copyright (c) 2009 Mellanox Technologies Ltd. All rights reserved.
+ *
+ * This software is available to you under a choice of one of two
+ * licenses. You may choose to be licensed under the terms of the GNU
+ * General Public License (GPL) Version 2, available from the file
+ * COPYING in the main directory of this source tree, or the
+ * OpenIB.org BSD license below:
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer.
+ *
+ * - Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+ * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+ * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+ * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
+ * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
+ * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+ * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+#include <linux/interrupt.h>
+#include <linux/dma-mapping.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include "sdp.h"
+
+#define sdp_cnt(var) do { (var)++; } while (0)
+
+SDP_MODPARAM_SINT(sdp_keepalive_probes_sent, 0,
+ "Total number of keepalive probes sent.");
+
+static int sdp_process_tx_cq(struct sdp_sock *ssk);
+
+int sdp_xmit_poll(struct sdp_sock *ssk, int force)
+{
+ int wc_processed = 0;
+
+ sdp_prf(&ssk->isk.sk, NULL, "called from %s:%d", func, line);
+
+ /* If we don't have a pending timer, set one up to catch our recent
+ post in case the interface becomes idle */
+ if (!timer_pending(&ssk->tx_ring.timer))
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+
+ /* Poll the CQ every SDP_TX_POLL_MODER packets */
+ if (force || (++ssk->tx_ring.poll_cnt & (SDP_TX_POLL_MODER - 1)) == 0)
+ wc_processed = sdp_process_tx_cq(ssk);
+
+ return wc_processed;
+}
+
+static unsigned long last_send;
+
+void sdp_post_send(struct sdp_sock *ssk, struct sk_buff *skb, u8 mid)
+{
+ struct sdp_buf *tx_req;
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb_push(skb, sizeof *h);
+ unsigned long mseq = ring_head(ssk->tx_ring);
+ int i, rc, frags;
+ u64 addr;
+ struct ib_device *dev;
+ struct ib_send_wr *bad_wr;
+ int delta;
+
+ struct ib_sge ibsge[SDP_MAX_SEND_SKB_FRAGS + 1];
+ struct ib_sge *sge = ibsge;
+ struct ib_send_wr tx_wr = { 0 };
+
+ SDPSTATS_COUNTER_MID_INC(post_send, mid);
+ SDPSTATS_HIST(send_size, skb->len);
+
+ ssk->tx_packets++;
+ ssk->tx_bytes += skb->len;
+
+ h->mid = mid;
+ if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+ h->flags = SDP_OOB_PRES | SDP_OOB_PEND;
+ else
+ h->flags = 0;
+
+ h->bufs = htons(ring_posted(ssk->rx_ring));
+ h->len = htonl(skb->len);
+ h->mseq = htonl(mseq);
+ h->mseq_ack = htonl(mseq_ack(ssk));
+
+ sdp_prf1(&ssk->isk.sk, skb, "TX: %s bufs: %d mseq:%ld ack:%d",
+ mid2str(mid), ring_posted(ssk->rx_ring), mseq,
+ ntohl(h->mseq_ack));
+
+ SDP_DUMP_PACKET(&ssk->isk.sk, "TX", skb, h);
+
+ tx_req = &ssk->tx_ring.buffer[mseq & (SDP_TX_SIZE - 1)];
+ tx_req->skb = skb;
+ dev = ssk->ib_device;
+ addr = ib_dma_map_single(dev, skb->data, skb->len - skb->data_len,
+ DMA_TO_DEVICE);
+ tx_req->mapping[0] = addr;
+
+ /* TODO: proper error handling */
+ BUG_ON(ib_dma_mapping_error(dev, addr));
+
+ sge->addr = addr;
+ sge->length = skb->len - skb->data_len;
+ sge->lkey = ssk->mr->lkey;
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i) {
+ ++sge;
+ addr = ib_dma_map_page(dev, skb_shinfo(skb)->frags[i].page,
+ skb_shinfo(skb)->frags[i].page_offset,
+ skb_shinfo(skb)->frags[i].size,
+ DMA_TO_DEVICE);
+ BUG_ON(ib_dma_mapping_error(dev, addr));
+ tx_req->mapping[i + 1] = addr;
+ sge->addr = addr;
+ sge->length = skb_shinfo(skb)->frags[i].size;
+ sge->lkey = ssk->mr->lkey;
+ }
+
+ tx_wr.next = NULL;
+ tx_wr.wr_id = ring_head(ssk->tx_ring) | SDP_OP_SEND;
+ tx_wr.sg_list = ibsge;
+ tx_wr.num_sge = frags + 1;
+ tx_wr.opcode = IB_WR_SEND;
+ tx_wr.send_flags = IB_SEND_SIGNALED;
+ if (unlikely(TCP_SKB_CB(skb)->flags & TCPCB_FLAG_URG))
+ tx_wr.send_flags |= IB_SEND_SOLICITED;
+
+ delta = jiffies - last_send;
+ if (likely(last_send))
+ SDPSTATS_HIST(send_interval, delta);
+ last_send = jiffies;
+
+ rc = ib_post_send(ssk->qp, &tx_wr, &bad_wr);
+ atomic_inc(&ssk->tx_ring.head);
+ atomic_dec(&ssk->tx_ring.credits);
+ atomic_set(&ssk->remote_credits, ring_posted(ssk->rx_ring));
+ if (unlikely(rc)) {
+ sdp_dbg(&ssk->isk.sk,
+ "ib_post_send failed with status %d.\n", rc);
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+ }
+}
+
+static struct sk_buff *sdp_send_completion(struct sdp_sock *ssk, int mseq)
+{
+ struct ib_device *dev;
+ struct sdp_buf *tx_req;
+ struct sk_buff *skb = NULL;
+ struct bzcopy_state *bz;
+ int i, frags;
+ struct sdp_tx_ring *tx_ring = &ssk->tx_ring;
+ if (unlikely(mseq != ring_tail(*tx_ring))) {
+ printk(KERN_WARNING "Bogus send completion id %d tail %d\n",
+ mseq, ring_tail(*tx_ring));
+ goto out;
+ }
+
+ dev = ssk->ib_device;
+ tx_req = &tx_ring->buffer[mseq & (SDP_TX_SIZE - 1)];
+ skb = tx_req->skb;
+ ib_dma_unmap_single(dev, tx_req->mapping[0], skb->len - skb->data_len,
+ DMA_TO_DEVICE);
+ frags = skb_shinfo(skb)->nr_frags;
+ for (i = 0; i < frags; ++i) {
+ ib_dma_unmap_page(dev, tx_req->mapping[i + 1],
+ skb_shinfo(skb)->frags[i].size,
+ DMA_TO_DEVICE);
+ }
+
+ tx_ring->una_seq += TCP_SKB_CB(skb)->end_seq;
+
+ /* TODO: AIO and real zcopy code; add their context support here */
+ bz = BZCOPY_STATE(skb);
+ if (bz)
+ bz->busy--;
+
+ atomic_inc(&tx_ring->tail);
+
+out:
+ return skb;
+}
+
+static int sdp_handle_send_comp(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+ struct sk_buff *skb = NULL;
+
+ skb = sdp_send_completion(ssk, wc->wr_id);
+ if (unlikely(!skb))
+ return -1;
+
+ if (unlikely(wc->status)) {
+ if (wc->status != IB_WC_WR_FLUSH_ERR) {
+ struct sock *sk = &ssk->isk.sk;
+ sdp_prf(sk, skb, "Send completion with error. "
+ "Status %d", wc->status);
+ sdp_warn(sk, "Send completion with error. "
+ "Status %d\n", wc->status);
+ sdp_set_error(sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+
+ queue_work(sdp_wq, &ssk->destroy_work);
+ }
+ }
+
+#ifdef SDP_PROFILING
+{
+ struct sdp_bsdh *h = (struct sdp_bsdh *)skb->data;
+ sdp_prf1(&ssk->isk.sk, skb, "tx completion. mseq:%d", ntohl(h->mseq));
+}
+#endif
+ sk_wmem_free_skb(&ssk->isk.sk, skb);
+
+ return 0;
+}
+
+static inline void sdp_process_tx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
+{
+ if (likely(wc->wr_id & SDP_OP_SEND)) {
+ sdp_handle_send_comp(ssk, wc);
+ return;
+ }
+
+ /* Keepalive probe sent cleanup */
+ sdp_cnt(sdp_keepalive_probes_sent);
+
+ if (likely(!wc->status))
+ return;
+
+ sdp_dbg(&ssk->isk.sk, " %s consumes KEEPALIVE status %d\n",
+ __func__, wc->status);
+
+ if (wc->status == IB_WC_WR_FLUSH_ERR)
+ return;
+
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+}
+
+static int sdp_process_tx_cq(struct sdp_sock *ssk)
+{
+ struct ib_wc ibwc[SDP_NUM_WC];
+ int n, i;
+ int wc_processed = 0;
+
+ if (!ssk->tx_ring.cq) {
+ sdp_warn(&ssk->isk.sk, "tx irq on destroyed tx_cq\n");
+ return 0;
+ }
+
+ do {
+ n = ib_poll_cq(ssk->tx_ring.cq, SDP_NUM_WC, ibwc);
+ for (i = 0; i < n; ++i) {
+ sdp_process_tx_wc(ssk, ibwc + i);
+ wc_processed++;
+ }
+ } while (n == SDP_NUM_WC);
+
+ sdp_dbg_data(&ssk->isk.sk, "processed %d wc's\n", wc_processed);
+
+ if (wc_processed) {
+ struct sock *sk = &ssk->isk.sk;
+ sdp_post_sends(ssk, 0);
+
+ if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
+ sk_stream_write_space(&ssk->isk.sk);
+ }
+
+ return wc_processed;
+}
+
+static void sdp_poll_tx_timeout(unsigned long data)
+{
+ struct sdp_sock *ssk = (struct sdp_sock *)data;
+ struct sock *sk = &ssk->isk.sk;
+ u32 inflight, wc_processed;
+
+ sdp_dbg_data(&ssk->isk.sk, "Polling tx cq. inflight=%d\n",
+ (u32) ring_posted(ssk->tx_ring));
+
+ sdp_prf(&ssk->isk.sk, NULL, "%s. inflight=%d", __func__,
+ (u32) ring_posted(ssk->tx_ring));
+
+ /* Only process if the socket is not in use */
+ bh_lock_sock(sk);
+ if (sock_owned_by_user(sk)) {
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ sdp_dbg_data(&ssk->isk.sk, "socket is busy - trying later\n");
+ SDPSTATS_COUNTER_INC(tx_poll_busy);
+ goto out;
+ }
+
+ if (unlikely(sk->sk_state == TCP_CLOSE))
+ goto out;
+
+ wc_processed = sdp_process_tx_cq(ssk);
+ if (!wc_processed)
+ SDPSTATS_COUNTER_INC(tx_poll_miss);
+ else
+ SDPSTATS_COUNTER_INC(tx_poll_hit);
+
+ inflight = (u32) ring_posted(ssk->tx_ring);
+
+ /* If there are still packets in flight and the timer has not already
+ * been scheduled by the Tx routine then schedule it here to guarantee
+ * completion processing of these packets */
+ if (inflight) { /* TODO: make sure socket is not closed */
+ sdp_dbg_data(sk, "arming timer for more polling\n");
+ mod_timer(&ssk->tx_ring.timer, jiffies + SDP_TX_POLL_TIMEOUT);
+ }
+
+out:
+ bh_unlock_sock(sk);
+}
+
+static void sdp_tx_irq(struct ib_cq *cq, void *cq_context)
+{
+ struct sock *sk = cq_context;
+ struct sdp_sock *ssk = sdp_sk(sk);
+
+ sdp_prf1(sk, NULL, "Got tx comp interrupt");
+
+ mod_timer(&ssk->tx_ring.timer, jiffies);
+}
+
+void sdp_tx_ring_purge(struct sdp_sock *ssk)
+{
+ while (ring_posted(ssk->tx_ring)) {
+ struct sk_buff *skb;
+ skb = sdp_send_completion(ssk, ring_tail(ssk->tx_ring));
+ if (!skb)
+ break;
+ __kfree_skb(skb);
+ }
+}
+
+void sdp_post_keepalive(struct sdp_sock *ssk)
+{
+ int rc;
+ struct ib_send_wr wr, *bad_wr;
+
+ sdp_dbg(&ssk->isk.sk, "%s\n", __func__);
+
+ memset(&wr, 0, sizeof(wr));
+
+ wr.next = NULL;
+ wr.wr_id = 0;
+ wr.sg_list = NULL;
+ wr.num_sge = 0;
+ wr.opcode = IB_WR_RDMA_WRITE;
+
+ rc = ib_post_send(ssk->qp, &wr, &bad_wr);
+ if (rc) {
+ sdp_dbg(&ssk->isk.sk,
+ "ib_post_keepalive failed with status %d.\n", rc);
+ sdp_set_error(&ssk->isk.sk, -ECONNRESET);
+ wake_up(&ssk->wq);
+ }
+
+ sdp_cnt(sdp_keepalive_probes_sent);
+}
+
+static void sdp_tx_cq_event_handler(struct ib_event *event, void *data)
+{
+}
+
+int sdp_tx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
+{
+ struct ib_cq *tx_cq;
+ int rc = 0;
+
+ atomic_set(&ssk->tx_ring.head, 1);
+ atomic_set(&ssk->tx_ring.tail, 1);
+
+ ssk->tx_ring.buffer = kmalloc(
+ sizeof *ssk->tx_ring.buffer * SDP_TX_SIZE, GFP_KERNEL);
+ if (!ssk->tx_ring.buffer) {
+ rc = -ENOMEM;
+ sdp_warn(&ssk->isk.sk, "Can't allocate TX Ring size %zd.\n",
+ sizeof(*ssk->tx_ring.buffer) * SDP_TX_SIZE);
+
+ goto out;
+ }
+
+ tx_cq = ib_create_cq(device, sdp_tx_irq, sdp_tx_cq_event_handler,
+ &ssk->isk.sk, SDP_TX_SIZE, 0);
+
+ if (IS_ERR(tx_cq)) {
+ rc = PTR_ERR(tx_cq);
+ sdp_warn(&ssk->isk.sk, "Unable to allocate TX CQ: %d.\n", rc);
+ goto err_cq;
+ }
+
+ sdp_sk(&ssk->isk.sk)->tx_ring.cq = tx_cq;
+
+ init_timer(&ssk->tx_ring.timer);
+ ssk->tx_ring.timer.function = sdp_poll_tx_timeout;
+ ssk->tx_ring.timer.data = (unsigned long) ssk;
+ ssk->tx_ring.poll_cnt = 0;
+
+ init_timer(&ssk->nagle_timer);
+ ssk->nagle_timer.function = sdp_nagle_timeout;
+ ssk->nagle_timer.data = (unsigned long) ssk;
+
+ return 0;
+
+err_cq:
+ kfree(ssk->tx_ring.buffer);
+ ssk->tx_ring.buffer = NULL;
+out:
+ return rc;
+}
+
+void sdp_tx_ring_destroy(struct sdp_sock *ssk)
+{
+ del_timer(&ssk->nagle_timer);
+
+ if (ssk->tx_ring.buffer) {
+ sdp_tx_ring_purge(ssk);
+
+ kfree(ssk->tx_ring.buffer);
+ ssk->tx_ring.buffer = NULL;
+ }
+
+ if (ssk->tx_ring.cq) {
+ ib_destroy_cq(ssk->tx_ring.cq);
+ ssk->tx_ring.cq = NULL;
+ }
+
+ WARN_ON(ring_head(ssk->tx_ring) != ring_tail(ssk->tx_ring));
+}
More information about the general
mailing list