From 89ca14ddb53440286ab2ed0c2b9e4473dda6d12e Mon Sep 17 00:00:00 2001 From: Eric Faurot Date: Fri, 19 Jul 2013 21:14:53 +0000 Subject: Many MTA improvements: - Better transient error handling logic: failing destinations are automatically disabled for a while. When a destination is active again, ask the scheduler to retry previous envelopes immediatly. - More informative error report when all routes fail for a mail. - Implement a "smtpctl show hoststats" command to get the latest stat message per MX domain. - Implement a "smtpctl show routes" command to show the state the currently known routes to remote MXs. - Implement a "smtpctl resume route" command to re-enable a route that has been disabled. - Do not hardcode limits - Minor code improvements --- usr.sbin/smtpd/control.c | 41 +- usr.sbin/smtpd/limit.c | 104 ++++ usr.sbin/smtpd/makemap/Makefile | 4 +- usr.sbin/smtpd/mta.c | 1169 +++++++++++++++++++++++++++------------ usr.sbin/smtpd/mta_session.c | 249 +++++++-- usr.sbin/smtpd/parse.y | 54 +- usr.sbin/smtpd/queue.c | 21 +- usr.sbin/smtpd/runq.c | 203 +++++++ usr.sbin/smtpd/scheduler.c | 8 +- usr.sbin/smtpd/smtpctl.8 | 38 +- usr.sbin/smtpd/smtpctl.c | 53 +- usr.sbin/smtpd/smtpd.c | 10 +- usr.sbin/smtpd/smtpd.conf.5 | 26 +- usr.sbin/smtpd/smtpd.h | 119 ++-- usr.sbin/smtpd/smtpd/Makefile | 6 +- 15 files changed, 1620 insertions(+), 485 deletions(-) create mode 100644 usr.sbin/smtpd/limit.c create mode 100644 usr.sbin/smtpd/runq.c diff --git a/usr.sbin/smtpd/control.c b/usr.sbin/smtpd/control.c index 67c83740fd5..5c92ae0243c 100644 --- a/usr.sbin/smtpd/control.c +++ b/usr.sbin/smtpd/control.c @@ -1,4 +1,4 @@ -/* $OpenBSD: control.c,v 1.88 2013/07/19 15:14:23 eric Exp $ */ +/* $OpenBSD: control.c,v 1.89 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade @@ -116,6 +116,22 @@ control_imsg(struct mproc *p, struct imsg *imsg) return; } } + if (p->proc == PROC_MTA) { + switch (imsg->hdr.type) { + case IMSG_CTL_MTA_SHOW_ROUTES: + c = tree_get(&ctl_conns, imsg->hdr.peerid); + if (c == NULL) + return; + m_forward(&c->mproc, imsg); + return; + case IMSG_CTL_MTA_SHOW_HOSTSTATS: + c = tree_get(&ctl_conns, imsg->hdr.peerid); + if (c == NULL) + return; + m_forward(&c->mproc, imsg); + return; + } + } switch (imsg->hdr.type) { case IMSG_STAT_INCREMENT: @@ -659,6 +675,15 @@ control_dispatch_ext(struct mproc *p, struct imsg *imsg) m_compose(p, IMSG_CTL_OK, 0, 0, -1, NULL, 0); return; + case IMSG_CTL_RESUME_ROUTE: + if (c->euid) + goto badcred; + + log_info("info: route resumed"); + m_forward(p_mta, imsg); + m_compose(p, IMSG_CTL_OK, 0, 0, -1, NULL, 0); + return; + case IMSG_CTL_LIST_MESSAGES: if (c->euid) goto badcred; @@ -673,6 +698,20 @@ control_dispatch_ext(struct mproc *p, struct imsg *imsg) imsg->data, imsg->hdr.len - sizeof(imsg->hdr)); return; + case IMSG_CTL_MTA_SHOW_ROUTES: + if (c->euid) + goto badcred; + m_compose(p_mta, IMSG_CTL_MTA_SHOW_ROUTES, c->id, 0, -1, + imsg->data, imsg->hdr.len - sizeof(imsg->hdr)); + return; + + case IMSG_CTL_MTA_SHOW_HOSTSTATS: + if (c->euid) + goto badcred; + m_compose(p_mta, IMSG_CTL_MTA_SHOW_HOSTSTATS, c->id, 0, -1, + imsg->data, imsg->hdr.len - sizeof(imsg->hdr)); + return; + case IMSG_CTL_SCHEDULE: if (c->euid) goto badcred; diff --git a/usr.sbin/smtpd/limit.c b/usr.sbin/smtpd/limit.c new file mode 100644 index 00000000000..bdacef93d3f --- /dev/null +++ b/usr.sbin/smtpd/limit.c @@ -0,0 +1,104 @@ +/* $OpenBSD: limit.c,v 1.1 2013/07/19 21:14:52 eric Exp $ */ + +/* + * Copyright (c) 2013 Eric Faurot + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "smtpd.h" +#include "log.h" + +void +limit_mta_set_defaults(struct mta_limits *limits) +{ + limits->maxconn_per_host = 10; + limits->maxconn_per_route = 5; + limits->maxconn_per_source = 50; + limits->maxconn_per_connector = 20; + limits->maxconn_per_relay = 100; + limits->maxconn_per_domain = 100; + + limits->conndelay_host = 0; + limits->conndelay_route = 5; + limits->conndelay_source = 0; + limits->conndelay_connector = 0; + limits->conndelay_relay = 2; + limits->conndelay_domain = 0; + + limits->discdelay_route = 3; + + limits->max_mail_per_session = 100; + limits->sessdelay_transaction = 1; + limits->sessdelay_keepalive = 10; + + limits->family = AF_UNSPEC; +} + +int +limit_mta_set(struct mta_limits *limits, const char *key, int64_t value) +{ + if (!strcmp(key, "max-conn-per-host")) + limits->maxconn_per_host = value; + else if (!strcmp(key, "max-conn-per-route")) + limits->maxconn_per_route = value; + else if (!strcmp(key, "max-conn-per-source")) + limits->maxconn_per_source = value; + else if (!strcmp(key, "max-conn-per-connector")) + limits->maxconn_per_connector = value; + else if (!strcmp(key, "max-conn-per-relay")) + limits->maxconn_per_relay = value; + else if (!strcmp(key, "max-conn-per-domain")) + limits->maxconn_per_domain = value; + + else if (!strcmp(key, "conn-delay-host")) + limits->conndelay_host = value; + else if (!strcmp(key, "conn-delay-route")) + limits->conndelay_route = value; + else if (!strcmp(key, "conn-delay-source")) + limits->conndelay_source = value; + else if (!strcmp(key, "conn-delay-connector")) + limits->conndelay_connector = value; + else if (!strcmp(key, "conn-delay-relay")) + limits->conndelay_relay = value; + else if (!strcmp(key, "conn-delay-domain")) + limits->conndelay_domain = value; + + else if (!strcmp(key, "reconn-delay-route")) + limits->discdelay_route = value; + + else if (!strcmp(key, "session-mail-max")) + limits->max_mail_per_session = value; + else if (!strcmp(key, "session-transaction-delay")) + limits->sessdelay_transaction = value; + else if (!strcmp(key, "session-keepalive")) + limits->sessdelay_keepalive = value; + else + return (0); + + return (1); +} diff --git a/usr.sbin/smtpd/makemap/Makefile b/usr.sbin/smtpd/makemap/Makefile index 6eec1d07e91..ce47caaefb1 100644 --- a/usr.sbin/smtpd/makemap/Makefile +++ b/usr.sbin/smtpd/makemap/Makefile @@ -1,4 +1,4 @@ -# $OpenBSD: Makefile,v 1.20 2013/07/19 19:53:33 eric Exp $ +# $OpenBSD: Makefile,v 1.21 2013/07/19 21:14:52 eric Exp $ .PATH: ${.CURDIR}/.. @@ -19,7 +19,7 @@ CFLAGS+= -Wshadow -Wpointer-arith -Wcast-qual CFLAGS+= -Wsign-compare -Wbounded CFLAGS+= -DNO_IO -SRCS= aliases.c dict.c expand.c log.c makemap.c parse.y \ +SRCS= aliases.c dict.c expand.c limit.c log.c makemap.c parse.y \ table.c to.c tree.c util.c SRCS+= table_static.c diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index 7f42ad043f5..9b8a8c93e8e 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.162 2013/07/19 15:14:23 eric Exp $ */ +/* $OpenBSD: mta.c,v 1.163 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard @@ -44,14 +44,14 @@ #define MAXERROR_PER_HOST 4 -#define MAXCONN_PER_HOST 10 -#define MAXCONN_PER_ROUTE 5 -#define MAXCONN_PER_SOURCE 50 /* XXX missing */ -#define MAXCONN_PER_CONNECTOR 20 -#define MAXCONN_PER_RELAY 100 +#define DELAY_CHECK_SOURCE 1 +#define DELAY_CHECK_SOURCE_SLOW 10 +#define DELAY_CHECK_SOURCE_FAST 0 +#define DELAY_CHECK_LIMIT 5 -#define CONNECTOR_DELAY_CONNECT 1 -#define CONNECTOR_DELAY_LIMIT 5 +#define DELAY_QUADRATIC 1 +#define DELAY_ROUTE_BASE 200 +#define DELAY_ROUTE_MAX (3600 * 4) static void mta_imsg(struct mproc *, struct imsg *); static void mta_shutdown(void); @@ -62,14 +62,17 @@ static void mta_query_secret(struct mta_relay *); static void mta_query_preference(struct mta_relay *); static void mta_query_source(struct mta_relay *); static void mta_on_mx(void *, void *, void *); +static void mta_on_secret(struct mta_relay *, const char *); +static void mta_on_preference(struct mta_relay *, int, int); static void mta_on_source(struct mta_relay *, struct mta_source *); +static void mta_on_timeout(struct runq *, void *); static void mta_connect(struct mta_connector *); -static void mta_recycle(struct mta_connector *); +static void mta_route_enable(struct mta_route *); +static void mta_route_disable(struct mta_route *, int, int); static void mta_drain(struct mta_relay *); -static void mta_relay_schedule(struct mta_relay *, unsigned int); -static void mta_relay_timeout(int, short, void *); static void mta_flush(struct mta_relay *, int, const char *); -static struct mta_route *mta_find_route(struct mta_connector *); +static struct mta_route *mta_find_route(struct mta_connector *, time_t, int*, + time_t*); static void mta_log(const struct mta_envelope *, const char *, const char *, const char *, const char *); @@ -111,9 +114,7 @@ static const char *mta_connector_to_text(struct mta_connector *); SPLAY_HEAD(mta_route_tree, mta_route); static struct mta_route *mta_route(struct mta_source *, struct mta_host *); -#if 0 static void mta_route_ref(struct mta_route *); -#endif static void mta_route_unref(struct mta_route *); static const char *mta_route_to_text(struct mta_route *); static int mta_route_cmp(const struct mta_route *, const struct mta_route *); @@ -125,61 +126,77 @@ static struct mta_host_tree hosts; static struct mta_source_tree sources; static struct mta_route_tree routes; -static struct tree batches; - static struct tree wait_mx; static struct tree wait_preference; static struct tree wait_secret; static struct tree wait_source; +static struct runq *runq_relay; +static struct runq *runq_connector; +static struct runq *runq_route; +static struct runq *runq_hoststat; + +static time_t max_seen_conndelay_route; +static time_t max_seen_discdelay_route; + +#define HOSTSTAT_EXPIRE_DELAY (4 * 3600) +struct hoststat { + char name[SMTPD_MAXHOSTNAMELEN]; + time_t tm; + char error[SMTPD_MAXLINESIZE]; + struct tree deferred; +}; +static struct dict hoststat; + +void mta_hoststat_update(const char *, const char *); +void mta_hoststat_cache(const char *, uint64_t); +void mta_hoststat_uncache(const char *, uint64_t); +void mta_hoststat_reschedule(const char *); +static void mta_hoststat_remove_entry(struct hoststat *); + + void mta_imsg(struct mproc *p, struct imsg *imsg) { struct mta_relay *relay; struct mta_task *task; - struct mta_source *source; struct mta_domain *domain; + struct mta_route *route; struct mta_mx *mx, *imx; + struct hoststat *hs; struct mta_envelope *e; struct sockaddr_storage ss; - struct tree *batch; struct envelope evp; struct msg m; const char *secret; + const char *hostname; uint64_t reqid; + time_t t; char buf[SMTPD_MAXLINESIZE]; int dnserror, preference, v, status; + void *iter; + uint64_t u64; if (p->proc == PROC_QUEUE) { switch (imsg->hdr.type) { - case IMSG_MTA_BATCH: - m_msg(&m, imsg); - m_get_id(&m, &reqid); - m_end(&m); - batch = xmalloc(sizeof *batch, "mta_batch"); - tree_init(batch); - tree_xset(&batches, reqid, batch); - log_trace(TRACE_MTA, - "mta: batch:%016" PRIx64 " created", reqid); - return; - - case IMSG_MTA_BATCH_ADD: + case IMSG_MTA_TRANSFER: m_msg(&m, imsg); - m_get_id(&m, &reqid); m_get_envelope(&m, &evp); m_end(&m); relay = mta_relay(&evp); - batch = tree_xget(&batches, reqid); - if ((task = tree_get(batch, relay->id)) == NULL) { - log_trace(TRACE_MTA, "mta: new task for %s", - mta_relay_to_text(relay)); + TAILQ_FOREACH(task, &relay->tasks, entry) + if (task->msgid == evpid_to_msgid(evp.id)) + break; + + if (task == NULL) { task = xmalloc(sizeof *task, "mta_task"); TAILQ_INIT(&task->envelopes); task->relay = relay; - tree_xset(batch, relay->id, task); + relay->ntask += 1; + TAILQ_INSERT_TAIL(&relay->tasks, task, entry); task->msgid = evpid_to_msgid(evp.id); if (evp.sender.user[0] || evp.sender.domain[0]) snprintf(buf, sizeof buf, "%s@%s", @@ -187,16 +204,8 @@ mta_imsg(struct mproc *p, struct imsg *imsg) else buf[0] = '\0'; task->sender = xstrdup(buf, "mta_task:sender"); - } else - mta_relay_unref(relay); /* from here */ - - /* - * Technically, we could handle that by adding a msg - * level, but the batch sent by the scheduler should - * be valid. - */ - if (task->msgid != evpid_to_msgid(evp.id)) - errx(1, "msgid mismatch in batch"); + stat_increment("mta.task", 1); + } e = xcalloc(1, sizeof *e, "mta_envelope"); e->id = evp.id; @@ -209,32 +218,15 @@ mta_imsg(struct mproc *p, struct imsg *imsg) if (strcmp(buf, e->dest)) e->rcpt = xstrdup(buf, "mta_envelope:rcpt"); e->task = task; - /* XXX honour relay->maxrcpt */ + TAILQ_INSERT_TAIL(&task->envelopes, e, entry); - stat_increment("mta.envelope", 1); log_debug("debug: mta: received evp:%016" PRIx64 " for <%s>", e->id, e->dest); - return; - case IMSG_MTA_BATCH_END: - m_msg(&m, imsg); - m_get_id(&m, &reqid); - m_end(&m); - batch = tree_xpop(&batches, reqid); - log_trace(TRACE_MTA, "mta: batch:%016" PRIx64 " closed", - reqid); - /* For all tasks, queue them on its relay */ - while (tree_poproot(batch, &reqid, (void**)&task)) { - if (reqid != task->relay->id) - errx(1, "relay id mismatch!"); - relay = task->relay; - relay->ntask += 1; - TAILQ_INSERT_TAIL(&relay->tasks, task, entry); - stat_increment("mta.task", 1); - mta_drain(relay); - mta_relay_unref(relay); /* from BATCH_APPEND */ - } - free(batch); + stat_increment("mta.envelope", 1); + + mta_drain(relay); + mta_relay_unref(relay); /* from here */ return; case IMSG_QUEUE_MESSAGE_FD: @@ -252,40 +244,20 @@ mta_imsg(struct mproc *p, struct imsg *imsg) m_get_string(&m, &secret); m_end(&m); relay = tree_xpop(&wait_secret, reqid); - if (secret[0]) - relay->secret = strdup(secret); - if (relay->secret == NULL) { - log_warnx("warn: Failed to retrieve secret " - "for %s", mta_relay_to_text(relay)); - relay->fail = IMSG_DELIVERY_TEMPFAIL; - relay->failstr = "Could not retrieve secret"; - } - relay->status &= ~RELAY_WAIT_SECRET; - mta_drain(relay); - mta_relay_unref(relay); /* from mta_query_secret() */ + mta_on_secret(relay, secret[0] ? secret : NULL); return; case IMSG_LKA_SOURCE: m_msg(&m, imsg); m_get_id(&m, &reqid); m_get_int(&m, &status); - - relay = tree_xpop(&wait_source, reqid); - relay->status &= ~RELAY_WAIT_SOURCE; - if (status == LKA_OK) { + if (status == LKA_OK) m_get_sockaddr(&m, (struct sockaddr*)&ss); - source = mta_source((struct sockaddr *)&ss); - mta_on_source(relay, source); - mta_source_unref(source); - } - else { - log_warnx("warn: Failed to get source address" - "for %s", mta_relay_to_text(relay)); - } m_end(&m); - mta_drain(relay); - mta_relay_unref(relay); /* from mta_query_source() */ + relay = tree_xpop(&wait_source, reqid); + mta_on_source(relay, (status == LKA_OK) ? + mta_source((struct sockaddr *)&ss) : NULL); return; case IMSG_LKA_HELO: @@ -337,23 +309,12 @@ mta_imsg(struct mproc *p, struct imsg *imsg) m_msg(&m, imsg); m_get_id(&m, &reqid); m_get_int(&m, &dnserror); - relay = tree_xpop(&wait_preference, reqid); - if (dnserror) { - log_debug("debug: couldn't find backup " - "preference for %s", - mta_relay_to_text(relay)); - relay->backuppref = INT_MAX; - } else { - m_get_int(&m, &relay->backuppref); - log_debug("debug: found backup preference %i " - "for %s", - relay->backuppref, - mta_relay_to_text(relay)); - } + if (dnserror == 0) + m_get_int(&m, &preference); m_end(&m); - relay->status &= ~RELAY_WAIT_PREFERENCE; - mta_drain(relay); - mta_relay_unref(relay); /* from mta_query_preference() */ + + relay = tree_xpop(&wait_preference, reqid); + mta_on_preference(relay, dnserror, preference); return; case IMSG_DNS_PTR: @@ -388,6 +349,65 @@ mta_imsg(struct mproc *p, struct imsg *imsg) } } + if (p->proc == PROC_CONTROL) { + switch (imsg->hdr.type) { + + case IMSG_CTL_RESUME_ROUTE: + u64 = *((uint64_t *)imsg->data); + if (u64) + log_debug("resuming route: %llu", + (unsigned long long)u64); + else + log_debug("resuming all routes"); + SPLAY_FOREACH(route, mta_route_tree, &routes) { + if (u64 && route->id != u64) + continue; + mta_route_enable(route); + if (u64) + break; + } + return; + + case IMSG_CTL_MTA_SHOW_ROUTES: + SPLAY_FOREACH(route, mta_route_tree, &routes) { + v = runq_pending(runq_route, NULL, route, &t); + snprintf(buf, sizeof(buf), + "%llu. %s %c%c%c%c nconn=%zu penalty=%i timeout=%s", + (unsigned long long)route->id, + mta_route_to_text(route), + route->flags & ROUTE_NEW ? 'N' : '-', + route->flags & ROUTE_DISABLED ? 'D' : '-', + route->flags & ROUTE_RUNQ ? 'Q' : '-', + route->flags & ROUTE_KEEPALIVE ? 'K' : '-', + route->nconn, + route->penalty, + v ? duration_to_text(t - time(NULL)) : "-"); + m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES, + imsg->hdr.peerid, 0, -1, + buf, strlen(buf) + 1); + } + m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES, imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + case IMSG_CTL_MTA_SHOW_HOSTSTATS: + iter = NULL; + while (dict_iter(&hoststat, &iter, &hostname, + (void **)&hs)) { + snprintf(buf, sizeof(buf), + "%s|%llu|%s", + hostname, (unsigned long long) hs->tm, + hs->error); + m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS, + imsg->hdr.peerid, 0, -1, + buf, strlen(buf) + 1); + } + m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS, + imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + } + } + errx(1, "mta_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); } @@ -451,15 +471,20 @@ mta(void) SPLAY_INIT(&sources); SPLAY_INIT(&routes); - tree_init(&batches); tree_init(&wait_secret); tree_init(&wait_mx); tree_init(&wait_preference); tree_init(&wait_source); + dict_init(&hoststat); imsg_callback = mta_imsg; event_init(); + runq_init(&runq_relay, mta_on_timeout); + runq_init(&runq_connector, mta_on_timeout); + runq_init(&runq_route, mta_on_timeout); + runq_init(&runq_hoststat, mta_on_timeout); + signal_set(&ev_sigint, SIGINT, mta_sig_handler, NULL); signal_set(&ev_sigterm, SIGTERM, mta_sig_handler, NULL); signal_add(&ev_sigint, NULL); @@ -487,14 +512,15 @@ void mta_source_error(struct mta_relay *relay, struct mta_route *route, const char *e) { struct mta_connector *c; + /* * Remember the source as broken for this connector. */ c = mta_connector(relay, route->src); - if (!(c->flags & CONNECTOR_SOURCE_ERROR)) - log_info("smtp-out: Error on connector %s: %s", - mta_connector_to_text(c), e); - c->flags |= CONNECTOR_SOURCE_ERROR; + if (!(c->flags & CONNECTOR_ERROR_SOURCE)) + log_info("smtp-out: Error on %s: %s", + mta_route_to_text(route), e); + c->flags |= CONNECTOR_ERROR_SOURCE; } /* @@ -526,12 +552,22 @@ mta_route_ok(struct mta_relay *relay, struct mta_route *route) { struct mta_connector *c; - log_debug("debug: mta: route ok %s", mta_route_to_text(route)); + if (!(route->flags & ROUTE_NEW)) + return; + + log_debug("debug: mta-routing: route %s is now valid.", + mta_route_to_text(route)); - route->dst->nerror = 0; + route->flags &= ~ROUTE_NEW; c = mta_connector(relay, route->src); - c->flags &= ~CONNECTOR_SOURCE_ERROR; + mta_connect(c); +} + +void +mta_route_down(struct mta_relay *relay, struct mta_route *route) +{ + mta_route_disable(route, 2, ROUTE_DISABLED_SMTP); } void @@ -539,26 +575,24 @@ mta_route_collect(struct mta_relay *relay, struct mta_route *route) { struct mta_connector *c; - log_debug("debug: mta: route collect %s", mta_route_to_text(route)); + log_debug("debug: mta_route_collect(%s)", + mta_route_to_text(route)); relay->nconn -= 1; + relay->domain->nconn -= 1; route->nconn -= 1; route->src->nconn -= 1; route->dst->nconn -= 1; + route->lastdisc = time(NULL); - c = mta_connector(relay, route->src); - mta_route_unref(route); /* from mta_find_route() */ + /* First connection failed */ + if (route->flags & ROUTE_NEW) + mta_route_disable(route, 2, ROUTE_DISABLED_NET); + c = mta_connector(relay, route->src); c->nconn -= 1; - - if (c->flags & CONNECTOR_LIMIT) { - log_debug("debug: mta; resetting limit flags on connector %s", - mta_connector_to_text(c)); - c->flags &= ~CONNECTOR_LIMIT; - } - - mta_recycle(c); - mta_drain(relay); + mta_connect(c); + mta_route_unref(route); /* from mta_find_route() */ mta_relay_unref(relay); /* from mta_connect() */ } @@ -577,29 +611,53 @@ mta_route_next_task(struct mta_relay *relay, struct mta_route *route) } void -mta_delivery(struct mta_envelope *e, const char *source, const char *relay, +mta_delivery_log(struct mta_envelope *e, const char *source, const char *relay, int delivery, const char *status) { if (delivery == IMSG_DELIVERY_OK) { mta_log(e, "Ok", source, relay, status); - queue_ok(e->id); } else if (delivery == IMSG_DELIVERY_TEMPFAIL) { mta_log(e, "TempFail", source, relay, status); - queue_tempfail(e->id, 0, status); } else if (delivery == IMSG_DELIVERY_PERMFAIL) { mta_log(e, "PermFail", source, relay, status); - queue_permfail(e->id, status); } else if (delivery == IMSG_DELIVERY_LOOP) { mta_log(e, "PermFail", source, relay, "Loop detected"); + } + else + errx(1, "bad delivery"); +} + +void +mta_delivery_notify(struct mta_envelope *e, int delivery, const char *status, + uint32_t penalty) +{ + if (delivery == IMSG_DELIVERY_OK) { + queue_ok(e->id); + } + else if (delivery == IMSG_DELIVERY_TEMPFAIL) { + queue_tempfail(e->id, penalty, status); + } + else if (delivery == IMSG_DELIVERY_PERMFAIL) { + queue_permfail(e->id, status); + } + else if (delivery == IMSG_DELIVERY_LOOP) { queue_loop(e->id); } else errx(1, "bad delivery"); } +void +mta_delivery(struct mta_envelope *e, const char *source, const char *relay, + int delivery, const char *status, uint32_t penalty) +{ + mta_delivery_log(e, source, relay, delivery, status); + mta_delivery_notify(e, delivery, status, penalty); +} + static void mta_query_mx(struct mta_relay *relay) { @@ -608,7 +666,8 @@ mta_query_mx(struct mta_relay *relay) if (relay->status & RELAY_WAIT_MX) return; - log_debug("debug: mta_query_mx(%s)", relay->domain->name); + log_debug("debug: mta: querying MX for %s...", + mta_relay_to_text(relay)); if (waitq_wait(&relay->domain->mxs, mta_on_mx, relay)) { id = generate_uid(); @@ -623,13 +682,30 @@ mta_query_mx(struct mta_relay *relay) mta_relay_ref(relay); } +static void +mta_query_limits(struct mta_relay *relay) +{ + if (relay->status & RELAY_WAIT_LIMITS) + return; + + relay->limits = dict_get(env->sc_limits_dict, relay->domain->name); + if (relay->limits == NULL) + relay->limits = dict_get(env->sc_limits_dict, "default"); + + if (max_seen_conndelay_route < relay->limits->conndelay_route) + max_seen_conndelay_route = relay->limits->conndelay_route; + if (max_seen_discdelay_route < relay->limits->discdelay_route) + max_seen_discdelay_route = relay->limits->discdelay_route; +} + static void mta_query_secret(struct mta_relay *relay) { if (relay->status & RELAY_WAIT_SECRET) return; - log_debug("debug: mta_query_secret(%s)", mta_relay_to_text(relay)); + log_debug("debug: mta: querying secret for %s...", + mta_relay_to_text(relay)); tree_xset(&wait_secret, relay->id, relay); relay->status |= RELAY_WAIT_SECRET; @@ -649,7 +725,8 @@ mta_query_preference(struct mta_relay *relay) if (relay->status & RELAY_WAIT_PREFERENCE) return; - log_debug("debug: mta_query_preference(%s)", mta_relay_to_text(relay)); + log_debug("debug: mta: querying preference for %s...", + mta_relay_to_text(relay)); tree_xset(&wait_preference, relay->id, relay); relay->status |= RELAY_WAIT_PREFERENCE; @@ -661,7 +738,20 @@ mta_query_preference(struct mta_relay *relay) static void mta_query_source(struct mta_relay *relay) { - log_debug("debug: mta_query_source(%s)", mta_relay_to_text(relay)); + log_debug("debug: mta: querying source for %s...", + mta_relay_to_text(relay)); + + relay->sourceloop += 1; + + if (relay->sourcetable == NULL) { + /* + * This is a recursive call, but it only happens once, since + * another source will not be queried immediatly. + */ + mta_relay_ref(relay); + mta_on_source(relay, mta_source(NULL)); + return; + } m_create(p_lka, IMSG_LKA_SOURCE, 0, 0, -1); m_add_id(p_lka, relay->id); @@ -679,7 +769,7 @@ mta_on_mx(void *tag, void *arg, void *data) struct mta_domain *domain = data; struct mta_relay *relay = arg; - log_debug("debug: mta_on_mx(%p, %s, %s)", + log_debug("debug: mta: ... got mx (%p, %s, %s)", tag, domain->name, mta_relay_to_text(relay)); switch (domain->mxstatus) { @@ -715,35 +805,233 @@ mta_on_mx(void *tag, void *arg, void *data) mta_relay_unref(relay); /* from mta_drain() */ } +static void +mta_on_secret(struct mta_relay *relay, const char *secret) +{ + log_debug("debug: mta: ... got secret for %s: %s", + mta_relay_to_text(relay), secret); + + if (secret) + relay->secret = strdup(secret); + + if (relay->secret == NULL) { + log_warnx("warn: Failed to retrieve secret " + "for %s", mta_relay_to_text(relay)); + relay->fail = IMSG_DELIVERY_TEMPFAIL; + relay->failstr = "Could not retrieve credentials"; + } + + relay->status &= ~RELAY_WAIT_SECRET; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_query_secret() */ +} + +static void +mta_on_preference(struct mta_relay *relay, int dnserror, int preference) +{ + if (dnserror) { + log_warnx("warn: Couldn't find backup preference for %s", + mta_relay_to_text(relay)); + relay->backuppref = INT_MAX; + } + else { + log_debug("debug: mta: ... got preference for %s: %i, %i", + mta_relay_to_text(relay), dnserror, preference); + relay->backuppref = preference; + } + + relay->status &= ~RELAY_WAIT_PREFERENCE; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_query_preference() */ +} + static void mta_on_source(struct mta_relay *relay, struct mta_source *source) { - mta_connector(relay, source); + struct mta_connector *c; + void *iter; + int delay, errmask; + + log_debug("debug: mta: ... got source for %s: %s", + mta_relay_to_text(relay), source ? mta_source_to_text(source) : "NULL"); + + relay->lastsource = time(NULL); + delay = DELAY_CHECK_SOURCE_SLOW; + + if (source) { + c = mta_connector(relay, source); + if (c->flags & CONNECTOR_NEW) { + c->flags &= ~CONNECTOR_NEW; + delay = DELAY_CHECK_SOURCE; + } + mta_connect(c); + if ((c->flags & CONNECTOR_ERROR) == 0) + relay->sourceloop = 0; + else + delay = DELAY_CHECK_SOURCE_FAST; + mta_source_unref(source); /* from constructor */ + } + else { + log_warnx("warn: Failed to get source address" + "for %s", mta_relay_to_text(relay)); + } + + if (tree_count(&relay->connectors) == 0) { + relay->fail = IMSG_DELIVERY_TEMPFAIL; + relay->failstr = "Could not retrieve source address"; + } + if (tree_count(&relay->connectors) < relay->sourceloop) { + relay->fail = IMSG_DELIVERY_TEMPFAIL; + relay->failstr = "No valid route to remote MX"; + + errmask = 0; + iter = NULL; + while (tree_iter(&relay->connectors, &iter, NULL, (void **)&c)) + errmask |= c->flags; + + if (errmask & CONNECTOR_ERROR_ROUTE_SMTP) + relay->failstr = "Destination seem to reject all mails"; + else if (errmask & CONNECTOR_ERROR_ROUTE_NET) + relay->failstr = "Network error on destination MXs"; + else if (errmask & CONNECTOR_ERROR_MX) + relay->failstr = "No MX found for destination"; + else if (errmask & CONNECTOR_ERROR_FAMILY) + relay->failstr = "Address family mismatch on destination MXs"; + else + relay->failstr = "No valid route to destination"; + } + + relay->nextsource = relay->lastsource + delay; + relay->status &= ~RELAY_WAIT_SOURCE; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_query_source() */ } static void mta_connect(struct mta_connector *c) { struct mta_route *route; + struct mta_limits *l = c->relay->limits; + int limits; + time_t nextconn, now; + + again: + + log_debug("debug: mta: connecting with %s", mta_connector_to_text(c)); + + /* Do not connect if this connector has an error. */ + if (c->flags & CONNECTOR_ERROR) { + log_debug("debug: mta: connector error"); + return; + } + + if (c->flags & CONNECTOR_WAIT) { + log_debug("debug: mta: canceling connector timeout"); + runq_cancel(runq_connector, NULL, c); + } + + /* No job. */ + if (c->relay->ntask == 0) { + log_debug("debug: mta: no task for connector"); + return; + } + + /* Do not create more connections than necessary */ + if ((c->relay->nconn_ready >= c->relay->ntask) || + (c->relay->nconn > 2 && c->relay->nconn >= c->relay->ntask / 2)) { + log_debug("debug: mta: enough connections already"); + return; + } + + limits = 0; + nextconn = now = time(NULL); + + if (c->relay->domain->lastconn + l->conndelay_domain > nextconn) { + log_debug("debug: mta: cannot use domain %s before %llus", + c->relay->domain->name, + (unsigned long long) c->relay->domain->lastconn + l->conndelay_domain - now); + nextconn = c->relay->domain->lastconn + l->conndelay_domain; + } + if (c->relay->domain->nconn >= l->maxconn_per_domain) { + log_debug("debug: mta: hit domain limit"); + limits |= CONNECTOR_LIMIT_DOMAIN; + } + + if (c->source->lastconn + l->conndelay_source > nextconn) { + log_debug("debug: mta: cannot use source %s before %llus", + mta_source_to_text(c->source), + (unsigned long long) c->source->lastconn + l->conndelay_source - now); + nextconn = c->source->lastconn + l->conndelay_source; + } + if (c->source->nconn >= l->maxconn_per_source) { + log_debug("debug: mta: hit source limit"); + limits |= CONNECTOR_LIMIT_SOURCE; + } + + if (c->lastconn + l->conndelay_connector > nextconn) { + log_debug("debug: mta: cannot use %s before %llus", + mta_connector_to_text(c), + (unsigned long long) c->lastconn + l->conndelay_connector - now); + nextconn = c->lastconn + l->conndelay_connector; + } + if (c->nconn >= l->maxconn_per_connector) { + log_debug("debug: mta: hit connector limit"); + limits |= CONNECTOR_LIMIT_CONN; + } + + if (c->relay->lastconn + l->conndelay_relay > nextconn) { + log_debug("debug: mta: cannot use %s before %llus", + mta_relay_to_text(c->relay), + (unsigned long long) c->relay->lastconn + l->conndelay_relay - now); + nextconn = c->relay->lastconn + l->conndelay_relay; + } + if (c->relay->nconn >= l->maxconn_per_relay) { + log_debug("debug: mta: hit relay limit"); + limits |= CONNECTOR_LIMIT_RELAY; + } - log_debug("debug: mta_connect() for %s", mta_connector_to_text(c)); + /* We can connect now, find a route */ + if (!limits && nextconn <= now) + route = mta_find_route(c, now, &limits, &nextconn); + else + route = NULL; - route = mta_find_route(c); + /* No route */ if (route == NULL) { - mta_recycle(c); - if (c->queue == &c->relay->c_limit) - c->clearlimit = time(NULL) + CONNECTOR_DELAY_LIMIT; - if (c->queue == &c->relay->c_ready) - fatalx("connector with no route ended up in ready list"); + + if (c->flags & CONNECTOR_ERROR) { + /* XXX we might want to clear this flag later */ + log_debug("debug: mta-routing: no route available for %s: errors on connector", + mta_connector_to_text(c)); + return; + } + else if (limits) { + log_debug("debug: mta-routing: no route available for %s: limits reached", + mta_connector_to_text(c)); + nextconn = now + DELAY_CHECK_LIMIT; + } + else { + log_debug("debug: mta-routing: no route available for %s: must wait a bit", + mta_connector_to_text(c)); + } + log_debug("debug: mta: retrying to connect on %s in %llus...", + mta_connector_to_text(c), + (unsigned long long) nextconn - time(NULL)); + c->flags |= CONNECTOR_WAIT; + runq_schedule(runq_connector, nextconn, NULL, c); return; } + log_debug("debug: mta-routing: spawning new connection on %s", + mta_route_to_text(route)); + c->nconn += 1; c->lastconn = time(NULL); - c->nextconn = c->lastconn + CONNECTOR_DELAY_CONNECT; c->relay->nconn += 1; c->relay->lastconn = c->lastconn; + c->relay->domain->nconn += 1; + c->relay->domain->lastconn = c->lastconn; route->nconn += 1; route->lastconn = c->lastconn; route->src->nconn += 1; @@ -751,105 +1039,101 @@ mta_connect(struct mta_connector *c) route->dst->nconn += 1; route->dst->lastconn = c->lastconn; - mta_recycle(c); - - mta_relay_ref(c->relay); mta_session(c->relay, route); /* this never fails synchronously */ + mta_relay_ref(c->relay); + + goto again; } static void -mta_recycle(struct mta_connector *c) +mta_on_timeout(struct runq *runq, void *arg) { - TAILQ_REMOVE(c->queue, c, lst_entry); - - if (c->flags & CONNECTOR_ERROR) { - log_debug("debug: mta: putting %s on error queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_error; + struct mta_connector *connector = arg; + struct mta_relay *relay = arg; + struct mta_route *route = arg; + struct hoststat *hs = arg; + + if (runq == runq_relay) { + log_debug("debug: mta: ... timeout for %s", + mta_relay_to_text(relay)); + relay->status &= ~RELAY_WAIT_CONNECTOR; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_drain() */ } - else if (c->flags & CONNECTOR_LIMIT) { - log_debug("debug: mta: putting %s on limit queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_limit; + else if (runq == runq_connector) { + log_debug("debug: mta: ... timeout for %s", + mta_connector_to_text(connector)); + connector->flags &= ~CONNECTOR_WAIT; + mta_connect(connector); } - else if (c->nextconn > time(NULL)) { - log_debug("debug: mta: putting %s on delay queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_delay; + else if (runq == runq_route) { + route->flags &= ~ROUTE_RUNQ; + mta_route_enable(route); + mta_route_unref(route); } - else { - log_debug("debug: mta: putting %s on ready queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_ready; + else if (runq == runq_hoststat) { + log_debug("debug: mta: ... timeout for hoststat %s", + hs->name); + mta_hoststat_remove_entry(hs); + free(hs); } - - TAILQ_INSERT_TAIL(c->queue, c, lst_entry); } static void -mta_relay_timeout(int fd, short ev, void *arg) +mta_route_disable(struct mta_route *route, int penalty, int reason) { - struct mta_relay *r = arg; - struct mta_connector *c; - time_t t; - - log_debug("debug: mta: timeout for %s", mta_relay_to_text(r)); + unsigned long long delay; - t = time(NULL); + route->penalty += penalty; + route->lastpenalty = time(NULL); + delay = (unsigned long long)DELAY_ROUTE_BASE * route->penalty * route->penalty; + if (delay > DELAY_ROUTE_MAX) + delay = DELAY_ROUTE_MAX; +#if 0 + delay = 60; +#endif - /* - * Clear the limit flags on all connectors. - */ - while ((c = TAILQ_FIRST(&r->c_limit))) { - /* This requires that the list is always sorted */ - if (c->clearlimit > t) - break; - log_debug("debug: mta: clearing limits on %s", - mta_connector_to_text(c)); - c->flags &= ~CONNECTOR_LIMIT; - mta_recycle(c); - } + log_info("smtp-out: Disabling route %s for %llus", + mta_route_to_text(route), delay); - while ((c = TAILQ_FIRST(&r->c_delay))) { - /* This requires that the list is always sorted */ - if (c->nextconn > t) - break; - log_debug("debug: mta: delay expired for %s", - mta_connector_to_text(c)); - mta_recycle(c); + if (route->flags & ROUTE_DISABLED) { + runq_cancel(runq_route, NULL, route); + mta_route_unref(route); /* from last call to here */ } - - mta_drain(r); - mta_relay_unref(r); /* from mta_relay_schedule() */ + route->flags |= reason & ROUTE_DISABLED; + runq_schedule(runq_route, time(NULL) + delay, NULL, route); + mta_route_ref(route); } static void -mta_relay_schedule(struct mta_relay *r, unsigned int delay) +mta_route_enable(struct mta_route *route) { - struct timeval tv; - - if (evtimer_pending(&r->ev, &tv)) - return; - - log_debug("debug: mta: adding relay timeout: %u", delay); - - tv.tv_sec = delay; - tv.tv_usec = 0; - evtimer_add(&r->ev, &tv); - mta_relay_ref(r); + if (route->flags & ROUTE_DISABLED) { + log_info("smtp-out: Enabling route %s", + mta_route_to_text(route)); + route->flags &= ~ROUTE_DISABLED; + route->flags |= ROUTE_NEW; + } + + if (route->penalty) { +#if DELAY_QUADRATIC + route->penalty -= 1; + route->lastpenalty = time(NULL); +#else + route->penalty = 0; +#endif + } } static void mta_drain(struct mta_relay *r) { - struct mta_connector *c; - struct mta_source *s; char buf[64]; log_debug("debug: mta: draining %s " "refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu", mta_relay_to_text(r), - r->refcount, r->ntask, r->nconnector, r->nconn); + r->refcount, r->ntask, tree_count(&r->connectors), r->nconn); /* * All done. @@ -879,99 +1163,40 @@ mta_drain(struct mta_relay *r) if (r->domain->lastmxquery == 0) mta_query_mx(r); + /* Query the limits if needed. */ + if (r->limits == NULL) + mta_query_limits(r); + /* Wait until we are ready to proceed. */ if (r->status & RELAY_WAITMASK) { buf[0] = '\0'; if (r->status & RELAY_WAIT_MX) - strlcat(buf, "MX ", sizeof buf); + strlcat(buf, " MX", sizeof buf); if (r->status & RELAY_WAIT_PREFERENCE) - strlcat(buf, "preference ", sizeof buf); + strlcat(buf, " preference", sizeof buf); if (r->status & RELAY_WAIT_SECRET) - strlcat(buf, "secret ", sizeof buf); + strlcat(buf, " secret", sizeof buf); if (r->status & RELAY_WAIT_SOURCE) - strlcat(buf, "source ", sizeof buf); - log_debug("debug: mta: %s waiting for %s", + strlcat(buf, " source", sizeof buf); + if (r->status & RELAY_WAIT_CONNECTOR) + strlcat(buf, " connector", sizeof buf); + log_debug("debug: mta: %s waiting for%s", mta_relay_to_text(r), buf); return; } /* - * Start new connections if possible. - * XXX find a better heuristic for the good number of connections - * depending on the number of tasks and other factors. We might - * want to try more than the number of task, to have a chance to - * hit a mx faster if the first ones timeout. + * We have pending task, and it's maybe time too try a new source. */ - while (r->nconn < r->ntask) { - log_debug("debug: mta: trying to create new connection: " - "refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu", - r->refcount, r->ntask, r->nconnector, r->nconn); - - /* Check the per-relay connection limit */ - if (r->nconn >= MAXCONN_PER_RELAY) { - log_debug("debug: mta: hit connection limit on %s", - mta_relay_to_text(r)); - return; - } - - /* Use the first connector if ready */ - c = TAILQ_FIRST(&r->c_ready); - if (c) { - log_debug("debug: mta: using connector %s", - mta_connector_to_text(c)); - r->sourceloop = 0; - mta_connect(c); - continue; - } - - /* No new connectors */ - if (r->sourceloop > r->nconnector) { - log_debug("debug: mta: no new connector available"); - - if (TAILQ_FIRST(&r->c_delay)) { - mta_relay_schedule(r, 1); - log_debug( - "debug: mta: waiting for relay timeout"); - return; - } - - if (TAILQ_FIRST(&r->c_limit)) { - mta_relay_schedule(r, 5); - log_debug( - "debug: mta: waiting for relay timeout"); - return; - } - - log_debug("debug: mta: failing..."); - /* - * All sources have been tried and no connectors can - * be used. - */ - if (r->nconnector == 0) { - r->fail = IMSG_DELIVERY_TEMPFAIL; - r->failstr = "No source address"; - } - else { - r->fail = IMSG_DELIVERY_TEMPFAIL; - r->failstr = "No MX could be reached"; - } - mta_flush(r, r->fail, r->failstr); - return; - } - - r->sourceloop++; - log_debug("debug: mta: need new connector (attempt %zu)", - r->sourceloop); - if (r->sourcetable) { - log_debug("debug: mta: querying source %s", - r->sourcetable); - mta_query_source(r); - return; - } - log_debug("debug: mta: using default source"); - s = mta_source(NULL); - mta_on_source(r, s); - mta_source_unref(s); + if (r->nextsource <= time(NULL)) + mta_query_source(r); + else { + log_debug("debug: mta: scheduling relay %s in %llus...", + mta_relay_to_text(r), + (unsigned long long) r->nextsource - time(NULL)); + runq_schedule(runq_relay, r->nextsource, NULL, r); + r->status |= RELAY_WAIT_CONNECTOR; + mta_relay_ref(r); } } @@ -980,7 +1205,11 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) { struct mta_envelope *e; struct mta_task *task; + const char *domain; + void *iter; + struct mta_connector *c; size_t n; + size_t r; log_debug("debug: mta_flush(%s, %i, \"%s\")", mta_relay_to_text(relay), fail, error); @@ -993,7 +1222,26 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) TAILQ_REMOVE(&relay->tasks, task, entry); while ((e = TAILQ_FIRST(&task->envelopes))) { TAILQ_REMOVE(&task->envelopes, e, entry); - mta_delivery(e, NULL, relay->domain->name, fail, error); + mta_delivery(e, NULL, relay->domain->name, fail, error, 0); + + /* + * host was suspended, cache envelope id in hoststat tree + * so that it can be retried when a delivery succeeds for + * that domain. + */ + domain = strchr(e->dest, '@'); + if (fail == IMSG_DELIVERY_TEMPFAIL && domain) { + r = 0; + iter = NULL; + while (tree_iter(&relay->connectors, &iter, + NULL, (void **)&c)) { + if (c->flags & CONNECTOR_ERROR_ROUTE) + r++; + } + if (tree_count(&relay->connectors) == r) + mta_hoststat_cache(domain+1, e->id); + } + free(e->dest); free(e->rcpt); free(e); @@ -1012,27 +1260,28 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) * Find a route to use for this connector */ static struct mta_route * -mta_find_route(struct mta_connector *c) +mta_find_route(struct mta_connector *c, time_t now, int *limits, + time_t *nextconn) { struct mta_route *route, *best; + struct mta_limits *l = c->relay->limits; struct mta_mx *mx; int level, limit_host, limit_route; - int family_mismatch, seen; + int family_mismatch, seen, suspended_route; + time_t tm; + log_debug("debug: mta-routing: searching new route for %s...", + mta_connector_to_text(c)); + + tm = 0; limit_host = 0; limit_route = 0; + suspended_route = 0; family_mismatch = 0; level = -1; best = NULL; seen = 0; - if (c->nconn >= MAXCONN_PER_CONNECTOR) { - log_debug("debug: mta: hit limit on connector %s", - mta_connector_to_text(c)); - c->flags |= CONNECTOR_LIMIT_SOURCE; - return (NULL); - } - TAILQ_FOREACH(mx, &c->relay->domain->mxs, entry) { /* * New preference level @@ -1047,9 +1296,10 @@ mta_find_route(struct mta_connector *c) /* * No candidate found. There are valid MXs at this - * preference level but they reached their limit. + * preference level but they reached their limit, or + * we can't connect yet. */ - if (limit_host || limit_route) + if (limit_host || limit_route || tm) break; /* @@ -1073,27 +1323,81 @@ mta_find_route(struct mta_connector *c) /* Found a possibly valid mx */ seen++; - if (c->source->sa && - c->source->sa->sa_family != mx->host->sa->sa_family) { + if ((c->source->sa && + c->source->sa->sa_family != mx->host->sa->sa_family) || + (l->family && l->family != mx->host->sa->sa_family)) { + log_debug("debug: mta-routing: skipping host %s: AF mismatch", + mta_host_to_text(mx->host)); family_mismatch = 1; continue; } - if (mx->host->nconn >= MAXCONN_PER_HOST) { + if (mx->host->nconn >= l->maxconn_per_host) { + log_debug("debug: mta-routing: skipping host %s: too many connections", + mta_host_to_text(mx->host)); limit_host = 1; continue; } + if (mx->host->lastconn + l->conndelay_host > now) { + log_debug("debug: mta-routing: skipping host %s: cannot use before %llus", + mta_host_to_text(mx->host), + (unsigned long long) mx->host->lastconn + l->conndelay_host - now); + if (tm == 0 || mx->host->lastconn + l->conndelay_host < tm) + tm = mx->host->lastconn + l->conndelay_host; + continue; + } + route = mta_route(c->source, mx->host); - if (route->nconn >= MAXCONN_PER_ROUTE) { + if (route->flags & ROUTE_DISABLED) { + log_debug("debug: mta-routing: skipping route %s: suspend", + mta_route_to_text(route)); + suspended_route |= route->flags & ROUTE_DISABLED; + mta_route_unref(route); /* from here */ + continue; + } + + if (route->nconn && (route->flags & ROUTE_NEW)) { + log_debug("debug: mta-routing: skipping route %s: not validated yet", + mta_route_to_text(route)); + limit_route = 1; + mta_route_unref(route); /* from here */ + continue; + } + + if (route->nconn >= l->maxconn_per_route) { + log_debug("debug: mta-routing: skipping route %s: too many connections", + mta_route_to_text(route)); limit_route = 1; mta_route_unref(route); /* from here */ continue; } + if (route->lastconn + l->conndelay_route > now) { + log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after connect)", + mta_route_to_text(route), + (unsigned long long) route->lastconn + l->conndelay_route - now); + if (tm == 0 || route->lastconn + l->conndelay_route < tm) + tm = route->lastconn + l->conndelay_route; + mta_route_unref(route); /* from here */ + continue; + } + + if (route->lastdisc + l->discdelay_route > now) { + log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after disconnect)", + mta_route_to_text(route), + (unsigned long long) route->lastdisc + l->discdelay_route - now); + if (tm == 0 || route->lastdisc + l->discdelay_route < tm) + tm = route->lastdisc + l->discdelay_route; + mta_route_unref(route); /* from here */ + continue; + } + /* Use the route with the lowest number of connections. */ if (best && route->nconn >= best->nconn) { + log_debug("debug: mta-routing: skipping route %s: current one is better", + mta_route_to_text(route)); mta_route_unref(route); /* from here */ continue; } @@ -1101,30 +1405,43 @@ mta_find_route(struct mta_connector *c) if (best) mta_route_unref(best); /* from here */ best = route; + log_debug("debug: mta-routing: selecting candidate route %s", + mta_route_to_text(route)); } if (best) return (best); + /* Order is important */ if (seen == 0) { - log_info("smtp-out: No reachable MX for connector %s", + log_info("smtp-out: No MX found for %s", mta_connector_to_text(c)); - c->flags |= CONNECTOR_MX_ERROR; + c->flags |= CONNECTOR_ERROR_MX; } else if (limit_route) { - log_debug("debug: mta: hit route limit on connector %s", - mta_connector_to_text(c)); - c->flags |= CONNECTOR_LIMIT_ROUTE; + log_debug("debug: mta: hit route limit"); + *limits |= CONNECTOR_LIMIT_ROUTE; } else if (limit_host) { - log_debug("debug: mta: hit host limit on connector %s", - mta_connector_to_text(c)); - c->flags |= CONNECTOR_LIMIT_HOST; + log_debug("debug: mta: hit host limit"); + *limits |= CONNECTOR_LIMIT_HOST; + } + else if (tm) { + if (tm > *nextconn) + *nextconn = tm; } else if (family_mismatch) { - log_info("smtp-out: Address family mismatch on connector %s", + log_info("smtp-out: Address family mismatch on %s", mta_connector_to_text(c)); - c->flags |= CONNECTOR_FAMILY_ERROR; + c->flags |= CONNECTOR_ERROR_FAMILY; + } + else if (suspended_route) { + log_info("smtp-out: No valid route for %s", + mta_connector_to_text(c)); + if (suspended_route & ROUTE_DISABLED_NET) + c->flags |= CONNECTOR_ERROR_ROUTE_NET; + if (suspended_route & ROUTE_DISABLED_SMTP) + c->flags |= CONNECTOR_ERROR_ROUTE_SMTP; } return (NULL); @@ -1134,33 +1451,16 @@ static void mta_log(const struct mta_envelope *evp, const char *prefix, const char *source, const char *relay, const char *status) { - char session[SMTPD_MAXLINESIZE]; - char rcpt[SMTPD_MAXLINESIZE]; - char src[SMTPD_MAXLINESIZE]; - - session[0] = '\0'; - if (evp->session) - snprintf(session, sizeof session, "session=%016"PRIx64", ", - evp->session); - - rcpt[0] = '\0'; - if (evp->rcpt) - snprintf(rcpt, sizeof rcpt, "rcpt=<%s>, ", evp->rcpt); - - src[0] = '\0'; - if (source) - snprintf(src, sizeof src, "source=%s, ", source); - - - log_info("relay: %s for %016" PRIx64 ": %sfrom=<%s>, to=<%s>, " - "%s%srelay=%s, delay=%s, stat=%s", + log_info("relay: %s for %016" PRIx64 ": session=%016"PRIx64", " + "from=<%s>, to=<%s>, rcpt=<%s>, source=%s, " + "relay=%s, delay=%s, stat=%s", prefix, evp->id, - session, + evp->session, evp->task->sender, evp->dest, - src, - rcpt, + evp->rcpt ? evp->rcpt : "-", + source ? source : "-", relay, duration_to_text(time(NULL) - evp->creation), status); @@ -1204,10 +1504,6 @@ mta_relay(struct envelope *e) if ((r = SPLAY_FIND(mta_relay_tree, &relays, &key)) == NULL) { r = xcalloc(1, sizeof *r, "mta_relay"); TAILQ_INIT(&r->tasks); - TAILQ_INIT(&r->c_ready); - TAILQ_INIT(&r->c_delay); - TAILQ_INIT(&r->c_limit); - TAILQ_INIT(&r->c_error); r->id = generate_uid(); r->flags = key.flags; r->domain = key.domain; @@ -1227,12 +1523,9 @@ mta_relay(struct envelope *e) r->helotable = xstrdup(key.helotable, "mta: helotable"); SPLAY_INSERT(mta_relay_tree, &relays, r); - evtimer_set(&r->ev, mta_relay_timeout, r); - log_trace(TRACE_MTA, "mta: new %s", mta_relay_to_text(r)); stat_increment("mta.relay", 1); } else { mta_domain_unref(key.domain); /* from here */ - log_trace(TRACE_MTA, "mta: reusing %s", mta_relay_to_text(r)); } r->refcount++; @@ -1267,9 +1560,6 @@ mta_relay_unref(struct mta_relay *relay) free(relay->secret); free(relay->sourcetable); - if (evtimer_pending(&relay->ev, NULL)) - evtimer_del(&relay->ev); - mta_domain_unref(relay->domain); /* from constructor */ free(relay); stat_decrement("mta.relay", 1); @@ -1594,14 +1884,11 @@ mta_connector(struct mta_relay *relay, struct mta_source *source) c = xcalloc(1, sizeof(*c), "mta_connector"); c->relay = relay; c->source = source; + c->flags |= CONNECTOR_NEW; mta_source_ref(source); - c->queue = &relay->c_ready; - TAILQ_INSERT_HEAD(c->queue, c, lst_entry); tree_xset(&relay->connectors, (uintptr_t)(source), c); - relay->nconnector++; stat_increment("mta.connector", 1); - log_debug("debug: mta: new connector %s", - mta_connector_to_text(c)); + log_debug("debug: mta: new %s", mta_connector_to_text(c)); } return (c); @@ -1610,10 +1897,17 @@ mta_connector(struct mta_relay *relay, struct mta_source *source) static void mta_connector_free(struct mta_connector *c) { - c->relay->nconnector--; - TAILQ_REMOVE(c->queue, c, lst_entry); - mta_source_unref(c->source); + log_debug("debug: mta: freeing %s", + mta_connector_to_text(c)); + + if (c->flags & CONNECTOR_WAIT) { + log_debug("debug: mta: canceling timeout for %s", + mta_connector_to_text(c)); + runq_cancel(runq_connector, NULL, c); + } + mta_source_unref(c->source); /* from constructor */ free(c); + stat_decrement("mta.connector", 1); } @@ -1622,9 +1916,10 @@ mta_connector_to_text(struct mta_connector *c) { static char buf[1024]; - snprintf(buf, sizeof buf, "%s->%s", + snprintf(buf, sizeof buf, "[connector:%s->%s,0x%x]", mta_source_to_text(c->source), - mta_relay_to_text(c->relay)); + mta_relay_to_text(c->relay), + c->flags); return (buf); } @@ -1632,6 +1927,7 @@ static struct mta_route * mta_route(struct mta_source *src, struct mta_host *dst) { struct mta_route key, *r; + static uint64_t rid = 0; key.src = src; key.dst = dst; @@ -1641,30 +1937,80 @@ mta_route(struct mta_source *src, struct mta_host *dst) r = xcalloc(1, sizeof(*r), "mta_route"); r->src = src; r->dst = dst; + r->flags |= ROUTE_NEW; + r->id = ++rid; SPLAY_INSERT(mta_route_tree, &routes, r); mta_source_ref(src); mta_host_ref(dst); stat_increment("mta.route", 1); } + else if (r->flags & ROUTE_RUNQ) { + log_debug("debug: mta: mta_route_ref(): canceling runq for route %s", + mta_route_to_text(r)); + r->flags &= ~(ROUTE_RUNQ | ROUTE_KEEPALIVE); + runq_cancel(runq_route, NULL, r); + r->refcount--; /* from mta_route_unref() */ + } r->refcount++; return (r); } -#if 0 static void mta_route_ref(struct mta_route *r) { r->refcount++; } -#endif static void mta_route_unref(struct mta_route *r) { + time_t sched, now; + int delay; + if (--r->refcount) return; + /* + * Nothing references this route, but we might want to keep it alive + * for a while. + */ + now = time(NULL); + sched = 0; + + if (r->penalty) { +#if DELAY_QUADRATIC + delay = DELAY_ROUTE_BASE * r->penalty * r->penalty; +#else + delay = 15 * 60; +#endif + if (delay > DELAY_ROUTE_MAX) + delay = DELAY_ROUTE_MAX; + sched = r->lastpenalty + delay; + log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (penalty %i)", + mta_route_to_text(r), (unsigned long long) sched - now, r->penalty); + } else if (!(r->flags & ROUTE_KEEPALIVE)) { + if (r->lastconn + max_seen_conndelay_route > now) + sched = r->lastconn + max_seen_conndelay_route; + if (r->lastdisc + max_seen_discdelay_route > now && + r->lastdisc + max_seen_discdelay_route < sched) + sched = r->lastdisc + max_seen_discdelay_route; + + if (sched > now) + log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (imposed delay)", + mta_route_to_text(r), (unsigned long long) sched - now); + } + + if (sched > now) { + r->flags |= ROUTE_RUNQ; + runq_schedule(runq_route, sched, NULL, r); + r->refcount++; + return; + } + + log_debug("debug: mta: ma_route_unref(): really discarding route %s", + mta_route_to_text(r)); + SPLAY_REMOVE(mta_route_tree, &routes, r); mta_source_unref(r->src); /* from constructor */ mta_host_unref(r->dst); /* from constructor */ @@ -1701,3 +2047,94 @@ mta_route_cmp(const struct mta_route *a, const struct mta_route *b) } SPLAY_GENERATE(mta_route_tree, mta_route, entry, mta_route_cmp); + + +/* hoststat errors are not critical, we do best effort */ +void +mta_hoststat_update(const char *host, const char *error) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + time_t tm; + + if (! lowercase(buf, host, sizeof buf)) + return; + + tm = time(NULL); + hs = dict_get(&hoststat, buf); + if (hs == NULL) { + hs = calloc(1, sizeof *hs); + if (hs == NULL) + return; + tree_init(&hs->deferred); + runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs); + } + strlcpy(hs->name, buf, sizeof hs->name); + strlcpy(hs->error, error, sizeof hs->error); + hs->tm = time(NULL); + dict_set(&hoststat, buf, hs); + + runq_cancel(runq_hoststat, NULL, hs); + runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs); +} + +void +mta_hoststat_cache(const char *host, uint64_t evpid) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + + if (! lowercase(buf, host, sizeof buf)) + return; + + hs = dict_get(&hoststat, buf); + if (hs == NULL) + return; + + tree_set(&hs->deferred, evpid, NULL); +} + +void +mta_hoststat_uncache(const char *host, uint64_t evpid) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + + if (! lowercase(buf, host, sizeof buf)) + return; + + hs = dict_get(&hoststat, buf); + if (hs == NULL) + return; + + tree_pop(&hs->deferred, evpid); +} + +void +mta_hoststat_reschedule(const char *host) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + uint64_t evpid; + + if (! lowercase(buf, host, sizeof buf)) + return; + + hs = dict_get(&hoststat, buf); + if (hs == NULL) + return; + + while (tree_poproot(&hs->deferred, &evpid, NULL)) { + m_compose(p_queue, IMSG_MTA_SCHEDULE, 0, 0, -1, + &evpid, sizeof evpid); + } +} + +static void +mta_hoststat_remove_entry(struct hoststat *hs) +{ + while (tree_poproot(&hs->deferred, NULL, NULL)) + ; + dict_pop(&hoststat, hs->name); + runq_cancel(runq_hoststat, NULL, hs); +} diff --git a/usr.sbin/smtpd/mta_session.c b/usr.sbin/smtpd/mta_session.c index a5a00d6823c..e3c8896c96d 100644 --- a/usr.sbin/smtpd/mta_session.c +++ b/usr.sbin/smtpd/mta_session.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta_session.c,v 1.38 2013/07/19 07:37:29 eric Exp $ */ +/* $OpenBSD: mta_session.c,v 1.39 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard @@ -45,9 +45,9 @@ #include "log.h" #include "ssl.h" -#define MAX_MAIL 100 +#define MAX_TRYBEFOREDISABLE 10 -#define MTA_HIWAT 65535 +#define MTA_HIWAT 65535 enum mta_state { MTA_INIT, @@ -84,11 +84,17 @@ enum mta_state { #define MTA_FREE 0x0400 #define MTA_LMTP 0x0800 #define MTA_WAIT 0x1000 +#define MTA_HANGON 0x2000 #define MTA_EXT_STARTTLS 0x01 #define MTA_EXT_AUTH 0x02 #define MTA_EXT_PIPELINING 0x04 +struct failed_evp { + int delivery; + char error[SMTPD_MAXLINESIZE]; + struct mta_envelope *evp; +}; struct mta_session { uint64_t id; @@ -108,13 +114,19 @@ struct mta_session { struct io io; int ext; - int msgcount; - int rcptcount; + size_t msgtried; + size_t msgcount; + size_t rcptcount; + int hangon; enum mta_state state; struct mta_task *task; struct mta_envelope *currevp; FILE *datafp; + +#define MAX_FAILED_ENVELOPES 15 + struct failed_evp failed[MAX_FAILED_ENVELOPES]; + int failedcount; }; static void mta_session_init(void); @@ -122,9 +134,10 @@ static void mta_start(int fd, short ev, void *arg); static void mta_io(struct io *, int); static void mta_free(struct mta_session *); static void mta_on_ptr(void *, void *, void *); +static void mta_on_timeout(struct runq *, void *); static void mta_connect(struct mta_session *); static void mta_enter_state(struct mta_session *, int); -static void mta_flush_task(struct mta_session *, int, const char *, size_t); +static void mta_flush_task(struct mta_session *, int, const char *, size_t, int); static void mta_error(struct mta_session *, const char *, ...); static void mta_send(struct mta_session *, char *, ...); static ssize_t mta_queue_data(struct mta_session *); @@ -134,6 +147,11 @@ static int mta_check_loop(FILE *); static void mta_start_tls(struct mta_session *); static int mta_verify_certificate(struct mta_session *); static struct mta_session *mta_tree_pop(struct tree *, uint64_t); +static void mta_flush_failedqueue(struct mta_session *); +void mta_hoststat_update(const char *, const char *); +void mta_hoststat_reschedule(const char *); +void mta_hoststat_cache(const char *, uint64_t); +void mta_hoststat_uncache(const char *, uint64_t); static struct tree wait_helo; static struct tree wait_ptr; @@ -141,6 +159,8 @@ static struct tree wait_fd; static struct tree wait_ssl_init; static struct tree wait_ssl_verify; +static struct runq *hangon; + static void mta_session_init(void) { @@ -152,6 +172,7 @@ mta_session_init(void) tree_init(&wait_fd); tree_init(&wait_ssl_init); tree_init(&wait_ssl_verify); + runq_init(&hangon, mta_on_timeout); init = 1; } } @@ -245,7 +266,7 @@ mta_session_imsg(struct mproc *p, struct imsg *imsg) if (imsg->fd == -1) { log_debug("debug: mta: failed to obtain msg fd"); mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL, - "Could not get message fd", 0); + "Could not get message fd", 0, 0); mta_enter_state(s, MTA_READY); io_reload(&s->io); return; @@ -260,7 +281,7 @@ mta_session_imsg(struct mproc *p, struct imsg *imsg) fclose(s->datafp); s->datafp = NULL; mta_flush_task(s, IMSG_DELIVERY_LOOP, - "Loop detected", 0); + "Loop detected", 0, 0); mta_enter_state(s, MTA_READY); } else { mta_enter_state(s, MTA_MAIL); @@ -295,7 +316,7 @@ mta_session_imsg(struct mproc *p, struct imsg *imsg) return; if (resp_ca_cert->status == CA_FAIL) { - log_info("smtp-out: Disconnecting session %016" PRIx64 + log_info("smtp-out: Disconnecting session %016"PRIx64 ": CA failure", s->id); mta_free(s); return; @@ -391,6 +412,16 @@ mta_free(struct mta_session *s) log_debug("debug: mta: %p: session done", s); + if (s->ready) + s->relay->nconn_ready -= 1; + + if (s->flags & MTA_HANGON) { + log_debug("debug: mta: %p: cancelling hangon timer", s); + runq_cancel(hangon, NULL, s); + } + + mta_flush_failedqueue(s); + io_clear(&s->io); iobuf_clear(&s->iobuf); @@ -408,6 +439,20 @@ mta_free(struct mta_session *s) mta_route_collect(relay, route); } +static void +mta_on_timeout(struct runq *runq, void *arg) +{ + struct mta_session *s = arg; + + log_debug("mta: timeout for session hangon"); + + s->flags &= ~MTA_HANGON; + s->hangon++; + + mta_enter_state(s, MTA_READY); + io_reload(&s->io); +} + static void mta_on_ptr(void *tag, void *arg, void *data) { @@ -469,7 +514,8 @@ mta_connect(struct mta_session *s) break; } default: - goto fail; + mta_free(s); + return; } portno = s->use_smtps ? 465 : 25; @@ -518,12 +564,6 @@ mta_connect(struct mta_session *s) mta_error(s, "Connection failed: %s", s->io.error); mta_free(s); } - return; - -fail: - mta_error(s, "Could not connect"); - mta_free(s); - return; } static void @@ -597,10 +637,19 @@ mta_enter_state(struct mta_session *s, int newstate) /* Ready to send a new mail */ if (s->ready == 0) { s->ready = 1; + s->relay->nconn_ready += 1; mta_route_ok(s->relay, s->route); } - if (s->msgcount >= MAX_MAIL) { + if (s->msgtried >= MAX_TRYBEFOREDISABLE) { + log_info("smtp-out: Remote host seems to reject all mails on session %016"PRIx64, + s->id); + mta_route_down(s->relay, s->route); + mta_enter_state(s, MTA_QUIT); + break; + } + + if (s->msgcount >= s->relay->limits->max_mail_per_session) { log_debug("debug: mta: " "%p: cannot send more message to relay %s", s, mta_relay_to_text(s->relay)); @@ -612,7 +661,17 @@ mta_enter_state(struct mta_session *s, int newstate) if (s->task == NULL) { log_debug("debug: mta: %p: no task for relay %s", s, mta_relay_to_text(s->relay)); - mta_enter_state(s, MTA_QUIT); + + if (s->relay->nconn > 1 || + s->hangon >= s->relay->limits->sessdelay_keepalive) { + mta_enter_state(s, MTA_QUIT); + break; + } + + log_debug("mta: debug: last connection: hanging on for %is", + s->relay->limits->sessdelay_keepalive - s->hangon); + s->flags |= MTA_HANGON; + runq_schedule(hangon, time(NULL) + 1, NULL, s); break; } @@ -631,6 +690,8 @@ mta_enter_state(struct mta_session *s, int newstate) break; case MTA_MAIL: + s->hangon = 0; + s->msgtried++; mta_send(s, "MAIL FROM:<%s>", s->task->sender); break; @@ -699,10 +760,12 @@ static void mta_response(struct mta_session *s, char *line) { struct mta_envelope *e; - char buf[SMTPD_MAXLINESIZE]; - int delivery; + struct failed_evp *fevp; struct sockaddr sa; + const char *domain; socklen_t sa_len; + char buf[SMTPD_MAXLINESIZE]; + int delivery; switch (s->state) { @@ -779,7 +842,7 @@ mta_response(struct mta_session *s, char *line) delivery = IMSG_DELIVERY_PERMFAIL; else delivery = IMSG_DELIVERY_TEMPFAIL; - mta_flush_task(s, delivery, line, 0); + mta_flush_task(s, delivery, line, 0, 0); mta_enter_state(s, MTA_RSET); return; } @@ -788,20 +851,38 @@ mta_response(struct mta_session *s, char *line) case MTA_RCPT: e = s->currevp; + + /* remove envelope from hosttat cache if there */ + if ((domain = strchr(e->dest, '@')) != NULL) { + domain++; + mta_hoststat_uncache(domain, e->id); + } + s->currevp = TAILQ_NEXT(s->currevp, entry); - if (line[0] != '2') { + if (line[0] == '2') { + mta_flush_failedqueue(s); + /* + * this host is up, reschedule envelopes that + * were cached for reschedule. + */ + if (domain) + mta_hoststat_reschedule(domain); + } + else { if (line[0] == '5') delivery = IMSG_DELIVERY_PERMFAIL; else delivery = IMSG_DELIVERY_TEMPFAIL; + /* remove failed envelope from task list */ TAILQ_REMOVE(&s->task->envelopes, e, entry); + stat_decrement("mta.envelope", 1); + + /* log right away */ snprintf(buf, sizeof(buf), "%s", mta_host_to_text(s->route->dst)); - /* we're about to log, associate session to envelope */ e->session = s->id; - /* XXX */ /* * getsockname() can only fail with ENOBUFS here @@ -809,22 +890,45 @@ mta_response(struct mta_session *s, char *line) */ sa_len = sizeof sa; if (getsockname(s->io.sock, &sa, &sa_len) < 0) - mta_delivery(e, NULL, buf, delivery, line); + mta_delivery_log(e, NULL, buf, delivery, line); else - mta_delivery(e, sa_to_text(&sa), + mta_delivery_log(e, sa_to_text(&sa), buf, delivery, line); - free(e->dest); - free(e->rcpt); - free(e); - stat_decrement("mta.envelope", 1); + /* push failed envelope to the session fail queue */ + e->delivery = delivery; + fevp = &s->failed[s->failedcount]; + fevp->delivery = delivery; + fevp->evp = e; + strlcpy(fevp->error, line, sizeof fevp->error); + s->failedcount++; + + /* + * if session fail queue is full: + * - flush failed queue (failure w/ penalty) + * - flush remaining tasks with TempFail + * - mark route down + */ + if (s->failedcount == MAX_FAILED_ENVELOPES) { + mta_flush_failedqueue(s); + mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL, + "Host temporarily disabled", 0, 1); + mta_route_down(s->relay, s->route); + mta_enter_state(s, MTA_QUIT); + break; + } + + /* + * if no more envelopes, flush failed queue + */ if (TAILQ_EMPTY(&s->task->envelopes)) { mta_flush_task(s, IMSG_DELIVERY_OK, - "No envelope", 0); + "No envelope", 0, 0); mta_enter_state(s, MTA_RSET); break; } } + if (s->currevp == NULL) mta_enter_state(s, MTA_DATA); else @@ -832,6 +936,7 @@ mta_response(struct mta_session *s, char *line) break; case MTA_DATA: + mta_flush_failedqueue(s); if (line[0] == '2' || line[0] == '3') { mta_enter_state(s, MTA_BODY); break; @@ -840,7 +945,7 @@ mta_response(struct mta_session *s, char *line) delivery = IMSG_DELIVERY_PERMFAIL; else delivery = IMSG_DELIVERY_TEMPFAIL; - mta_flush_task(s, delivery, line, 0); + mta_flush_task(s, delivery, line, 0, 0); mta_enter_state(s, MTA_RSET); break; @@ -848,24 +953,47 @@ mta_response(struct mta_session *s, char *line) case MTA_EOM: if (line[0] == '2') { delivery = IMSG_DELIVERY_OK; + s->msgtried = 0; s->msgcount++; } else if (line[0] == '5') delivery = IMSG_DELIVERY_PERMFAIL; else delivery = IMSG_DELIVERY_TEMPFAIL; - mta_flush_task(s, delivery, line, (s->flags & MTA_LMTP) ? 1 : 0 ); + mta_flush_task(s, delivery, line, (s->flags & MTA_LMTP) ? 1 : 0, 0); if (s->task) { s->rcptcount--; mta_enter_state(s, MTA_LMTP_EOM); } else { s->rcptcount = 0; - mta_enter_state(s, MTA_READY); + if (s->relay->limits->sessdelay_transaction) { + log_debug("debug: mta: waiting for %llis before next transaction", + (long long int)s->relay->limits->sessdelay_transaction); + s->hangon = s->relay->limits->sessdelay_transaction -1; + s->flags |= MTA_HANGON; + runq_schedule(hangon, time(NULL) + + s->relay->limits->sessdelay_transaction, + NULL, s); + } + else + mta_enter_state(s, MTA_READY); } break; case MTA_RSET: - mta_enter_state(s, MTA_READY); + mta_flush_failedqueue(s); + s->rcptcount = 0; + if (s->relay->limits->sessdelay_transaction) { + log_debug("debug: mta: waiting for %llis after reset", + (long long int)s->relay->limits->sessdelay_transaction); + s->hangon = s->relay->limits->sessdelay_transaction -1; + s->flags |= MTA_HANGON; + runq_schedule(hangon, time(NULL) + + s->relay->limits->sessdelay_transaction, + NULL, s); + } + else + mta_enter_state(s, MTA_READY); break; default: @@ -961,7 +1089,7 @@ mta_io(struct io *io, int evt) if (s->state == MTA_QUIT) { log_info("smtp-out: Closing session %016"PRIx64 - ": %i message%s sent.", s->id, s->msgcount, + ": %zu message%s sent.", s->id, s->msgcount, (s->msgcount > 1) ? "s" : ""); mta_free(s); return; @@ -1072,7 +1200,7 @@ mta_queue_data(struct mta_session *s) if (ferror(s->datafp)) { mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL, - "Error reading content file", 0); + "Error reading content file", 0, 0); return (-1); } @@ -1085,16 +1213,17 @@ mta_queue_data(struct mta_session *s) } static void -mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t count) +mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t count, + int cache) { struct mta_envelope *e; char relay[SMTPD_MAXLINESIZE]; size_t n; struct sockaddr sa; socklen_t sa_len; + const char *domain; snprintf(relay, sizeof relay, "%s", mta_host_to_text(s->route->dst)); - n = 0; while ((e = TAILQ_FIRST(&s->task->envelopes))) { @@ -1115,10 +1244,17 @@ mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t co */ sa_len = sizeof sa; if (getsockname(s->io.sock, &sa, &sa_len) < 0) - mta_delivery(e, NULL, relay, delivery, error); + mta_delivery(e, NULL, relay, delivery, error, 0); else mta_delivery(e, sa_to_text(&sa), - relay, delivery, error); + relay, delivery, error, 0); + + domain = strchr(e->dest, '@'); + if (domain) { + mta_hoststat_update(domain + 1, error); + if (cache) + mta_hoststat_cache(domain + 1, e->id); + } free(e->dest); free(e->rcpt); @@ -1140,6 +1276,32 @@ mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t co stat_decrement("mta.task", 1); } +static void +mta_flush_failedqueue(struct mta_session *s) +{ + int i; + struct failed_evp *fevp; + struct mta_envelope *e; + const char *domain; + uint32_t penalty; + + penalty = s->failedcount == MAX_FAILED_ENVELOPES ? 1 : 0; + for (i = 0; i < s->failedcount; ++i) { + fevp = &s->failed[i]; + e = fevp->evp; + mta_delivery_notify(e, fevp->delivery, fevp->error, penalty); + + domain = strchr(e->dest, '@'); + if (domain) + mta_hoststat_update(domain + 1, fevp->error); + + free(e->dest); + free(e->rcpt); + free(e); + } + s->failedcount = 0; +} + static void mta_error(struct mta_session *s, const char *fmt, ...) { @@ -1154,7 +1316,7 @@ mta_error(struct mta_session *s, const char *fmt, ...) if (s->msgcount) log_info("smtp-out: Error on session %016"PRIx64 - " after %i message%s sent: %s", s->id, s->msgcount, + " after %zu message%s sent: %s", s->id, s->msgcount, (s->msgcount > 1) ? "s" : "", error); else log_info("smtp-out: Error on session %016"PRIx64 ": %s", @@ -1173,7 +1335,7 @@ mta_error(struct mta_session *s, const char *fmt, ...) mta_route_error(s->relay, s->route); if (s->task) - mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL, error, 0); + mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL, error, 0, 0); free(error); } @@ -1307,6 +1469,7 @@ mta_verify_certificate(struct mta_session *s) return 1; } + #define CASE(x) case x : return #x static const char * diff --git a/usr.sbin/smtpd/parse.y b/usr.sbin/smtpd/parse.y index c31187f15c1..dedaa27c582 100644 --- a/usr.sbin/smtpd/parse.y +++ b/usr.sbin/smtpd/parse.y @@ -1,4 +1,4 @@ -/* $OpenBSD: parse.y,v 1.122 2013/07/19 20:37:07 eric Exp $ */ +/* $OpenBSD: parse.y,v 1.123 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -91,6 +91,7 @@ static int errors = 0; struct table *table = NULL; struct rule *rule = NULL; struct listener l; +struct mta_limits *limits; struct listener *host_v4(const char *, in_port_t); struct listener *host_v6(const char *, in_port_t); @@ -119,9 +120,9 @@ typedef struct { %} %token AS QUEUE COMPRESSION ENCRYPTION MAXMESSAGESIZE LISTEN ON ANY PORT EXPIRE -%token TABLE SSL SMTPS CERTIFICATE DOMAIN BOUNCEWARN INET4 INET6 +%token TABLE SSL SMTPS CERTIFICATE DOMAIN BOUNCEWARN LIMIT INET4 INET6 %token RELAY BACKUP VIA DELIVER TO LMTP MAILDIR MBOX HOSTNAME HELO -%token ACCEPT REJECT INCLUDE ERROR MDA FROM FOR SOURCE +%token ACCEPT REJECT INCLUDE ERROR MDA FROM FOR SOURCE MTA %token ARROW AUTH TLS LOCAL VIRTUAL TAG TAGGED ALIAS FILTER KEY %token AUTH_OPTIONAL TLS_REQUIRE USERBASE SENDER %token STRING @@ -339,6 +340,26 @@ listen_helo : HOSTNAME STRING { $$ = $2; } | /* empty */ { $$ = NULL; } ; +opt_limit : INET4 { + limits->family = AF_INET; + } + | INET6 { + limits->family = AF_INET6; + } + | STRING NUMBER { + if (!limit_mta_set(limits, $1, $2)) { + yyerror("invalid limit keyword"); + free($1); + YYERROR; + } + free($1); + } + ; + +limits : opt_limit limits + | /* empty */ + ; + main : BOUNCEWARN { bzero(conf->sc_bounce_warn, sizeof conf->sc_bounce_warn); } bouncedelays @@ -361,6 +382,21 @@ main : BOUNCEWARN { | MAXMESSAGESIZE size { conf->sc_maxsize = $2; } + | LIMIT MTA FOR DOMAIN STRING { + struct mta_limits *d; + + limits = dict_get(conf->sc_limits_dict, $5); + if (limits == NULL) { + limits = xcalloc(1, sizeof(*limits), "mta_limits"); + dict_xset(conf->sc_limits_dict, $5, limits); + d = dict_xget(conf->sc_limits_dict, "default"); + memmove(limits, d, sizeof(*limits)); + } + free($5); + } limits + | LIMIT MTA { + limits = dict_get(conf->sc_limits_dict, "default"); + } limits | LISTEN { bzero(&l, sizeof l); } ON STRING address_family port ssl certificate auth tag listen_helo { @@ -990,6 +1026,7 @@ lookup(char *s) { "inet4", INET4 }, { "inet6", INET6 }, { "key", KEY }, + { "limit", LIMIT }, { "listen", LISTEN }, { "lmtp", LMTP }, { "local", LOCAL }, @@ -997,6 +1034,7 @@ lookup(char *s) { "max-message-size", MAXMESSAGESIZE }, { "mbox", MBOX }, { "mda", MDA }, + { "mta", MTA }, { "on", ON }, { "port", PORT }, { "queue", QUEUE }, @@ -1359,6 +1397,7 @@ parse_config(struct smtpd *x_conf, const char *filename, int opts) conf->sc_rules = calloc(1, sizeof(*conf->sc_rules)); conf->sc_listeners = calloc(1, sizeof(*conf->sc_listeners)); conf->sc_ssl_dict = calloc(1, sizeof(*conf->sc_ssl_dict)); + conf->sc_limits_dict = calloc(1, sizeof(*conf->sc_limits_dict)); /* Report mails delayed for more than 4 hours */ conf->sc_bounce_warn[0] = 3600 * 4; @@ -1366,12 +1405,14 @@ parse_config(struct smtpd *x_conf, const char *filename, int opts) if (conf->sc_tables_dict == NULL || conf->sc_rules == NULL || conf->sc_listeners == NULL || - conf->sc_ssl_dict == NULL) { + conf->sc_ssl_dict == NULL || + conf->sc_limits_dict == NULL) { log_warn("warn: cannot allocate memory"); free(conf->sc_tables_dict); free(conf->sc_rules); free(conf->sc_listeners); free(conf->sc_ssl_dict); + free(conf->sc_limits_dict); return (-1); } @@ -1385,6 +1426,11 @@ parse_config(struct smtpd *x_conf, const char *filename, int opts) dict_init(conf->sc_ssl_dict); dict_init(conf->sc_tables_dict); + dict_init(conf->sc_limits_dict); + limits = xcalloc(1, sizeof(*limits), "mta_limits"); + limit_mta_set_defaults(limits); + dict_xset(conf->sc_limits_dict, "default", limits); + TAILQ_INIT(conf->sc_listeners); TAILQ_INIT(conf->sc_rules); diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index 177be1a7c38..d328979f5bd 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.152 2013/07/19 20:37:07 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.153 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -63,7 +63,6 @@ queue_imsg(struct mproc *p, struct imsg *imsg) struct delivery_bounce bounce; struct bounce_req_msg *req_bounce; struct envelope evp; - static uint64_t batch_id; struct msg m; const char *reason; uint64_t reqid, evpid; @@ -266,14 +265,7 @@ queue_imsg(struct mproc *p, struct imsg *imsg) bounce_add(evpid); return; - case IMSG_MTA_BATCH: - batch_id = generate_uid(); - m_create(p_mta, IMSG_MTA_BATCH, 0, 0, -1); - m_add_id(p_mta, batch_id); - m_close(p_mta); - return; - - case IMSG_MTA_BATCH_ADD: + case IMSG_MTA_TRANSFER: m_msg(&m, imsg); m_get_evpid(&m, &evpid); m_end(&m); @@ -286,18 +278,11 @@ queue_imsg(struct mproc *p, struct imsg *imsg) return; } evp.lasttry = time(NULL); - m_create(p_mta, IMSG_MTA_BATCH_ADD, 0, 0, -1); - m_add_id(p_mta, batch_id); + m_create(p_mta, IMSG_MTA_TRANSFER, 0, 0, -1); m_add_envelope(p_mta, &evp); m_close(p_mta); return; - case IMSG_MTA_BATCH_END: - m_create(p_mta, IMSG_MTA_BATCH_END, 0, 0, -1); - m_add_id(p_mta, batch_id); - m_close(p_mta); - return; - case IMSG_CTL_LIST_ENVELOPES: if (imsg->hdr.len == sizeof imsg->hdr) { m_forward(p_control, imsg); diff --git a/usr.sbin/smtpd/runq.c b/usr.sbin/smtpd/runq.c new file mode 100644 index 00000000000..30fd9e17f67 --- /dev/null +++ b/usr.sbin/smtpd/runq.c @@ -0,0 +1,203 @@ +/* $OpenBSD: runq.c,v 1.1 2013/07/19 21:14:52 eric Exp $ */ + +/* + * Copyright (c) 2013 Eric Faurot + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "smtpd.h" + +struct job { + TAILQ_ENTRY(job) entry; + time_t when; + void (*cb)(struct runq *, void *); + void *arg; +}; + +struct runq { + TAILQ_HEAD(, job) jobs; + void (*cb)(struct runq *, void *); + struct event ev; +}; + +static void runq_timeout(int, short, void *); + +static struct runq *active; + +static void +runq_reset(struct runq *runq) +{ + struct timeval tv; + struct job *job; + time_t now; + + job = TAILQ_FIRST(&runq->jobs); + if (job == NULL) + return; + + now = time(NULL); + if (job->when <= now) + tv.tv_sec = 0; + else + tv.tv_sec = job->when - now; + tv.tv_usec = 0; + evtimer_add(&runq->ev, &tv); +} + +static void +runq_timeout(int fd, short ev, void *arg) +{ + struct runq *runq = arg; + struct job *job; + time_t now; + + active = runq; + now = time(NULL); + + while((job = TAILQ_FIRST(&runq->jobs))) { + if (job->when > now) + break; + TAILQ_REMOVE(&runq->jobs, job, entry); + if (job->cb) + job->cb(runq, job->arg); + else + runq->cb(runq, job->arg); + free(job); + } + + active = NULL; + runq_reset(runq); +} + +int +runq_init(struct runq **runqp, void (*cb)(struct runq *, void *)) +{ + struct runq *runq; + + runq = malloc(sizeof(*runq)); + if (runq == NULL) + return (0); + + runq->cb = cb; + TAILQ_INIT(&runq->jobs); + evtimer_set(&runq->ev, runq_timeout, runq); + + *runqp = runq; + + return (1); +} + +int +runq_schedule(struct runq *runq, time_t when, void (*cb)(struct runq *, void *), + void *arg) +{ + struct job *job, *tmpjob; + + job = malloc(sizeof(*job)); + if (job == NULL) + return (0); + + job->arg = arg; + job->cb = cb; + job->when = when; + + TAILQ_FOREACH(tmpjob, &runq->jobs, entry) { + if (tmpjob->when > job->when) { + TAILQ_INSERT_BEFORE(tmpjob, job, entry); + goto done; + } + } + TAILQ_INSERT_TAIL(&runq->jobs, job, entry); + + done: + if (runq != active && job == TAILQ_FIRST(&runq->jobs)) { + evtimer_del(&runq->ev); + runq_reset(runq); + } + return (1); +} + +int +runq_delay(struct runq *runq, unsigned int delay, + void (*cb)(struct runq *, void *), void *arg) +{ + return runq_schedule(runq, time(NULL) + delay, cb, arg); +} + +int +runq_cancel(struct runq *runq, void (*cb)(struct runq *, void *), void *arg) +{ + struct job *job, *first; + + first = TAILQ_FIRST(&runq->jobs); + TAILQ_FOREACH(job, &runq->jobs, entry) { + if (job->cb == cb && job->arg == arg) { + TAILQ_REMOVE(&runq->jobs, job, entry); + free(job); + if (runq != active && job == first) { + evtimer_del(&runq->ev); + runq_reset(runq); + } + return (1); + } + } + + return (0); +} + +int +runq_pending(struct runq *runq, void (*cb)(struct runq *, void *), void *arg, + time_t *when) +{ + struct job *job; + + TAILQ_FOREACH(job, &runq->jobs, entry) { + if (job->cb == cb && job->arg == arg) { + if (when) + *when = job->when; + return (1); + } + } + + return (0); +} + +int +runq_next(struct runq *runq, void (**cb)(struct runq *, void *), void **arg, + time_t *when) +{ + struct job *job; + + job = TAILQ_FIRST(&runq->jobs); + if (job == NULL) + return (0); + if (cb) + *cb = job->cb; + if (arg) + *arg = job->arg; + if (when) + *when = job->when; + + return (1); +} diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index d6a07bdeed4..fb06b6bc016 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler.c,v 1.31 2013/07/19 15:14:23 eric Exp $ */ +/* $OpenBSD: scheduler.c,v 1.32 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -555,17 +555,13 @@ scheduler_process_mta(struct scheduler_batch *batch) { size_t i; - m_compose(p_queue, IMSG_MTA_BATCH, 0, 0, -1, NULL, 0); - for (i = 0; i < batch->evpcount; i++) { log_debug("debug: scheduler: evp:%016" PRIx64 " scheduled (mta)", batch->evpids[i]); - m_create(p_queue, IMSG_MTA_BATCH_ADD, 0, 0, -1); + m_create(p_queue, IMSG_MTA_TRANSFER, 0, 0, -1); m_add_evpid(p_queue, batch->evpids[i]); m_close(p_queue); } - m_compose(p_queue, IMSG_MTA_BATCH_END, 0, 0, -1, NULL, 0); - stat_increment("scheduler.envelope.inflight", batch->evpcount); } diff --git a/usr.sbin/smtpd/smtpctl.8 b/usr.sbin/smtpd/smtpctl.8 index e9d7ae4cfe4..22293162fac 100644 --- a/usr.sbin/smtpd/smtpctl.8 +++ b/usr.sbin/smtpd/smtpctl.8 @@ -1,4 +1,4 @@ -.\" $OpenBSD: smtpctl.8,v 1.42 2013/07/19 15:14:23 eric Exp $ +.\" $OpenBSD: smtpctl.8,v 1.43 2013/07/19 21:14:52 eric Exp $ .\" .\" Copyright (c) 2006 Pierre-Yves Ritschard .\" Copyright (c) 2012 Gilles Chehade @@ -79,7 +79,7 @@ Envelopes removed by the administrator. Generated bounces. .El .It Cm pause envelope Ar envelope-id | message-id -Temporarily suspend scheduling for the envelope with the given ID, +Temporarily suspend scheduling for the envelope with the given ID, or all envelopes with the given message ID. .It Cm pause mda Temporarily stop deliveries to local users. @@ -108,6 +108,9 @@ or all envelopes with the given message ID. Resume deliveries to local users. .It Cm resume mta Resume relaying and deliveries to remote users. +.It Cm resume route Ar route-id +Resume routing on disabled route +.Ar route-id . .It Cm resume smtp Resume accepting incoming sessions. .It Cm schedule all @@ -117,6 +120,19 @@ Mark a single envelope, or all envelopes with the same message ID, as ready for immediate delivery. .It Cm show envelope Ar envelope-id Display envelope content for the given ID. +.It Cm show hoststats +Display status of last delivery for domains that have been active in the +last 4 hours. +It consists of the following fields, separated by a "|": +.Pp +.Bl -bullet -compact +.It +domain. +.It +Unix timestamp of last delivery. +.It +Status of last delivery. +.El .It Cm show message Ar envelope-id Display message content for the given ID. .It Cm show queue @@ -160,6 +176,24 @@ is not running. .It Error string for the last failed delivery or relay attempt. .El +.It Cm show routes +Display status of routes currently known by +.Xr smtpd 8 . +Each line consists in a route number, a source address, a destination +address, a set of flags, the number of connections on this +route, the current penalty level which determines the amount of time +the route is disabled if an error occurs, and the delay before it +gets re-activated. The following flags are defined: +.Pp +.Bl -tag -width xx -compact +.It N +The route is new. No SMTP session has been established yet. +.It D +The route is currently disabled. +.It Q +The route as a timeout registered to lower its penality level and possibly +re-activate or discard it. +.El .It Cm show stats Displays runtime statistics concerning .Xr smtpd 8 . diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c index 6c7dfe3d766..7d453410690 100644 --- a/usr.sbin/smtpd/smtpctl.c +++ b/usr.sbin/smtpd/smtpctl.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpctl.c,v 1.107 2013/07/19 15:14:23 eric Exp $ */ +/* $OpenBSD: smtpctl.c,v 1.108 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot @@ -494,6 +494,20 @@ do_resume_mta(int argc, struct parameter *argv) return srv_check_result(); } +static int +do_resume_route(int argc, struct parameter *argv) +{ + uint64_t v; + + if (argc == 0) + v = 0; + else + v = argv[0].u.u_routeid; + + srv_send(IMSG_CTL_RESUME_ROUTE, &v, sizeof(v)); + return srv_check_result(); +} + static int do_resume_smtp(int argc, struct parameter *argv) { @@ -547,6 +561,23 @@ do_show_envelope(int argc, struct parameter *argv) return (0); } +static int +do_show_hoststats(int argc, struct parameter *argv) +{ + srv_send(IMSG_CTL_MTA_SHOW_HOSTSTATS, NULL, 0); + + do { + srv_recv(IMSG_CTL_MTA_SHOW_HOSTSTATS); + if (rlen) { + printf("%s\n", rdata); + srv_read(NULL, rlen); + } + srv_end(); + } while (rlen); + + return (0); +} + static int do_show_message(int argc, struct parameter *argv) { @@ -628,6 +659,23 @@ do_show_queue(int argc, struct parameter *argv) return (0); } +static int +do_show_routes(int argc, struct parameter *argv) +{ + srv_send(IMSG_CTL_MTA_SHOW_ROUTES, NULL, 0); + + do { + srv_recv(IMSG_CTL_MTA_SHOW_ROUTES); + if (rlen) { + printf("%s\n", rdata); + srv_read(NULL, rlen); + } + srv_end(); + } while (rlen); + + return (0); +} + static int do_show_stats(int argc, struct parameter *argv) { @@ -760,15 +808,18 @@ main(int argc, char **argv) cmd_install("resume envelope ", do_resume_envelope); cmd_install("resume mda", do_resume_mda); cmd_install("resume mta", do_resume_mta); + cmd_install("resume route ", do_resume_route); cmd_install("resume smtp", do_resume_smtp); cmd_install("schedule ", do_schedule); cmd_install("schedule ", do_schedule); cmd_install("schedule all", do_schedule); cmd_install("show envelope ", do_show_envelope); + cmd_install("show hoststats", do_show_hoststats); cmd_install("show message ", do_show_message); cmd_install("show message ", do_show_message); cmd_install("show queue", do_show_queue); cmd_install("show queue ", do_show_queue); + cmd_install("show routes", do_show_routes); cmd_install("show stats", do_show_stats); cmd_install("stop", do_stop); cmd_install("trace ", do_trace); diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c index 00a1a8cb0a7..71c96517381 100644 --- a/usr.sbin/smtpd/smtpd.c +++ b/usr.sbin/smtpd/smtpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.c,v 1.196 2013/07/19 20:37:07 eric Exp $ */ +/* $OpenBSD: smtpd.c,v 1.197 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -1426,6 +1426,7 @@ imsg_to_str(int type) CASE(IMSG_CTL_RESUME_MDA); CASE(IMSG_CTL_RESUME_MTA); CASE(IMSG_CTL_RESUME_SMTP); + CASE(IMSG_CTL_RESUME_ROUTE); CASE(IMSG_CTL_LIST_MESSAGES); CASE(IMSG_CTL_LIST_ENVELOPES); CASE(IMSG_CTL_REMOVE); @@ -1436,6 +1437,9 @@ imsg_to_str(int type) CASE(IMSG_CTL_PROFILE); CASE(IMSG_CTL_UNPROFILE); + CASE(IMSG_CTL_MTA_SHOW_ROUTES); + CASE(IMSG_CTL_MTA_SHOW_HOSTSTATS); + CASE(IMSG_CONF_START); CASE(IMSG_CONF_SSL); CASE(IMSG_CONF_LISTENER); @@ -1485,9 +1489,7 @@ imsg_to_str(int type) CASE(IMSG_MFA_SMTP_DATA); CASE(IMSG_MFA_SMTP_RESPONSE); - CASE(IMSG_MTA_BATCH); - CASE(IMSG_MTA_BATCH_ADD); - CASE(IMSG_MTA_BATCH_END); + CASE(IMSG_MTA_TRANSFER); CASE(IMSG_MTA_SCHEDULE); CASE(IMSG_QUEUE_CREATE_MESSAGE); diff --git a/usr.sbin/smtpd/smtpd.conf.5 b/usr.sbin/smtpd/smtpd.conf.5 index 492a9e7f0a2..e8a00535cca 100644 --- a/usr.sbin/smtpd/smtpd.conf.5 +++ b/usr.sbin/smtpd/smtpd.conf.5 @@ -1,4 +1,4 @@ -.\" $OpenBSD: smtpd.conf.5,v 1.101 2013/07/19 20:37:07 eric Exp $ +.\" $OpenBSD: smtpd.conf.5,v 1.102 2013/07/19 21:14:52 eric Exp $ .\" .\" Copyright (c) 2008 Janne Johansson .\" Copyright (c) 2009 Jacek Masiulaniec @@ -312,7 +312,7 @@ If the parameter is specified, .Xr smtpd 8 will explicitly bind to an address found in the table referenced by -.Ar source +.Ar table when connecting to the relay. If the table contains more than one address, they are picked in turn each time a new connection is opened. @@ -406,7 +406,7 @@ If the parameter is specified, .Xr smtpd 8 will explicitly bind to an address found in the table referenced by -.Ar source +.Ar table when connecting to the relay. If the table contains more than one address, they are picked in turn each time a new connection is opened. @@ -451,6 +451,26 @@ expire 10h # expire after 10 hours .Ed .It Xo .Bk -words +.Ic limit mta +.Op Ic for Ic domain Ar domain +.Ar family +.Ek +.Xc +Instruct +.Xr smtpd 8 +to only use the specified address +.Ar family +for out-going connections. +Accepted values are +.Ic inet4 +and +.Ic inet6 . +If a +.Ar domain +is specified, the restriction only applies when connecting +to MXs for this domain. +.It Xo +.Bk -words .Ic listen on Ar interface .Op Ar family .Op Ic port Ar port diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 7ec3aece8dd..9dbe77e26e0 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.420 2013/07/19 20:37:07 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.421 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -150,7 +150,7 @@ union lookup { * Bump IMSG_VERSION whenever a change is made to enum imsg_type. * This will ensure that we can never use a wrong version of smtpctl with smtpd. */ -#define IMSG_VERSION 3 +#define IMSG_VERSION 5 enum imsg_type { IMSG_NONE, @@ -166,6 +166,7 @@ enum imsg_type { IMSG_CTL_RESUME_MDA, IMSG_CTL_RESUME_MTA, IMSG_CTL_RESUME_SMTP, + IMSG_CTL_RESUME_ROUTE, IMSG_CTL_LIST_MESSAGES, IMSG_CTL_LIST_ENVELOPES, IMSG_CTL_REMOVE, @@ -176,6 +177,9 @@ enum imsg_type { IMSG_CTL_PROFILE, IMSG_CTL_UNPROFILE, + IMSG_CTL_MTA_SHOW_ROUTES, + IMSG_CTL_MTA_SHOW_HOSTSTATS, + IMSG_CONF_START, IMSG_CONF_SSL, IMSG_CONF_LISTENER, @@ -225,9 +229,7 @@ enum imsg_type { IMSG_MFA_SMTP_DATA, IMSG_MFA_SMTP_RESPONSE, - IMSG_MTA_BATCH, - IMSG_MTA_BATCH_ADD, - IMSG_MTA_BATCH_END, + IMSG_MTA_TRANSFER, IMSG_MTA_SCHEDULE, IMSG_QUEUE_CREATE_MESSAGE, @@ -560,6 +562,8 @@ struct smtpd { struct dict *sc_tables_dict; /* keyed lookup */ + struct dict *sc_limits_dict; + struct dict sc_filters; uint32_t filtermask; }; @@ -650,45 +654,84 @@ struct mta_source { }; struct mta_connector { - TAILQ_ENTRY(mta_connector) lst_entry; struct mta_source *source; struct mta_relay *relay; - struct mta_connectors *queue; -#define CONNECTOR_FAMILY_ERROR 0x01 -#define CONNECTOR_SOURCE_ERROR 0x02 -#define CONNECTOR_MX_ERROR 0x04 -#define CONNECTOR_ERROR 0x0f - -#define CONNECTOR_LIMIT_HOST 0x10 -#define CONNECTOR_LIMIT_ROUTE 0x20 -#define CONNECTOR_LIMIT_SOURCE 0x40 -#define CONNECTOR_LIMIT 0xf0 +#define CONNECTOR_ERROR_FAMILY 0x0001 +#define CONNECTOR_ERROR_SOURCE 0x0002 +#define CONNECTOR_ERROR_MX 0x0004 +#define CONNECTOR_ERROR_ROUTE_NET 0x0008 +#define CONNECTOR_ERROR_ROUTE_SMTP 0x0010 +#define CONNECTOR_ERROR_ROUTE 0x0018 +#define CONNECTOR_ERROR 0x00ff + +#define CONNECTOR_LIMIT_HOST 0x0100 +#define CONNECTOR_LIMIT_ROUTE 0x0200 +#define CONNECTOR_LIMIT_SOURCE 0x0400 +#define CONNECTOR_LIMIT_RELAY 0x0800 +#define CONNECTOR_LIMIT_CONN 0x1000 +#define CONNECTOR_LIMIT_DOMAIN 0x2000 +#define CONNECTOR_LIMIT 0xff00 + +#define CONNECTOR_NEW 0x10000 +#define CONNECTOR_WAIT 0x20000 int flags; int refcount; size_t nconn; time_t lastconn; - time_t nextconn; - time_t clearlimit; }; struct mta_route { SPLAY_ENTRY(mta_route) entry; + uint64_t id; struct mta_source *src; struct mta_host *dst; +#define ROUTE_NEW 0x01 +#define ROUTE_RUNQ 0x02 +#define ROUTE_KEEPALIVE 0x04 +#define ROUTE_DISABLED 0xf0 +#define ROUTE_DISABLED_NET 0x10 +#define ROUTE_DISABLED_SMTP 0x20 + int flags; + int penalty; int refcount; size_t nconn; time_t lastconn; + time_t lastdisc; + time_t lastpenalty; }; -TAILQ_HEAD(mta_connectors, mta_connector); +struct mta_limits { + size_t maxconn_per_host; + size_t maxconn_per_route; + size_t maxconn_per_source; + size_t maxconn_per_connector; + size_t maxconn_per_relay; + size_t maxconn_per_domain; + + time_t conndelay_host; + time_t conndelay_route; + time_t conndelay_source; + time_t conndelay_connector; + time_t conndelay_relay; + time_t conndelay_domain; + + time_t discdelay_route; + + size_t max_mail_per_session; + time_t sessdelay_transaction; + time_t sessdelay_keepalive; + + int family; +}; struct mta_relay { SPLAY_ENTRY(mta_relay) entry; uint64_t id; struct mta_domain *domain; + struct mta_limits *limits; int flags; char *backupname; int backuppref; @@ -699,21 +742,15 @@ struct mta_relay { char *authlabel; char *helotable; char *heloname; - char *secret; size_t ntask; TAILQ_HEAD(, mta_task) tasks; struct tree connectors; - size_t nconnector; size_t sourceloop; - - struct mta_connectors c_ready; - struct mta_connectors c_limit; - struct mta_connectors c_delay; - struct mta_connectors c_error; - struct event ev; + time_t lastsource; + time_t nextsource; int fail; char *failstr; @@ -721,15 +758,16 @@ struct mta_relay { #define RELAY_WAIT_MX 0x01 #define RELAY_WAIT_PREFERENCE 0x02 #define RELAY_WAIT_SECRET 0x04 -#define RELAY_WAIT_SOURCE 0x08 -#define RELAY_WAITMASK 0x0f +#define RELAY_WAIT_LIMITS 0x08 +#define RELAY_WAIT_SOURCE 0x10 +#define RELAY_WAIT_CONNECTOR 0x20 +#define RELAY_WAITMASK 0x3f int status; int refcount; size_t nconn; + size_t nconn_ready; time_t lastconn; - - size_t maxconn; }; struct mta_envelope { @@ -740,6 +778,7 @@ struct mta_envelope { char *dest; char *rcpt; struct mta_task *task; + int delivery; }; struct mta_task { @@ -1115,6 +1154,9 @@ void imsgproc_set_write(struct imsgproc *); void imsgproc_set_read_write(struct imsgproc *); void imsgproc_reset_callback(struct imsgproc *, void (*)(struct imsg *, void *), void *); +/* limit.c */ +void limit_mta_set_defaults(struct mta_limits *); +int limit_mta_set(struct mta_limits *, const char*, int64_t); /* lka.c */ pid_t lka(void); @@ -1193,9 +1235,12 @@ void m_get_envelope(struct msg *, struct envelope *); pid_t mta(void); void mta_route_ok(struct mta_relay *, struct mta_route *); void mta_route_error(struct mta_relay *, struct mta_route *); +void mta_route_down(struct mta_relay *, struct mta_route *); void mta_route_collect(struct mta_relay *, struct mta_route *); void mta_source_error(struct mta_relay *, struct mta_route *, const char *); -void mta_delivery(struct mta_envelope *, const char *, const char *, int, const char *); +void mta_delivery_log(struct mta_envelope *, const char *, const char *, int, const char *); +void mta_delivery_notify(struct mta_envelope *, int, const char *, uint32_t); +void mta_delivery(struct mta_envelope *, const char *, const char *, int, const char *, uint32_t); struct mta_task *mta_route_next_task(struct mta_relay *, struct mta_route *); const char *mta_host_to_text(struct mta_host *); const char *mta_relay_to_text(struct mta_relay *); @@ -1377,3 +1422,13 @@ int session_socket_error(int); /* waitq.c */ int waitq_wait(void *, void (*)(void *, void *, void *), void *); void waitq_run(void *, void *); + +/* runq.c */ +struct runq; + +int runq_init(struct runq **, void (*)(struct runq *, void *)); +int runq_schedule(struct runq *, time_t, void (*)(struct runq *, void *), void *); +int runq_delay(struct runq *, unsigned int, void (*)(struct runq *, void *), void *); +int runq_cancel(struct runq *, void (*)(struct runq *, void *), void *); +int runq_pending(struct runq *, void (*)(struct runq *, void *), void *, time_t *); +int runq_next(struct runq *, void (**)(struct runq *, void *), void **, time_t *); diff --git a/usr.sbin/smtpd/smtpd/Makefile b/usr.sbin/smtpd/smtpd/Makefile index 0d89d9c2dfc..fa024ced24e 100644 --- a/usr.sbin/smtpd/smtpd/Makefile +++ b/usr.sbin/smtpd/smtpd/Makefile @@ -1,4 +1,4 @@ -# $OpenBSD: Makefile,v 1.63 2013/07/19 20:37:08 eric Exp $ +# $OpenBSD: Makefile,v 1.64 2013/07/19 21:14:52 eric Exp $ .PATH: ${.CURDIR}/.. @@ -6,10 +6,10 @@ PROG= smtpd SRCS= aliases.c bounce.c ca.c compress_backend.c config.c \ control.c crypto.c delivery.c dict.c dns.c envelope.c \ - expand.c forward.c iobuf.c ioev.c lka.c lka_session.c \ + expand.c forward.c iobuf.c ioev.c limit.c lka.c lka_session.c \ log.c mda.c mfa.c mfa_session.c mproc.c \ mta.c mta_session.c parse.y queue.c queue_backend.c \ - ruleset.c scheduler.c scheduler_backend.c \ + ruleset.c runq.c scheduler.c scheduler_backend.c \ smtp.c smtp_session.c smtpd.c ssl.c ssl_privsep.c \ ssl_smtpd.c stat_backend.c table.c to.c tree.c util.c \ waitq.c -- cgit v1.2.3