[openib-general] [PATCH (draft)] sdp: fix aio/sync completion race
Michael S. Tsirkin
mst at mellanox.co.il
Tue Jun 28 13:18:57 PDT 2005
Libor, here's a stub at solving an old problem.
Most of the patch is passing sdp_opt to iocb complete and cancel functions.
I didnt yet test it, and I wont have the time today,
but maybe you could tell me whether I'm going in the right
direction with this.
---
Use "users" in the lock to prevent more iocbs from completing before
a current aio is done.
Any synchronous transfer would then wait till socket is unlocked.
(Once we switch to get_user_pages, we'll be able to do it only for receive
iocbs).
Signed-off-by: Michael S. Tsirkin <mst at mellanox.co.il>
Index: sdp_write.c
===================================================================
--- sdp_write.c (revision 2726)
+++ sdp_write.c (working copy)
@@ -109,7 +109,7 @@ int sdp_event_write(struct sdp_opt *conn
SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
break;
case SDP_DESC_TYPE_NONE:
Index: sdp_rcvd.c
===================================================================
--- sdp_rcvd.c (revision 2726)
+++ sdp_rcvd.c (working copy)
@@ -152,7 +152,7 @@ static int sdp_rcvd_send_sm(struct sdp_o
conn->src_sent--;
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
}
/*
* Cancel complete, clear the state.
@@ -209,7 +209,7 @@ static int sdp_rcvd_rdma_wr(struct sdp_o
conn->snk_sent--;
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
return 0;
}
@@ -270,7 +270,7 @@ static int sdp_rcvd_rdma_rd(struct sdp_o
SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
}
/*
* If Source Cancel was in process, and there are no more outstanding
@@ -562,7 +562,7 @@ static int sdp_rcvd_snk_cancel_ack(struc
while ((iocb = sdp_iocb_q_get_head(&conn->r_snk))) {
conn->snk_sent--;
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
}
/*
* cancellation is complete. Cancel flag is cleared in RECV post.
@@ -684,7 +684,7 @@ static int sdp_rcvd_snk_avail(struct sdp
sdp_desc_q_put_head(&conn->send_queue,
(struct sdpc_desc *)iocb);
else
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
}
/*
* If Source Cancel was in process, it should now
Index: sdp_proto.h
===================================================================
--- sdp_proto.h (revision 2726)
+++ sdp_proto.h (working copy)
@@ -162,7 +162,8 @@ void sdp_iocb_q_put_tail(struct sdpc_ioc
struct sdpc_iocb *sdp_iocb_q_lookup(struct sdpc_iocb_q *table, u32 key);
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp);
+void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
+ u32 mask, ssize_t comp);
void sdp_iocb_q_remove(struct sdpc_iocb *iocb);
@@ -170,7 +171,8 @@ int sdp_iocb_register(struct sdpc_iocb *
void sdp_iocb_release(struct sdpc_iocb *iocb);
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status);
+void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
+ ssize_t status);
int sdp_iocb_lock(struct sdpc_iocb *iocb);
Index: sdp_read.c
===================================================================
--- sdp_read.c (revision 2726)
+++ sdp_read.c (working copy)
@@ -205,7 +205,7 @@ int sdp_event_read(struct sdp_opt *conn,
SDP_CONN_STAT_READ_INC(conn, iocb->post);
SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
break;
case SDP_DESC_TYPE_NONE:
@@ -226,7 +226,7 @@ int sdp_event_read(struct sdp_opt *conn,
SDP_CONN_STAT_READ_INC(conn, iocb->post);
SDP_CONN_STAT_RQ_DEC(conn, iocb->size);
- sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+ sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
break;
default:
Index: sdp_send.c
===================================================================
--- sdp_send.c (revision 2726)
+++ sdp_send.c (working copy)
@@ -859,7 +859,7 @@ static int sdp_send_data_iocb(struct sdp
SDP_CONN_STAT_WRITE_INC(conn, iocb->post);
SDP_CONN_STAT_WQ_DEC(conn, iocb->size);
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
}
goto done;
@@ -1730,7 +1730,7 @@ static int sdp_inet_write_cancel(struct
* needs to be compelted.
*/
if (iocb->post > 0) {
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
result = -EAGAIN;
} else {
sdp_iocb_destroy(iocb);
Index: sdp_conn.c
===================================================================
--- sdp_conn.c (revision 2726)
+++ sdp_conn.c (working copy)
@@ -697,7 +697,8 @@ static int sdp_desc_q_cancel_lookup_func
return ((element->type == SDP_DESC_TYPE_IOCB) ? 0 : -ERANGE);
}
-static void sdp_desc_q_cancel_iocb(struct sdpc_desc_q *table, ssize_t error)
+static void sdp_desc_q_cancel_iocb(struct sdp_opt *conn,
+ struct sdpc_desc_q *table, ssize_t error)
{
struct sdpc_iocb *iocb;
@@ -707,24 +708,24 @@ static void sdp_desc_q_cancel_iocb(struc
NULL))) {
sdp_iocb_q_remove(iocb);
- sdp_iocb_complete(iocb, error);
+ sdp_iocb_complete(conn, iocb, error);
}
}
void sdp_iocb_q_cancel_all_read(struct sdp_opt *conn, ssize_t error)
{
- sdp_iocb_q_cancel(&conn->r_pend, SDP_IOCB_F_ALL, error);
- sdp_iocb_q_cancel(&conn->r_snk, SDP_IOCB_F_ALL, error);
+ sdp_iocb_q_cancel(conn, &conn->r_pend, SDP_IOCB_F_ALL, error);
+ sdp_iocb_q_cancel(conn, &conn->r_snk, SDP_IOCB_F_ALL, error);
- sdp_desc_q_cancel_iocb(&conn->r_src, error);
+ sdp_desc_q_cancel_iocb(conn, &conn->r_src, error);
}
void sdp_iocb_q_cancel_all_write(struct sdp_opt *conn, ssize_t error)
{
- sdp_iocb_q_cancel(&conn->w_src, SDP_IOCB_F_ALL, error);
+ sdp_iocb_q_cancel(conn, &conn->w_src, SDP_IOCB_F_ALL, error);
- sdp_desc_q_cancel_iocb(&conn->send_queue, error);
- sdp_desc_q_cancel_iocb(&conn->w_snk, error);
+ sdp_desc_q_cancel_iocb(conn, &conn->send_queue, error);
+ sdp_desc_q_cancel_iocb(conn, &conn->w_snk, error);
}
void sdp_iocb_q_cancel_all(struct sdp_opt *conn, ssize_t error)
Index: sdp_recv.c
===================================================================
--- sdp_recv.c (revision 2726)
+++ sdp_recv.c (working copy)
@@ -663,7 +663,7 @@ static int sdp_recv_buff_iocb_active(str
/*
* callback to complete IOCB
*/
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
return (buff->tail - buff->data);
}
@@ -716,7 +716,7 @@ static int sdp_recv_buff_iocb_pending(st
/*
* callback to complete IOCB
*/
- sdp_iocb_complete(sdp_iocb_q_get_head(&conn->r_pend), 0);
+ sdp_iocb_complete(conn, sdp_iocb_q_get_head(&conn->r_pend), 0);
}
return (buff->tail - buff->data);
@@ -875,7 +875,7 @@ static int sdp_inet_read_cancel(struct k
/*
* callback to complete IOCB, or drop reference
*/
- sdp_iocb_complete(iocb, 0);
+ sdp_iocb_complete(conn, iocb, 0);
result = -EAGAIN;
}
else {
Index: sdp_conn.h
===================================================================
--- sdp_conn.h (revision 2726)
+++ sdp_conn.h (working copy)
@@ -471,7 +471,7 @@ static inline void sdp_conn_unlock(struc
sdp_conn_internal_unlock(conn);
}
- conn->lock.users = 0;
+ --conn->lock.users;
wake_up(&conn->lock.waitq);
spin_unlock_irqrestore(&conn->lock.slock, flags);
Index: sdp_iocb.c
===================================================================
--- sdp_iocb.c (revision 2726)
+++ sdp_iocb.c (working copy)
@@ -508,6 +508,8 @@ static void do_iocb_complete(void *arg)
* we ignore the value.
*/
(void)aio_complete(iocb->req, value, 0);
+
+ sdp_conn_unlock(iocb->conn);
/*
* delete IOCB
*/
@@ -517,11 +519,14 @@ static void do_iocb_complete(void *arg)
/*
* sdp_iocb_complete - complete an IOCB
*/
-void sdp_iocb_complete(struct sdpc_iocb *iocb, ssize_t status)
+void sdp_iocb_complete(struct sdp_opt *conn, struct sdpc_iocb *iocb,
+ ssize_t status)
{
iocb->status = status;
if (in_atomic() || irqs_disabled()) {
+ conn->lock.users++;
+ iocb->conn = conn;
INIT_WORK(&iocb->completion, do_iocb_complete, (void *)iocb);
schedule_work(&iocb->completion);
} else
@@ -750,7 +755,8 @@ void sdp_iocb_q_put_head(struct sdpc_ioc
/*
* sdp_iocb_q_cancel - cancel all outstanding AIOs in a queue
*/
-void sdp_iocb_q_cancel(struct sdpc_iocb_q *table, u32 mask, ssize_t comp)
+void sdp_iocb_q_cancel(struct sdp_opt *conn, struct sdpc_iocb_q *table,
+ u32 mask, ssize_t comp)
{
struct sdpc_iocb *iocb;
struct sdpc_iocb *next;
@@ -772,7 +778,7 @@ void sdp_iocb_q_cancel(struct sdpc_iocb_
iocb->post, iocb->len);
sdp_iocb_q_remove(iocb);
- sdp_iocb_complete(iocb, comp);
+ sdp_iocb_complete(conn, iocb, comp);
}
iocb = next;
Index: sdp_iocb.h
===================================================================
--- sdp_iocb.h (revision 2726)
+++ sdp_iocb.h (working copy)
@@ -109,7 +109,8 @@ struct sdpc_iocb {
int page_count; /* number of physical pages. */
int page_offset; /* offset into first page. */
- struct work_struct completion; /* task for defered completion. */
+ struct work_struct completion; /* task for deferred completion. */
+ struct sdp_opt *conn; /* conn to unlock for deferred completion. */
/*
* kernel iocb structure
*/
--
MST
More information about the general
mailing list