From 755c1458f0197d4da1dcb86ac832a8ffa8d02b27 Mon Sep 17 00:00:00 2001 From: Matt Johnston Date: Wed, 18 Feb 2015 22:46:15 +0800 Subject: async connections working --HG-- branch : fastopen --- channel.h | 4 +++ cli-main.c | 8 ++--- cli-session.c | 9 +++++ cli-tcpfwd.c | 14 +------- common-channel.c | 45 +++++++++++------------- common-session.c | 7 +++- dbutil.c | 104 +++++++++++++++++++++++++++++++------------------------ dbutil.h | 3 ++ packet.c | 2 ++ session.h | 1 + svr-tcpfwd.c | 14 +------- 11 files changed, 107 insertions(+), 104 deletions(-) diff --git a/channel.h b/channel.h index a310d44..b543ea1 100644 --- a/channel.h +++ b/channel.h @@ -73,6 +73,7 @@ struct Channel { * to ensure we don't run it twice (nor type->checkclose()). */ int close_handler_done; + struct dropbear_progress_connection *conn_pending; int initconn; /* used for TCP forwarding, whether the channel has been fully initialised */ @@ -100,6 +101,9 @@ struct ChanType { void (*closehandler)(struct Channel*); }; +/* Callback for connect_remote */ +void channel_connect_done(int result, int sock, void* user_data, const char* errstring); + void chaninitialise(const struct ChanType *chantypes[]); void chancleanup(); void setchannelfds(fd_set *readfd, fd_set *writefd); diff --git a/cli-main.c b/cli-main.c index a956721..6834d1d 100644 --- a/cli-main.c +++ b/cli-main.c @@ -72,12 +72,8 @@ int main(int argc, char ** argv) { } else #endif { - int sock = connect_remote(cli_opts.remotehost, cli_opts.remoteport, &error); - sock_in = sock_out = sock; - } - - if (sock_in < 0) { - dropbear_exit("%s", error); + connect_remote(cli_opts.remotehost, cli_opts.remoteport, cli_connected, NULL); + sock_in = sock_out = -1; } cli_session(sock_in, sock_out); diff --git a/cli-session.c b/cli-session.c index a484bf7..a5ae728 100644 --- a/cli-session.c +++ b/cli-session.c @@ -93,6 +93,15 @@ static const struct ChanType *cli_chantypes[] = { NULL /* Null termination */ }; +void cli_connected(int result, int sock, void* userdata, const char *errstring) +{ + if (result == DROPBEAR_FAILURE) + { + dropbear_exit("Connect failed: %s", errstring); + } + ses.sock_in = ses.sock_out = sock; +} + void cli_session(int sock_in, int sock_out) { common_session_init(sock_in, sock_out); diff --git a/cli-tcpfwd.c b/cli-tcpfwd.c index 3894044..3e87ffd 100644 --- a/cli-tcpfwd.c +++ b/cli-tcpfwd.c @@ -254,19 +254,7 @@ static int newtcpforwarded(struct Channel * channel) { } snprintf(portstring, sizeof(portstring), "%d", fwd->connectport); - sock = connect_remote(fwd->connectaddr, portstring, NULL); - if (sock < 0) { - TRACE(("leave newtcpdirect: sock failed")) - err = SSH_OPEN_CONNECT_FAILED; - goto out; - } - - ses.maxfd = MAX(ses.maxfd, sock); - - /* We don't set readfd, that will get set after the connection's - * progress succeeds */ - channel->writefd = sock; - channel->initconn = 1; + channel->conn_pending = connect_remote(fwd->connectaddr, portstring, channel_connect_done, channel); channel->prio = DROPBEAR_CHANNEL_PRIO_UNKNOWABLE; diff --git a/common-channel.c b/common-channel.c index 049658d..40f8613 100644 --- a/common-channel.c +++ b/common-channel.c @@ -48,7 +48,6 @@ static void send_msg_channel_data(struct Channel *channel, int isextended); static void send_msg_channel_eof(struct Channel *channel); static void send_msg_channel_close(struct Channel *channel); static void remove_channel(struct Channel *channel); -static void check_in_progress(struct Channel *channel); static unsigned int write_pending(struct Channel * channel); static void check_close(struct Channel *channel); static void close_chan_fd(struct Channel *channel, int fd, int how); @@ -163,7 +162,6 @@ static struct Channel* newchannel(unsigned int remotechan, newchan->writefd = FD_UNINIT; newchan->readfd = FD_UNINIT; newchan->errfd = FD_CLOSED; /* this isn't always set to start with */ - newchan->initconn = 0; newchan->await_open = 0; newchan->flushing = 0; @@ -242,12 +240,6 @@ void channelio(fd_set *readfds, fd_set *writefds) { /* write to program/pipe stdin */ if (channel->writefd >= 0 && FD_ISSET(channel->writefd, writefds)) { - if (channel->initconn) { - /* XXX should this go somewhere cleaner? */ - check_in_progress(channel); - continue; /* Important not to use the channel after - check_in_progress(), as it may be NULL */ - } writechannel(channel, channel->writefd, channel->writebuf); do_check_close = 1; } @@ -374,27 +366,27 @@ static void check_close(struct Channel *channel) { * if so, set up the channel properly. Otherwise, the channel is cleaned up, so * it is important that the channel reference isn't used after a call to this * function */ -static void check_in_progress(struct Channel *channel) { +void channel_connect_done(int result, int sock, void* user_data, const char* UNUSED(errstring)) { - int val; - socklen_t vallen = sizeof(val); + struct Channel *channel = user_data; - TRACE(("enter check_in_progress")) + TRACE(("enter channel_connect_done")) - if (getsockopt(channel->writefd, SOL_SOCKET, SO_ERROR, &val, &vallen) - || val != 0) { + if (result == DROPBEAR_SUCCESS) + { + channel->readfd = channel->writefd = sock; + channel->conn_pending = NULL; + chan_initwritebuf(channel); + send_msg_channel_open_confirmation(channel, channel->recvwindow, + channel->recvmaxpacket); + TRACE(("leave channel_connect_done: success")) + } + else + { send_msg_channel_open_failure(channel->remotechan, SSH_OPEN_CONNECT_FAILED, "", ""); - close(channel->writefd); remove_channel(channel); TRACE(("leave check_in_progress: fail")) - } else { - chan_initwritebuf(channel); - send_msg_channel_open_confirmation(channel, channel->recvwindow, - channel->recvmaxpacket); - channel->readfd = channel->writefd; - channel->initconn = 0; - TRACE(("leave check_in_progress: success")) } } @@ -514,8 +506,7 @@ void setchannelfds(fd_set *readfds, fd_set *writefds) { } /* Stuff from the wire */ - if (channel->initconn - ||(channel->writefd >= 0 && cbuf_getused(channel->writebuf) > 0)) { + if (channel->writefd >= 0 && cbuf_getused(channel->writebuf) > 0) { FD_SET(channel->writefd, writefds); } @@ -599,6 +590,10 @@ static void remove_channel(struct Channel * channel) { channel->close_handler_done = 1; } + if (channel->conn_pending) { + cancel_connect(channel->conn_pending); + } + ses.channels[channel->index] = NULL; m_free(channel); ses.chancount--; @@ -1149,7 +1144,7 @@ struct Channel* get_any_ready_channel() { struct Channel *chan = ses.channels[i]; if (chan && !(chan->sent_eof || chan->recv_eof) - && !(chan->await_open || chan->initconn)) { + && !(chan->await_open)) { return chan; } } diff --git a/common-session.c b/common-session.c index 26ef147..6ca7f54 100644 --- a/common-session.c +++ b/common-session.c @@ -167,6 +167,9 @@ void session_loop(void(*loophandler)()) { /* set up for channels which can be read/written */ setchannelfds(&readfd, &writefd); + /* Pending connections to test */ + set_connect_fds(&writefd); + val = select(ses.maxfd+1, &readfd, &writefd, NULL, &timeout); if (exitflag) { @@ -214,11 +217,13 @@ void session_loop(void(*loophandler)()) { process_packet(); } } - + /* if required, flush out any queued reply packets that were being held up during a KEX */ maybe_flush_reply_queue(); + handle_connect_fds(&writefd); + /* process pipes etc for the channels, ses.dataallowed == 0 * during rekeying ) */ channelio(&readfd, &writefd); diff --git a/dbutil.c b/dbutil.c index edae1af..7b3a664 100644 --- a/dbutil.c +++ b/dbutil.c @@ -995,6 +995,8 @@ struct dropbear_progress_connection or NULL. */ int sock; + + char* errstring; }; /* Deallocate a progress connection. Removes from the pending list if iter!=NULL. @@ -1005,6 +1007,7 @@ static void remove_connect(struct dropbear_progress_connection *c, m_list_elem * } m_free(c->remotehost); m_free(c->remoteport); + m_free(c->errstring); m_free(c); if (iter) { @@ -1012,12 +1015,24 @@ static void remove_connect(struct dropbear_progress_connection *c, m_list_elem * } } -static int connect_try_next(struct dropbear_progress_connection *c) { +static void cancel_callback(int result, int sock, void* UNUSED(data), const char* UNUSED(errstring)) { + if (result == DROPBEAR_SUCCESS) + { + m_close(sock); + } +} + +void cancel_connect(struct dropbear_progress_connection *c) { + c->cb = cancel_callback; + c->cb_data = NULL; +} + +static void connect_try_next(struct dropbear_progress_connection *c) { int err = EADDRNOTAVAIL; struct addrinfo *r; if (!c->res_iter) { - return DROPBEAR_FAILURE; + return; } for (r = c->res_iter; r; r = r->ai_next) @@ -1030,6 +1045,7 @@ static int connect_try_next(struct dropbear_progress_connection *c) { continue; } + ses.maxfd = MAX(ses.maxfd, c->sock); setnonblocking(c->sock); #if defined(__linux__) && defined(TCP_DEFER_ACCEPT) @@ -1060,8 +1076,12 @@ static int connect_try_next(struct dropbear_progress_connection *c) { if (c->sock >= 0 || (errno == EINPROGRESS)) { /* Success */ set_sock_nodelay(c->sock); - return DROPBEAR_SUCCESS; + return; } else { + if (!c->res_iter) + { + + } /* XXX - returning error message through */ #if 0 /* Failed */ @@ -1073,15 +1093,10 @@ static int connect_try_next(struct dropbear_progress_connection *c) { } TRACE(("Error connecting: %s", strerror(err))) #endif - return DROPBEAR_FAILURE; } } -/* Connect via TCP to a host. Connection will try ipv4 or ipv6, will - * return immediately if nonblocking is set. On failure, if errstring - * wasn't null, it will be a newly malloced error message */ - -/* TODO: maxfd */ +/* Connect via TCP to a host. */ struct dropbear_progress_connection *connect_remote(const char* remotehost, const char* remoteport, connect_callback cb, void* cb_data) { @@ -1096,6 +1111,8 @@ struct dropbear_progress_connection *connect_remote(const char* remotehost, cons c->cb = cb; c->cb_data = cb_data; + list_append(&ses.conn_pending, c); + memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_STREAM; hints.ai_family = PF_UNSPEC; @@ -1103,29 +1120,18 @@ struct dropbear_progress_connection *connect_remote(const char* remotehost, cons err = getaddrinfo(remotehost, remoteport, &hints, &c->res); if (err) { int len; - char *errstring; len = 100 + strlen(gai_strerror(err)); - errstring = (char*)m_malloc(len); - snprintf(errstring, len, "Error resolving '%s' port '%s'. %s", + c->errstring = (char*)m_malloc(len); + snprintf(c->errstring, len, "Error resolving '%s' port '%s'. %s", remotehost, remoteport, gai_strerror(err)); - c->cb(DROPBEAR_FAILURE, -1, c->cb_data, errstring); - m_free(errstring); TRACE(("Error resolving: %s", gai_strerror(err))) - remove_connect(c, NULL); return NULL; } c->res_iter = c->res; - if (connect_try_next(c) == DROPBEAR_FAILURE) { - /* Should not happen - getaddrinfo() should return failure if there are no addresses */ - c->cb(DROPBEAR_FAILURE, -1, c->cb_data, "No address to try"); - TRACE(("leave handle_connect_fds - failed")) - remove_connect(c, NULL); - return NULL; - } - - list_append(&ses.conn_pending, c); + /* Set one going */ + connect_try_next(c); return c; } @@ -1136,12 +1142,24 @@ void set_connect_fds(fd_set *writefd) { TRACE(("enter handle_connect_fds")) for (iter = ses.conn_pending.first; iter; iter = iter->next) { struct dropbear_progress_connection *c = iter->item; + /* Set one going */ + while (c->res_iter && c->sock < 0) + { + connect_try_next(c); + } if (c->sock >= 0) { FD_SET(c->sock, writefd); - } - else - { - + } else { + m_list_elem *remove_iter; + /* Final failure */ + if (!c->errstring) { + c->errstring = m_strdup("unexpected failure"); + } + c->cb(DROPBEAR_FAILURE, -1, c->cb_data, c->errstring); + /* Safely remove without invalidating iter */ + remove_iter = iter; + iter = iter->prev; + remove_connect(c, remove_iter); } } } @@ -1162,31 +1180,25 @@ void handle_connect_fds(fd_set *writefd) { if (getsockopt(c->sock, SOL_SOCKET, SO_ERROR, &val, &vallen) != 0) { TRACE(("handle_connect_fds getsockopt(%d) SO_ERROR failed: %s", c->sock, strerror(errno))) + /* This isn't expected to happen - Unix has surprises though, continue gracefully. */ + m_close(c->sock); + c->sock = -1; } else if (val != 0) { /* Connect failed */ TRACE(("connect to %s port %s failed.", c->remotehost, c->remoteport)) m_close(c->sock); c->sock = -1; - if (connect_try_next(c) == DROPBEAR_FAILURE) { - c->cb(DROPBEAR_FAILURE, -1, c->cb_data, strerror(val)); - TRACE(("leave handle_connect_fds - failed")) - remove_connect(c, iter); - /* Must return here - remove_connect() invalidates iter */ - return; - } else { - /* new connection try was successfuly started, will be finished by a - later call to handle_connect_fds() */ - TRACE(("leave handle_connect_fds - new try")) - continue; - } + m_free(c->errstring); + c->errstring = strerror(val); + } else { + /* New connection has been established */ + c->cb(DROPBEAR_SUCCESS, c->sock, c->cb_data, NULL); + remove_connect(c, iter); + TRACE(("leave handle_connect_fds - success")) + /* Must return here - remove_connect() invalidates iter */ + return; } - /* New connection has been established */ - c->cb(DROPBEAR_SUCCESS, c->sock, c->cb_data, ""); - remove_connect(c, iter); - TRACE(("leave handle_connect_fds - success")) - /* Must return here - remove_connect() invalidates iter */ - return; } TRACE(("leave handle_connect_fds - end iter")) } diff --git a/dbutil.h b/dbutil.h index 3f4f165..e05b249 100644 --- a/dbutil.h +++ b/dbutil.h @@ -129,4 +129,7 @@ struct dropbear_progress_connection * connect_remote (const char* remotehost, co void set_connect_fds(fd_set *writefd); void handle_connect_fds(fd_set *writefd); +/* Doesn't actually stop the connect, but adds a dummy callback instead */ +void cancel_connect(struct dropbear_progress_connection *c); + #endif /* _DBUTIL_H_ */ diff --git a/packet.c b/packet.c index 10ee88e..d02ec69 100644 --- a/packet.c +++ b/packet.c @@ -52,6 +52,7 @@ static buffer* buf_decompress(buffer* buf, unsigned int len); static void buf_compress(buffer * dest, buffer * src, unsigned int len); #endif +#if 0 struct iovec * dropbear_queue_to_iovec(struct Queue *queue) { struct iovec *iov = NULL; @@ -69,6 +70,7 @@ struct iovec * dropbear_queue_to_iovec(struct Queue *queue) { void dropbear_queue_consume(struct Queue *queue, ssize_t written) { } +#endif /* non-blocking function writing out a current encrypted packet */ void write_packet() { diff --git a/session.h b/session.h index 50d8d10..3786346 100644 --- a/session.h +++ b/session.h @@ -61,6 +61,7 @@ void svr_dropbear_log(int priority, const char* format, va_list param); /* Client */ void cli_session(int sock_in, int sock_out); +void cli_connected(int result, int sock, void* userdata, const char *errstring); void cleantext(unsigned char* dirtytext); /* crypto parameters that are stored individually for transmit and receive */ diff --git a/svr-tcpfwd.c b/svr-tcpfwd.c index f2c4b93..8f364b5 100644 --- a/svr-tcpfwd.c +++ b/svr-tcpfwd.c @@ -270,19 +270,7 @@ static int newtcpdirect(struct Channel * channel) { } snprintf(portstring, sizeof(portstring), "%d", destport); - sock = connect_remote(desthost, portstring, NULL); - if (sock < 0) { - err = SSH_OPEN_CONNECT_FAILED; - TRACE(("leave newtcpdirect: sock failed")) - goto out; - } - - ses.maxfd = MAX(ses.maxfd, sock); - - /* We don't set readfd, that will get set after the connection's - * progress succeeds */ - channel->writefd = sock; - channel->initconn = 1; + channel->conn_pending = connect_remote(desthost, portstring, channel_connect_done, channel); channel->prio = DROPBEAR_CHANNEL_PRIO_UNKNOWABLE; -- cgit v1.2.3