summaryrefslogtreecommitdiff
path: root/usr.sbin/nsd/xfrd-tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr.sbin/nsd/xfrd-tcp.c')
-rw-r--r--usr.sbin/nsd/xfrd-tcp.c564
1 files changed, 479 insertions, 85 deletions
diff --git a/usr.sbin/nsd/xfrd-tcp.c b/usr.sbin/nsd/xfrd-tcp.c
index b0fea134aa1..a61ac95fdfa 100644
--- a/usr.sbin/nsd/xfrd-tcp.c
+++ b/usr.sbin/nsd/xfrd-tcp.c
@@ -1,7 +1,7 @@
/*
* xfrd-tcp.c - XFR (transfer) Daemon TCP system source file. Manages tcp conn.
*
- * Copyright (c) 2001-2011, NLnet Labs. All rights reserved.
+ * Copyright (c) 2001-2006, NLnet Labs. All rights reserved.
*
* See LICENSE for the license.
*
@@ -20,8 +20,29 @@
#include "options.h"
#include "namedb.h"
#include "xfrd.h"
+#include "xfrd-disk.h"
#include "util.h"
+/* sort tcppipe, first on IP address, for an IPadresss, sort on num_unused */
+static int
+xfrd_pipe_cmp(const void* a, const void* b)
+{
+ const struct xfrd_tcp_pipeline* x = (struct xfrd_tcp_pipeline*)a;
+ const struct xfrd_tcp_pipeline* y = (struct xfrd_tcp_pipeline*)b;
+ int r;
+ if(y->ip_len != x->ip_len)
+ return (int)y->ip_len - (int)x->ip_len;
+ r = memcmp(&x->ip, &y->ip, x->ip_len);
+ if(r != 0)
+ return r;
+ /* sort that num_unused is sorted ascending,
+ * thus, if(x=10, y=1) then result 'bigger', 10-1>0*/
+ if(x->num_unused != y->num_unused)
+ return x->num_unused - y->num_unused;
+ /* different pipelines are different still, even with same numunused*/
+ return (int)(a - b);
+}
+
xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region)
{
int i;
@@ -31,17 +52,33 @@ xfrd_tcp_set_t* xfrd_tcp_set_create(struct region* region)
tcp_set->tcp_waiting_first = 0;
tcp_set->tcp_waiting_last = 0;
for(i=0; i<XFRD_MAX_TCP; i++)
- tcp_set->tcp_state[i] = xfrd_tcp_create(region);
+ tcp_set->tcp_state[i] = xfrd_tcp_pipeline_create(region);
+ tcp_set->pipetree = rbtree_create(region, &xfrd_pipe_cmp);
return tcp_set;
}
+struct xfrd_tcp_pipeline*
+xfrd_tcp_pipeline_create(region_type* region)
+{
+ int i;
+ struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)
+ region_alloc_zero(region, sizeof(*tp));
+ tp->num_unused = ID_PIPE_NUM;
+ assert(sizeof(tp->unused)/sizeof(tp->unused[0]) == ID_PIPE_NUM);
+ for(i=0; i<ID_PIPE_NUM; i++)
+ tp->unused[i] = (uint16_t)i;
+ tp->tcp_r = xfrd_tcp_create(region, QIOBUFSZ);
+ tp->tcp_w = xfrd_tcp_create(region, 512);
+ return tp;
+}
+
void
xfrd_setup_packet(buffer_type* packet,
- uint16_t type, uint16_t klass, const dname_type* dname)
+ uint16_t type, uint16_t klass, const dname_type* dname, uint16_t qid)
{
/* Set up the header */
buffer_clear(packet);
- ID_SET(packet, qid_generate());
+ ID_SET(packet, qid);
FLAGS_SET(packet, 0);
OPCODE_SET(packet, OPCODE_QUERY);
QDCOUNT_SET(packet, 1);
@@ -153,20 +190,213 @@ xfrd_write_soa_buffer(struct buffer* packet,
}
xfrd_tcp_t*
-xfrd_tcp_create(region_type* region)
+xfrd_tcp_create(region_type* region, size_t bufsize)
{
xfrd_tcp_t* tcp_state = (xfrd_tcp_t*)region_alloc(
region, sizeof(xfrd_tcp_t));
memset(tcp_state, 0, sizeof(xfrd_tcp_t));
- tcp_state->packet = buffer_create(region, QIOBUFSZ);
+ tcp_state->packet = buffer_create(region, bufsize);
tcp_state->fd = -1;
return tcp_state;
}
+static struct xfrd_tcp_pipeline*
+pipeline_find(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
+{
+ rbnode_t* sme = NULL;
+ struct xfrd_tcp_pipeline* r;
+ /* smaller buf than a full pipeline with 64kb ID array, only need
+ * the front part with the key info, this front part contains the
+ * members that the compare function uses. */
+ const size_t keysize = sizeof(struct xfrd_tcp_pipeline) -
+ ID_PIPE_NUM*(sizeof(struct xfrd_zone*) + sizeof(uint16_t));
+ /* void* type for alignment of the struct,
+ * divide the keysize by ptr-size and then add one to round up */
+ void* buf[ (keysize / sizeof(void*)) + 1 ];
+ struct xfrd_tcp_pipeline* key = (struct xfrd_tcp_pipeline*)buf;
+ key->node.key = key;
+ key->ip_len = xfrd_acl_sockaddr_to(zone->master, &key->ip);
+ key->num_unused = ID_PIPE_NUM;
+ /* lookup existing tcp transfer to the master with highest unused */
+ if(rbtree_find_less_equal(set->pipetree, key, &sme)) {
+ /* exact match, strange, fully unused tcp cannot be open */
+ assert(0);
+ }
+ if(!sme)
+ return NULL;
+ r = (struct xfrd_tcp_pipeline*)sme->key;
+ /* <= key pointed at, is the master correct ? */
+ if(r->ip_len != key->ip_len)
+ return NULL;
+ if(memcmp(&r->ip, &key->ip, key->ip_len) != 0)
+ return NULL;
+ /* correct master, is there a slot free for this transfer? */
+ if(r->num_unused == 0)
+ return NULL;
+ return r;
+}
+
+/* remove zone from tcp waiting list */
+static void
+tcp_zone_waiting_list_popfirst(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
+{
+ assert(zone->tcp_waiting);
+ set->tcp_waiting_first = zone->tcp_waiting_next;
+ if(zone->tcp_waiting_next)
+ zone->tcp_waiting_next->tcp_waiting_prev = NULL;
+ else set->tcp_waiting_last = 0;
+ zone->tcp_waiting_next = 0;
+ zone->tcp_waiting = 0;
+}
+
+/* remove zone from tcp pipe write-wait list */
+static void
+tcp_pipe_sendlist_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
+{
+ if(zone->in_tcp_send) {
+ if(zone->tcp_send_prev)
+ zone->tcp_send_prev->tcp_send_next=zone->tcp_send_next;
+ else tp->tcp_send_first=zone->tcp_send_next;
+ if(zone->tcp_send_next)
+ zone->tcp_send_next->tcp_send_prev=zone->tcp_send_prev;
+ else tp->tcp_send_last=zone->tcp_send_prev;
+ zone->in_tcp_send = 0;
+ }
+}
+
+/* remove first from write-wait list */
+static void
+tcp_pipe_sendlist_popfirst(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
+{
+ tp->tcp_send_first = zone->tcp_send_next;
+ if(tp->tcp_send_first)
+ tp->tcp_send_first->tcp_send_prev = NULL;
+ else tp->tcp_send_last = NULL;
+ zone->in_tcp_send = 0;
+}
+
+/* remove zone from tcp pipe ID map */
+static void
+tcp_pipe_id_remove(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
+{
+ assert(tp->num_unused < ID_PIPE_NUM && tp->num_unused >= 0);
+ assert(tp->id[zone->query_id] == zone);
+ tp->id[zone->query_id] = NULL;
+ tp->unused[tp->num_unused] = zone->query_id;
+ /* must remove and re-add for sort order in tree */
+ (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
+ tp->num_unused++;
+ (void)rbtree_insert(xfrd->tcp_set->pipetree, &tp->node);
+}
+
+/* stop the tcp pipe (and all its zones need to retry) */
+static void
+xfrd_tcp_pipe_stop(struct xfrd_tcp_pipeline* tp)
+{
+ int i, conn = -1;
+ assert(tp->num_unused < ID_PIPE_NUM); /* at least one 'in-use' */
+ assert(ID_PIPE_NUM - tp->num_unused > tp->num_skip); /* at least one 'nonskip' */
+ /* need to retry for all the zones connected to it */
+ /* these could use different lists and go to a different nextmaster*/
+ for(i=0; i<ID_PIPE_NUM; i++) {
+ if(tp->id[i] && tp->id[i] != TCP_NULL_SKIP) {
+ xfrd_zone_t* zone = tp->id[i];
+ conn = zone->tcp_conn;
+ zone->tcp_conn = -1;
+ zone->tcp_waiting = 0;
+ tcp_pipe_sendlist_remove(tp, zone);
+ tcp_pipe_id_remove(tp, zone);
+ xfrd_set_refresh_now(zone);
+ }
+ }
+ assert(conn != -1);
+ /* now release the entire tcp pipe */
+ xfrd_tcp_pipe_release(xfrd->tcp_set, tp, conn);
+}
+
+static void
+tcp_pipe_reset_timeout(struct xfrd_tcp_pipeline* tp)
+{
+ int fd = tp->handler.ev_fd;
+ struct timeval tv;
+ tv.tv_sec = xfrd->tcp_set->tcp_timeout;
+ tv.tv_usec = 0;
+ if(tp->handler_added)
+ event_del(&tp->handler);
+ event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|
+ (tp->tcp_send_first?EV_WRITE:0), xfrd_handle_tcp_pipe, tp);
+ if(event_base_set(xfrd->event_base, &tp->handler) != 0)
+ log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
+ if(event_add(&tp->handler, &tv) != 0)
+ log_msg(LOG_ERR, "xfrd tcp: event_add failed");
+ tp->handler_added = 1;
+}
+
+/* handle event from fd of tcp pipe */
+void
+xfrd_handle_tcp_pipe(int ATTR_UNUSED(fd), short event, void* arg)
+{
+ struct xfrd_tcp_pipeline* tp = (struct xfrd_tcp_pipeline*)arg;
+ if((event & EV_WRITE)) {
+ tcp_pipe_reset_timeout(tp);
+ if(tp->tcp_send_first) {
+ DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp write, zone %s",
+ tp->tcp_send_first->apex_str));
+ xfrd_tcp_write(tp, tp->tcp_send_first);
+ }
+ }
+ if((event & EV_READ) && tp->handler_added) {
+ DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp read"));
+ tcp_pipe_reset_timeout(tp);
+ xfrd_tcp_read(tp);
+ }
+ if((event & EV_TIMEOUT) && tp->handler_added) {
+ /* tcp connection timed out */
+ DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: event tcp timeout"));
+ xfrd_tcp_pipe_stop(tp);
+ }
+}
+
+/* add a zone to the pipeline, it starts to want to write its query */
+static void
+pipeline_setup_new_zone(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
+ xfrd_zone_t* zone)
+{
+ /* assign the ID */
+ int idx;
+ assert(tp->num_unused > 0);
+ /* we pick a random ID, even though it is TCP anyway */
+ idx = random_generate(tp->num_unused);
+ zone->query_id = tp->unused[idx];
+ tp->unused[idx] = tp->unused[tp->num_unused-1];
+ tp->id[zone->query_id] = zone;
+ /* decrement unused counter, and fixup tree */
+ (void)rbtree_delete(set->pipetree, &tp->node);
+ tp->num_unused--;
+ (void)rbtree_insert(set->pipetree, &tp->node);
+
+ /* add to sendlist, at end */
+ zone->tcp_send_next = NULL;
+ zone->tcp_send_prev = tp->tcp_send_last;
+ zone->in_tcp_send = 1;
+ if(tp->tcp_send_last)
+ tp->tcp_send_last->tcp_send_next = zone;
+ else tp->tcp_send_first = zone;
+ tp->tcp_send_last = zone;
+
+ /* is it first in line? */
+ if(tp->tcp_send_first == zone) {
+ xfrd_tcp_setup_write_packet(tp, zone);
+ /* add write to event handler */
+ tcp_pipe_reset_timeout(tp);
+ }
+}
+
void
xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
{
+ struct xfrd_tcp_pipeline* tp;
assert(zone->tcp_conn == -1);
assert(zone->tcp_waiting == 0);
@@ -176,7 +406,7 @@ xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
set->tcp_count ++;
/* find a free tcp_buffer */
for(i=0; i<XFRD_MAX_TCP; i++) {
- if(set->tcp_state[i]->fd == -1) {
+ if(set->tcp_state[i]->tcp_r->fd == -1) {
zone->tcp_conn = i;
break;
}
@@ -186,22 +416,57 @@ xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
return;
}
+ tp = set->tcp_state[zone->tcp_conn];
zone->tcp_waiting = 0;
/* stop udp use (if any) */
- if(zone->zone_handler.fd != -1)
+ if(zone->zone_handler.ev_fd != -1)
xfrd_udp_release(zone);
- if(!xfrd_tcp_open(set, zone))
+ if(!xfrd_tcp_open(set, tp, zone)) {
+ zone->tcp_conn = -1;
+ set->tcp_count --;
+ xfrd_set_refresh_now(zone);
return;
+ }
+ /* ip and ip_len set by tcp_open */
+ tp->node.key = tp;
+ tp->num_unused = ID_PIPE_NUM;
+ tp->num_skip = 0;
+ tp->tcp_send_first = NULL;
+ tp->tcp_send_last = NULL;
+ memset(tp->id, 0, sizeof(tp->id));
+ for(i=0; i<ID_PIPE_NUM; i++) {
+ tp->unused[i] = i;
+ }
- xfrd_tcp_xfr(set, zone);
+ /* insert into tree */
+ (void)rbtree_insert(set->pipetree, &tp->node);
+ xfrd_deactivate_zone(zone);
+ xfrd_unset_timer(zone);
+ pipeline_setup_new_zone(set, tp, zone);
+ return;
+ }
+ /* check for a pipeline to the same master with unused ID */
+ if((tp = pipeline_find(set, zone))!= NULL) {
+ int i;
+ if(zone->zone_handler.ev_fd != -1)
+ xfrd_udp_release(zone);
+ for(i=0; i<XFRD_MAX_TCP; i++) {
+ if(set->tcp_state[i] == tp)
+ zone->tcp_conn = i;
+ }
+ xfrd_deactivate_zone(zone);
+ xfrd_unset_timer(zone);
+ pipeline_setup_new_zone(set, tp, zone);
return;
}
+
/* wait, at end of line */
DEBUG(DEBUG_XFRD,2, (LOG_INFO, "xfrd: max number of tcp "
"connections (%d) reached.", XFRD_MAX_TCP));
zone->tcp_waiting_next = 0;
+ zone->tcp_waiting_prev = set->tcp_waiting_last;
zone->tcp_waiting = 1;
if(!set->tcp_waiting_last) {
set->tcp_waiting_first = zone;
@@ -210,85 +475,92 @@ xfrd_tcp_obtain(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
set->tcp_waiting_last->tcp_waiting_next = zone;
set->tcp_waiting_last = zone;
}
+ xfrd_deactivate_zone(zone);
xfrd_unset_timer(zone);
}
int
-xfrd_tcp_open(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
+xfrd_tcp_open(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
+ xfrd_zone_t* zone)
{
int fd, family, conn;
-
-#ifdef INET6
- struct sockaddr_storage to;
-#else
- struct sockaddr_in to;
-#endif /* INET6 */
- socklen_t to_len;
-
+ struct timeval tv;
assert(zone->tcp_conn != -1);
DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s open tcp conn to %s",
zone->apex_str, zone->master->ip_address_spec));
- set->tcp_state[zone->tcp_conn]->is_reading = 0;
- set->tcp_state[zone->tcp_conn]->total_bytes = 0;
- set->tcp_state[zone->tcp_conn]->msglen = 0;
+ tp->tcp_r->is_reading = 1;
+ tp->tcp_r->total_bytes = 0;
+ tp->tcp_r->msglen = 0;
+ buffer_clear(tp->tcp_r->packet);
+ tp->tcp_w->is_reading = 0;
+ tp->tcp_w->total_bytes = 0;
+ tp->tcp_w->msglen = 0;
+ tp->connection_established = 0;
if(zone->master->is_ipv6) {
#ifdef INET6
family = PF_INET6;
#else
xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
return 0;
#endif
} else {
family = PF_INET;
}
fd = socket(family, SOCK_STREAM, IPPROTO_TCP);
- set->tcp_state[zone->tcp_conn]->fd = fd;
if(fd == -1) {
log_msg(LOG_ERR, "xfrd: %s cannot create tcp socket: %s",
zone->master->ip_address_spec, strerror(errno));
xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
return 0;
}
if(fcntl(fd, F_SETFL, O_NONBLOCK) == -1) {
log_msg(LOG_ERR, "xfrd: fcntl failed: %s", strerror(errno));
+ close(fd);
xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
return 0;
}
- to_len = xfrd_acl_sockaddr_to(zone->master, &to);
+ tp->ip_len = xfrd_acl_sockaddr_to(zone->master, &tp->ip);
/* bind it */
- if (!xfrd_bind_local_interface(fd,
- zone->zone_options->outgoing_interface, zone->master, 1)) {
-
+ if (!xfrd_bind_local_interface(fd, zone->zone_options->pattern->
+ outgoing_interface, zone->master, 1)) {
+ close(fd);
xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
return 0;
}
- conn = connect(fd, (struct sockaddr*)&to, to_len);
+ conn = connect(fd, (struct sockaddr*)&tp->ip, tp->ip_len);
if (conn == -1 && errno != EINPROGRESS) {
log_msg(LOG_ERR, "xfrd: connect %s failed: %s",
zone->master->ip_address_spec, strerror(errno));
+ close(fd);
xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
return 0;
}
-
- zone->zone_handler.fd = fd;
- zone->zone_handler.event_types = NETIO_EVENT_TIMEOUT|NETIO_EVENT_WRITE;
- xfrd_set_timer(zone, xfrd_time() + set->tcp_timeout);
+ tp->tcp_r->fd = fd;
+ tp->tcp_w->fd = fd;
+
+ /* set the tcp pipe event */
+ if(tp->handler_added)
+ event_del(&tp->handler);
+ event_set(&tp->handler, fd, EV_PERSIST|EV_TIMEOUT|EV_READ|EV_WRITE,
+ xfrd_handle_tcp_pipe, tp);
+ if(event_base_set(xfrd->event_base, &tp->handler) != 0)
+ log_msg(LOG_ERR, "xfrd tcp: event_base_set failed");
+ tv.tv_sec = set->tcp_timeout;
+ tv.tv_usec = 0;
+ if(event_add(&tp->handler, &tv) != 0)
+ log_msg(LOG_ERR, "xfrd tcp: event_add failed");
+ tp->handler_added = 1;
return 1;
}
void
-xfrd_tcp_xfr(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
+xfrd_tcp_setup_write_packet(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
{
- xfrd_tcp_t* tcp = set->tcp_state[zone->tcp_conn];
+ xfrd_tcp_t* tcp = tp->tcp_w;
assert(zone->tcp_conn != -1);
assert(zone->tcp_waiting == 0);
/* start AXFR or IXFR for the zone */
@@ -298,17 +570,21 @@ xfrd_tcp_xfr(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
"(AXFR) for %s to %s",
zone->apex_str, zone->master->ip_address_spec));
- xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex);
+ xfrd_setup_packet(tcp->packet, TYPE_AXFR, CLASS_IN, zone->apex,
+ zone->query_id);
} else {
DEBUG(DEBUG_XFRD,1, (LOG_INFO, "request incremental zone "
"transfer (IXFR) for %s to %s",
zone->apex_str, zone->master->ip_address_spec));
- xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex);
+ xfrd_setup_packet(tcp->packet, TYPE_IXFR, CLASS_IN, zone->apex,
+ zone->query_id);
NSCOUNT_SET(tcp->packet, 1);
xfrd_write_soa_buffer(tcp->packet, zone->apex, &zone->soa_disk);
}
- zone->query_id = ID(tcp->packet);
+ /* old transfer needs to be removed still? */
+ if(zone->msg_seq_nr)
+ xfrd_unlink_xfrfile(xfrd->nsd, zone->xfrfilenumber);
zone->msg_seq_nr = 0;
zone->msg_rr_count = 0;
if(zone->master->key_options && zone->master->key_options->tsig_key) {
@@ -317,7 +593,7 @@ xfrd_tcp_xfr(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
buffer_flip(tcp->packet);
DEBUG(DEBUG_XFRD,1, (LOG_INFO, "sent tcp query with ID %d", zone->query_id));
tcp->msglen = buffer_limit(tcp->packet);
- /* wait for select to complete connect before write */
+ tcp->total_bytes = 0;
}
static void
@@ -382,12 +658,14 @@ int conn_write(xfrd_tcp_t* tcp)
}
void
-xfrd_tcp_write(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
+xfrd_tcp_write(struct xfrd_tcp_pipeline* tp, xfrd_zone_t* zone)
{
int ret;
- xfrd_tcp_t* tcp = set->tcp_state[zone->tcp_conn];
+ xfrd_tcp_t* tcp = tp->tcp_w;
assert(zone->tcp_conn != -1);
- if(tcp->total_bytes == 0) {
+ assert(zone == tp->tcp_send_first);
+ /* see if for non-established connection, there is a connect error */
+ if(!tp->connection_established) {
/* check for pending error from nonblocking connect */
/* from Stevens, unix network programming, vol1, 3rd ed, p450 */
int error = 0;
@@ -398,28 +676,51 @@ xfrd_tcp_write(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
if(error == EINPROGRESS || error == EWOULDBLOCK)
return; /* try again later */
if(error != 0) {
- log_msg(LOG_ERR, "Could not tcp connect to %s: %s",
- zone->master->ip_address_spec, strerror(error));
- xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
+ log_msg(LOG_ERR, "%s: Could not tcp connect to %s: %s",
+ zone->apex_str, zone->master->ip_address_spec,
+ strerror(error));
+ xfrd_tcp_pipe_stop(tp);
return;
}
}
ret = conn_write(tcp);
if(ret == -1) {
log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
- xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
+ xfrd_tcp_pipe_stop(tp);
return;
}
+ if(tcp->total_bytes != 0 && !tp->connection_established)
+ tp->connection_established = 1;
if(ret == 0) {
return; /* write again later */
}
- /* done writing, get ready for reading */
- tcp->is_reading = 1;
- tcp_conn_ready_for_reading(tcp);
- zone->zone_handler.event_types = NETIO_EVENT_READ|NETIO_EVENT_TIMEOUT;
- xfrd_tcp_read(set, zone);
+ /* done writing this message */
+
+ /* remove first zone from sendlist */
+ tcp_pipe_sendlist_popfirst(tp, zone);
+
+ /* see if other zone wants to write; init; let it write (now) */
+ /* and use a loop, because 64k stack calls is a too much */
+ while(tp->tcp_send_first) {
+ /* setup to write for this zone */
+ xfrd_tcp_setup_write_packet(tp, tp->tcp_send_first);
+ /* attempt to write for this zone (if success, continue loop)*/
+ ret = conn_write(tcp);
+ if(ret == -1) {
+ log_msg(LOG_ERR, "xfrd: failed writing tcp %s", strerror(errno));
+ xfrd_tcp_pipe_stop(tp);
+ return;
+ }
+ if(ret == 0)
+ return; /* write again later */
+ tcp_pipe_sendlist_popfirst(tp, tp->tcp_send_first);
+ }
+
+ /* if sendlist empty, remove WRITE from event */
+
+ /* listen to READ, and not WRITE events */
+ assert(tp->tcp_send_first == NULL);
+ tcp_pipe_reset_timeout(tp);
}
int
@@ -496,42 +797,71 @@ conn_read(xfrd_tcp_t* tcp)
}
void
-xfrd_tcp_read(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
+xfrd_tcp_read(struct xfrd_tcp_pipeline* tp)
{
- xfrd_tcp_t* tcp = set->tcp_state[zone->tcp_conn];
+ xfrd_zone_t* zone;
+ xfrd_tcp_t* tcp = tp->tcp_r;
int ret;
+ enum xfrd_packet_result pkt_result;
- assert(zone->tcp_conn != -1);
ret = conn_read(tcp);
if(ret == -1) {
- xfrd_set_refresh_now(zone);
- xfrd_tcp_release(set, zone);
+ xfrd_tcp_pipe_stop(tp);
return;
}
if(ret == 0)
return;
-
/* completed msg */
buffer_flip(tcp->packet);
- switch(xfrd_handle_received_xfr_packet(zone, tcp->packet)) {
+ /* see which ID number it is, if skip, handle skip, NULL: warn */
+ if(tcp->msglen < QHEADERSZ) {
+ /* too short for DNS header, skip it */
+ DEBUG(DEBUG_XFRD,1, (LOG_INFO,
+ "xfrd: tcp skip response that is too short"));
+ tcp_conn_ready_for_reading(tcp);
+ return;
+ }
+ zone = tp->id[ID(tcp->packet)];
+ if(!zone || zone == TCP_NULL_SKIP) {
+ /* no zone for this id? skip it */
+ DEBUG(DEBUG_XFRD,1, (LOG_INFO,
+ "xfrd: tcp skip response with %s ID",
+ zone?"set-to-skip":"unknown"));
+ tcp_conn_ready_for_reading(tcp);
+ return;
+ }
+ assert(zone->tcp_conn != -1);
+
+ /* handle message for zone */
+ pkt_result = xfrd_handle_received_xfr_packet(zone, tcp->packet);
+ /* setup for reading the next packet on this connection */
+ tcp_conn_ready_for_reading(tcp);
+ switch(pkt_result) {
case xfrd_packet_more:
- tcp_conn_ready_for_reading(tcp);
+ /* wait for next packet */
break;
- case xfrd_packet_transfer:
case xfrd_packet_newlease:
- xfrd_tcp_release(set, zone);
+ /* set to skip if more packets with this ID */
+ tp->id[zone->query_id] = TCP_NULL_SKIP;
+ tp->num_skip++;
+ /* fall through to remove zone from tp */
+ case xfrd_packet_transfer:
+ xfrd_tcp_release(xfrd->tcp_set, zone);
assert(zone->round_num == -1);
break;
case xfrd_packet_notimpl:
zone->master->ixfr_disabled = time(NULL);
- xfrd_tcp_release(set, zone);
+ xfrd_tcp_release(xfrd->tcp_set, zone);
/* query next server */
xfrd_make_request(zone);
break;
case xfrd_packet_bad:
case xfrd_packet_tcp:
default:
- xfrd_tcp_release(set, zone);
+ /* set to skip if more packets with this ID */
+ tp->id[zone->query_id] = TCP_NULL_SKIP;
+ tp->num_skip++;
+ xfrd_tcp_release(xfrd->tcp_set, zone);
/* query next server */
xfrd_make_request(zone);
break;
@@ -542,40 +872,103 @@ void
xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
{
int conn = zone->tcp_conn;
+ struct xfrd_tcp_pipeline* tp = set->tcp_state[conn];
DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: zone %s released tcp conn to %s",
zone->apex_str, zone->master->ip_address_spec));
assert(zone->tcp_conn != -1);
assert(zone->tcp_waiting == 0);
zone->tcp_conn = -1;
zone->tcp_waiting = 0;
- zone->zone_handler.fd = -1;
- zone->zone_handler.event_types = NETIO_EVENT_READ|NETIO_EVENT_TIMEOUT;
- if(set->tcp_state[conn]->fd != -1)
- close(set->tcp_state[conn]->fd);
+ /* remove from tcp_send list */
+ tcp_pipe_sendlist_remove(tp, zone);
+ /* remove it from the ID list */
+ if(tp->id[zone->query_id] != TCP_NULL_SKIP)
+ tcp_pipe_id_remove(tp, zone);
+ DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: released tcp pipe now %d unused",
+ tp->num_unused));
+ /* if pipe was full, but no more, then see if waiting element is
+ * for the same master, and can fill the unused ID */
+ if(tp->num_unused == 1 && set->tcp_waiting_first) {
+ struct sockaddr_storage to;
+ socklen_t to_len = xfrd_acl_sockaddr_to(
+ set->tcp_waiting_first->master, &to);
+ if(to_len == tp->ip_len && memcmp(&to, &tp->ip, to_len) == 0) {
+ /* use this connnection for the waiting zone */
+ zone = set->tcp_waiting_first;
+ assert(zone->tcp_conn == -1);
+ zone->tcp_conn = conn;
+ tcp_zone_waiting_list_popfirst(set, zone);
+ if(zone->zone_handler.ev_fd != -1)
+ xfrd_udp_release(zone);
+ xfrd_unset_timer(zone);
+ pipeline_setup_new_zone(set, tp, zone);
+ return;
+ }
+ /* waiting zone did not go to same server */
+ }
- set->tcp_state[conn]->fd = -1;
+ /* if all unused, or only skipped leftover, close the pipeline */
+ if(tp->num_unused >= ID_PIPE_NUM || tp->num_skip >= ID_PIPE_NUM - tp->num_unused)
+ xfrd_tcp_pipe_release(set, tp, conn);
+}
+void
+xfrd_tcp_pipe_release(xfrd_tcp_set_t* set, struct xfrd_tcp_pipeline* tp,
+ int conn)
+{
+ DEBUG(DEBUG_XFRD,1, (LOG_INFO, "xfrd: tcp pipe released"));
+ /* one handler per tcp pipe */
+ if(tp->handler_added)
+ event_del(&tp->handler);
+ tp->handler_added = 0;
+
+ /* fd in tcp_r and tcp_w is the same, close once */
+ if(tp->tcp_r->fd != -1)
+ close(tp->tcp_r->fd);
+ tp->tcp_r->fd = -1;
+ tp->tcp_w->fd = -1;
+
+ /* remove from pipetree */
+ (void)rbtree_delete(xfrd->tcp_set->pipetree, &tp->node);
+
+ /* a waiting zone can use the free tcp slot (to another server) */
if(set->tcp_count == XFRD_MAX_TCP && set->tcp_waiting_first) {
- /* pop first waiting process */
- zone = set->tcp_waiting_first;
- if(set->tcp_waiting_last == zone)
- set->tcp_waiting_last = 0;
+ int i;
- set->tcp_waiting_first = zone->tcp_waiting_next;
- zone->tcp_waiting_next = 0;
+ /* pop first waiting process */
+ xfrd_zone_t* zone = set->tcp_waiting_first;
/* start it */
assert(zone->tcp_conn == -1);
zone->tcp_conn = conn;
- zone->tcp_waiting = 0;
+
/* stop udp (if any) */
- if(zone->zone_handler.fd != -1)
+ if(zone->zone_handler.ev_fd != -1)
xfrd_udp_release(zone);
-
- if(!xfrd_tcp_open(set, zone))
+ if(!xfrd_tcp_open(set, tp, zone)) {
+ zone->tcp_conn = -1;
+ set->tcp_count --;
+ xfrd_set_refresh_now(zone);
return;
+ }
+ /* re-init this tcppipe */
+ /* ip and ip_len set by tcp_open */
+ tp->node.key = tp;
+ tp->num_unused = ID_PIPE_NUM;
+ tp->num_skip = 0;
+ tp->tcp_send_first = NULL;
+ tp->tcp_send_last = NULL;
+ memset(tp->id, 0, sizeof(tp->id));
+ for(i=0; i<ID_PIPE_NUM; i++) {
+ tp->unused[i] = i;
+ }
- xfrd_tcp_xfr(set, zone);
+ /* insert into tree */
+ (void)rbtree_insert(set->pipetree, &tp->node);
+ /* succeeded? remove zone from lists and setup write */
+ xfrd_unset_timer(zone);
+ tcp_zone_waiting_list_popfirst(set, zone);
+ pipeline_setup_new_zone(set, tp, zone);
}
else {
assert(!set->tcp_waiting_first);
@@ -583,3 +976,4 @@ xfrd_tcp_release(xfrd_tcp_set_t* set, xfrd_zone_t* zone)
assert(set->tcp_count >= 0);
}
}
+