ceph: use connection mutex to protect read and write stages
Use a single mutex (previously out_mutex) to protect both read and write activity from concurrent ceph_con_* calls. Drop the mutex when doing callbacks to avoid nested locking (the callback may need to call something like ceph_con_close). Signed-off-by: Sage Weil <sage@newdream.net>
This commit is contained in:
parent
529cfcc46f
commit
ec302645f4
@ -316,7 +316,6 @@ static void reset_connection(struct ceph_connection *con)
|
|||||||
{
|
{
|
||||||
/* reset connection, out_queue, msg_ and connect_seq */
|
/* reset connection, out_queue, msg_ and connect_seq */
|
||||||
/* discard existing out_queue and msg_seq */
|
/* discard existing out_queue and msg_seq */
|
||||||
mutex_lock(&con->out_mutex);
|
|
||||||
ceph_msg_remove_list(&con->out_queue);
|
ceph_msg_remove_list(&con->out_queue);
|
||||||
ceph_msg_remove_list(&con->out_sent);
|
ceph_msg_remove_list(&con->out_sent);
|
||||||
|
|
||||||
@ -332,7 +331,6 @@ static void reset_connection(struct ceph_connection *con)
|
|||||||
con->out_msg = NULL;
|
con->out_msg = NULL;
|
||||||
}
|
}
|
||||||
con->in_seq = 0;
|
con->in_seq = 0;
|
||||||
mutex_unlock(&con->out_mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -343,7 +341,9 @@ void ceph_con_close(struct ceph_connection *con)
|
|||||||
dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
|
dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
|
||||||
set_bit(CLOSED, &con->state); /* in case there's queued work */
|
set_bit(CLOSED, &con->state); /* in case there's queued work */
|
||||||
clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
|
clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
reset_connection(con);
|
reset_connection(con);
|
||||||
|
mutex_unlock(&con->mutex);
|
||||||
queue_con(con);
|
queue_con(con);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -392,7 +392,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
|
|||||||
memset(con, 0, sizeof(*con));
|
memset(con, 0, sizeof(*con));
|
||||||
atomic_set(&con->nref, 1);
|
atomic_set(&con->nref, 1);
|
||||||
con->msgr = msgr;
|
con->msgr = msgr;
|
||||||
mutex_init(&con->out_mutex);
|
mutex_init(&con->mutex);
|
||||||
INIT_LIST_HEAD(&con->out_queue);
|
INIT_LIST_HEAD(&con->out_queue);
|
||||||
INIT_LIST_HEAD(&con->out_sent);
|
INIT_LIST_HEAD(&con->out_sent);
|
||||||
INIT_DELAYED_WORK(&con->work, con_work);
|
INIT_DELAYED_WORK(&con->work, con_work);
|
||||||
@ -571,11 +571,13 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
|
|||||||
int auth_len = 0;
|
int auth_len = 0;
|
||||||
int auth_protocol = 0;
|
int auth_protocol = 0;
|
||||||
|
|
||||||
|
mutex_unlock(&con->mutex);
|
||||||
if (con->ops->get_authorizer)
|
if (con->ops->get_authorizer)
|
||||||
con->ops->get_authorizer(con, &auth_buf, &auth_len,
|
con->ops->get_authorizer(con, &auth_buf, &auth_len,
|
||||||
&auth_protocol, &con->auth_reply_buf,
|
&auth_protocol, &con->auth_reply_buf,
|
||||||
&con->auth_reply_buf_len,
|
&con->auth_reply_buf_len,
|
||||||
con->auth_retry);
|
con->auth_retry);
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
|
|
||||||
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
|
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
|
||||||
con->out_connect.authorizer_len = cpu_to_le32(auth_len);
|
con->out_connect.authorizer_len = cpu_to_le32(auth_len);
|
||||||
@ -1094,10 +1096,13 @@ static int process_connect(struct ceph_connection *con)
|
|||||||
le32_to_cpu(con->out_connect.protocol_version),
|
le32_to_cpu(con->out_connect.protocol_version),
|
||||||
le32_to_cpu(con->in_reply.protocol_version));
|
le32_to_cpu(con->in_reply.protocol_version));
|
||||||
con->error_msg = "protocol version mismatch";
|
con->error_msg = "protocol version mismatch";
|
||||||
if (con->ops->bad_proto)
|
|
||||||
con->ops->bad_proto(con);
|
|
||||||
reset_connection(con);
|
reset_connection(con);
|
||||||
set_bit(CLOSED, &con->state); /* in case there's queued work */
|
set_bit(CLOSED, &con->state); /* in case there's queued work */
|
||||||
|
|
||||||
|
mutex_unlock(&con->mutex);
|
||||||
|
if (con->ops->bad_proto)
|
||||||
|
con->ops->bad_proto(con);
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
case CEPH_MSGR_TAG_BADAUTHORIZER:
|
case CEPH_MSGR_TAG_BADAUTHORIZER:
|
||||||
@ -1133,9 +1138,11 @@ static int process_connect(struct ceph_connection *con)
|
|||||||
prepare_read_connect(con);
|
prepare_read_connect(con);
|
||||||
|
|
||||||
/* Tell ceph about it. */
|
/* Tell ceph about it. */
|
||||||
|
mutex_unlock(&con->mutex);
|
||||||
pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
|
pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
|
||||||
if (con->ops->peer_reset)
|
if (con->ops->peer_reset)
|
||||||
con->ops->peer_reset(con);
|
con->ops->peer_reset(con);
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case CEPH_MSGR_TAG_RETRY_SESSION:
|
case CEPH_MSGR_TAG_RETRY_SESSION:
|
||||||
@ -1221,7 +1228,6 @@ static void process_ack(struct ceph_connection *con)
|
|||||||
u64 ack = le64_to_cpu(con->in_temp_ack);
|
u64 ack = le64_to_cpu(con->in_temp_ack);
|
||||||
u64 seq;
|
u64 seq;
|
||||||
|
|
||||||
mutex_lock(&con->out_mutex);
|
|
||||||
while (!list_empty(&con->out_sent)) {
|
while (!list_empty(&con->out_sent)) {
|
||||||
m = list_first_entry(&con->out_sent, struct ceph_msg,
|
m = list_first_entry(&con->out_sent, struct ceph_msg,
|
||||||
list_head);
|
list_head);
|
||||||
@ -1232,7 +1238,6 @@ static void process_ack(struct ceph_connection *con)
|
|||||||
le16_to_cpu(m->hdr.type), m);
|
le16_to_cpu(m->hdr.type), m);
|
||||||
ceph_msg_remove(m);
|
ceph_msg_remove(m);
|
||||||
}
|
}
|
||||||
mutex_unlock(&con->out_mutex);
|
|
||||||
prepare_read_tag(con);
|
prepare_read_tag(con);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1366,8 +1371,10 @@ static int read_partial_message(struct ceph_connection *con)
|
|||||||
/* find pages for data payload */
|
/* find pages for data payload */
|
||||||
want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
|
want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
|
||||||
ret = -1;
|
ret = -1;
|
||||||
|
mutex_unlock(&con->mutex);
|
||||||
if (con->ops->prepare_pages)
|
if (con->ops->prepare_pages)
|
||||||
ret = con->ops->prepare_pages(con, m, want);
|
ret = con->ops->prepare_pages(con, m, want);
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
dout("%p prepare_pages failed, skipping payload\n", m);
|
dout("%p prepare_pages failed, skipping payload\n", m);
|
||||||
con->in_base_pos = -data_len - sizeof(m->footer);
|
con->in_base_pos = -data_len - sizeof(m->footer);
|
||||||
@ -1454,9 +1461,8 @@ static void process_message(struct ceph_connection *con)
|
|||||||
if (con->peer_name.type == 0)
|
if (con->peer_name.type == 0)
|
||||||
con->peer_name = msg->hdr.src.name;
|
con->peer_name = msg->hdr.src.name;
|
||||||
|
|
||||||
mutex_lock(&con->out_mutex);
|
|
||||||
con->in_seq++;
|
con->in_seq++;
|
||||||
mutex_unlock(&con->out_mutex);
|
mutex_unlock(&con->mutex);
|
||||||
|
|
||||||
dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
|
dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
|
||||||
msg, le64_to_cpu(msg->hdr.seq),
|
msg, le64_to_cpu(msg->hdr.seq),
|
||||||
@ -1467,6 +1473,8 @@ static void process_message(struct ceph_connection *con)
|
|||||||
le32_to_cpu(msg->hdr.data_len),
|
le32_to_cpu(msg->hdr.data_len),
|
||||||
con->in_front_crc, con->in_middle_crc, con->in_data_crc);
|
con->in_front_crc, con->in_middle_crc, con->in_data_crc);
|
||||||
con->ops->dispatch(con, msg);
|
con->ops->dispatch(con, msg);
|
||||||
|
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
prepare_read_tag(con);
|
prepare_read_tag(con);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1483,7 +1491,7 @@ static int try_write(struct ceph_connection *con)
|
|||||||
dout("try_write start %p state %lu nref %d\n", con, con->state,
|
dout("try_write start %p state %lu nref %d\n", con, con->state,
|
||||||
atomic_read(&con->nref));
|
atomic_read(&con->nref));
|
||||||
|
|
||||||
mutex_lock(&con->out_mutex);
|
mutex_lock(&con->mutex);
|
||||||
more:
|
more:
|
||||||
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
|
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
|
||||||
|
|
||||||
@ -1576,7 +1584,7 @@ static int try_write(struct ceph_connection *con)
|
|||||||
done:
|
done:
|
||||||
ret = 0;
|
ret = 0;
|
||||||
out:
|
out:
|
||||||
mutex_unlock(&con->out_mutex);
|
mutex_unlock(&con->mutex);
|
||||||
dout("try_write done on %p\n", con);
|
dout("try_write done on %p\n", con);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
@ -1600,6 +1608,8 @@ static int try_read(struct ceph_connection *con)
|
|||||||
dout("try_read start on %p\n", con);
|
dout("try_read start on %p\n", con);
|
||||||
msgr = con->msgr;
|
msgr = con->msgr;
|
||||||
|
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
|
|
||||||
more:
|
more:
|
||||||
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
|
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
|
||||||
con->in_base_pos);
|
con->in_base_pos);
|
||||||
@ -1693,6 +1703,7 @@ static int try_read(struct ceph_connection *con)
|
|||||||
done:
|
done:
|
||||||
ret = 0;
|
ret = 0;
|
||||||
out:
|
out:
|
||||||
|
mutex_unlock(&con->mutex);
|
||||||
dout("try_read done on %p\n", con);
|
dout("try_read done on %p\n", con);
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
@ -1818,6 +1829,8 @@ static void ceph_fault(struct ceph_connection *con)
|
|||||||
|
|
||||||
clear_bit(BUSY, &con->state); /* to avoid an improbable race */
|
clear_bit(BUSY, &con->state); /* to avoid an improbable race */
|
||||||
|
|
||||||
|
mutex_lock(&con->mutex);
|
||||||
|
|
||||||
con_close_socket(con);
|
con_close_socket(con);
|
||||||
|
|
||||||
if (con->in_msg) {
|
if (con->in_msg) {
|
||||||
@ -1827,24 +1840,24 @@ static void ceph_fault(struct ceph_connection *con)
|
|||||||
|
|
||||||
/* If there are no messages in the queue, place the connection
|
/* If there are no messages in the queue, place the connection
|
||||||
* in a STANDBY state (i.e., don't try to reconnect just yet). */
|
* in a STANDBY state (i.e., don't try to reconnect just yet). */
|
||||||
mutex_lock(&con->out_mutex);
|
|
||||||
if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
|
if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
|
||||||
dout("fault setting STANDBY\n");
|
dout("fault setting STANDBY\n");
|
||||||
set_bit(STANDBY, &con->state);
|
set_bit(STANDBY, &con->state);
|
||||||
mutex_unlock(&con->out_mutex);
|
mutex_unlock(&con->mutex);
|
||||||
goto out;
|
goto out;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Requeue anything that hasn't been acked, and retry after a
|
/* Requeue anything that hasn't been acked, and retry after a
|
||||||
* delay. */
|
* delay. */
|
||||||
list_splice_init(&con->out_sent, &con->out_queue);
|
list_splice_init(&con->out_sent, &con->out_queue);
|
||||||
mutex_unlock(&con->out_mutex);
|
|
||||||
|
|
||||||
if (con->delay == 0)
|
if (con->delay == 0)
|
||||||
con->delay = BASE_DELAY_INTERVAL;
|
con->delay = BASE_DELAY_INTERVAL;
|
||||||
else if (con->delay < MAX_DELAY_INTERVAL)
|
else if (con->delay < MAX_DELAY_INTERVAL)
|
||||||
con->delay *= 2;
|
con->delay *= 2;
|
||||||
|
|
||||||
|
mutex_unlock(&con->mutex);
|
||||||
|
|
||||||
/* explicitly schedule work to try to reconnect again later. */
|
/* explicitly schedule work to try to reconnect again later. */
|
||||||
dout("fault queueing %p delay %lu\n", con, con->delay);
|
dout("fault queueing %p delay %lu\n", con, con->delay);
|
||||||
con->ops->get(con);
|
con->ops->get(con);
|
||||||
@ -1920,7 +1933,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
|
|||||||
msg->hdr.dst_erank = con->peer_addr.erank;
|
msg->hdr.dst_erank = con->peer_addr.erank;
|
||||||
|
|
||||||
/* queue */
|
/* queue */
|
||||||
mutex_lock(&con->out_mutex);
|
mutex_lock(&con->mutex);
|
||||||
BUG_ON(!list_empty(&msg->list_head));
|
BUG_ON(!list_empty(&msg->list_head));
|
||||||
list_add_tail(&msg->list_head, &con->out_queue);
|
list_add_tail(&msg->list_head, &con->out_queue);
|
||||||
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
|
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
|
||||||
@ -1929,7 +1942,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
|
|||||||
le32_to_cpu(msg->hdr.front_len),
|
le32_to_cpu(msg->hdr.front_len),
|
||||||
le32_to_cpu(msg->hdr.middle_len),
|
le32_to_cpu(msg->hdr.middle_len),
|
||||||
le32_to_cpu(msg->hdr.data_len));
|
le32_to_cpu(msg->hdr.data_len));
|
||||||
mutex_unlock(&con->out_mutex);
|
mutex_unlock(&con->mutex);
|
||||||
|
|
||||||
/* if there wasn't anything waiting to send before, queue
|
/* if there wasn't anything waiting to send before, queue
|
||||||
* new work */
|
* new work */
|
||||||
@ -1942,7 +1955,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
|
|||||||
*/
|
*/
|
||||||
void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
|
void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
|
||||||
{
|
{
|
||||||
mutex_lock(&con->out_mutex);
|
mutex_lock(&con->mutex);
|
||||||
if (!list_empty(&msg->list_head)) {
|
if (!list_empty(&msg->list_head)) {
|
||||||
dout("con_revoke %p msg %p\n", con, msg);
|
dout("con_revoke %p msg %p\n", con, msg);
|
||||||
list_del_init(&msg->list_head);
|
list_del_init(&msg->list_head);
|
||||||
@ -1959,7 +1972,7 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
|
|||||||
} else {
|
} else {
|
||||||
dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
|
dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
|
||||||
}
|
}
|
||||||
mutex_unlock(&con->out_mutex);
|
mutex_unlock(&con->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -155,8 +155,9 @@ struct ceph_connection {
|
|||||||
void *auth_reply_buf; /* where to put the authorizer reply */
|
void *auth_reply_buf; /* where to put the authorizer reply */
|
||||||
int auth_reply_buf_len;
|
int auth_reply_buf_len;
|
||||||
|
|
||||||
|
struct mutex mutex;
|
||||||
|
|
||||||
/* out queue */
|
/* out queue */
|
||||||
struct mutex out_mutex;
|
|
||||||
struct list_head out_queue;
|
struct list_head out_queue;
|
||||||
struct list_head out_sent; /* sending or sent but unacked */
|
struct list_head out_sent; /* sending or sent but unacked */
|
||||||
u64 out_seq; /* last message queued for send */
|
u64 out_seq; /* last message queued for send */
|
||||||
|
Loading…
Reference in New Issue
Block a user