summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2013-07-19 21:14:53 +0000
committerEric Faurot <eric@cvs.openbsd.org>2013-07-19 21:14:53 +0000
commit89ca14ddb53440286ab2ed0c2b9e4473dda6d12e (patch)
treecf8744bfc84648a1c7489c187f08afd665f2dcaf
parent012c74efba199a734b02042792ff1c3e8f7b4546 (diff)
Many MTA improvements:
- Better transient error handling logic: failing destinations are automatically disabled for a while. When a destination is active again, ask the scheduler to retry previous envelopes immediatly. - More informative error report when all routes fail for a mail. - Implement a "smtpctl show hoststats" command to get the latest stat message per MX domain. - Implement a "smtpctl show routes" command to show the state the currently known routes to remote MXs. - Implement a "smtpctl resume route" command to re-enable a route that has been disabled. - Do not hardcode limits - Minor code improvements
-rw-r--r--usr.sbin/smtpd/control.c41
-rw-r--r--usr.sbin/smtpd/limit.c104
-rw-r--r--usr.sbin/smtpd/makemap/Makefile4
-rw-r--r--usr.sbin/smtpd/mta.c1169
-rw-r--r--usr.sbin/smtpd/mta_session.c249
-rw-r--r--usr.sbin/smtpd/parse.y54
-rw-r--r--usr.sbin/smtpd/queue.c21
-rw-r--r--usr.sbin/smtpd/runq.c203
-rw-r--r--usr.sbin/smtpd/scheduler.c8
-rw-r--r--usr.sbin/smtpd/smtpctl.838
-rw-r--r--usr.sbin/smtpd/smtpctl.c53
-rw-r--r--usr.sbin/smtpd/smtpd.c10
-rw-r--r--usr.sbin/smtpd/smtpd.conf.526
-rw-r--r--usr.sbin/smtpd/smtpd.h119
-rw-r--r--usr.sbin/smtpd/smtpd/Makefile6
15 files changed, 1620 insertions, 485 deletions
diff --git a/usr.sbin/smtpd/control.c b/usr.sbin/smtpd/control.c
index 67c83740fd5..5c92ae0243c 100644
--- a/usr.sbin/smtpd/control.c
+++ b/usr.sbin/smtpd/control.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: control.c,v 1.88 2013/07/19 15:14:23 eric Exp $ */
+/* $OpenBSD: control.c,v 1.89 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
@@ -116,6 +116,22 @@ control_imsg(struct mproc *p, struct imsg *imsg)
return;
}
}
+ if (p->proc == PROC_MTA) {
+ switch (imsg->hdr.type) {
+ case IMSG_CTL_MTA_SHOW_ROUTES:
+ c = tree_get(&ctl_conns, imsg->hdr.peerid);
+ if (c == NULL)
+ return;
+ m_forward(&c->mproc, imsg);
+ return;
+ case IMSG_CTL_MTA_SHOW_HOSTSTATS:
+ c = tree_get(&ctl_conns, imsg->hdr.peerid);
+ if (c == NULL)
+ return;
+ m_forward(&c->mproc, imsg);
+ return;
+ }
+ }
switch (imsg->hdr.type) {
case IMSG_STAT_INCREMENT:
@@ -659,6 +675,15 @@ control_dispatch_ext(struct mproc *p, struct imsg *imsg)
m_compose(p, IMSG_CTL_OK, 0, 0, -1, NULL, 0);
return;
+ case IMSG_CTL_RESUME_ROUTE:
+ if (c->euid)
+ goto badcred;
+
+ log_info("info: route resumed");
+ m_forward(p_mta, imsg);
+ m_compose(p, IMSG_CTL_OK, 0, 0, -1, NULL, 0);
+ return;
+
case IMSG_CTL_LIST_MESSAGES:
if (c->euid)
goto badcred;
@@ -673,6 +698,20 @@ control_dispatch_ext(struct mproc *p, struct imsg *imsg)
imsg->data, imsg->hdr.len - sizeof(imsg->hdr));
return;
+ case IMSG_CTL_MTA_SHOW_ROUTES:
+ if (c->euid)
+ goto badcred;
+ m_compose(p_mta, IMSG_CTL_MTA_SHOW_ROUTES, c->id, 0, -1,
+ imsg->data, imsg->hdr.len - sizeof(imsg->hdr));
+ return;
+
+ case IMSG_CTL_MTA_SHOW_HOSTSTATS:
+ if (c->euid)
+ goto badcred;
+ m_compose(p_mta, IMSG_CTL_MTA_SHOW_HOSTSTATS, c->id, 0, -1,
+ imsg->data, imsg->hdr.len - sizeof(imsg->hdr));
+ return;
+
case IMSG_CTL_SCHEDULE:
if (c->euid)
goto badcred;
diff --git a/usr.sbin/smtpd/limit.c b/usr.sbin/smtpd/limit.c
new file mode 100644
index 00000000000..bdacef93d3f
--- /dev/null
+++ b/usr.sbin/smtpd/limit.c
@@ -0,0 +1,104 @@
+/* $OpenBSD: limit.c,v 1.1 2013/07/19 21:14:52 eric Exp $ */
+
+/*
+ * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/socket.h>
+
+#include <ctype.h>
+#include <err.h>
+#include <event.h>
+#include <fcntl.h>
+#include <imsg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "smtpd.h"
+#include "log.h"
+
+void
+limit_mta_set_defaults(struct mta_limits *limits)
+{
+ limits->maxconn_per_host = 10;
+ limits->maxconn_per_route = 5;
+ limits->maxconn_per_source = 50;
+ limits->maxconn_per_connector = 20;
+ limits->maxconn_per_relay = 100;
+ limits->maxconn_per_domain = 100;
+
+ limits->conndelay_host = 0;
+ limits->conndelay_route = 5;
+ limits->conndelay_source = 0;
+ limits->conndelay_connector = 0;
+ limits->conndelay_relay = 2;
+ limits->conndelay_domain = 0;
+
+ limits->discdelay_route = 3;
+
+ limits->max_mail_per_session = 100;
+ limits->sessdelay_transaction = 1;
+ limits->sessdelay_keepalive = 10;
+
+ limits->family = AF_UNSPEC;
+}
+
+int
+limit_mta_set(struct mta_limits *limits, const char *key, int64_t value)
+{
+ if (!strcmp(key, "max-conn-per-host"))
+ limits->maxconn_per_host = value;
+ else if (!strcmp(key, "max-conn-per-route"))
+ limits->maxconn_per_route = value;
+ else if (!strcmp(key, "max-conn-per-source"))
+ limits->maxconn_per_source = value;
+ else if (!strcmp(key, "max-conn-per-connector"))
+ limits->maxconn_per_connector = value;
+ else if (!strcmp(key, "max-conn-per-relay"))
+ limits->maxconn_per_relay = value;
+ else if (!strcmp(key, "max-conn-per-domain"))
+ limits->maxconn_per_domain = value;
+
+ else if (!strcmp(key, "conn-delay-host"))
+ limits->conndelay_host = value;
+ else if (!strcmp(key, "conn-delay-route"))
+ limits->conndelay_route = value;
+ else if (!strcmp(key, "conn-delay-source"))
+ limits->conndelay_source = value;
+ else if (!strcmp(key, "conn-delay-connector"))
+ limits->conndelay_connector = value;
+ else if (!strcmp(key, "conn-delay-relay"))
+ limits->conndelay_relay = value;
+ else if (!strcmp(key, "conn-delay-domain"))
+ limits->conndelay_domain = value;
+
+ else if (!strcmp(key, "reconn-delay-route"))
+ limits->discdelay_route = value;
+
+ else if (!strcmp(key, "session-mail-max"))
+ limits->max_mail_per_session = value;
+ else if (!strcmp(key, "session-transaction-delay"))
+ limits->sessdelay_transaction = value;
+ else if (!strcmp(key, "session-keepalive"))
+ limits->sessdelay_keepalive = value;
+ else
+ return (0);
+
+ return (1);
+}
diff --git a/usr.sbin/smtpd/makemap/Makefile b/usr.sbin/smtpd/makemap/Makefile
index 6eec1d07e91..ce47caaefb1 100644
--- a/usr.sbin/smtpd/makemap/Makefile
+++ b/usr.sbin/smtpd/makemap/Makefile
@@ -1,4 +1,4 @@
-# $OpenBSD: Makefile,v 1.20 2013/07/19 19:53:33 eric Exp $
+# $OpenBSD: Makefile,v 1.21 2013/07/19 21:14:52 eric Exp $
.PATH: ${.CURDIR}/..
@@ -19,7 +19,7 @@ CFLAGS+= -Wshadow -Wpointer-arith -Wcast-qual
CFLAGS+= -Wsign-compare -Wbounded
CFLAGS+= -DNO_IO
-SRCS= aliases.c dict.c expand.c log.c makemap.c parse.y \
+SRCS= aliases.c dict.c expand.c limit.c log.c makemap.c parse.y \
table.c to.c tree.c util.c
SRCS+= table_static.c
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c
index 7f42ad043f5..9b8a8c93e8e 100644
--- a/usr.sbin/smtpd/mta.c
+++ b/usr.sbin/smtpd/mta.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta.c,v 1.162 2013/07/19 15:14:23 eric Exp $ */
+/* $OpenBSD: mta.c,v 1.163 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -44,14 +44,14 @@
#define MAXERROR_PER_HOST 4
-#define MAXCONN_PER_HOST 10
-#define MAXCONN_PER_ROUTE 5
-#define MAXCONN_PER_SOURCE 50 /* XXX missing */
-#define MAXCONN_PER_CONNECTOR 20
-#define MAXCONN_PER_RELAY 100
+#define DELAY_CHECK_SOURCE 1
+#define DELAY_CHECK_SOURCE_SLOW 10
+#define DELAY_CHECK_SOURCE_FAST 0
+#define DELAY_CHECK_LIMIT 5
-#define CONNECTOR_DELAY_CONNECT 1
-#define CONNECTOR_DELAY_LIMIT 5
+#define DELAY_QUADRATIC 1
+#define DELAY_ROUTE_BASE 200
+#define DELAY_ROUTE_MAX (3600 * 4)
static void mta_imsg(struct mproc *, struct imsg *);
static void mta_shutdown(void);
@@ -62,14 +62,17 @@ static void mta_query_secret(struct mta_relay *);
static void mta_query_preference(struct mta_relay *);
static void mta_query_source(struct mta_relay *);
static void mta_on_mx(void *, void *, void *);
+static void mta_on_secret(struct mta_relay *, const char *);
+static void mta_on_preference(struct mta_relay *, int, int);
static void mta_on_source(struct mta_relay *, struct mta_source *);
+static void mta_on_timeout(struct runq *, void *);
static void mta_connect(struct mta_connector *);
-static void mta_recycle(struct mta_connector *);
+static void mta_route_enable(struct mta_route *);
+static void mta_route_disable(struct mta_route *, int, int);
static void mta_drain(struct mta_relay *);
-static void mta_relay_schedule(struct mta_relay *, unsigned int);
-static void mta_relay_timeout(int, short, void *);
static void mta_flush(struct mta_relay *, int, const char *);
-static struct mta_route *mta_find_route(struct mta_connector *);
+static struct mta_route *mta_find_route(struct mta_connector *, time_t, int*,
+ time_t*);
static void mta_log(const struct mta_envelope *, const char *, const char *,
const char *, const char *);
@@ -111,9 +114,7 @@ static const char *mta_connector_to_text(struct mta_connector *);
SPLAY_HEAD(mta_route_tree, mta_route);
static struct mta_route *mta_route(struct mta_source *, struct mta_host *);
-#if 0
static void mta_route_ref(struct mta_route *);
-#endif
static void mta_route_unref(struct mta_route *);
static const char *mta_route_to_text(struct mta_route *);
static int mta_route_cmp(const struct mta_route *, const struct mta_route *);
@@ -125,61 +126,77 @@ static struct mta_host_tree hosts;
static struct mta_source_tree sources;
static struct mta_route_tree routes;
-static struct tree batches;
-
static struct tree wait_mx;
static struct tree wait_preference;
static struct tree wait_secret;
static struct tree wait_source;
+static struct runq *runq_relay;
+static struct runq *runq_connector;
+static struct runq *runq_route;
+static struct runq *runq_hoststat;
+
+static time_t max_seen_conndelay_route;
+static time_t max_seen_discdelay_route;
+
+#define HOSTSTAT_EXPIRE_DELAY (4 * 3600)
+struct hoststat {
+ char name[SMTPD_MAXHOSTNAMELEN];
+ time_t tm;
+ char error[SMTPD_MAXLINESIZE];
+ struct tree deferred;
+};
+static struct dict hoststat;
+
+void mta_hoststat_update(const char *, const char *);
+void mta_hoststat_cache(const char *, uint64_t);
+void mta_hoststat_uncache(const char *, uint64_t);
+void mta_hoststat_reschedule(const char *);
+static void mta_hoststat_remove_entry(struct hoststat *);
+
+
void
mta_imsg(struct mproc *p, struct imsg *imsg)
{
struct mta_relay *relay;
struct mta_task *task;
- struct mta_source *source;
struct mta_domain *domain;
+ struct mta_route *route;
struct mta_mx *mx, *imx;
+ struct hoststat *hs;
struct mta_envelope *e;
struct sockaddr_storage ss;
- struct tree *batch;
struct envelope evp;
struct msg m;
const char *secret;
+ const char *hostname;
uint64_t reqid;
+ time_t t;
char buf[SMTPD_MAXLINESIZE];
int dnserror, preference, v, status;
+ void *iter;
+ uint64_t u64;
if (p->proc == PROC_QUEUE) {
switch (imsg->hdr.type) {
- case IMSG_MTA_BATCH:
- m_msg(&m, imsg);
- m_get_id(&m, &reqid);
- m_end(&m);
- batch = xmalloc(sizeof *batch, "mta_batch");
- tree_init(batch);
- tree_xset(&batches, reqid, batch);
- log_trace(TRACE_MTA,
- "mta: batch:%016" PRIx64 " created", reqid);
- return;
-
- case IMSG_MTA_BATCH_ADD:
+ case IMSG_MTA_TRANSFER:
m_msg(&m, imsg);
- m_get_id(&m, &reqid);
m_get_envelope(&m, &evp);
m_end(&m);
relay = mta_relay(&evp);
- batch = tree_xget(&batches, reqid);
- if ((task = tree_get(batch, relay->id)) == NULL) {
- log_trace(TRACE_MTA, "mta: new task for %s",
- mta_relay_to_text(relay));
+ TAILQ_FOREACH(task, &relay->tasks, entry)
+ if (task->msgid == evpid_to_msgid(evp.id))
+ break;
+
+ if (task == NULL) {
task = xmalloc(sizeof *task, "mta_task");
TAILQ_INIT(&task->envelopes);
task->relay = relay;
- tree_xset(batch, relay->id, task);
+ relay->ntask += 1;
+ TAILQ_INSERT_TAIL(&relay->tasks, task, entry);
task->msgid = evpid_to_msgid(evp.id);
if (evp.sender.user[0] || evp.sender.domain[0])
snprintf(buf, sizeof buf, "%s@%s",
@@ -187,16 +204,8 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
else
buf[0] = '\0';
task->sender = xstrdup(buf, "mta_task:sender");
- } else
- mta_relay_unref(relay); /* from here */
-
- /*
- * Technically, we could handle that by adding a msg
- * level, but the batch sent by the scheduler should
- * be valid.
- */
- if (task->msgid != evpid_to_msgid(evp.id))
- errx(1, "msgid mismatch in batch");
+ stat_increment("mta.task", 1);
+ }
e = xcalloc(1, sizeof *e, "mta_envelope");
e->id = evp.id;
@@ -209,32 +218,15 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
if (strcmp(buf, e->dest))
e->rcpt = xstrdup(buf, "mta_envelope:rcpt");
e->task = task;
- /* XXX honour relay->maxrcpt */
+
TAILQ_INSERT_TAIL(&task->envelopes, e, entry);
- stat_increment("mta.envelope", 1);
log_debug("debug: mta: received evp:%016" PRIx64
" for <%s>", e->id, e->dest);
- return;
- case IMSG_MTA_BATCH_END:
- m_msg(&m, imsg);
- m_get_id(&m, &reqid);
- m_end(&m);
- batch = tree_xpop(&batches, reqid);
- log_trace(TRACE_MTA, "mta: batch:%016" PRIx64 " closed",
- reqid);
- /* For all tasks, queue them on its relay */
- while (tree_poproot(batch, &reqid, (void**)&task)) {
- if (reqid != task->relay->id)
- errx(1, "relay id mismatch!");
- relay = task->relay;
- relay->ntask += 1;
- TAILQ_INSERT_TAIL(&relay->tasks, task, entry);
- stat_increment("mta.task", 1);
- mta_drain(relay);
- mta_relay_unref(relay); /* from BATCH_APPEND */
- }
- free(batch);
+ stat_increment("mta.envelope", 1);
+
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from here */
return;
case IMSG_QUEUE_MESSAGE_FD:
@@ -252,40 +244,20 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
m_get_string(&m, &secret);
m_end(&m);
relay = tree_xpop(&wait_secret, reqid);
- if (secret[0])
- relay->secret = strdup(secret);
- if (relay->secret == NULL) {
- log_warnx("warn: Failed to retrieve secret "
- "for %s", mta_relay_to_text(relay));
- relay->fail = IMSG_DELIVERY_TEMPFAIL;
- relay->failstr = "Could not retrieve secret";
- }
- relay->status &= ~RELAY_WAIT_SECRET;
- mta_drain(relay);
- mta_relay_unref(relay); /* from mta_query_secret() */
+ mta_on_secret(relay, secret[0] ? secret : NULL);
return;
case IMSG_LKA_SOURCE:
m_msg(&m, imsg);
m_get_id(&m, &reqid);
m_get_int(&m, &status);
-
- relay = tree_xpop(&wait_source, reqid);
- relay->status &= ~RELAY_WAIT_SOURCE;
- if (status == LKA_OK) {
+ if (status == LKA_OK)
m_get_sockaddr(&m, (struct sockaddr*)&ss);
- source = mta_source((struct sockaddr *)&ss);
- mta_on_source(relay, source);
- mta_source_unref(source);
- }
- else {
- log_warnx("warn: Failed to get source address"
- "for %s", mta_relay_to_text(relay));
- }
m_end(&m);
- mta_drain(relay);
- mta_relay_unref(relay); /* from mta_query_source() */
+ relay = tree_xpop(&wait_source, reqid);
+ mta_on_source(relay, (status == LKA_OK) ?
+ mta_source((struct sockaddr *)&ss) : NULL);
return;
case IMSG_LKA_HELO:
@@ -337,23 +309,12 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
m_msg(&m, imsg);
m_get_id(&m, &reqid);
m_get_int(&m, &dnserror);
- relay = tree_xpop(&wait_preference, reqid);
- if (dnserror) {
- log_debug("debug: couldn't find backup "
- "preference for %s",
- mta_relay_to_text(relay));
- relay->backuppref = INT_MAX;
- } else {
- m_get_int(&m, &relay->backuppref);
- log_debug("debug: found backup preference %i "
- "for %s",
- relay->backuppref,
- mta_relay_to_text(relay));
- }
+ if (dnserror == 0)
+ m_get_int(&m, &preference);
m_end(&m);
- relay->status &= ~RELAY_WAIT_PREFERENCE;
- mta_drain(relay);
- mta_relay_unref(relay); /* from mta_query_preference() */
+
+ relay = tree_xpop(&wait_preference, reqid);
+ mta_on_preference(relay, dnserror, preference);
return;
case IMSG_DNS_PTR:
@@ -388,6 +349,65 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
}
}
+ if (p->proc == PROC_CONTROL) {
+ switch (imsg->hdr.type) {
+
+ case IMSG_CTL_RESUME_ROUTE:
+ u64 = *((uint64_t *)imsg->data);
+ if (u64)
+ log_debug("resuming route: %llu",
+ (unsigned long long)u64);
+ else
+ log_debug("resuming all routes");
+ SPLAY_FOREACH(route, mta_route_tree, &routes) {
+ if (u64 && route->id != u64)
+ continue;
+ mta_route_enable(route);
+ if (u64)
+ break;
+ }
+ return;
+
+ case IMSG_CTL_MTA_SHOW_ROUTES:
+ SPLAY_FOREACH(route, mta_route_tree, &routes) {
+ v = runq_pending(runq_route, NULL, route, &t);
+ snprintf(buf, sizeof(buf),
+ "%llu. %s %c%c%c%c nconn=%zu penalty=%i timeout=%s",
+ (unsigned long long)route->id,
+ mta_route_to_text(route),
+ route->flags & ROUTE_NEW ? 'N' : '-',
+ route->flags & ROUTE_DISABLED ? 'D' : '-',
+ route->flags & ROUTE_RUNQ ? 'Q' : '-',
+ route->flags & ROUTE_KEEPALIVE ? 'K' : '-',
+ route->nconn,
+ route->penalty,
+ v ? duration_to_text(t - time(NULL)) : "-");
+ m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES,
+ imsg->hdr.peerid, 0, -1,
+ buf, strlen(buf) + 1);
+ }
+ m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES, imsg->hdr.peerid,
+ 0, -1, NULL, 0);
+ return;
+ case IMSG_CTL_MTA_SHOW_HOSTSTATS:
+ iter = NULL;
+ while (dict_iter(&hoststat, &iter, &hostname,
+ (void **)&hs)) {
+ snprintf(buf, sizeof(buf),
+ "%s|%llu|%s",
+ hostname, (unsigned long long) hs->tm,
+ hs->error);
+ m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS,
+ imsg->hdr.peerid, 0, -1,
+ buf, strlen(buf) + 1);
+ }
+ m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS,
+ imsg->hdr.peerid,
+ 0, -1, NULL, 0);
+ return;
+ }
+ }
+
errx(1, "mta_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
}
@@ -451,15 +471,20 @@ mta(void)
SPLAY_INIT(&sources);
SPLAY_INIT(&routes);
- tree_init(&batches);
tree_init(&wait_secret);
tree_init(&wait_mx);
tree_init(&wait_preference);
tree_init(&wait_source);
+ dict_init(&hoststat);
imsg_callback = mta_imsg;
event_init();
+ runq_init(&runq_relay, mta_on_timeout);
+ runq_init(&runq_connector, mta_on_timeout);
+ runq_init(&runq_route, mta_on_timeout);
+ runq_init(&runq_hoststat, mta_on_timeout);
+
signal_set(&ev_sigint, SIGINT, mta_sig_handler, NULL);
signal_set(&ev_sigterm, SIGTERM, mta_sig_handler, NULL);
signal_add(&ev_sigint, NULL);
@@ -487,14 +512,15 @@ void
mta_source_error(struct mta_relay *relay, struct mta_route *route, const char *e)
{
struct mta_connector *c;
+
/*
* Remember the source as broken for this connector.
*/
c = mta_connector(relay, route->src);
- if (!(c->flags & CONNECTOR_SOURCE_ERROR))
- log_info("smtp-out: Error on connector %s: %s",
- mta_connector_to_text(c), e);
- c->flags |= CONNECTOR_SOURCE_ERROR;
+ if (!(c->flags & CONNECTOR_ERROR_SOURCE))
+ log_info("smtp-out: Error on %s: %s",
+ mta_route_to_text(route), e);
+ c->flags |= CONNECTOR_ERROR_SOURCE;
}
/*
@@ -526,12 +552,22 @@ mta_route_ok(struct mta_relay *relay, struct mta_route *route)
{
struct mta_connector *c;
- log_debug("debug: mta: route ok %s", mta_route_to_text(route));
+ if (!(route->flags & ROUTE_NEW))
+ return;
+
+ log_debug("debug: mta-routing: route %s is now valid.",
+ mta_route_to_text(route));
- route->dst->nerror = 0;
+ route->flags &= ~ROUTE_NEW;
c = mta_connector(relay, route->src);
- c->flags &= ~CONNECTOR_SOURCE_ERROR;
+ mta_connect(c);
+}
+
+void
+mta_route_down(struct mta_relay *relay, struct mta_route *route)
+{
+ mta_route_disable(route, 2, ROUTE_DISABLED_SMTP);
}
void
@@ -539,26 +575,24 @@ mta_route_collect(struct mta_relay *relay, struct mta_route *route)
{
struct mta_connector *c;
- log_debug("debug: mta: route collect %s", mta_route_to_text(route));
+ log_debug("debug: mta_route_collect(%s)",
+ mta_route_to_text(route));
relay->nconn -= 1;
+ relay->domain->nconn -= 1;
route->nconn -= 1;
route->src->nconn -= 1;
route->dst->nconn -= 1;
+ route->lastdisc = time(NULL);
- c = mta_connector(relay, route->src);
- mta_route_unref(route); /* from mta_find_route() */
+ /* First connection failed */
+ if (route->flags & ROUTE_NEW)
+ mta_route_disable(route, 2, ROUTE_DISABLED_NET);
+ c = mta_connector(relay, route->src);
c->nconn -= 1;
-
- if (c->flags & CONNECTOR_LIMIT) {
- log_debug("debug: mta; resetting limit flags on connector %s",
- mta_connector_to_text(c));
- c->flags &= ~CONNECTOR_LIMIT;
- }
-
- mta_recycle(c);
- mta_drain(relay);
+ mta_connect(c);
+ mta_route_unref(route); /* from mta_find_route() */
mta_relay_unref(relay); /* from mta_connect() */
}
@@ -577,29 +611,53 @@ mta_route_next_task(struct mta_relay *relay, struct mta_route *route)
}
void
-mta_delivery(struct mta_envelope *e, const char *source, const char *relay,
+mta_delivery_log(struct mta_envelope *e, const char *source, const char *relay,
int delivery, const char *status)
{
if (delivery == IMSG_DELIVERY_OK) {
mta_log(e, "Ok", source, relay, status);
- queue_ok(e->id);
}
else if (delivery == IMSG_DELIVERY_TEMPFAIL) {
mta_log(e, "TempFail", source, relay, status);
- queue_tempfail(e->id, 0, status);
}
else if (delivery == IMSG_DELIVERY_PERMFAIL) {
mta_log(e, "PermFail", source, relay, status);
- queue_permfail(e->id, status);
}
else if (delivery == IMSG_DELIVERY_LOOP) {
mta_log(e, "PermFail", source, relay, "Loop detected");
+ }
+ else
+ errx(1, "bad delivery");
+}
+
+void
+mta_delivery_notify(struct mta_envelope *e, int delivery, const char *status,
+ uint32_t penalty)
+{
+ if (delivery == IMSG_DELIVERY_OK) {
+ queue_ok(e->id);
+ }
+ else if (delivery == IMSG_DELIVERY_TEMPFAIL) {
+ queue_tempfail(e->id, penalty, status);
+ }
+ else if (delivery == IMSG_DELIVERY_PERMFAIL) {
+ queue_permfail(e->id, status);
+ }
+ else if (delivery == IMSG_DELIVERY_LOOP) {
queue_loop(e->id);
}
else
errx(1, "bad delivery");
}
+void
+mta_delivery(struct mta_envelope *e, const char *source, const char *relay,
+ int delivery, const char *status, uint32_t penalty)
+{
+ mta_delivery_log(e, source, relay, delivery, status);
+ mta_delivery_notify(e, delivery, status, penalty);
+}
+
static void
mta_query_mx(struct mta_relay *relay)
{
@@ -608,7 +666,8 @@ mta_query_mx(struct mta_relay *relay)
if (relay->status & RELAY_WAIT_MX)
return;
- log_debug("debug: mta_query_mx(%s)", relay->domain->name);
+ log_debug("debug: mta: querying MX for %s...",
+ mta_relay_to_text(relay));
if (waitq_wait(&relay->domain->mxs, mta_on_mx, relay)) {
id = generate_uid();
@@ -624,12 +683,29 @@ mta_query_mx(struct mta_relay *relay)
}
static void
+mta_query_limits(struct mta_relay *relay)
+{
+ if (relay->status & RELAY_WAIT_LIMITS)
+ return;
+
+ relay->limits = dict_get(env->sc_limits_dict, relay->domain->name);
+ if (relay->limits == NULL)
+ relay->limits = dict_get(env->sc_limits_dict, "default");
+
+ if (max_seen_conndelay_route < relay->limits->conndelay_route)
+ max_seen_conndelay_route = relay->limits->conndelay_route;
+ if (max_seen_discdelay_route < relay->limits->discdelay_route)
+ max_seen_discdelay_route = relay->limits->discdelay_route;
+}
+
+static void
mta_query_secret(struct mta_relay *relay)
{
if (relay->status & RELAY_WAIT_SECRET)
return;
- log_debug("debug: mta_query_secret(%s)", mta_relay_to_text(relay));
+ log_debug("debug: mta: querying secret for %s...",
+ mta_relay_to_text(relay));
tree_xset(&wait_secret, relay->id, relay);
relay->status |= RELAY_WAIT_SECRET;
@@ -649,7 +725,8 @@ mta_query_preference(struct mta_relay *relay)
if (relay->status & RELAY_WAIT_PREFERENCE)
return;
- log_debug("debug: mta_query_preference(%s)", mta_relay_to_text(relay));
+ log_debug("debug: mta: querying preference for %s...",
+ mta_relay_to_text(relay));
tree_xset(&wait_preference, relay->id, relay);
relay->status |= RELAY_WAIT_PREFERENCE;
@@ -661,7 +738,20 @@ mta_query_preference(struct mta_relay *relay)
static void
mta_query_source(struct mta_relay *relay)
{
- log_debug("debug: mta_query_source(%s)", mta_relay_to_text(relay));
+ log_debug("debug: mta: querying source for %s...",
+ mta_relay_to_text(relay));
+
+ relay->sourceloop += 1;
+
+ if (relay->sourcetable == NULL) {
+ /*
+ * This is a recursive call, but it only happens once, since
+ * another source will not be queried immediatly.
+ */
+ mta_relay_ref(relay);
+ mta_on_source(relay, mta_source(NULL));
+ return;
+ }
m_create(p_lka, IMSG_LKA_SOURCE, 0, 0, -1);
m_add_id(p_lka, relay->id);
@@ -679,7 +769,7 @@ mta_on_mx(void *tag, void *arg, void *data)
struct mta_domain *domain = data;
struct mta_relay *relay = arg;
- log_debug("debug: mta_on_mx(%p, %s, %s)",
+ log_debug("debug: mta: ... got mx (%p, %s, %s)",
tag, domain->name, mta_relay_to_text(relay));
switch (domain->mxstatus) {
@@ -716,34 +806,232 @@ mta_on_mx(void *tag, void *arg, void *data)
}
static void
+mta_on_secret(struct mta_relay *relay, const char *secret)
+{
+ log_debug("debug: mta: ... got secret for %s: %s",
+ mta_relay_to_text(relay), secret);
+
+ if (secret)
+ relay->secret = strdup(secret);
+
+ if (relay->secret == NULL) {
+ log_warnx("warn: Failed to retrieve secret "
+ "for %s", mta_relay_to_text(relay));
+ relay->fail = IMSG_DELIVERY_TEMPFAIL;
+ relay->failstr = "Could not retrieve credentials";
+ }
+
+ relay->status &= ~RELAY_WAIT_SECRET;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_query_secret() */
+}
+
+static void
+mta_on_preference(struct mta_relay *relay, int dnserror, int preference)
+{
+ if (dnserror) {
+ log_warnx("warn: Couldn't find backup preference for %s",
+ mta_relay_to_text(relay));
+ relay->backuppref = INT_MAX;
+ }
+ else {
+ log_debug("debug: mta: ... got preference for %s: %i, %i",
+ mta_relay_to_text(relay), dnserror, preference);
+ relay->backuppref = preference;
+ }
+
+ relay->status &= ~RELAY_WAIT_PREFERENCE;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_query_preference() */
+}
+
+static void
mta_on_source(struct mta_relay *relay, struct mta_source *source)
{
- mta_connector(relay, source);
+ struct mta_connector *c;
+ void *iter;
+ int delay, errmask;
+
+ log_debug("debug: mta: ... got source for %s: %s",
+ mta_relay_to_text(relay), source ? mta_source_to_text(source) : "NULL");
+
+ relay->lastsource = time(NULL);
+ delay = DELAY_CHECK_SOURCE_SLOW;
+
+ if (source) {
+ c = mta_connector(relay, source);
+ if (c->flags & CONNECTOR_NEW) {
+ c->flags &= ~CONNECTOR_NEW;
+ delay = DELAY_CHECK_SOURCE;
+ }
+ mta_connect(c);
+ if ((c->flags & CONNECTOR_ERROR) == 0)
+ relay->sourceloop = 0;
+ else
+ delay = DELAY_CHECK_SOURCE_FAST;
+ mta_source_unref(source); /* from constructor */
+ }
+ else {
+ log_warnx("warn: Failed to get source address"
+ "for %s", mta_relay_to_text(relay));
+ }
+
+ if (tree_count(&relay->connectors) == 0) {
+ relay->fail = IMSG_DELIVERY_TEMPFAIL;
+ relay->failstr = "Could not retrieve source address";
+ }
+ if (tree_count(&relay->connectors) < relay->sourceloop) {
+ relay->fail = IMSG_DELIVERY_TEMPFAIL;
+ relay->failstr = "No valid route to remote MX";
+
+ errmask = 0;
+ iter = NULL;
+ while (tree_iter(&relay->connectors, &iter, NULL, (void **)&c))
+ errmask |= c->flags;
+
+ if (errmask & CONNECTOR_ERROR_ROUTE_SMTP)
+ relay->failstr = "Destination seem to reject all mails";
+ else if (errmask & CONNECTOR_ERROR_ROUTE_NET)
+ relay->failstr = "Network error on destination MXs";
+ else if (errmask & CONNECTOR_ERROR_MX)
+ relay->failstr = "No MX found for destination";
+ else if (errmask & CONNECTOR_ERROR_FAMILY)
+ relay->failstr = "Address family mismatch on destination MXs";
+ else
+ relay->failstr = "No valid route to destination";
+ }
+
+ relay->nextsource = relay->lastsource + delay;
+ relay->status &= ~RELAY_WAIT_SOURCE;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_query_source() */
}
static void
mta_connect(struct mta_connector *c)
{
struct mta_route *route;
+ struct mta_limits *l = c->relay->limits;
+ int limits;
+ time_t nextconn, now;
+
+ again:
+
+ log_debug("debug: mta: connecting with %s", mta_connector_to_text(c));
+
+ /* Do not connect if this connector has an error. */
+ if (c->flags & CONNECTOR_ERROR) {
+ log_debug("debug: mta: connector error");
+ return;
+ }
+
+ if (c->flags & CONNECTOR_WAIT) {
+ log_debug("debug: mta: canceling connector timeout");
+ runq_cancel(runq_connector, NULL, c);
+ }
+
+ /* No job. */
+ if (c->relay->ntask == 0) {
+ log_debug("debug: mta: no task for connector");
+ return;
+ }
+
+ /* Do not create more connections than necessary */
+ if ((c->relay->nconn_ready >= c->relay->ntask) ||
+ (c->relay->nconn > 2 && c->relay->nconn >= c->relay->ntask / 2)) {
+ log_debug("debug: mta: enough connections already");
+ return;
+ }
+
+ limits = 0;
+ nextconn = now = time(NULL);
+
+ if (c->relay->domain->lastconn + l->conndelay_domain > nextconn) {
+ log_debug("debug: mta: cannot use domain %s before %llus",
+ c->relay->domain->name,
+ (unsigned long long) c->relay->domain->lastconn + l->conndelay_domain - now);
+ nextconn = c->relay->domain->lastconn + l->conndelay_domain;
+ }
+ if (c->relay->domain->nconn >= l->maxconn_per_domain) {
+ log_debug("debug: mta: hit domain limit");
+ limits |= CONNECTOR_LIMIT_DOMAIN;
+ }
+
+ if (c->source->lastconn + l->conndelay_source > nextconn) {
+ log_debug("debug: mta: cannot use source %s before %llus",
+ mta_source_to_text(c->source),
+ (unsigned long long) c->source->lastconn + l->conndelay_source - now);
+ nextconn = c->source->lastconn + l->conndelay_source;
+ }
+ if (c->source->nconn >= l->maxconn_per_source) {
+ log_debug("debug: mta: hit source limit");
+ limits |= CONNECTOR_LIMIT_SOURCE;
+ }
+
+ if (c->lastconn + l->conndelay_connector > nextconn) {
+ log_debug("debug: mta: cannot use %s before %llus",
+ mta_connector_to_text(c),
+ (unsigned long long) c->lastconn + l->conndelay_connector - now);
+ nextconn = c->lastconn + l->conndelay_connector;
+ }
+ if (c->nconn >= l->maxconn_per_connector) {
+ log_debug("debug: mta: hit connector limit");
+ limits |= CONNECTOR_LIMIT_CONN;
+ }
+
+ if (c->relay->lastconn + l->conndelay_relay > nextconn) {
+ log_debug("debug: mta: cannot use %s before %llus",
+ mta_relay_to_text(c->relay),
+ (unsigned long long) c->relay->lastconn + l->conndelay_relay - now);
+ nextconn = c->relay->lastconn + l->conndelay_relay;
+ }
+ if (c->relay->nconn >= l->maxconn_per_relay) {
+ log_debug("debug: mta: hit relay limit");
+ limits |= CONNECTOR_LIMIT_RELAY;
+ }
- log_debug("debug: mta_connect() for %s", mta_connector_to_text(c));
+ /* We can connect now, find a route */
+ if (!limits && nextconn <= now)
+ route = mta_find_route(c, now, &limits, &nextconn);
+ else
+ route = NULL;
- route = mta_find_route(c);
+ /* No route */
if (route == NULL) {
- mta_recycle(c);
- if (c->queue == &c->relay->c_limit)
- c->clearlimit = time(NULL) + CONNECTOR_DELAY_LIMIT;
- if (c->queue == &c->relay->c_ready)
- fatalx("connector with no route ended up in ready list");
+
+ if (c->flags & CONNECTOR_ERROR) {
+ /* XXX we might want to clear this flag later */
+ log_debug("debug: mta-routing: no route available for %s: errors on connector",
+ mta_connector_to_text(c));
+ return;
+ }
+ else if (limits) {
+ log_debug("debug: mta-routing: no route available for %s: limits reached",
+ mta_connector_to_text(c));
+ nextconn = now + DELAY_CHECK_LIMIT;
+ }
+ else {
+ log_debug("debug: mta-routing: no route available for %s: must wait a bit",
+ mta_connector_to_text(c));
+ }
+ log_debug("debug: mta: retrying to connect on %s in %llus...",
+ mta_connector_to_text(c),
+ (unsigned long long) nextconn - time(NULL));
+ c->flags |= CONNECTOR_WAIT;
+ runq_schedule(runq_connector, nextconn, NULL, c);
return;
}
+ log_debug("debug: mta-routing: spawning new connection on %s",
+ mta_route_to_text(route));
+
c->nconn += 1;
c->lastconn = time(NULL);
- c->nextconn = c->lastconn + CONNECTOR_DELAY_CONNECT;
c->relay->nconn += 1;
c->relay->lastconn = c->lastconn;
+ c->relay->domain->nconn += 1;
+ c->relay->domain->lastconn = c->lastconn;
route->nconn += 1;
route->lastconn = c->lastconn;
route->src->nconn += 1;
@@ -751,105 +1039,101 @@ mta_connect(struct mta_connector *c)
route->dst->nconn += 1;
route->dst->lastconn = c->lastconn;
- mta_recycle(c);
-
- mta_relay_ref(c->relay);
mta_session(c->relay, route); /* this never fails synchronously */
+ mta_relay_ref(c->relay);
+
+ goto again;
}
static void
-mta_recycle(struct mta_connector *c)
+mta_on_timeout(struct runq *runq, void *arg)
{
- TAILQ_REMOVE(c->queue, c, lst_entry);
-
- if (c->flags & CONNECTOR_ERROR) {
- log_debug("debug: mta: putting %s on error queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_error;
+ struct mta_connector *connector = arg;
+ struct mta_relay *relay = arg;
+ struct mta_route *route = arg;
+ struct hoststat *hs = arg;
+
+ if (runq == runq_relay) {
+ log_debug("debug: mta: ... timeout for %s",
+ mta_relay_to_text(relay));
+ relay->status &= ~RELAY_WAIT_CONNECTOR;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_drain() */
}
- else if (c->flags & CONNECTOR_LIMIT) {
- log_debug("debug: mta: putting %s on limit queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_limit;
+ else if (runq == runq_connector) {
+ log_debug("debug: mta: ... timeout for %s",
+ mta_connector_to_text(connector));
+ connector->flags &= ~CONNECTOR_WAIT;
+ mta_connect(connector);
}
- else if (c->nextconn > time(NULL)) {
- log_debug("debug: mta: putting %s on delay queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_delay;
+ else if (runq == runq_route) {
+ route->flags &= ~ROUTE_RUNQ;
+ mta_route_enable(route);
+ mta_route_unref(route);
}
- else {
- log_debug("debug: mta: putting %s on ready queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_ready;
+ else if (runq == runq_hoststat) {
+ log_debug("debug: mta: ... timeout for hoststat %s",
+ hs->name);
+ mta_hoststat_remove_entry(hs);
+ free(hs);
}
-
- TAILQ_INSERT_TAIL(c->queue, c, lst_entry);
}
static void
-mta_relay_timeout(int fd, short ev, void *arg)
+mta_route_disable(struct mta_route *route, int penalty, int reason)
{
- struct mta_relay *r = arg;
- struct mta_connector *c;
- time_t t;
-
- log_debug("debug: mta: timeout for %s", mta_relay_to_text(r));
+ unsigned long long delay;
- t = time(NULL);
+ route->penalty += penalty;
+ route->lastpenalty = time(NULL);
+ delay = (unsigned long long)DELAY_ROUTE_BASE * route->penalty * route->penalty;
+ if (delay > DELAY_ROUTE_MAX)
+ delay = DELAY_ROUTE_MAX;
+#if 0
+ delay = 60;
+#endif
- /*
- * Clear the limit flags on all connectors.
- */
- while ((c = TAILQ_FIRST(&r->c_limit))) {
- /* This requires that the list is always sorted */
- if (c->clearlimit > t)
- break;
- log_debug("debug: mta: clearing limits on %s",
- mta_connector_to_text(c));
- c->flags &= ~CONNECTOR_LIMIT;
- mta_recycle(c);
- }
+ log_info("smtp-out: Disabling route %s for %llus",
+ mta_route_to_text(route), delay);
- while ((c = TAILQ_FIRST(&r->c_delay))) {
- /* This requires that the list is always sorted */
- if (c->nextconn > t)
- break;
- log_debug("debug: mta: delay expired for %s",
- mta_connector_to_text(c));
- mta_recycle(c);
+ if (route->flags & ROUTE_DISABLED) {
+ runq_cancel(runq_route, NULL, route);
+ mta_route_unref(route); /* from last call to here */
}
-
- mta_drain(r);
- mta_relay_unref(r); /* from mta_relay_schedule() */
+ route->flags |= reason & ROUTE_DISABLED;
+ runq_schedule(runq_route, time(NULL) + delay, NULL, route);
+ mta_route_ref(route);
}
static void
-mta_relay_schedule(struct mta_relay *r, unsigned int delay)
+mta_route_enable(struct mta_route *route)
{
- struct timeval tv;
-
- if (evtimer_pending(&r->ev, &tv))
- return;
-
- log_debug("debug: mta: adding relay timeout: %u", delay);
-
- tv.tv_sec = delay;
- tv.tv_usec = 0;
- evtimer_add(&r->ev, &tv);
- mta_relay_ref(r);
+ if (route->flags & ROUTE_DISABLED) {
+ log_info("smtp-out: Enabling route %s",
+ mta_route_to_text(route));
+ route->flags &= ~ROUTE_DISABLED;
+ route->flags |= ROUTE_NEW;
+ }
+
+ if (route->penalty) {
+#if DELAY_QUADRATIC
+ route->penalty -= 1;
+ route->lastpenalty = time(NULL);
+#else
+ route->penalty = 0;
+#endif
+ }
}
static void
mta_drain(struct mta_relay *r)
{
- struct mta_connector *c;
- struct mta_source *s;
char buf[64];
log_debug("debug: mta: draining %s "
"refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu",
mta_relay_to_text(r),
- r->refcount, r->ntask, r->nconnector, r->nconn);
+ r->refcount, r->ntask, tree_count(&r->connectors), r->nconn);
/*
* All done.
@@ -879,99 +1163,40 @@ mta_drain(struct mta_relay *r)
if (r->domain->lastmxquery == 0)
mta_query_mx(r);
+ /* Query the limits if needed. */
+ if (r->limits == NULL)
+ mta_query_limits(r);
+
/* Wait until we are ready to proceed. */
if (r->status & RELAY_WAITMASK) {
buf[0] = '\0';
if (r->status & RELAY_WAIT_MX)
- strlcat(buf, "MX ", sizeof buf);
+ strlcat(buf, " MX", sizeof buf);
if (r->status & RELAY_WAIT_PREFERENCE)
- strlcat(buf, "preference ", sizeof buf);
+ strlcat(buf, " preference", sizeof buf);
if (r->status & RELAY_WAIT_SECRET)
- strlcat(buf, "secret ", sizeof buf);
+ strlcat(buf, " secret", sizeof buf);
if (r->status & RELAY_WAIT_SOURCE)
- strlcat(buf, "source ", sizeof buf);
- log_debug("debug: mta: %s waiting for %s",
+ strlcat(buf, " source", sizeof buf);
+ if (r->status & RELAY_WAIT_CONNECTOR)
+ strlcat(buf, " connector", sizeof buf);
+ log_debug("debug: mta: %s waiting for%s",
mta_relay_to_text(r), buf);
return;
}
/*
- * Start new connections if possible.
- * XXX find a better heuristic for the good number of connections
- * depending on the number of tasks and other factors. We might
- * want to try more than the number of task, to have a chance to
- * hit a mx faster if the first ones timeout.
+ * We have pending task, and it's maybe time too try a new source.
*/
- while (r->nconn < r->ntask) {
- log_debug("debug: mta: trying to create new connection: "
- "refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu",
- r->refcount, r->ntask, r->nconnector, r->nconn);
-
- /* Check the per-relay connection limit */
- if (r->nconn >= MAXCONN_PER_RELAY) {
- log_debug("debug: mta: hit connection limit on %s",
- mta_relay_to_text(r));
- return;
- }
-
- /* Use the first connector if ready */
- c = TAILQ_FIRST(&r->c_ready);
- if (c) {
- log_debug("debug: mta: using connector %s",
- mta_connector_to_text(c));
- r->sourceloop = 0;
- mta_connect(c);
- continue;
- }
-
- /* No new connectors */
- if (r->sourceloop > r->nconnector) {
- log_debug("debug: mta: no new connector available");
-
- if (TAILQ_FIRST(&r->c_delay)) {
- mta_relay_schedule(r, 1);
- log_debug(
- "debug: mta: waiting for relay timeout");
- return;
- }
-
- if (TAILQ_FIRST(&r->c_limit)) {
- mta_relay_schedule(r, 5);
- log_debug(
- "debug: mta: waiting for relay timeout");
- return;
- }
-
- log_debug("debug: mta: failing...");
- /*
- * All sources have been tried and no connectors can
- * be used.
- */
- if (r->nconnector == 0) {
- r->fail = IMSG_DELIVERY_TEMPFAIL;
- r->failstr = "No source address";
- }
- else {
- r->fail = IMSG_DELIVERY_TEMPFAIL;
- r->failstr = "No MX could be reached";
- }
- mta_flush(r, r->fail, r->failstr);
- return;
- }
-
- r->sourceloop++;
- log_debug("debug: mta: need new connector (attempt %zu)",
- r->sourceloop);
- if (r->sourcetable) {
- log_debug("debug: mta: querying source %s",
- r->sourcetable);
- mta_query_source(r);
- return;
- }
- log_debug("debug: mta: using default source");
- s = mta_source(NULL);
- mta_on_source(r, s);
- mta_source_unref(s);
+ if (r->nextsource <= time(NULL))
+ mta_query_source(r);
+ else {
+ log_debug("debug: mta: scheduling relay %s in %llus...",
+ mta_relay_to_text(r),
+ (unsigned long long) r->nextsource - time(NULL));
+ runq_schedule(runq_relay, r->nextsource, NULL, r);
+ r->status |= RELAY_WAIT_CONNECTOR;
+ mta_relay_ref(r);
}
}
@@ -980,7 +1205,11 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
{
struct mta_envelope *e;
struct mta_task *task;
+ const char *domain;
+ void *iter;
+ struct mta_connector *c;
size_t n;
+ size_t r;
log_debug("debug: mta_flush(%s, %i, \"%s\")",
mta_relay_to_text(relay), fail, error);
@@ -993,7 +1222,26 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
TAILQ_REMOVE(&relay->tasks, task, entry);
while ((e = TAILQ_FIRST(&task->envelopes))) {
TAILQ_REMOVE(&task->envelopes, e, entry);
- mta_delivery(e, NULL, relay->domain->name, fail, error);
+ mta_delivery(e, NULL, relay->domain->name, fail, error, 0);
+
+ /*
+ * host was suspended, cache envelope id in hoststat tree
+ * so that it can be retried when a delivery succeeds for
+ * that domain.
+ */
+ domain = strchr(e->dest, '@');
+ if (fail == IMSG_DELIVERY_TEMPFAIL && domain) {
+ r = 0;
+ iter = NULL;
+ while (tree_iter(&relay->connectors, &iter,
+ NULL, (void **)&c)) {
+ if (c->flags & CONNECTOR_ERROR_ROUTE)
+ r++;
+ }
+ if (tree_count(&relay->connectors) == r)
+ mta_hoststat_cache(domain+1, e->id);
+ }
+
free(e->dest);
free(e->rcpt);
free(e);
@@ -1012,27 +1260,28 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
* Find a route to use for this connector
*/
static struct mta_route *
-mta_find_route(struct mta_connector *c)
+mta_find_route(struct mta_connector *c, time_t now, int *limits,
+ time_t *nextconn)
{
struct mta_route *route, *best;
+ struct mta_limits *l = c->relay->limits;
struct mta_mx *mx;
int level, limit_host, limit_route;
- int family_mismatch, seen;
+ int family_mismatch, seen, suspended_route;
+ time_t tm;
+ log_debug("debug: mta-routing: searching new route for %s...",
+ mta_connector_to_text(c));
+
+ tm = 0;
limit_host = 0;
limit_route = 0;
+ suspended_route = 0;
family_mismatch = 0;
level = -1;
best = NULL;
seen = 0;
- if (c->nconn >= MAXCONN_PER_CONNECTOR) {
- log_debug("debug: mta: hit limit on connector %s",
- mta_connector_to_text(c));
- c->flags |= CONNECTOR_LIMIT_SOURCE;
- return (NULL);
- }
-
TAILQ_FOREACH(mx, &c->relay->domain->mxs, entry) {
/*
* New preference level
@@ -1047,9 +1296,10 @@ mta_find_route(struct mta_connector *c)
/*
* No candidate found. There are valid MXs at this
- * preference level but they reached their limit.
+ * preference level but they reached their limit, or
+ * we can't connect yet.
*/
- if (limit_host || limit_route)
+ if (limit_host || limit_route || tm)
break;
/*
@@ -1073,27 +1323,81 @@ mta_find_route(struct mta_connector *c)
/* Found a possibly valid mx */
seen++;
- if (c->source->sa &&
- c->source->sa->sa_family != mx->host->sa->sa_family) {
+ if ((c->source->sa &&
+ c->source->sa->sa_family != mx->host->sa->sa_family) ||
+ (l->family && l->family != mx->host->sa->sa_family)) {
+ log_debug("debug: mta-routing: skipping host %s: AF mismatch",
+ mta_host_to_text(mx->host));
family_mismatch = 1;
continue;
}
- if (mx->host->nconn >= MAXCONN_PER_HOST) {
+ if (mx->host->nconn >= l->maxconn_per_host) {
+ log_debug("debug: mta-routing: skipping host %s: too many connections",
+ mta_host_to_text(mx->host));
limit_host = 1;
continue;
}
+ if (mx->host->lastconn + l->conndelay_host > now) {
+ log_debug("debug: mta-routing: skipping host %s: cannot use before %llus",
+ mta_host_to_text(mx->host),
+ (unsigned long long) mx->host->lastconn + l->conndelay_host - now);
+ if (tm == 0 || mx->host->lastconn + l->conndelay_host < tm)
+ tm = mx->host->lastconn + l->conndelay_host;
+ continue;
+ }
+
route = mta_route(c->source, mx->host);
- if (route->nconn >= MAXCONN_PER_ROUTE) {
+ if (route->flags & ROUTE_DISABLED) {
+ log_debug("debug: mta-routing: skipping route %s: suspend",
+ mta_route_to_text(route));
+ suspended_route |= route->flags & ROUTE_DISABLED;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
+ if (route->nconn && (route->flags & ROUTE_NEW)) {
+ log_debug("debug: mta-routing: skipping route %s: not validated yet",
+ mta_route_to_text(route));
+ limit_route = 1;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
+ if (route->nconn >= l->maxconn_per_route) {
+ log_debug("debug: mta-routing: skipping route %s: too many connections",
+ mta_route_to_text(route));
limit_route = 1;
mta_route_unref(route); /* from here */
continue;
}
+ if (route->lastconn + l->conndelay_route > now) {
+ log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after connect)",
+ mta_route_to_text(route),
+ (unsigned long long) route->lastconn + l->conndelay_route - now);
+ if (tm == 0 || route->lastconn + l->conndelay_route < tm)
+ tm = route->lastconn + l->conndelay_route;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
+ if (route->lastdisc + l->discdelay_route > now) {
+ log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after disconnect)",
+ mta_route_to_text(route),
+ (unsigned long long) route->lastdisc + l->discdelay_route - now);
+ if (tm == 0 || route->lastdisc + l->discdelay_route < tm)
+ tm = route->lastdisc + l->discdelay_route;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
/* Use the route with the lowest number of connections. */
if (best && route->nconn >= best->nconn) {
+ log_debug("debug: mta-routing: skipping route %s: current one is better",
+ mta_route_to_text(route));
mta_route_unref(route); /* from here */
continue;
}
@@ -1101,30 +1405,43 @@ mta_find_route(struct mta_connector *c)
if (best)
mta_route_unref(best); /* from here */
best = route;
+ log_debug("debug: mta-routing: selecting candidate route %s",
+ mta_route_to_text(route));
}
if (best)
return (best);
+ /* Order is important */
if (seen == 0) {
- log_info("smtp-out: No reachable MX for connector %s",
+ log_info("smtp-out: No MX found for %s",
mta_connector_to_text(c));
- c->flags |= CONNECTOR_MX_ERROR;
+ c->flags |= CONNECTOR_ERROR_MX;
}
else if (limit_route) {
- log_debug("debug: mta: hit route limit on connector %s",
- mta_connector_to_text(c));
- c->flags |= CONNECTOR_LIMIT_ROUTE;
+ log_debug("debug: mta: hit route limit");
+ *limits |= CONNECTOR_LIMIT_ROUTE;
}
else if (limit_host) {
- log_debug("debug: mta: hit host limit on connector %s",
- mta_connector_to_text(c));
- c->flags |= CONNECTOR_LIMIT_HOST;
+ log_debug("debug: mta: hit host limit");
+ *limits |= CONNECTOR_LIMIT_HOST;
+ }
+ else if (tm) {
+ if (tm > *nextconn)
+ *nextconn = tm;
}
else if (family_mismatch) {
- log_info("smtp-out: Address family mismatch on connector %s",
+ log_info("smtp-out: Address family mismatch on %s",
mta_connector_to_text(c));
- c->flags |= CONNECTOR_FAMILY_ERROR;
+ c->flags |= CONNECTOR_ERROR_FAMILY;
+ }
+ else if (suspended_route) {
+ log_info("smtp-out: No valid route for %s",
+ mta_connector_to_text(c));
+ if (suspended_route & ROUTE_DISABLED_NET)
+ c->flags |= CONNECTOR_ERROR_ROUTE_NET;
+ if (suspended_route & ROUTE_DISABLED_SMTP)
+ c->flags |= CONNECTOR_ERROR_ROUTE_SMTP;
}
return (NULL);
@@ -1134,33 +1451,16 @@ static void
mta_log(const struct mta_envelope *evp, const char *prefix, const char *source,
const char *relay, const char *status)
{
- char session[SMTPD_MAXLINESIZE];
- char rcpt[SMTPD_MAXLINESIZE];
- char src[SMTPD_MAXLINESIZE];
-
- session[0] = '\0';
- if (evp->session)
- snprintf(session, sizeof session, "session=%016"PRIx64", ",
- evp->session);
-
- rcpt[0] = '\0';
- if (evp->rcpt)
- snprintf(rcpt, sizeof rcpt, "rcpt=<%s>, ", evp->rcpt);
-
- src[0] = '\0';
- if (source)
- snprintf(src, sizeof src, "source=%s, ", source);
-
-
- log_info("relay: %s for %016" PRIx64 ": %sfrom=<%s>, to=<%s>, "
- "%s%srelay=%s, delay=%s, stat=%s",
+ log_info("relay: %s for %016" PRIx64 ": session=%016"PRIx64", "
+ "from=<%s>, to=<%s>, rcpt=<%s>, source=%s, "
+ "relay=%s, delay=%s, stat=%s",
prefix,
evp->id,
- session,
+ evp->session,
evp->task->sender,
evp->dest,
- src,
- rcpt,
+ evp->rcpt ? evp->rcpt : "-",
+ source ? source : "-",
relay,
duration_to_text(time(NULL) - evp->creation),
status);
@@ -1204,10 +1504,6 @@ mta_relay(struct envelope *e)
if ((r = SPLAY_FIND(mta_relay_tree, &relays, &key)) == NULL) {
r = xcalloc(1, sizeof *r, "mta_relay");
TAILQ_INIT(&r->tasks);
- TAILQ_INIT(&r->c_ready);
- TAILQ_INIT(&r->c_delay);
- TAILQ_INIT(&r->c_limit);
- TAILQ_INIT(&r->c_error);
r->id = generate_uid();
r->flags = key.flags;
r->domain = key.domain;
@@ -1227,12 +1523,9 @@ mta_relay(struct envelope *e)
r->helotable = xstrdup(key.helotable,
"mta: helotable");
SPLAY_INSERT(mta_relay_tree, &relays, r);
- evtimer_set(&r->ev, mta_relay_timeout, r);
- log_trace(TRACE_MTA, "mta: new %s", mta_relay_to_text(r));
stat_increment("mta.relay", 1);
} else {
mta_domain_unref(key.domain); /* from here */
- log_trace(TRACE_MTA, "mta: reusing %s", mta_relay_to_text(r));
}
r->refcount++;
@@ -1267,9 +1560,6 @@ mta_relay_unref(struct mta_relay *relay)
free(relay->secret);
free(relay->sourcetable);
- if (evtimer_pending(&relay->ev, NULL))
- evtimer_del(&relay->ev);
-
mta_domain_unref(relay->domain); /* from constructor */
free(relay);
stat_decrement("mta.relay", 1);
@@ -1594,14 +1884,11 @@ mta_connector(struct mta_relay *relay, struct mta_source *source)
c = xcalloc(1, sizeof(*c), "mta_connector");
c->relay = relay;
c->source = source;
+ c->flags |= CONNECTOR_NEW;
mta_source_ref(source);
- c->queue = &relay->c_ready;
- TAILQ_INSERT_HEAD(c->queue, c, lst_entry);
tree_xset(&relay->connectors, (uintptr_t)(source), c);
- relay->nconnector++;
stat_increment("mta.connector", 1);
- log_debug("debug: mta: new connector %s",
- mta_connector_to_text(c));
+ log_debug("debug: mta: new %s", mta_connector_to_text(c));
}
return (c);
@@ -1610,10 +1897,17 @@ mta_connector(struct mta_relay *relay, struct mta_source *source)
static void
mta_connector_free(struct mta_connector *c)
{
- c->relay->nconnector--;
- TAILQ_REMOVE(c->queue, c, lst_entry);
- mta_source_unref(c->source);
+ log_debug("debug: mta: freeing %s",
+ mta_connector_to_text(c));
+
+ if (c->flags & CONNECTOR_WAIT) {
+ log_debug("debug: mta: canceling timeout for %s",
+ mta_connector_to_text(c));
+ runq_cancel(runq_connector, NULL, c);
+ }
+ mta_source_unref(c->source); /* from constructor */
free(c);
+
stat_decrement("mta.connector", 1);
}
@@ -1622,9 +1916,10 @@ mta_connector_to_text(struct mta_connector *c)
{
static char buf[1024];
- snprintf(buf, sizeof buf, "%s->%s",
+ snprintf(buf, sizeof buf, "[connector:%s->%s,0x%x]",
mta_source_to_text(c->source),
- mta_relay_to_text(c->relay));
+ mta_relay_to_text(c->relay),
+ c->flags);
return (buf);
}
@@ -1632,6 +1927,7 @@ static struct mta_route *
mta_route(struct mta_source *src, struct mta_host *dst)
{
struct mta_route key, *r;
+ static uint64_t rid = 0;
key.src = src;
key.dst = dst;
@@ -1641,30 +1937,80 @@ mta_route(struct mta_source *src, struct mta_host *dst)
r = xcalloc(1, sizeof(*r), "mta_route");
r->src = src;
r->dst = dst;
+ r->flags |= ROUTE_NEW;
+ r->id = ++rid;
SPLAY_INSERT(mta_route_tree, &routes, r);
mta_source_ref(src);
mta_host_ref(dst);
stat_increment("mta.route", 1);
}
+ else if (r->flags & ROUTE_RUNQ) {
+ log_debug("debug: mta: mta_route_ref(): canceling runq for route %s",
+ mta_route_to_text(r));
+ r->flags &= ~(ROUTE_RUNQ | ROUTE_KEEPALIVE);
+ runq_cancel(runq_route, NULL, r);
+ r->refcount--; /* from mta_route_unref() */
+ }
r->refcount++;
return (r);
}
-#if 0
static void
mta_route_ref(struct mta_route *r)
{
r->refcount++;
}
-#endif
static void
mta_route_unref(struct mta_route *r)
{
+ time_t sched, now;
+ int delay;
+
if (--r->refcount)
return;
+ /*
+ * Nothing references this route, but we might want to keep it alive
+ * for a while.
+ */
+ now = time(NULL);
+ sched = 0;
+
+ if (r->penalty) {
+#if DELAY_QUADRATIC
+ delay = DELAY_ROUTE_BASE * r->penalty * r->penalty;
+#else
+ delay = 15 * 60;
+#endif
+ if (delay > DELAY_ROUTE_MAX)
+ delay = DELAY_ROUTE_MAX;
+ sched = r->lastpenalty + delay;
+ log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (penalty %i)",
+ mta_route_to_text(r), (unsigned long long) sched - now, r->penalty);
+ } else if (!(r->flags & ROUTE_KEEPALIVE)) {
+ if (r->lastconn + max_seen_conndelay_route > now)
+ sched = r->lastconn + max_seen_conndelay_route;
+ if (r->lastdisc + max_seen_discdelay_route > now &&
+ r->lastdisc + max_seen_discdelay_route < sched)
+ sched = r->lastdisc + max_seen_discdelay_route;
+
+ if (sched > now)
+ log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (imposed delay)",
+ mta_route_to_text(r), (unsigned long long) sched - now);
+ }
+
+ if (sched > now) {
+ r->flags |= ROUTE_RUNQ;
+ runq_schedule(runq_route, sched, NULL, r);
+ r->refcount++;
+ return;
+ }
+
+ log_debug("debug: mta: ma_route_unref(): really discarding route %s",
+ mta_route_to_text(r));
+
SPLAY_REMOVE(mta_route_tree, &routes, r);
mta_source_unref(r->src); /* from constructor */
mta_host_unref(r->dst); /* from constructor */
@@ -1701,3 +2047,94 @@ mta_route_cmp(const struct mta_route *a, const struct mta_route *b)
}
SPLAY_GENERATE(mta_route_tree, mta_route, entry, mta_route_cmp);
+
+
+/* hoststat errors are not critical, we do best effort */
+void
+mta_hoststat_update(const char *host, const char *error)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+ time_t tm;
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ tm = time(NULL);
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL) {
+ hs = calloc(1, sizeof *hs);
+ if (hs == NULL)
+ return;
+ tree_init(&hs->deferred);
+ runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs);
+ }
+ strlcpy(hs->name, buf, sizeof hs->name);
+ strlcpy(hs->error, error, sizeof hs->error);
+ hs->tm = time(NULL);
+ dict_set(&hoststat, buf, hs);
+
+ runq_cancel(runq_hoststat, NULL, hs);
+ runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs);
+}
+
+void
+mta_hoststat_cache(const char *host, uint64_t evpid)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL)
+ return;
+
+ tree_set(&hs->deferred, evpid, NULL);
+}
+
+void
+mta_hoststat_uncache(const char *host, uint64_t evpid)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL)
+ return;
+
+ tree_pop(&hs->deferred, evpid);
+}
+
+void
+mta_hoststat_reschedule(const char *host)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+ uint64_t evpid;
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL)
+ return;
+
+ while (tree_poproot(&hs->deferred, &evpid, NULL)) {
+ m_compose(p_queue, IMSG_MTA_SCHEDULE, 0, 0, -1,
+ &evpid, sizeof evpid);
+ }
+}
+
+static void
+mta_hoststat_remove_entry(struct hoststat *hs)
+{
+ while (tree_poproot(&hs->deferred, NULL, NULL))
+ ;
+ dict_pop(&hoststat, hs->name);
+ runq_cancel(runq_hoststat, NULL, hs);
+}
diff --git a/usr.sbin/smtpd/mta_session.c b/usr.sbin/smtpd/mta_session.c
index a5a00d6823c..e3c8896c96d 100644
--- a/usr.sbin/smtpd/mta_session.c
+++ b/usr.sbin/smtpd/mta_session.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta_session.c,v 1.38 2013/07/19 07:37:29 eric Exp $ */
+/* $OpenBSD: mta_session.c,v 1.39 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -45,9 +45,9 @@
#include "log.h"
#include "ssl.h"
-#define MAX_MAIL 100
+#define MAX_TRYBEFOREDISABLE 10
-#define MTA_HIWAT 65535
+#define MTA_HIWAT 65535
enum mta_state {
MTA_INIT,
@@ -84,11 +84,17 @@ enum mta_state {
#define MTA_FREE 0x0400
#define MTA_LMTP 0x0800
#define MTA_WAIT 0x1000
+#define MTA_HANGON 0x2000
#define MTA_EXT_STARTTLS 0x01
#define MTA_EXT_AUTH 0x02
#define MTA_EXT_PIPELINING 0x04
+struct failed_evp {
+ int delivery;
+ char error[SMTPD_MAXLINESIZE];
+ struct mta_envelope *evp;
+};
struct mta_session {
uint64_t id;
@@ -108,13 +114,19 @@ struct mta_session {
struct io io;
int ext;
- int msgcount;
- int rcptcount;
+ size_t msgtried;
+ size_t msgcount;
+ size_t rcptcount;
+ int hangon;
enum mta_state state;
struct mta_task *task;
struct mta_envelope *currevp;
FILE *datafp;
+
+#define MAX_FAILED_ENVELOPES 15
+ struct failed_evp failed[MAX_FAILED_ENVELOPES];
+ int failedcount;
};
static void mta_session_init(void);
@@ -122,9 +134,10 @@ static void mta_start(int fd, short ev, void *arg);
static void mta_io(struct io *, int);
static void mta_free(struct mta_session *);
static void mta_on_ptr(void *, void *, void *);
+static void mta_on_timeout(struct runq *, void *);
static void mta_connect(struct mta_session *);
static void mta_enter_state(struct mta_session *, int);
-static void mta_flush_task(struct mta_session *, int, const char *, size_t);
+static void mta_flush_task(struct mta_session *, int, const char *, size_t, int);
static void mta_error(struct mta_session *, const char *, ...);
static void mta_send(struct mta_session *, char *, ...);
static ssize_t mta_queue_data(struct mta_session *);
@@ -134,6 +147,11 @@ static int mta_check_loop(FILE *);
static void mta_start_tls(struct mta_session *);
static int mta_verify_certificate(struct mta_session *);
static struct mta_session *mta_tree_pop(struct tree *, uint64_t);
+static void mta_flush_failedqueue(struct mta_session *);
+void mta_hoststat_update(const char *, const char *);
+void mta_hoststat_reschedule(const char *);
+void mta_hoststat_cache(const char *, uint64_t);
+void mta_hoststat_uncache(const char *, uint64_t);
static struct tree wait_helo;
static struct tree wait_ptr;
@@ -141,6 +159,8 @@ static struct tree wait_fd;
static struct tree wait_ssl_init;
static struct tree wait_ssl_verify;
+static struct runq *hangon;
+
static void
mta_session_init(void)
{
@@ -152,6 +172,7 @@ mta_session_init(void)
tree_init(&wait_fd);
tree_init(&wait_ssl_init);
tree_init(&wait_ssl_verify);
+ runq_init(&hangon, mta_on_timeout);
init = 1;
}
}
@@ -245,7 +266,7 @@ mta_session_imsg(struct mproc *p, struct imsg *imsg)
if (imsg->fd == -1) {
log_debug("debug: mta: failed to obtain msg fd");
mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL,
- "Could not get message fd", 0);
+ "Could not get message fd", 0, 0);
mta_enter_state(s, MTA_READY);
io_reload(&s->io);
return;
@@ -260,7 +281,7 @@ mta_session_imsg(struct mproc *p, struct imsg *imsg)
fclose(s->datafp);
s->datafp = NULL;
mta_flush_task(s, IMSG_DELIVERY_LOOP,
- "Loop detected", 0);
+ "Loop detected", 0, 0);
mta_enter_state(s, MTA_READY);
} else {
mta_enter_state(s, MTA_MAIL);
@@ -295,7 +316,7 @@ mta_session_imsg(struct mproc *p, struct imsg *imsg)
return;
if (resp_ca_cert->status == CA_FAIL) {
- log_info("smtp-out: Disconnecting session %016" PRIx64
+ log_info("smtp-out: Disconnecting session %016"PRIx64
": CA failure", s->id);
mta_free(s);
return;
@@ -391,6 +412,16 @@ mta_free(struct mta_session *s)
log_debug("debug: mta: %p: session done", s);
+ if (s->ready)
+ s->relay->nconn_ready -= 1;
+
+ if (s->flags & MTA_HANGON) {
+ log_debug("debug: mta: %p: cancelling hangon timer", s);
+ runq_cancel(hangon, NULL, s);
+ }
+
+ mta_flush_failedqueue(s);
+
io_clear(&s->io);
iobuf_clear(&s->iobuf);
@@ -409,6 +440,20 @@ mta_free(struct mta_session *s)
}
static void
+mta_on_timeout(struct runq *runq, void *arg)
+{
+ struct mta_session *s = arg;
+
+ log_debug("mta: timeout for session hangon");
+
+ s->flags &= ~MTA_HANGON;
+ s->hangon++;
+
+ mta_enter_state(s, MTA_READY);
+ io_reload(&s->io);
+}
+
+static void
mta_on_ptr(void *tag, void *arg, void *data)
{
struct mta_session *s = arg;
@@ -469,7 +514,8 @@ mta_connect(struct mta_session *s)
break;
}
default:
- goto fail;
+ mta_free(s);
+ return;
}
portno = s->use_smtps ? 465 : 25;
@@ -518,12 +564,6 @@ mta_connect(struct mta_session *s)
mta_error(s, "Connection failed: %s", s->io.error);
mta_free(s);
}
- return;
-
-fail:
- mta_error(s, "Could not connect");
- mta_free(s);
- return;
}
static void
@@ -597,10 +637,19 @@ mta_enter_state(struct mta_session *s, int newstate)
/* Ready to send a new mail */
if (s->ready == 0) {
s->ready = 1;
+ s->relay->nconn_ready += 1;
mta_route_ok(s->relay, s->route);
}
- if (s->msgcount >= MAX_MAIL) {
+ if (s->msgtried >= MAX_TRYBEFOREDISABLE) {
+ log_info("smtp-out: Remote host seems to reject all mails on session %016"PRIx64,
+ s->id);
+ mta_route_down(s->relay, s->route);
+ mta_enter_state(s, MTA_QUIT);
+ break;
+ }
+
+ if (s->msgcount >= s->relay->limits->max_mail_per_session) {
log_debug("debug: mta: "
"%p: cannot send more message to relay %s", s,
mta_relay_to_text(s->relay));
@@ -612,7 +661,17 @@ mta_enter_state(struct mta_session *s, int newstate)
if (s->task == NULL) {
log_debug("debug: mta: %p: no task for relay %s",
s, mta_relay_to_text(s->relay));
- mta_enter_state(s, MTA_QUIT);
+
+ if (s->relay->nconn > 1 ||
+ s->hangon >= s->relay->limits->sessdelay_keepalive) {
+ mta_enter_state(s, MTA_QUIT);
+ break;
+ }
+
+ log_debug("mta: debug: last connection: hanging on for %is",
+ s->relay->limits->sessdelay_keepalive - s->hangon);
+ s->flags |= MTA_HANGON;
+ runq_schedule(hangon, time(NULL) + 1, NULL, s);
break;
}
@@ -631,6 +690,8 @@ mta_enter_state(struct mta_session *s, int newstate)
break;
case MTA_MAIL:
+ s->hangon = 0;
+ s->msgtried++;
mta_send(s, "MAIL FROM:<%s>", s->task->sender);
break;
@@ -699,10 +760,12 @@ static void
mta_response(struct mta_session *s, char *line)
{
struct mta_envelope *e;
- char buf[SMTPD_MAXLINESIZE];
- int delivery;
+ struct failed_evp *fevp;
struct sockaddr sa;
+ const char *domain;
socklen_t sa_len;
+ char buf[SMTPD_MAXLINESIZE];
+ int delivery;
switch (s->state) {
@@ -779,7 +842,7 @@ mta_response(struct mta_session *s, char *line)
delivery = IMSG_DELIVERY_PERMFAIL;
else
delivery = IMSG_DELIVERY_TEMPFAIL;
- mta_flush_task(s, delivery, line, 0);
+ mta_flush_task(s, delivery, line, 0, 0);
mta_enter_state(s, MTA_RSET);
return;
}
@@ -788,20 +851,38 @@ mta_response(struct mta_session *s, char *line)
case MTA_RCPT:
e = s->currevp;
+
+ /* remove envelope from hosttat cache if there */
+ if ((domain = strchr(e->dest, '@')) != NULL) {
+ domain++;
+ mta_hoststat_uncache(domain, e->id);
+ }
+
s->currevp = TAILQ_NEXT(s->currevp, entry);
- if (line[0] != '2') {
+ if (line[0] == '2') {
+ mta_flush_failedqueue(s);
+ /*
+ * this host is up, reschedule envelopes that
+ * were cached for reschedule.
+ */
+ if (domain)
+ mta_hoststat_reschedule(domain);
+ }
+ else {
if (line[0] == '5')
delivery = IMSG_DELIVERY_PERMFAIL;
else
delivery = IMSG_DELIVERY_TEMPFAIL;
+ /* remove failed envelope from task list */
TAILQ_REMOVE(&s->task->envelopes, e, entry);
+ stat_decrement("mta.envelope", 1);
+
+ /* log right away */
snprintf(buf, sizeof(buf), "%s",
mta_host_to_text(s->route->dst));
- /* we're about to log, associate session to envelope */
e->session = s->id;
-
/* XXX */
/*
* getsockname() can only fail with ENOBUFS here
@@ -809,22 +890,45 @@ mta_response(struct mta_session *s, char *line)
*/
sa_len = sizeof sa;
if (getsockname(s->io.sock, &sa, &sa_len) < 0)
- mta_delivery(e, NULL, buf, delivery, line);
+ mta_delivery_log(e, NULL, buf, delivery, line);
else
- mta_delivery(e, sa_to_text(&sa),
+ mta_delivery_log(e, sa_to_text(&sa),
buf, delivery, line);
- free(e->dest);
- free(e->rcpt);
- free(e);
- stat_decrement("mta.envelope", 1);
+ /* push failed envelope to the session fail queue */
+ e->delivery = delivery;
+ fevp = &s->failed[s->failedcount];
+ fevp->delivery = delivery;
+ fevp->evp = e;
+ strlcpy(fevp->error, line, sizeof fevp->error);
+ s->failedcount++;
+
+ /*
+ * if session fail queue is full:
+ * - flush failed queue (failure w/ penalty)
+ * - flush remaining tasks with TempFail
+ * - mark route down
+ */
+ if (s->failedcount == MAX_FAILED_ENVELOPES) {
+ mta_flush_failedqueue(s);
+ mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL,
+ "Host temporarily disabled", 0, 1);
+ mta_route_down(s->relay, s->route);
+ mta_enter_state(s, MTA_QUIT);
+ break;
+ }
+
+ /*
+ * if no more envelopes, flush failed queue
+ */
if (TAILQ_EMPTY(&s->task->envelopes)) {
mta_flush_task(s, IMSG_DELIVERY_OK,
- "No envelope", 0);
+ "No envelope", 0, 0);
mta_enter_state(s, MTA_RSET);
break;
}
}
+
if (s->currevp == NULL)
mta_enter_state(s, MTA_DATA);
else
@@ -832,6 +936,7 @@ mta_response(struct mta_session *s, char *line)
break;
case MTA_DATA:
+ mta_flush_failedqueue(s);
if (line[0] == '2' || line[0] == '3') {
mta_enter_state(s, MTA_BODY);
break;
@@ -840,7 +945,7 @@ mta_response(struct mta_session *s, char *line)
delivery = IMSG_DELIVERY_PERMFAIL;
else
delivery = IMSG_DELIVERY_TEMPFAIL;
- mta_flush_task(s, delivery, line, 0);
+ mta_flush_task(s, delivery, line, 0, 0);
mta_enter_state(s, MTA_RSET);
break;
@@ -848,24 +953,47 @@ mta_response(struct mta_session *s, char *line)
case MTA_EOM:
if (line[0] == '2') {
delivery = IMSG_DELIVERY_OK;
+ s->msgtried = 0;
s->msgcount++;
}
else if (line[0] == '5')
delivery = IMSG_DELIVERY_PERMFAIL;
else
delivery = IMSG_DELIVERY_TEMPFAIL;
- mta_flush_task(s, delivery, line, (s->flags & MTA_LMTP) ? 1 : 0 );
+ mta_flush_task(s, delivery, line, (s->flags & MTA_LMTP) ? 1 : 0, 0);
if (s->task) {
s->rcptcount--;
mta_enter_state(s, MTA_LMTP_EOM);
} else {
s->rcptcount = 0;
- mta_enter_state(s, MTA_READY);
+ if (s->relay->limits->sessdelay_transaction) {
+ log_debug("debug: mta: waiting for %llis before next transaction",
+ (long long int)s->relay->limits->sessdelay_transaction);
+ s->hangon = s->relay->limits->sessdelay_transaction -1;
+ s->flags |= MTA_HANGON;
+ runq_schedule(hangon, time(NULL)
+ + s->relay->limits->sessdelay_transaction,
+ NULL, s);
+ }
+ else
+ mta_enter_state(s, MTA_READY);
}
break;
case MTA_RSET:
- mta_enter_state(s, MTA_READY);
+ mta_flush_failedqueue(s);
+ s->rcptcount = 0;
+ if (s->relay->limits->sessdelay_transaction) {
+ log_debug("debug: mta: waiting for %llis after reset",
+ (long long int)s->relay->limits->sessdelay_transaction);
+ s->hangon = s->relay->limits->sessdelay_transaction -1;
+ s->flags |= MTA_HANGON;
+ runq_schedule(hangon, time(NULL)
+ + s->relay->limits->sessdelay_transaction,
+ NULL, s);
+ }
+ else
+ mta_enter_state(s, MTA_READY);
break;
default:
@@ -961,7 +1089,7 @@ mta_io(struct io *io, int evt)
if (s->state == MTA_QUIT) {
log_info("smtp-out: Closing session %016"PRIx64
- ": %i message%s sent.", s->id, s->msgcount,
+ ": %zu message%s sent.", s->id, s->msgcount,
(s->msgcount > 1) ? "s" : "");
mta_free(s);
return;
@@ -1072,7 +1200,7 @@ mta_queue_data(struct mta_session *s)
if (ferror(s->datafp)) {
mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL,
- "Error reading content file", 0);
+ "Error reading content file", 0, 0);
return (-1);
}
@@ -1085,16 +1213,17 @@ mta_queue_data(struct mta_session *s)
}
static void
-mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t count)
+mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t count,
+ int cache)
{
struct mta_envelope *e;
char relay[SMTPD_MAXLINESIZE];
size_t n;
struct sockaddr sa;
socklen_t sa_len;
+ const char *domain;
snprintf(relay, sizeof relay, "%s", mta_host_to_text(s->route->dst));
-
n = 0;
while ((e = TAILQ_FIRST(&s->task->envelopes))) {
@@ -1115,10 +1244,17 @@ mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t co
*/
sa_len = sizeof sa;
if (getsockname(s->io.sock, &sa, &sa_len) < 0)
- mta_delivery(e, NULL, relay, delivery, error);
+ mta_delivery(e, NULL, relay, delivery, error, 0);
else
mta_delivery(e, sa_to_text(&sa),
- relay, delivery, error);
+ relay, delivery, error, 0);
+
+ domain = strchr(e->dest, '@');
+ if (domain) {
+ mta_hoststat_update(domain + 1, error);
+ if (cache)
+ mta_hoststat_cache(domain + 1, e->id);
+ }
free(e->dest);
free(e->rcpt);
@@ -1141,6 +1277,32 @@ mta_flush_task(struct mta_session *s, int delivery, const char *error, size_t co
}
static void
+mta_flush_failedqueue(struct mta_session *s)
+{
+ int i;
+ struct failed_evp *fevp;
+ struct mta_envelope *e;
+ const char *domain;
+ uint32_t penalty;
+
+ penalty = s->failedcount == MAX_FAILED_ENVELOPES ? 1 : 0;
+ for (i = 0; i < s->failedcount; ++i) {
+ fevp = &s->failed[i];
+ e = fevp->evp;
+ mta_delivery_notify(e, fevp->delivery, fevp->error, penalty);
+
+ domain = strchr(e->dest, '@');
+ if (domain)
+ mta_hoststat_update(domain + 1, fevp->error);
+
+ free(e->dest);
+ free(e->rcpt);
+ free(e);
+ }
+ s->failedcount = 0;
+}
+
+static void
mta_error(struct mta_session *s, const char *fmt, ...)
{
va_list ap;
@@ -1154,7 +1316,7 @@ mta_error(struct mta_session *s, const char *fmt, ...)
if (s->msgcount)
log_info("smtp-out: Error on session %016"PRIx64
- " after %i message%s sent: %s", s->id, s->msgcount,
+ " after %zu message%s sent: %s", s->id, s->msgcount,
(s->msgcount > 1) ? "s" : "", error);
else
log_info("smtp-out: Error on session %016"PRIx64 ": %s",
@@ -1173,7 +1335,7 @@ mta_error(struct mta_session *s, const char *fmt, ...)
mta_route_error(s->relay, s->route);
if (s->task)
- mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL, error, 0);
+ mta_flush_task(s, IMSG_DELIVERY_TEMPFAIL, error, 0, 0);
free(error);
}
@@ -1307,6 +1469,7 @@ mta_verify_certificate(struct mta_session *s)
return 1;
}
+
#define CASE(x) case x : return #x
static const char *
diff --git a/usr.sbin/smtpd/parse.y b/usr.sbin/smtpd/parse.y
index c31187f15c1..dedaa27c582 100644
--- a/usr.sbin/smtpd/parse.y
+++ b/usr.sbin/smtpd/parse.y
@@ -1,4 +1,4 @@
-/* $OpenBSD: parse.y,v 1.122 2013/07/19 20:37:07 eric Exp $ */
+/* $OpenBSD: parse.y,v 1.123 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
@@ -91,6 +91,7 @@ static int errors = 0;
struct table *table = NULL;
struct rule *rule = NULL;
struct listener l;
+struct mta_limits *limits;
struct listener *host_v4(const char *, in_port_t);
struct listener *host_v6(const char *, in_port_t);
@@ -119,9 +120,9 @@ typedef struct {
%}
%token AS QUEUE COMPRESSION ENCRYPTION MAXMESSAGESIZE LISTEN ON ANY PORT EXPIRE
-%token TABLE SSL SMTPS CERTIFICATE DOMAIN BOUNCEWARN INET4 INET6
+%token TABLE SSL SMTPS CERTIFICATE DOMAIN BOUNCEWARN LIMIT INET4 INET6
%token RELAY BACKUP VIA DELIVER TO LMTP MAILDIR MBOX HOSTNAME HELO
-%token ACCEPT REJECT INCLUDE ERROR MDA FROM FOR SOURCE
+%token ACCEPT REJECT INCLUDE ERROR MDA FROM FOR SOURCE MTA
%token ARROW AUTH TLS LOCAL VIRTUAL TAG TAGGED ALIAS FILTER KEY
%token AUTH_OPTIONAL TLS_REQUIRE USERBASE SENDER
%token <v.string> STRING
@@ -339,6 +340,26 @@ listen_helo : HOSTNAME STRING { $$ = $2; }
| /* empty */ { $$ = NULL; }
;
+opt_limit : INET4 {
+ limits->family = AF_INET;
+ }
+ | INET6 {
+ limits->family = AF_INET6;
+ }
+ | STRING NUMBER {
+ if (!limit_mta_set(limits, $1, $2)) {
+ yyerror("invalid limit keyword");
+ free($1);
+ YYERROR;
+ }
+ free($1);
+ }
+ ;
+
+limits : opt_limit limits
+ | /* empty */
+ ;
+
main : BOUNCEWARN {
bzero(conf->sc_bounce_warn, sizeof conf->sc_bounce_warn);
} bouncedelays
@@ -361,6 +382,21 @@ main : BOUNCEWARN {
| MAXMESSAGESIZE size {
conf->sc_maxsize = $2;
}
+ | LIMIT MTA FOR DOMAIN STRING {
+ struct mta_limits *d;
+
+ limits = dict_get(conf->sc_limits_dict, $5);
+ if (limits == NULL) {
+ limits = xcalloc(1, sizeof(*limits), "mta_limits");
+ dict_xset(conf->sc_limits_dict, $5, limits);
+ d = dict_xget(conf->sc_limits_dict, "default");
+ memmove(limits, d, sizeof(*limits));
+ }
+ free($5);
+ } limits
+ | LIMIT MTA {
+ limits = dict_get(conf->sc_limits_dict, "default");
+ } limits
| LISTEN {
bzero(&l, sizeof l);
} ON STRING address_family port ssl certificate auth tag listen_helo {
@@ -990,6 +1026,7 @@ lookup(char *s)
{ "inet4", INET4 },
{ "inet6", INET6 },
{ "key", KEY },
+ { "limit", LIMIT },
{ "listen", LISTEN },
{ "lmtp", LMTP },
{ "local", LOCAL },
@@ -997,6 +1034,7 @@ lookup(char *s)
{ "max-message-size", MAXMESSAGESIZE },
{ "mbox", MBOX },
{ "mda", MDA },
+ { "mta", MTA },
{ "on", ON },
{ "port", PORT },
{ "queue", QUEUE },
@@ -1359,6 +1397,7 @@ parse_config(struct smtpd *x_conf, const char *filename, int opts)
conf->sc_rules = calloc(1, sizeof(*conf->sc_rules));
conf->sc_listeners = calloc(1, sizeof(*conf->sc_listeners));
conf->sc_ssl_dict = calloc(1, sizeof(*conf->sc_ssl_dict));
+ conf->sc_limits_dict = calloc(1, sizeof(*conf->sc_limits_dict));
/* Report mails delayed for more than 4 hours */
conf->sc_bounce_warn[0] = 3600 * 4;
@@ -1366,12 +1405,14 @@ parse_config(struct smtpd *x_conf, const char *filename, int opts)
if (conf->sc_tables_dict == NULL ||
conf->sc_rules == NULL ||
conf->sc_listeners == NULL ||
- conf->sc_ssl_dict == NULL) {
+ conf->sc_ssl_dict == NULL ||
+ conf->sc_limits_dict == NULL) {
log_warn("warn: cannot allocate memory");
free(conf->sc_tables_dict);
free(conf->sc_rules);
free(conf->sc_listeners);
free(conf->sc_ssl_dict);
+ free(conf->sc_limits_dict);
return (-1);
}
@@ -1385,6 +1426,11 @@ parse_config(struct smtpd *x_conf, const char *filename, int opts)
dict_init(conf->sc_ssl_dict);
dict_init(conf->sc_tables_dict);
+ dict_init(conf->sc_limits_dict);
+ limits = xcalloc(1, sizeof(*limits), "mta_limits");
+ limit_mta_set_defaults(limits);
+ dict_xset(conf->sc_limits_dict, "default", limits);
+
TAILQ_INIT(conf->sc_listeners);
TAILQ_INIT(conf->sc_rules);
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index 177be1a7c38..d328979f5bd 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.152 2013/07/19 20:37:07 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.153 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
@@ -63,7 +63,6 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
struct delivery_bounce bounce;
struct bounce_req_msg *req_bounce;
struct envelope evp;
- static uint64_t batch_id;
struct msg m;
const char *reason;
uint64_t reqid, evpid;
@@ -266,14 +265,7 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
bounce_add(evpid);
return;
- case IMSG_MTA_BATCH:
- batch_id = generate_uid();
- m_create(p_mta, IMSG_MTA_BATCH, 0, 0, -1);
- m_add_id(p_mta, batch_id);
- m_close(p_mta);
- return;
-
- case IMSG_MTA_BATCH_ADD:
+ case IMSG_MTA_TRANSFER:
m_msg(&m, imsg);
m_get_evpid(&m, &evpid);
m_end(&m);
@@ -286,18 +278,11 @@ queue_imsg(struct mproc *p, struct imsg *imsg)
return;
}
evp.lasttry = time(NULL);
- m_create(p_mta, IMSG_MTA_BATCH_ADD, 0, 0, -1);
- m_add_id(p_mta, batch_id);
+ m_create(p_mta, IMSG_MTA_TRANSFER, 0, 0, -1);
m_add_envelope(p_mta, &evp);
m_close(p_mta);
return;
- case IMSG_MTA_BATCH_END:
- m_create(p_mta, IMSG_MTA_BATCH_END, 0, 0, -1);
- m_add_id(p_mta, batch_id);
- m_close(p_mta);
- return;
-
case IMSG_CTL_LIST_ENVELOPES:
if (imsg->hdr.len == sizeof imsg->hdr) {
m_forward(p_control, imsg);
diff --git a/usr.sbin/smtpd/runq.c b/usr.sbin/smtpd/runq.c
new file mode 100644
index 00000000000..30fd9e17f67
--- /dev/null
+++ b/usr.sbin/smtpd/runq.c
@@ -0,0 +1,203 @@
+/* $OpenBSD: runq.c,v 1.1 2013/07/19 21:14:52 eric Exp $ */
+
+/*
+ * Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/uio.h>
+
+#include <imsg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <time.h>
+
+#include "smtpd.h"
+
+struct job {
+ TAILQ_ENTRY(job) entry;
+ time_t when;
+ void (*cb)(struct runq *, void *);
+ void *arg;
+};
+
+struct runq {
+ TAILQ_HEAD(, job) jobs;
+ void (*cb)(struct runq *, void *);
+ struct event ev;
+};
+
+static void runq_timeout(int, short, void *);
+
+static struct runq *active;
+
+static void
+runq_reset(struct runq *runq)
+{
+ struct timeval tv;
+ struct job *job;
+ time_t now;
+
+ job = TAILQ_FIRST(&runq->jobs);
+ if (job == NULL)
+ return;
+
+ now = time(NULL);
+ if (job->when <= now)
+ tv.tv_sec = 0;
+ else
+ tv.tv_sec = job->when - now;
+ tv.tv_usec = 0;
+ evtimer_add(&runq->ev, &tv);
+}
+
+static void
+runq_timeout(int fd, short ev, void *arg)
+{
+ struct runq *runq = arg;
+ struct job *job;
+ time_t now;
+
+ active = runq;
+ now = time(NULL);
+
+ while((job = TAILQ_FIRST(&runq->jobs))) {
+ if (job->when > now)
+ break;
+ TAILQ_REMOVE(&runq->jobs, job, entry);
+ if (job->cb)
+ job->cb(runq, job->arg);
+ else
+ runq->cb(runq, job->arg);
+ free(job);
+ }
+
+ active = NULL;
+ runq_reset(runq);
+}
+
+int
+runq_init(struct runq **runqp, void (*cb)(struct runq *, void *))
+{
+ struct runq *runq;
+
+ runq = malloc(sizeof(*runq));
+ if (runq == NULL)
+ return (0);
+
+ runq->cb = cb;
+ TAILQ_INIT(&runq->jobs);
+ evtimer_set(&runq->ev, runq_timeout, runq);
+
+ *runqp = runq;
+
+ return (1);
+}
+
+int
+runq_schedule(struct runq *runq, time_t when, void (*cb)(struct runq *, void *),
+ void *arg)
+{
+ struct job *job, *tmpjob;
+
+ job = malloc(sizeof(*job));
+ if (job == NULL)
+ return (0);
+
+ job->arg = arg;
+ job->cb = cb;
+ job->when = when;
+
+ TAILQ_FOREACH(tmpjob, &runq->jobs, entry) {
+ if (tmpjob->when > job->when) {
+ TAILQ_INSERT_BEFORE(tmpjob, job, entry);
+ goto done;
+ }
+ }
+ TAILQ_INSERT_TAIL(&runq->jobs, job, entry);
+
+ done:
+ if (runq != active && job == TAILQ_FIRST(&runq->jobs)) {
+ evtimer_del(&runq->ev);
+ runq_reset(runq);
+ }
+ return (1);
+}
+
+int
+runq_delay(struct runq *runq, unsigned int delay,
+ void (*cb)(struct runq *, void *), void *arg)
+{
+ return runq_schedule(runq, time(NULL) + delay, cb, arg);
+}
+
+int
+runq_cancel(struct runq *runq, void (*cb)(struct runq *, void *), void *arg)
+{
+ struct job *job, *first;
+
+ first = TAILQ_FIRST(&runq->jobs);
+ TAILQ_FOREACH(job, &runq->jobs, entry) {
+ if (job->cb == cb && job->arg == arg) {
+ TAILQ_REMOVE(&runq->jobs, job, entry);
+ free(job);
+ if (runq != active && job == first) {
+ evtimer_del(&runq->ev);
+ runq_reset(runq);
+ }
+ return (1);
+ }
+ }
+
+ return (0);
+}
+
+int
+runq_pending(struct runq *runq, void (*cb)(struct runq *, void *), void *arg,
+ time_t *when)
+{
+ struct job *job;
+
+ TAILQ_FOREACH(job, &runq->jobs, entry) {
+ if (job->cb == cb && job->arg == arg) {
+ if (when)
+ *when = job->when;
+ return (1);
+ }
+ }
+
+ return (0);
+}
+
+int
+runq_next(struct runq *runq, void (**cb)(struct runq *, void *), void **arg,
+ time_t *when)
+{
+ struct job *job;
+
+ job = TAILQ_FIRST(&runq->jobs);
+ if (job == NULL)
+ return (0);
+ if (cb)
+ *cb = job->cb;
+ if (arg)
+ *arg = job->arg;
+ if (when)
+ *when = job->when;
+
+ return (1);
+}
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index d6a07bdeed4..fb06b6bc016 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.31 2013/07/19 15:14:23 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.32 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
@@ -555,17 +555,13 @@ scheduler_process_mta(struct scheduler_batch *batch)
{
size_t i;
- m_compose(p_queue, IMSG_MTA_BATCH, 0, 0, -1, NULL, 0);
-
for (i = 0; i < batch->evpcount; i++) {
log_debug("debug: scheduler: evp:%016" PRIx64
" scheduled (mta)", batch->evpids[i]);
- m_create(p_queue, IMSG_MTA_BATCH_ADD, 0, 0, -1);
+ m_create(p_queue, IMSG_MTA_TRANSFER, 0, 0, -1);
m_add_evpid(p_queue, batch->evpids[i]);
m_close(p_queue);
}
- m_compose(p_queue, IMSG_MTA_BATCH_END, 0, 0, -1, NULL, 0);
-
stat_increment("scheduler.envelope.inflight", batch->evpcount);
}
diff --git a/usr.sbin/smtpd/smtpctl.8 b/usr.sbin/smtpd/smtpctl.8
index e9d7ae4cfe4..22293162fac 100644
--- a/usr.sbin/smtpd/smtpctl.8
+++ b/usr.sbin/smtpd/smtpctl.8
@@ -1,4 +1,4 @@
-.\" $OpenBSD: smtpctl.8,v 1.42 2013/07/19 15:14:23 eric Exp $
+.\" $OpenBSD: smtpctl.8,v 1.43 2013/07/19 21:14:52 eric Exp $
.\"
.\" Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org>
.\" Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
@@ -79,7 +79,7 @@ Envelopes removed by the administrator.
Generated bounces.
.El
.It Cm pause envelope Ar envelope-id | message-id
-Temporarily suspend scheduling for the envelope with the given ID,
+Temporarily suspend scheduling for the envelope with the given ID,
or all envelopes with the given message ID.
.It Cm pause mda
Temporarily stop deliveries to local users.
@@ -108,6 +108,9 @@ or all envelopes with the given message ID.
Resume deliveries to local users.
.It Cm resume mta
Resume relaying and deliveries to remote users.
+.It Cm resume route Ar route-id
+Resume routing on disabled route
+.Ar route-id .
.It Cm resume smtp
Resume accepting incoming sessions.
.It Cm schedule all
@@ -117,6 +120,19 @@ Mark a single envelope, or all envelopes with the same message ID,
as ready for immediate delivery.
.It Cm show envelope Ar envelope-id
Display envelope content for the given ID.
+.It Cm show hoststats
+Display status of last delivery for domains that have been active in the
+last 4 hours.
+It consists of the following fields, separated by a "|":
+.Pp
+.Bl -bullet -compact
+.It
+domain.
+.It
+Unix timestamp of last delivery.
+.It
+Status of last delivery.
+.El
.It Cm show message Ar envelope-id
Display message content for the given ID.
.It Cm show queue
@@ -160,6 +176,24 @@ is not running.
.It
Error string for the last failed delivery or relay attempt.
.El
+.It Cm show routes
+Display status of routes currently known by
+.Xr smtpd 8 .
+Each line consists in a route number, a source address, a destination
+address, a set of flags, the number of connections on this
+route, the current penalty level which determines the amount of time
+the route is disabled if an error occurs, and the delay before it
+gets re-activated. The following flags are defined:
+.Pp
+.Bl -tag -width xx -compact
+.It N
+The route is new. No SMTP session has been established yet.
+.It D
+The route is currently disabled.
+.It Q
+The route as a timeout registered to lower its penality level and possibly
+re-activate or discard it.
+.El
.It Cm show stats
Displays runtime statistics concerning
.Xr smtpd 8 .
diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c
index 6c7dfe3d766..7d453410690 100644
--- a/usr.sbin/smtpd/smtpctl.c
+++ b/usr.sbin/smtpd/smtpctl.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpctl.c,v 1.107 2013/07/19 15:14:23 eric Exp $ */
+/* $OpenBSD: smtpctl.c,v 1.108 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
@@ -495,6 +495,20 @@ do_resume_mta(int argc, struct parameter *argv)
}
static int
+do_resume_route(int argc, struct parameter *argv)
+{
+ uint64_t v;
+
+ if (argc == 0)
+ v = 0;
+ else
+ v = argv[0].u.u_routeid;
+
+ srv_send(IMSG_CTL_RESUME_ROUTE, &v, sizeof(v));
+ return srv_check_result();
+}
+
+static int
do_resume_smtp(int argc, struct parameter *argv)
{
srv_send(IMSG_CTL_RESUME_SMTP, NULL, 0);
@@ -548,6 +562,23 @@ do_show_envelope(int argc, struct parameter *argv)
}
static int
+do_show_hoststats(int argc, struct parameter *argv)
+{
+ srv_send(IMSG_CTL_MTA_SHOW_HOSTSTATS, NULL, 0);
+
+ do {
+ srv_recv(IMSG_CTL_MTA_SHOW_HOSTSTATS);
+ if (rlen) {
+ printf("%s\n", rdata);
+ srv_read(NULL, rlen);
+ }
+ srv_end();
+ } while (rlen);
+
+ return (0);
+}
+
+static int
do_show_message(int argc, struct parameter *argv)
{
char buf[SMTPD_MAXPATHLEN];
@@ -629,6 +660,23 @@ do_show_queue(int argc, struct parameter *argv)
}
static int
+do_show_routes(int argc, struct parameter *argv)
+{
+ srv_send(IMSG_CTL_MTA_SHOW_ROUTES, NULL, 0);
+
+ do {
+ srv_recv(IMSG_CTL_MTA_SHOW_ROUTES);
+ if (rlen) {
+ printf("%s\n", rdata);
+ srv_read(NULL, rlen);
+ }
+ srv_end();
+ } while (rlen);
+
+ return (0);
+}
+
+static int
do_show_stats(int argc, struct parameter *argv)
{
struct stat_kv kv;
@@ -760,15 +808,18 @@ main(int argc, char **argv)
cmd_install("resume envelope <msgid>", do_resume_envelope);
cmd_install("resume mda", do_resume_mda);
cmd_install("resume mta", do_resume_mta);
+ cmd_install("resume route <routeid>", do_resume_route);
cmd_install("resume smtp", do_resume_smtp);
cmd_install("schedule <msgid>", do_schedule);
cmd_install("schedule <evpid>", do_schedule);
cmd_install("schedule all", do_schedule);
cmd_install("show envelope <evpid>", do_show_envelope);
+ cmd_install("show hoststats", do_show_hoststats);
cmd_install("show message <msgid>", do_show_message);
cmd_install("show message <evpid>", do_show_message);
cmd_install("show queue", do_show_queue);
cmd_install("show queue <msgid>", do_show_queue);
+ cmd_install("show routes", do_show_routes);
cmd_install("show stats", do_show_stats);
cmd_install("stop", do_stop);
cmd_install("trace <str>", do_trace);
diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c
index 00a1a8cb0a7..71c96517381 100644
--- a/usr.sbin/smtpd/smtpd.c
+++ b/usr.sbin/smtpd/smtpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.c,v 1.196 2013/07/19 20:37:07 eric Exp $ */
+/* $OpenBSD: smtpd.c,v 1.197 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
@@ -1426,6 +1426,7 @@ imsg_to_str(int type)
CASE(IMSG_CTL_RESUME_MDA);
CASE(IMSG_CTL_RESUME_MTA);
CASE(IMSG_CTL_RESUME_SMTP);
+ CASE(IMSG_CTL_RESUME_ROUTE);
CASE(IMSG_CTL_LIST_MESSAGES);
CASE(IMSG_CTL_LIST_ENVELOPES);
CASE(IMSG_CTL_REMOVE);
@@ -1436,6 +1437,9 @@ imsg_to_str(int type)
CASE(IMSG_CTL_PROFILE);
CASE(IMSG_CTL_UNPROFILE);
+ CASE(IMSG_CTL_MTA_SHOW_ROUTES);
+ CASE(IMSG_CTL_MTA_SHOW_HOSTSTATS);
+
CASE(IMSG_CONF_START);
CASE(IMSG_CONF_SSL);
CASE(IMSG_CONF_LISTENER);
@@ -1485,9 +1489,7 @@ imsg_to_str(int type)
CASE(IMSG_MFA_SMTP_DATA);
CASE(IMSG_MFA_SMTP_RESPONSE);
- CASE(IMSG_MTA_BATCH);
- CASE(IMSG_MTA_BATCH_ADD);
- CASE(IMSG_MTA_BATCH_END);
+ CASE(IMSG_MTA_TRANSFER);
CASE(IMSG_MTA_SCHEDULE);
CASE(IMSG_QUEUE_CREATE_MESSAGE);
diff --git a/usr.sbin/smtpd/smtpd.conf.5 b/usr.sbin/smtpd/smtpd.conf.5
index 492a9e7f0a2..e8a00535cca 100644
--- a/usr.sbin/smtpd/smtpd.conf.5
+++ b/usr.sbin/smtpd/smtpd.conf.5
@@ -1,4 +1,4 @@
-.\" $OpenBSD: smtpd.conf.5,v 1.101 2013/07/19 20:37:07 eric Exp $
+.\" $OpenBSD: smtpd.conf.5,v 1.102 2013/07/19 21:14:52 eric Exp $
.\"
.\" Copyright (c) 2008 Janne Johansson <jj@openbsd.org>
.\" Copyright (c) 2009 Jacek Masiulaniec <jacekm@dobremiasto.net>
@@ -312,7 +312,7 @@ If the
parameter is specified,
.Xr smtpd 8
will explicitly bind to an address found in the table referenced by
-.Ar source
+.Ar table
when connecting to the relay.
If the table contains more than one address, they are picked in turn each
time a new connection is opened.
@@ -406,7 +406,7 @@ If the
parameter is specified,
.Xr smtpd 8
will explicitly bind to an address found in the table referenced by
-.Ar source
+.Ar table
when connecting to the relay.
If the table contains more than one address, they are picked in turn each
time a new connection is opened.
@@ -451,6 +451,26 @@ expire 10h # expire after 10 hours
.Ed
.It Xo
.Bk -words
+.Ic limit mta
+.Op Ic for Ic domain Ar domain
+.Ar family
+.Ek
+.Xc
+Instruct
+.Xr smtpd 8
+to only use the specified address
+.Ar family
+for out-going connections.
+Accepted values are
+.Ic inet4
+and
+.Ic inet6 .
+If a
+.Ar domain
+is specified, the restriction only applies when connecting
+to MXs for this domain.
+.It Xo
+.Bk -words
.Ic listen on Ar interface
.Op Ar family
.Op Ic port Ar port
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 7ec3aece8dd..9dbe77e26e0 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.420 2013/07/19 20:37:07 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.421 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
@@ -150,7 +150,7 @@ union lookup {
* Bump IMSG_VERSION whenever a change is made to enum imsg_type.
* This will ensure that we can never use a wrong version of smtpctl with smtpd.
*/
-#define IMSG_VERSION 3
+#define IMSG_VERSION 5
enum imsg_type {
IMSG_NONE,
@@ -166,6 +166,7 @@ enum imsg_type {
IMSG_CTL_RESUME_MDA,
IMSG_CTL_RESUME_MTA,
IMSG_CTL_RESUME_SMTP,
+ IMSG_CTL_RESUME_ROUTE,
IMSG_CTL_LIST_MESSAGES,
IMSG_CTL_LIST_ENVELOPES,
IMSG_CTL_REMOVE,
@@ -176,6 +177,9 @@ enum imsg_type {
IMSG_CTL_PROFILE,
IMSG_CTL_UNPROFILE,
+ IMSG_CTL_MTA_SHOW_ROUTES,
+ IMSG_CTL_MTA_SHOW_HOSTSTATS,
+
IMSG_CONF_START,
IMSG_CONF_SSL,
IMSG_CONF_LISTENER,
@@ -225,9 +229,7 @@ enum imsg_type {
IMSG_MFA_SMTP_DATA,
IMSG_MFA_SMTP_RESPONSE,
- IMSG_MTA_BATCH,
- IMSG_MTA_BATCH_ADD,
- IMSG_MTA_BATCH_END,
+ IMSG_MTA_TRANSFER,
IMSG_MTA_SCHEDULE,
IMSG_QUEUE_CREATE_MESSAGE,
@@ -560,6 +562,8 @@ struct smtpd {
struct dict *sc_tables_dict; /* keyed lookup */
+ struct dict *sc_limits_dict;
+
struct dict sc_filters;
uint32_t filtermask;
};
@@ -650,45 +654,84 @@ struct mta_source {
};
struct mta_connector {
- TAILQ_ENTRY(mta_connector) lst_entry;
struct mta_source *source;
struct mta_relay *relay;
- struct mta_connectors *queue;
-#define CONNECTOR_FAMILY_ERROR 0x01
-#define CONNECTOR_SOURCE_ERROR 0x02
-#define CONNECTOR_MX_ERROR 0x04
-#define CONNECTOR_ERROR 0x0f
-
-#define CONNECTOR_LIMIT_HOST 0x10
-#define CONNECTOR_LIMIT_ROUTE 0x20
-#define CONNECTOR_LIMIT_SOURCE 0x40
-#define CONNECTOR_LIMIT 0xf0
+#define CONNECTOR_ERROR_FAMILY 0x0001
+#define CONNECTOR_ERROR_SOURCE 0x0002
+#define CONNECTOR_ERROR_MX 0x0004
+#define CONNECTOR_ERROR_ROUTE_NET 0x0008
+#define CONNECTOR_ERROR_ROUTE_SMTP 0x0010
+#define CONNECTOR_ERROR_ROUTE 0x0018
+#define CONNECTOR_ERROR 0x00ff
+
+#define CONNECTOR_LIMIT_HOST 0x0100
+#define CONNECTOR_LIMIT_ROUTE 0x0200
+#define CONNECTOR_LIMIT_SOURCE 0x0400
+#define CONNECTOR_LIMIT_RELAY 0x0800
+#define CONNECTOR_LIMIT_CONN 0x1000
+#define CONNECTOR_LIMIT_DOMAIN 0x2000
+#define CONNECTOR_LIMIT 0xff00
+
+#define CONNECTOR_NEW 0x10000
+#define CONNECTOR_WAIT 0x20000
int flags;
int refcount;
size_t nconn;
time_t lastconn;
- time_t nextconn;
- time_t clearlimit;
};
struct mta_route {
SPLAY_ENTRY(mta_route) entry;
+ uint64_t id;
struct mta_source *src;
struct mta_host *dst;
+#define ROUTE_NEW 0x01
+#define ROUTE_RUNQ 0x02
+#define ROUTE_KEEPALIVE 0x04
+#define ROUTE_DISABLED 0xf0
+#define ROUTE_DISABLED_NET 0x10
+#define ROUTE_DISABLED_SMTP 0x20
+ int flags;
+ int penalty;
int refcount;
size_t nconn;
time_t lastconn;
+ time_t lastdisc;
+ time_t lastpenalty;
};
-TAILQ_HEAD(mta_connectors, mta_connector);
+struct mta_limits {
+ size_t maxconn_per_host;
+ size_t maxconn_per_route;
+ size_t maxconn_per_source;
+ size_t maxconn_per_connector;
+ size_t maxconn_per_relay;
+ size_t maxconn_per_domain;
+
+ time_t conndelay_host;
+ time_t conndelay_route;
+ time_t conndelay_source;
+ time_t conndelay_connector;
+ time_t conndelay_relay;
+ time_t conndelay_domain;
+
+ time_t discdelay_route;
+
+ size_t max_mail_per_session;
+ time_t sessdelay_transaction;
+ time_t sessdelay_keepalive;
+
+ int family;
+};
struct mta_relay {
SPLAY_ENTRY(mta_relay) entry;
uint64_t id;
struct mta_domain *domain;
+ struct mta_limits *limits;
int flags;
char *backupname;
int backuppref;
@@ -699,21 +742,15 @@ struct mta_relay {
char *authlabel;
char *helotable;
char *heloname;
-
char *secret;
size_t ntask;
TAILQ_HEAD(, mta_task) tasks;
struct tree connectors;
- size_t nconnector;
size_t sourceloop;
-
- struct mta_connectors c_ready;
- struct mta_connectors c_limit;
- struct mta_connectors c_delay;
- struct mta_connectors c_error;
- struct event ev;
+ time_t lastsource;
+ time_t nextsource;
int fail;
char *failstr;
@@ -721,15 +758,16 @@ struct mta_relay {
#define RELAY_WAIT_MX 0x01
#define RELAY_WAIT_PREFERENCE 0x02
#define RELAY_WAIT_SECRET 0x04
-#define RELAY_WAIT_SOURCE 0x08
-#define RELAY_WAITMASK 0x0f
+#define RELAY_WAIT_LIMITS 0x08
+#define RELAY_WAIT_SOURCE 0x10
+#define RELAY_WAIT_CONNECTOR 0x20
+#define RELAY_WAITMASK 0x3f
int status;
int refcount;
size_t nconn;
+ size_t nconn_ready;
time_t lastconn;
-
- size_t maxconn;
};
struct mta_envelope {
@@ -740,6 +778,7 @@ struct mta_envelope {
char *dest;
char *rcpt;
struct mta_task *task;
+ int delivery;
};
struct mta_task {
@@ -1115,6 +1154,9 @@ void imsgproc_set_write(struct imsgproc *);
void imsgproc_set_read_write(struct imsgproc *);
void imsgproc_reset_callback(struct imsgproc *, void (*)(struct imsg *, void *), void *);
+/* limit.c */
+void limit_mta_set_defaults(struct mta_limits *);
+int limit_mta_set(struct mta_limits *, const char*, int64_t);
/* lka.c */
pid_t lka(void);
@@ -1193,9 +1235,12 @@ void m_get_envelope(struct msg *, struct envelope *);
pid_t mta(void);
void mta_route_ok(struct mta_relay *, struct mta_route *);
void mta_route_error(struct mta_relay *, struct mta_route *);
+void mta_route_down(struct mta_relay *, struct mta_route *);
void mta_route_collect(struct mta_relay *, struct mta_route *);
void mta_source_error(struct mta_relay *, struct mta_route *, const char *);
-void mta_delivery(struct mta_envelope *, const char *, const char *, int, const char *);
+void mta_delivery_log(struct mta_envelope *, const char *, const char *, int, const char *);
+void mta_delivery_notify(struct mta_envelope *, int, const char *, uint32_t);
+void mta_delivery(struct mta_envelope *, const char *, const char *, int, const char *, uint32_t);
struct mta_task *mta_route_next_task(struct mta_relay *, struct mta_route *);
const char *mta_host_to_text(struct mta_host *);
const char *mta_relay_to_text(struct mta_relay *);
@@ -1377,3 +1422,13 @@ int session_socket_error(int);
/* waitq.c */
int waitq_wait(void *, void (*)(void *, void *, void *), void *);
void waitq_run(void *, void *);
+
+/* runq.c */
+struct runq;
+
+int runq_init(struct runq **, void (*)(struct runq *, void *));
+int runq_schedule(struct runq *, time_t, void (*)(struct runq *, void *), void *);
+int runq_delay(struct runq *, unsigned int, void (*)(struct runq *, void *), void *);
+int runq_cancel(struct runq *, void (*)(struct runq *, void *), void *);
+int runq_pending(struct runq *, void (*)(struct runq *, void *), void *, time_t *);
+int runq_next(struct runq *, void (**)(struct runq *, void *), void **, time_t *);
diff --git a/usr.sbin/smtpd/smtpd/Makefile b/usr.sbin/smtpd/smtpd/Makefile
index 0d89d9c2dfc..fa024ced24e 100644
--- a/usr.sbin/smtpd/smtpd/Makefile
+++ b/usr.sbin/smtpd/smtpd/Makefile
@@ -1,4 +1,4 @@
-# $OpenBSD: Makefile,v 1.63 2013/07/19 20:37:08 eric Exp $
+# $OpenBSD: Makefile,v 1.64 2013/07/19 21:14:52 eric Exp $
.PATH: ${.CURDIR}/..
@@ -6,10 +6,10 @@ PROG= smtpd
SRCS= aliases.c bounce.c ca.c compress_backend.c config.c \
control.c crypto.c delivery.c dict.c dns.c envelope.c \
- expand.c forward.c iobuf.c ioev.c lka.c lka_session.c \
+ expand.c forward.c iobuf.c ioev.c limit.c lka.c lka_session.c \
log.c mda.c mfa.c mfa_session.c mproc.c \
mta.c mta_session.c parse.y queue.c queue_backend.c \
- ruleset.c scheduler.c scheduler_backend.c \
+ ruleset.c runq.c scheduler.c scheduler_backend.c \
smtp.c smtp_session.c smtpd.c ssl.c ssl_privsep.c \
ssl_smtpd.c stat_backend.c table.c to.c tree.c util.c \
waitq.c