diff options
Diffstat (limited to 'usr.sbin/nsd/xfrd-tcp.c')
-rw-r--r-- | usr.sbin/nsd/xfrd-tcp.c | 564 |
1 files changed, 479 insertions, 85 deletions
diff --git a/usr.sbin/nsd/xfrd-tcp.c b/usr.sbin/nsd/xfrd-tcp.c index b0fea134aa1..a61ac95fdfa 100644 --- a/usr.sbin/nsd/xfrd-tcp.c +++ b/usr.sbin/nsd/xfrd-tcp.c @@ -1,7 +1,7 @@ /* * xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn. * - * Copyright (c) 2001-2011, NLnet Labs. All rights reserved. + * Copyright (c) 2001-2006, NLnet Labs. All rights reserved. * * See LICENSE for the license. * @@ -20,8 +20,29 @@ #include "options.h" #include "namedb.h" #include "xfrd.h" +#include "xfrd-disk.h" #include "util.h" +/* sort tcppipe, first on IP address, for an IPadresss, sort on num_unused */ +static int +xfrd_pipe_cmp(const void* a, const void* b) +{ + const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a; + const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b; + int r; + if(y->ip_len != x->ip_len) + return (int)y->ip_len - (int)x->ip_len; + r = memcmp(&x->ip, &y->ip, x->ip_len); + if(r != 0) + return r; + /* sort that num_unused is sorted ascending, + * thus, if(x=10, y=1) then result 'bigger', 10-1>0*/ + if(x->num_unused != y->num_unused) + return x->num_unused - y->num_unused; + /* different pipelines are different still, even with same numunused*/ + return (int)(a - b); +} + xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region) { int i; @@ -31,17 +52,33 @@ xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region) tcp_set->tcp_waiting_first = 0; tcp_set->tcp_waiting_last = 0; for(i=0; i<XFRD_MAX_TCP; i++) - tcp_set->tcp_state[i] = xfrd_tcp_create(region); + tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region); + tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp); return tcp_set; } +struct xfrd_tcp_pipeline* +xfrd_tcp_pipeline_create(region_type* region) +{ + int i; + struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*) + region_alloc_zero(region, sizeof(*tp)); + tp->num_unused = ID_PIPE_NUM; + assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM); + for(i=0; i<ID_PIPE_NUM; i++) + tp->unused[i] = (uint16_t)i; + tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ); + tp->tcp_w = xfrd_tcp_create(region, 512); + return tp; +} + void xfrd_setup_packet(buffer_type* packet, - uint16_t type, uint16_t klass, const dname_type* dname) + uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid) { /* Set up the header */ buffer_clear(packet); - ID_SET(packet, qid_generate()); + ID_SET(packet, qid); FLAGS_SET(packet, 0); OPCODE_SET(packet, OPCODE_QUERY); QDCOUNT_SET(packet, 1); @@ -153,20 +190,213 @@ xfrd_write_soa_buffer(struct buffer* packet, } xfrd_tcp_t* -xfrd_tcp_create(region_type* region) +xfrd_tcp_create(region_type* region, size_t bufsize) { xfrd_tcp_t* tcp_state = (xfrd_tcp_t*)region_alloc( region, sizeof(xfrd_tcp_t)); memset(tcp_state, 0, sizeof(xfrd_tcp_t)); - tcp_state->packet = buffer_create(region, QIOBUFSZ); + tcp_state->packet = buffer_create(region, bufsize); tcp_state->fd = -1; return tcp_state; } +static struct xfrd_tcp_pipeline* +pipeline_find(xfrd_tcp_set_t* set, xfrd_zone_t* zone) +{ + rbnode_t* sme = NULL; + struct xfrd_tcp_pipeline* r; + /* smaller buf than a full pipeline with 64kb ID array, only need + * the front part with the key info, this front part contains the + * members that the compare function uses. */ + const size_t keysize = sizeof(struct xfrd_tcp_pipeline) - + ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t)); + /* void* type for alignment of the struct, + * divide the keysize by ptr-size and then add one to round up */ + void* buf[ (keysize / sizeof(void*)) + 1 ]; + struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf; + key->node.key = key; + key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip); + key->num_unused = ID_PIPE_NUM; + /* lookup existing tcp transfer to the master with highest unused */ + if(rbtree_find_less_equal(set->pipetree, key, &sme)) { + /* exact match, strange, fully unused tcp cannot be open */ + assert(0); + } + if(!sme) + return NULL; + r = (struct xfrd_tcp_pipeline*)sme->key; + /* <= key pointed at, is the master correct ? */ + if(r->ip_len != key->ip_len) + return NULL; + if(memcmp(&r->ip, &key->ip, key->ip_len) != 0) + return NULL; + /* correct master, is there a slot free for this transfer? */ + if(r->num_unused == 0) + return NULL; + return r; +} + +/* remove zone from tcp waiting list */ +static void +tcp_zone_waiting_list_popfirst(xfrd_tcp_set_t* set, xfrd_zone_t* zone) +{ + assert(zone->tcp_waiting); + set->tcp_waiting_first = zone->tcp_waiting_next; + if(zone->tcp_waiting_next) + zone->tcp_waiting_next->tcp_waiting_prev = NULL; + else set->tcp_waiting_last = 0; + zone->tcp_waiting_next = 0; + zone->tcp_waiting = 0; +} + +/* remove zone from tcp pipe write-wait list */ +static void +tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) +{ + if(zone->in_tcp_send) { + if(zone->tcp_send_prev) + zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next; + else tp->tcp_send_first=zone->tcp_send_next; + if(zone->tcp_send_next) + zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev; + else tp->tcp_send_last=zone->tcp_send_prev; + zone->in_tcp_send = 0; + } +} + +/* remove first from write-wait list */ +static void +tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) +{ + tp->tcp_send_first = zone->tcp_send_next; + if(tp->tcp_send_first) + tp->tcp_send_first->tcp_send_prev = NULL; + else tp->tcp_send_last = NULL; + zone->in_tcp_send = 0; +} + +/* remove zone from tcp pipe ID map */ +static void +tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) +{ + assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0); + assert(tp->id[zone->query_id] == zone); + tp->id[zone->query_id] = NULL; + tp->unused[tp->num_unused] = zone->query_id; + /* must remove and re-add for sort order in tree */ + (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); + tp->num_unused++; + (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node); +} + +/* stop the tcp pipe (and all its zones need to retry) */ +static void +xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp) +{ + int i, conn = -1; + assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */ + assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */ + /* need to retry for all the zones connected to it */ + /* these could use different lists and go to a different nextmaster*/ + for(i=0; i<ID_PIPE_NUM; i++) { + if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) { + xfrd_zone_t* zone = tp->id[i]; + conn = zone->tcp_conn; + zone->tcp_conn = -1; + zone->tcp_waiting = 0; + tcp_pipe_sendlist_remove(tp, zone); + tcp_pipe_id_remove(tp, zone); + xfrd_set_refresh_now(zone); + } + } + assert(conn != -1); + /* now release the entire tcp pipe */ + xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn); +} + +static void +tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp) +{ + int fd = tp->handler.ev_fd; + struct timeval tv; + tv.tv_sec = xfrd->tcp_set->tcp_timeout; + tv.tv_usec = 0; + if(tp->handler_added) + event_del(&tp->handler); + event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ| + (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp); + if(event_base_set(xfrd->event_base, &tp->handler) != 0) + log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); + if(event_add(&tp->handler, &tv) != 0) + log_msg(LOG_ERR, "xfrd tcp: event_add failed"); + tp->handler_added = 1; +} + +/* handle event from fd of tcp pipe */ +void +xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg) +{ + struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg; + if((event & EV_WRITE)) { + tcp_pipe_reset_timeout(tp); + if(tp->tcp_send_first) { + DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s", + tp->tcp_send_first->apex_str)); + xfrd_tcp_write(tp, tp->tcp_send_first); + } + } + if((event & EV_READ) && tp->handler_added) { + DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read")); + tcp_pipe_reset_timeout(tp); + xfrd_tcp_read(tp); + } + if((event & EV_TIMEOUT) && tp->handler_added) { + /* tcp connection timed out */ + DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout")); + xfrd_tcp_pipe_stop(tp); + } +} + +/* add a zone to the pipeline, it starts to want to write its query */ +static void +pipeline_setup_new_zone(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp, + xfrd_zone_t* zone) +{ + /* assign the ID */ + int idx; + assert(tp->num_unused > 0); + /* we pick a random ID, even though it is TCP anyway */ + idx = random_generate(tp->num_unused); + zone->query_id = tp->unused[idx]; + tp->unused[idx] = tp->unused[tp->num_unused-1]; + tp->id[zone->query_id] = zone; + /* decrement unused counter, and fixup tree */ + (void)rbtree_delete(set->pipetree, &tp->node); + tp->num_unused--; + (void)rbtree_insert(set->pipetree, &tp->node); + + /* add to sendlist, at end */ + zone->tcp_send_next = NULL; + zone->tcp_send_prev = tp->tcp_send_last; + zone->in_tcp_send = 1; + if(tp->tcp_send_last) + tp->tcp_send_last->tcp_send_next = zone; + else tp->tcp_send_first = zone; + tp->tcp_send_last = zone; + + /* is it first in line? */ + if(tp->tcp_send_first == zone) { + xfrd_tcp_setup_write_packet(tp, zone); + /* add write to event handler */ + tcp_pipe_reset_timeout(tp); + } +} + void xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone) { + struct xfrd_tcp_pipeline* tp; assert(zone->tcp_conn == -1); assert(zone->tcp_waiting == 0); @@ -176,7 +406,7 @@ xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone) set->tcp_count ++; /* find a free tcp_buffer */ for(i=0; i<XFRD_MAX_TCP; i++) { - if(set->tcp_state[i]->fd == -1) { + if(set->tcp_state[i]->tcp_r->fd == -1) { zone->tcp_conn = i; break; } @@ -186,22 +416,57 @@ xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone) return; } + tp = set->tcp_state[zone->tcp_conn]; zone->tcp_waiting = 0; /* stop udp use (if any) */ - if(zone->zone_handler.fd != -1) + if(zone->zone_handler.ev_fd != -1) xfrd_udp_release(zone); - if(!xfrd_tcp_open(set, zone)) + if(!xfrd_tcp_open(set, tp, zone)) { + zone->tcp_conn = -1; + set->tcp_count --; + xfrd_set_refresh_now(zone); return; + } + /* ip and ip_len set by tcp_open */ + tp->node.key = tp; + tp->num_unused = ID_PIPE_NUM; + tp->num_skip = 0; + tp->tcp_send_first = NULL; + tp->tcp_send_last = NULL; + memset(tp->id, 0, sizeof(tp->id)); + for(i=0; i<ID_PIPE_NUM; i++) { + tp->unused[i] = i; + } - xfrd_tcp_xfr(set, zone); + /* insert into tree */ + (void)rbtree_insert(set->pipetree, &tp->node); + xfrd_deactivate_zone(zone); + xfrd_unset_timer(zone); + pipeline_setup_new_zone(set, tp, zone); + return; + } + /* check for a pipeline to the same master with unused ID */ + if((tp = pipeline_find(set, zone))!= NULL) { + int i; + if(zone->zone_handler.ev_fd != -1) + xfrd_udp_release(zone); + for(i=0; i<XFRD_MAX_TCP; i++) { + if(set->tcp_state[i] == tp) + zone->tcp_conn = i; + } + xfrd_deactivate_zone(zone); + xfrd_unset_timer(zone); + pipeline_setup_new_zone(set, tp, zone); return; } + /* wait, at end of line */ DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp " "connections (%d) reached.", XFRD_MAX_TCP)); zone->tcp_waiting_next = 0; + zone->tcp_waiting_prev = set->tcp_waiting_last; zone->tcp_waiting = 1; if(!set->tcp_waiting_last) { set->tcp_waiting_first = zone; @@ -210,85 +475,92 @@ xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone) set->tcp_waiting_last->tcp_waiting_next = zone; set->tcp_waiting_last = zone; } + xfrd_deactivate_zone(zone); xfrd_unset_timer(zone); } int -xfrd_tcp_open(xfrd_tcp_set_t* set, xfrd_zone_t* zone) +xfrd_tcp_open(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp, + xfrd_zone_t* zone) { int fd, family, conn; - -#ifdef INET6 - struct sockaddr_storage to; -#else - struct sockaddr_in to; -#endif /* INET6 */ - socklen_t to_len; - + struct timeval tv; assert(zone->tcp_conn != -1); DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s", zone->apex_str, zone->master->ip_address_spec)); - set->tcp_state[zone->tcp_conn]->is_reading = 0; - set->tcp_state[zone->tcp_conn]->total_bytes = 0; - set->tcp_state[zone->tcp_conn]->msglen = 0; + tp->tcp_r->is_reading = 1; + tp->tcp_r->total_bytes = 0; + tp->tcp_r->msglen = 0; + buffer_clear(tp->tcp_r->packet); + tp->tcp_w->is_reading = 0; + tp->tcp_w->total_bytes = 0; + tp->tcp_w->msglen = 0; + tp->connection_established = 0; if(zone->master->is_ipv6) { #ifdef INET6 family = PF_INET6; #else xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); return 0; #endif } else { family = PF_INET; } fd = socket(family, SOCK_STREAM, IPPROTO_TCP); - set->tcp_state[zone->tcp_conn]->fd = fd; if(fd == -1) { log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s", zone->master->ip_address_spec, strerror(errno)); xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); return 0; } if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) { log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno)); + close(fd); xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); return 0; } - to_len = xfrd_acl_sockaddr_to(zone->master, &to); + tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip); /* bind it */ - if (!xfrd_bind_local_interface(fd, - zone->zone_options->outgoing_interface, zone->master, 1)) { - + if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern-> + outgoing_interface, zone->master, 1)) { + close(fd); xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); return 0; } - conn = connect(fd, (struct sockaddr*)&to, to_len); + conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len); if (conn == -1 && errno != EINPROGRESS) { log_msg(LOG_ERR, "xfrd: connect %s failed: %s", zone->master->ip_address_spec, strerror(errno)); + close(fd); xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); return 0; } - - zone->zone_handler.fd = fd; - zone->zone_handler.event_types = NETIO_EVENT_TIMEOUT|NETIO_EVENT_WRITE; - xfrd_set_timer(zone, xfrd_time() + set->tcp_timeout); + tp->tcp_r->fd = fd; + tp->tcp_w->fd = fd; + + /* set the tcp pipe event */ + if(tp->handler_added) + event_del(&tp->handler); + event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE, + xfrd_handle_tcp_pipe, tp); + if(event_base_set(xfrd->event_base, &tp->handler) != 0) + log_msg(LOG_ERR, "xfrd tcp: event_base_set failed"); + tv.tv_sec = set->tcp_timeout; + tv.tv_usec = 0; + if(event_add(&tp->handler, &tv) != 0) + log_msg(LOG_ERR, "xfrd tcp: event_add failed"); + tp->handler_added = 1; return 1; } void -xfrd_tcp_xfr(xfrd_tcp_set_t* set, xfrd_zone_t* zone) +xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) { - xfrd_tcp_t* tcp = set->tcp_state[zone->tcp_conn]; + xfrd_tcp_t* tcp = tp->tcp_w; assert(zone->tcp_conn != -1); assert(zone->tcp_waiting == 0); /* start AXFR or IXFR for the zone */ @@ -298,17 +570,21 @@ xfrd_tcp_xfr(xfrd_tcp_set_t* set, xfrd_zone_t* zone) "(AXFR) for %s to %s", zone->apex_str, zone->master->ip_address_spec)); - xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex); + xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex, + zone->query_id); } else { DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone " "transfer (IXFR) for %s to %s", zone->apex_str, zone->master->ip_address_spec)); - xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex); + xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex, + zone->query_id); NSCOUNT_SET(tcp->packet, 1); xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk); } - zone->query_id = ID(tcp->packet); + /* old transfer needs to be removed still? */ + if(zone->msg_seq_nr) + xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber); zone->msg_seq_nr = 0; zone->msg_rr_count = 0; if(zone->master->key_options && zone->master->key_options->tsig_key) { @@ -317,7 +593,7 @@ xfrd_tcp_xfr(xfrd_tcp_set_t* set, xfrd_zone_t* zone) buffer_flip(tcp->packet); DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id)); tcp->msglen = buffer_limit(tcp->packet); - /* wait for select to complete connect before write */ + tcp->total_bytes = 0; } static void @@ -382,12 +658,14 @@ int conn_write(xfrd_tcp_t* tcp) } void -xfrd_tcp_write(xfrd_tcp_set_t* set, xfrd_zone_t* zone) +xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone) { int ret; - xfrd_tcp_t* tcp = set->tcp_state[zone->tcp_conn]; + xfrd_tcp_t* tcp = tp->tcp_w; assert(zone->tcp_conn != -1); - if(tcp->total_bytes == 0) { + assert(zone == tp->tcp_send_first); + /* see if for non-established connection, there is a connect error */ + if(!tp->connection_established) { /* check for pending error from nonblocking connect */ /* from Stevens, unix network programming, vol1, 3rd ed, p450 */ int error = 0; @@ -398,28 +676,51 @@ xfrd_tcp_write(xfrd_tcp_set_t* set, xfrd_zone_t* zone) if(error == EINPROGRESS || error == EWOULDBLOCK) return; /* try again later */ if(error != 0) { - log_msg(LOG_ERR, "Could not tcp connect to %s: %s", - zone->master->ip_address_spec, strerror(error)); - xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); + log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s", + zone->apex_str, zone->master->ip_address_spec, + strerror(error)); + xfrd_tcp_pipe_stop(tp); return; } } ret = conn_write(tcp); if(ret == -1) { log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); - xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); + xfrd_tcp_pipe_stop(tp); return; } + if(tcp->total_bytes != 0 && !tp->connection_established) + tp->connection_established = 1; if(ret == 0) { return; /* write again later */ } - /* done writing, get ready for reading */ - tcp->is_reading = 1; - tcp_conn_ready_for_reading(tcp); - zone->zone_handler.event_types = NETIO_EVENT_READ|NETIO_EVENT_TIMEOUT; - xfrd_tcp_read(set, zone); + /* done writing this message */ + + /* remove first zone from sendlist */ + tcp_pipe_sendlist_popfirst(tp, zone); + + /* see if other zone wants to write; init; let it write (now) */ + /* and use a loop, because 64k stack calls is a too much */ + while(tp->tcp_send_first) { + /* setup to write for this zone */ + xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first); + /* attempt to write for this zone (if success, continue loop)*/ + ret = conn_write(tcp); + if(ret == -1) { + log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno)); + xfrd_tcp_pipe_stop(tp); + return; + } + if(ret == 0) + return; /* write again later */ + tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first); + } + + /* if sendlist empty, remove WRITE from event */ + + /* listen to READ, and not WRITE events */ + assert(tp->tcp_send_first == NULL); + tcp_pipe_reset_timeout(tp); } int @@ -496,42 +797,71 @@ conn_read(xfrd_tcp_t* tcp) } void -xfrd_tcp_read(xfrd_tcp_set_t* set, xfrd_zone_t* zone) +xfrd_tcp_read(struct xfrd_tcp_pipeline* tp) { - xfrd_tcp_t* tcp = set->tcp_state[zone->tcp_conn]; + xfrd_zone_t* zone; + xfrd_tcp_t* tcp = tp->tcp_r; int ret; + enum xfrd_packet_result pkt_result; - assert(zone->tcp_conn != -1); ret = conn_read(tcp); if(ret == -1) { - xfrd_set_refresh_now(zone); - xfrd_tcp_release(set, zone); + xfrd_tcp_pipe_stop(tp); return; } if(ret == 0) return; - /* completed msg */ buffer_flip(tcp->packet); - switch(xfrd_handle_received_xfr_packet(zone, tcp->packet)) { + /* see which ID number it is, if skip, handle skip, NULL: warn */ + if(tcp->msglen < QHEADERSZ) { + /* too short for DNS header, skip it */ + DEBUG(DEBUG_XFRD,1, (LOG_INFO, + "xfrd: tcp skip response that is too short")); + tcp_conn_ready_for_reading(tcp); + return; + } + zone = tp->id[ID(tcp->packet)]; + if(!zone || zone == TCP_NULL_SKIP) { + /* no zone for this id? skip it */ + DEBUG(DEBUG_XFRD,1, (LOG_INFO, + "xfrd: tcp skip response with %s ID", + zone?"set-to-skip":"unknown")); + tcp_conn_ready_for_reading(tcp); + return; + } + assert(zone->tcp_conn != -1); + + /* handle message for zone */ + pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet); + /* setup for reading the next packet on this connection */ + tcp_conn_ready_for_reading(tcp); + switch(pkt_result) { case xfrd_packet_more: - tcp_conn_ready_for_reading(tcp); + /* wait for next packet */ break; - case xfrd_packet_transfer: case xfrd_packet_newlease: - xfrd_tcp_release(set, zone); + /* set to skip if more packets with this ID */ + tp->id[zone->query_id] = TCP_NULL_SKIP; + tp->num_skip++; + /* fall through to remove zone from tp */ + case xfrd_packet_transfer: + xfrd_tcp_release(xfrd->tcp_set, zone); assert(zone->round_num == -1); break; case xfrd_packet_notimpl: zone->master->ixfr_disabled = time(NULL); - xfrd_tcp_release(set, zone); + xfrd_tcp_release(xfrd->tcp_set, zone); /* query next server */ xfrd_make_request(zone); break; case xfrd_packet_bad: case xfrd_packet_tcp: default: - xfrd_tcp_release(set, zone); + /* set to skip if more packets with this ID */ + tp->id[zone->query_id] = TCP_NULL_SKIP; + tp->num_skip++; + xfrd_tcp_release(xfrd->tcp_set, zone); /* query next server */ xfrd_make_request(zone); break; @@ -542,40 +872,103 @@ void xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone) { int conn = zone->tcp_conn; + struct xfrd_tcp_pipeline* tp = set->tcp_state[conn]; DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s", zone->apex_str, zone->master->ip_address_spec)); assert(zone->tcp_conn != -1); assert(zone->tcp_waiting == 0); zone->tcp_conn = -1; zone->tcp_waiting = 0; - zone->zone_handler.fd = -1; - zone->zone_handler.event_types = NETIO_EVENT_READ|NETIO_EVENT_TIMEOUT; - if(set->tcp_state[conn]->fd != -1) - close(set->tcp_state[conn]->fd); + /* remove from tcp_send list */ + tcp_pipe_sendlist_remove(tp, zone); + /* remove it from the ID list */ + if(tp->id[zone->query_id] != TCP_NULL_SKIP) + tcp_pipe_id_remove(tp, zone); + DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused", + tp->num_unused)); + /* if pipe was full, but no more, then see if waiting element is + * for the same master, and can fill the unused ID */ + if(tp->num_unused == 1 && set->tcp_waiting_first) { + struct sockaddr_storage to; + socklen_t to_len = xfrd_acl_sockaddr_to( + set->tcp_waiting_first->master, &to); + if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) { + /* use this connnection for the waiting zone */ + zone = set->tcp_waiting_first; + assert(zone->tcp_conn == -1); + zone->tcp_conn = conn; + tcp_zone_waiting_list_popfirst(set, zone); + if(zone->zone_handler.ev_fd != -1) + xfrd_udp_release(zone); + xfrd_unset_timer(zone); + pipeline_setup_new_zone(set, tp, zone); + return; + } + /* waiting zone did not go to same server */ + } - set->tcp_state[conn]->fd = -1; + /* if all unused, or only skipped leftover, close the pipeline */ + if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused) + xfrd_tcp_pipe_release(set, tp, conn); +} +void +xfrd_tcp_pipe_release(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp, + int conn) +{ + DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released")); + /* one handler per tcp pipe */ + if(tp->handler_added) + event_del(&tp->handler); + tp->handler_added = 0; + + /* fd in tcp_r and tcp_w is the same, close once */ + if(tp->tcp_r->fd != -1) + close(tp->tcp_r->fd); + tp->tcp_r->fd = -1; + tp->tcp_w->fd = -1; + + /* remove from pipetree */ + (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node); + + /* a waiting zone can use the free tcp slot (to another server) */ if(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) { - /* pop first waiting process */ - zone = set->tcp_waiting_first; - if(set->tcp_waiting_last == zone) - set->tcp_waiting_last = 0; + int i; - set->tcp_waiting_first = zone->tcp_waiting_next; - zone->tcp_waiting_next = 0; + /* pop first waiting process */ + xfrd_zone_t* zone = set->tcp_waiting_first; /* start it */ assert(zone->tcp_conn == -1); zone->tcp_conn = conn; - zone->tcp_waiting = 0; + /* stop udp (if any) */ - if(zone->zone_handler.fd != -1) + if(zone->zone_handler.ev_fd != -1) xfrd_udp_release(zone); - - if(!xfrd_tcp_open(set, zone)) + if(!xfrd_tcp_open(set, tp, zone)) { + zone->tcp_conn = -1; + set->tcp_count --; + xfrd_set_refresh_now(zone); return; + } + /* re-init this tcppipe */ + /* ip and ip_len set by tcp_open */ + tp->node.key = tp; + tp->num_unused = ID_PIPE_NUM; + tp->num_skip = 0; + tp->tcp_send_first = NULL; + tp->tcp_send_last = NULL; + memset(tp->id, 0, sizeof(tp->id)); + for(i=0; i<ID_PIPE_NUM; i++) { + tp->unused[i] = i; + } - xfrd_tcp_xfr(set, zone); + /* insert into tree */ + (void)rbtree_insert(set->pipetree, &tp->node); + /* succeeded? remove zone from lists and setup write */ + xfrd_unset_timer(zone); + tcp_zone_waiting_list_popfirst(set, zone); + pipeline_setup_new_zone(set, tp, zone); } else { assert(!set->tcp_waiting_first); @@ -583,3 +976,4 @@ xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone) assert(set->tcp_count >= 0); } } + |