summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/mta.c
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2013-07-19 21:14:53 +0000
committerEric Faurot <eric@cvs.openbsd.org>2013-07-19 21:14:53 +0000
commit89ca14ddb53440286ab2ed0c2b9e4473dda6d12e (patch)
treecf8744bfc84648a1c7489c187f08afd665f2dcaf /usr.sbin/smtpd/mta.c
parent012c74efba199a734b02042792ff1c3e8f7b4546 (diff)
Many MTA improvements:
- Better transient error handling logic: failing destinations are automatically disabled for a while. When a destination is active again, ask the scheduler to retry previous envelopes immediatly. - More informative error report when all routes fail for a mail. - Implement a "smtpctl show hoststats" command to get the latest stat message per MX domain. - Implement a "smtpctl show routes" command to show the state the currently known routes to remote MXs. - Implement a "smtpctl resume route" command to re-enable a route that has been disabled. - Do not hardcode limits - Minor code improvements
Diffstat (limited to 'usr.sbin/smtpd/mta.c')
-rw-r--r--usr.sbin/smtpd/mta.c1169
1 files changed, 803 insertions, 366 deletions
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c
index 7f42ad043f5..9b8a8c93e8e 100644
--- a/usr.sbin/smtpd/mta.c
+++ b/usr.sbin/smtpd/mta.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta.c,v 1.162 2013/07/19 15:14:23 eric Exp $ */
+/* $OpenBSD: mta.c,v 1.163 2013/07/19 21:14:52 eric Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -44,14 +44,14 @@
#define MAXERROR_PER_HOST 4
-#define MAXCONN_PER_HOST 10
-#define MAXCONN_PER_ROUTE 5
-#define MAXCONN_PER_SOURCE 50 /* XXX missing */
-#define MAXCONN_PER_CONNECTOR 20
-#define MAXCONN_PER_RELAY 100
+#define DELAY_CHECK_SOURCE 1
+#define DELAY_CHECK_SOURCE_SLOW 10
+#define DELAY_CHECK_SOURCE_FAST 0
+#define DELAY_CHECK_LIMIT 5
-#define CONNECTOR_DELAY_CONNECT 1
-#define CONNECTOR_DELAY_LIMIT 5
+#define DELAY_QUADRATIC 1
+#define DELAY_ROUTE_BASE 200
+#define DELAY_ROUTE_MAX (3600 * 4)
static void mta_imsg(struct mproc *, struct imsg *);
static void mta_shutdown(void);
@@ -62,14 +62,17 @@ static void mta_query_secret(struct mta_relay *);
static void mta_query_preference(struct mta_relay *);
static void mta_query_source(struct mta_relay *);
static void mta_on_mx(void *, void *, void *);
+static void mta_on_secret(struct mta_relay *, const char *);
+static void mta_on_preference(struct mta_relay *, int, int);
static void mta_on_source(struct mta_relay *, struct mta_source *);
+static void mta_on_timeout(struct runq *, void *);
static void mta_connect(struct mta_connector *);
-static void mta_recycle(struct mta_connector *);
+static void mta_route_enable(struct mta_route *);
+static void mta_route_disable(struct mta_route *, int, int);
static void mta_drain(struct mta_relay *);
-static void mta_relay_schedule(struct mta_relay *, unsigned int);
-static void mta_relay_timeout(int, short, void *);
static void mta_flush(struct mta_relay *, int, const char *);
-static struct mta_route *mta_find_route(struct mta_connector *);
+static struct mta_route *mta_find_route(struct mta_connector *, time_t, int*,
+ time_t*);
static void mta_log(const struct mta_envelope *, const char *, const char *,
const char *, const char *);
@@ -111,9 +114,7 @@ static const char *mta_connector_to_text(struct mta_connector *);
SPLAY_HEAD(mta_route_tree, mta_route);
static struct mta_route *mta_route(struct mta_source *, struct mta_host *);
-#if 0
static void mta_route_ref(struct mta_route *);
-#endif
static void mta_route_unref(struct mta_route *);
static const char *mta_route_to_text(struct mta_route *);
static int mta_route_cmp(const struct mta_route *, const struct mta_route *);
@@ -125,61 +126,77 @@ static struct mta_host_tree hosts;
static struct mta_source_tree sources;
static struct mta_route_tree routes;
-static struct tree batches;
-
static struct tree wait_mx;
static struct tree wait_preference;
static struct tree wait_secret;
static struct tree wait_source;
+static struct runq *runq_relay;
+static struct runq *runq_connector;
+static struct runq *runq_route;
+static struct runq *runq_hoststat;
+
+static time_t max_seen_conndelay_route;
+static time_t max_seen_discdelay_route;
+
+#define HOSTSTAT_EXPIRE_DELAY (4 * 3600)
+struct hoststat {
+ char name[SMTPD_MAXHOSTNAMELEN];
+ time_t tm;
+ char error[SMTPD_MAXLINESIZE];
+ struct tree deferred;
+};
+static struct dict hoststat;
+
+void mta_hoststat_update(const char *, const char *);
+void mta_hoststat_cache(const char *, uint64_t);
+void mta_hoststat_uncache(const char *, uint64_t);
+void mta_hoststat_reschedule(const char *);
+static void mta_hoststat_remove_entry(struct hoststat *);
+
+
void
mta_imsg(struct mproc *p, struct imsg *imsg)
{
struct mta_relay *relay;
struct mta_task *task;
- struct mta_source *source;
struct mta_domain *domain;
+ struct mta_route *route;
struct mta_mx *mx, *imx;
+ struct hoststat *hs;
struct mta_envelope *e;
struct sockaddr_storage ss;
- struct tree *batch;
struct envelope evp;
struct msg m;
const char *secret;
+ const char *hostname;
uint64_t reqid;
+ time_t t;
char buf[SMTPD_MAXLINESIZE];
int dnserror, preference, v, status;
+ void *iter;
+ uint64_t u64;
if (p->proc == PROC_QUEUE) {
switch (imsg->hdr.type) {
- case IMSG_MTA_BATCH:
- m_msg(&m, imsg);
- m_get_id(&m, &reqid);
- m_end(&m);
- batch = xmalloc(sizeof *batch, "mta_batch");
- tree_init(batch);
- tree_xset(&batches, reqid, batch);
- log_trace(TRACE_MTA,
- "mta: batch:%016" PRIx64 " created", reqid);
- return;
-
- case IMSG_MTA_BATCH_ADD:
+ case IMSG_MTA_TRANSFER:
m_msg(&m, imsg);
- m_get_id(&m, &reqid);
m_get_envelope(&m, &evp);
m_end(&m);
relay = mta_relay(&evp);
- batch = tree_xget(&batches, reqid);
- if ((task = tree_get(batch, relay->id)) == NULL) {
- log_trace(TRACE_MTA, "mta: new task for %s",
- mta_relay_to_text(relay));
+ TAILQ_FOREACH(task, &relay->tasks, entry)
+ if (task->msgid == evpid_to_msgid(evp.id))
+ break;
+
+ if (task == NULL) {
task = xmalloc(sizeof *task, "mta_task");
TAILQ_INIT(&task->envelopes);
task->relay = relay;
- tree_xset(batch, relay->id, task);
+ relay->ntask += 1;
+ TAILQ_INSERT_TAIL(&relay->tasks, task, entry);
task->msgid = evpid_to_msgid(evp.id);
if (evp.sender.user[0] || evp.sender.domain[0])
snprintf(buf, sizeof buf, "%s@%s",
@@ -187,16 +204,8 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
else
buf[0] = '\0';
task->sender = xstrdup(buf, "mta_task:sender");
- } else
- mta_relay_unref(relay); /* from here */
-
- /*
- * Technically, we could handle that by adding a msg
- * level, but the batch sent by the scheduler should
- * be valid.
- */
- if (task->msgid != evpid_to_msgid(evp.id))
- errx(1, "msgid mismatch in batch");
+ stat_increment("mta.task", 1);
+ }
e = xcalloc(1, sizeof *e, "mta_envelope");
e->id = evp.id;
@@ -209,32 +218,15 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
if (strcmp(buf, e->dest))
e->rcpt = xstrdup(buf, "mta_envelope:rcpt");
e->task = task;
- /* XXX honour relay->maxrcpt */
+
TAILQ_INSERT_TAIL(&task->envelopes, e, entry);
- stat_increment("mta.envelope", 1);
log_debug("debug: mta: received evp:%016" PRIx64
" for <%s>", e->id, e->dest);
- return;
- case IMSG_MTA_BATCH_END:
- m_msg(&m, imsg);
- m_get_id(&m, &reqid);
- m_end(&m);
- batch = tree_xpop(&batches, reqid);
- log_trace(TRACE_MTA, "mta: batch:%016" PRIx64 " closed",
- reqid);
- /* For all tasks, queue them on its relay */
- while (tree_poproot(batch, &reqid, (void**)&task)) {
- if (reqid != task->relay->id)
- errx(1, "relay id mismatch!");
- relay = task->relay;
- relay->ntask += 1;
- TAILQ_INSERT_TAIL(&relay->tasks, task, entry);
- stat_increment("mta.task", 1);
- mta_drain(relay);
- mta_relay_unref(relay); /* from BATCH_APPEND */
- }
- free(batch);
+ stat_increment("mta.envelope", 1);
+
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from here */
return;
case IMSG_QUEUE_MESSAGE_FD:
@@ -252,40 +244,20 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
m_get_string(&m, &secret);
m_end(&m);
relay = tree_xpop(&wait_secret, reqid);
- if (secret[0])
- relay->secret = strdup(secret);
- if (relay->secret == NULL) {
- log_warnx("warn: Failed to retrieve secret "
- "for %s", mta_relay_to_text(relay));
- relay->fail = IMSG_DELIVERY_TEMPFAIL;
- relay->failstr = "Could not retrieve secret";
- }
- relay->status &= ~RELAY_WAIT_SECRET;
- mta_drain(relay);
- mta_relay_unref(relay); /* from mta_query_secret() */
+ mta_on_secret(relay, secret[0] ? secret : NULL);
return;
case IMSG_LKA_SOURCE:
m_msg(&m, imsg);
m_get_id(&m, &reqid);
m_get_int(&m, &status);
-
- relay = tree_xpop(&wait_source, reqid);
- relay->status &= ~RELAY_WAIT_SOURCE;
- if (status == LKA_OK) {
+ if (status == LKA_OK)
m_get_sockaddr(&m, (struct sockaddr*)&ss);
- source = mta_source((struct sockaddr *)&ss);
- mta_on_source(relay, source);
- mta_source_unref(source);
- }
- else {
- log_warnx("warn: Failed to get source address"
- "for %s", mta_relay_to_text(relay));
- }
m_end(&m);
- mta_drain(relay);
- mta_relay_unref(relay); /* from mta_query_source() */
+ relay = tree_xpop(&wait_source, reqid);
+ mta_on_source(relay, (status == LKA_OK) ?
+ mta_source((struct sockaddr *)&ss) : NULL);
return;
case IMSG_LKA_HELO:
@@ -337,23 +309,12 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
m_msg(&m, imsg);
m_get_id(&m, &reqid);
m_get_int(&m, &dnserror);
- relay = tree_xpop(&wait_preference, reqid);
- if (dnserror) {
- log_debug("debug: couldn't find backup "
- "preference for %s",
- mta_relay_to_text(relay));
- relay->backuppref = INT_MAX;
- } else {
- m_get_int(&m, &relay->backuppref);
- log_debug("debug: found backup preference %i "
- "for %s",
- relay->backuppref,
- mta_relay_to_text(relay));
- }
+ if (dnserror == 0)
+ m_get_int(&m, &preference);
m_end(&m);
- relay->status &= ~RELAY_WAIT_PREFERENCE;
- mta_drain(relay);
- mta_relay_unref(relay); /* from mta_query_preference() */
+
+ relay = tree_xpop(&wait_preference, reqid);
+ mta_on_preference(relay, dnserror, preference);
return;
case IMSG_DNS_PTR:
@@ -388,6 +349,65 @@ mta_imsg(struct mproc *p, struct imsg *imsg)
}
}
+ if (p->proc == PROC_CONTROL) {
+ switch (imsg->hdr.type) {
+
+ case IMSG_CTL_RESUME_ROUTE:
+ u64 = *((uint64_t *)imsg->data);
+ if (u64)
+ log_debug("resuming route: %llu",
+ (unsigned long long)u64);
+ else
+ log_debug("resuming all routes");
+ SPLAY_FOREACH(route, mta_route_tree, &routes) {
+ if (u64 && route->id != u64)
+ continue;
+ mta_route_enable(route);
+ if (u64)
+ break;
+ }
+ return;
+
+ case IMSG_CTL_MTA_SHOW_ROUTES:
+ SPLAY_FOREACH(route, mta_route_tree, &routes) {
+ v = runq_pending(runq_route, NULL, route, &t);
+ snprintf(buf, sizeof(buf),
+ "%llu. %s %c%c%c%c nconn=%zu penalty=%i timeout=%s",
+ (unsigned long long)route->id,
+ mta_route_to_text(route),
+ route->flags & ROUTE_NEW ? 'N' : '-',
+ route->flags & ROUTE_DISABLED ? 'D' : '-',
+ route->flags & ROUTE_RUNQ ? 'Q' : '-',
+ route->flags & ROUTE_KEEPALIVE ? 'K' : '-',
+ route->nconn,
+ route->penalty,
+ v ? duration_to_text(t - time(NULL)) : "-");
+ m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES,
+ imsg->hdr.peerid, 0, -1,
+ buf, strlen(buf) + 1);
+ }
+ m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES, imsg->hdr.peerid,
+ 0, -1, NULL, 0);
+ return;
+ case IMSG_CTL_MTA_SHOW_HOSTSTATS:
+ iter = NULL;
+ while (dict_iter(&hoststat, &iter, &hostname,
+ (void **)&hs)) {
+ snprintf(buf, sizeof(buf),
+ "%s|%llu|%s",
+ hostname, (unsigned long long) hs->tm,
+ hs->error);
+ m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS,
+ imsg->hdr.peerid, 0, -1,
+ buf, strlen(buf) + 1);
+ }
+ m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS,
+ imsg->hdr.peerid,
+ 0, -1, NULL, 0);
+ return;
+ }
+ }
+
errx(1, "mta_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type));
}
@@ -451,15 +471,20 @@ mta(void)
SPLAY_INIT(&sources);
SPLAY_INIT(&routes);
- tree_init(&batches);
tree_init(&wait_secret);
tree_init(&wait_mx);
tree_init(&wait_preference);
tree_init(&wait_source);
+ dict_init(&hoststat);
imsg_callback = mta_imsg;
event_init();
+ runq_init(&runq_relay, mta_on_timeout);
+ runq_init(&runq_connector, mta_on_timeout);
+ runq_init(&runq_route, mta_on_timeout);
+ runq_init(&runq_hoststat, mta_on_timeout);
+
signal_set(&ev_sigint, SIGINT, mta_sig_handler, NULL);
signal_set(&ev_sigterm, SIGTERM, mta_sig_handler, NULL);
signal_add(&ev_sigint, NULL);
@@ -487,14 +512,15 @@ void
mta_source_error(struct mta_relay *relay, struct mta_route *route, const char *e)
{
struct mta_connector *c;
+
/*
* Remember the source as broken for this connector.
*/
c = mta_connector(relay, route->src);
- if (!(c->flags & CONNECTOR_SOURCE_ERROR))
- log_info("smtp-out: Error on connector %s: %s",
- mta_connector_to_text(c), e);
- c->flags |= CONNECTOR_SOURCE_ERROR;
+ if (!(c->flags & CONNECTOR_ERROR_SOURCE))
+ log_info("smtp-out: Error on %s: %s",
+ mta_route_to_text(route), e);
+ c->flags |= CONNECTOR_ERROR_SOURCE;
}
/*
@@ -526,12 +552,22 @@ mta_route_ok(struct mta_relay *relay, struct mta_route *route)
{
struct mta_connector *c;
- log_debug("debug: mta: route ok %s", mta_route_to_text(route));
+ if (!(route->flags & ROUTE_NEW))
+ return;
+
+ log_debug("debug: mta-routing: route %s is now valid.",
+ mta_route_to_text(route));
- route->dst->nerror = 0;
+ route->flags &= ~ROUTE_NEW;
c = mta_connector(relay, route->src);
- c->flags &= ~CONNECTOR_SOURCE_ERROR;
+ mta_connect(c);
+}
+
+void
+mta_route_down(struct mta_relay *relay, struct mta_route *route)
+{
+ mta_route_disable(route, 2, ROUTE_DISABLED_SMTP);
}
void
@@ -539,26 +575,24 @@ mta_route_collect(struct mta_relay *relay, struct mta_route *route)
{
struct mta_connector *c;
- log_debug("debug: mta: route collect %s", mta_route_to_text(route));
+ log_debug("debug: mta_route_collect(%s)",
+ mta_route_to_text(route));
relay->nconn -= 1;
+ relay->domain->nconn -= 1;
route->nconn -= 1;
route->src->nconn -= 1;
route->dst->nconn -= 1;
+ route->lastdisc = time(NULL);
- c = mta_connector(relay, route->src);
- mta_route_unref(route); /* from mta_find_route() */
+ /* First connection failed */
+ if (route->flags & ROUTE_NEW)
+ mta_route_disable(route, 2, ROUTE_DISABLED_NET);
+ c = mta_connector(relay, route->src);
c->nconn -= 1;
-
- if (c->flags & CONNECTOR_LIMIT) {
- log_debug("debug: mta; resetting limit flags on connector %s",
- mta_connector_to_text(c));
- c->flags &= ~CONNECTOR_LIMIT;
- }
-
- mta_recycle(c);
- mta_drain(relay);
+ mta_connect(c);
+ mta_route_unref(route); /* from mta_find_route() */
mta_relay_unref(relay); /* from mta_connect() */
}
@@ -577,29 +611,53 @@ mta_route_next_task(struct mta_relay *relay, struct mta_route *route)
}
void
-mta_delivery(struct mta_envelope *e, const char *source, const char *relay,
+mta_delivery_log(struct mta_envelope *e, const char *source, const char *relay,
int delivery, const char *status)
{
if (delivery == IMSG_DELIVERY_OK) {
mta_log(e, "Ok", source, relay, status);
- queue_ok(e->id);
}
else if (delivery == IMSG_DELIVERY_TEMPFAIL) {
mta_log(e, "TempFail", source, relay, status);
- queue_tempfail(e->id, 0, status);
}
else if (delivery == IMSG_DELIVERY_PERMFAIL) {
mta_log(e, "PermFail", source, relay, status);
- queue_permfail(e->id, status);
}
else if (delivery == IMSG_DELIVERY_LOOP) {
mta_log(e, "PermFail", source, relay, "Loop detected");
+ }
+ else
+ errx(1, "bad delivery");
+}
+
+void
+mta_delivery_notify(struct mta_envelope *e, int delivery, const char *status,
+ uint32_t penalty)
+{
+ if (delivery == IMSG_DELIVERY_OK) {
+ queue_ok(e->id);
+ }
+ else if (delivery == IMSG_DELIVERY_TEMPFAIL) {
+ queue_tempfail(e->id, penalty, status);
+ }
+ else if (delivery == IMSG_DELIVERY_PERMFAIL) {
+ queue_permfail(e->id, status);
+ }
+ else if (delivery == IMSG_DELIVERY_LOOP) {
queue_loop(e->id);
}
else
errx(1, "bad delivery");
}
+void
+mta_delivery(struct mta_envelope *e, const char *source, const char *relay,
+ int delivery, const char *status, uint32_t penalty)
+{
+ mta_delivery_log(e, source, relay, delivery, status);
+ mta_delivery_notify(e, delivery, status, penalty);
+}
+
static void
mta_query_mx(struct mta_relay *relay)
{
@@ -608,7 +666,8 @@ mta_query_mx(struct mta_relay *relay)
if (relay->status & RELAY_WAIT_MX)
return;
- log_debug("debug: mta_query_mx(%s)", relay->domain->name);
+ log_debug("debug: mta: querying MX for %s...",
+ mta_relay_to_text(relay));
if (waitq_wait(&relay->domain->mxs, mta_on_mx, relay)) {
id = generate_uid();
@@ -624,12 +683,29 @@ mta_query_mx(struct mta_relay *relay)
}
static void
+mta_query_limits(struct mta_relay *relay)
+{
+ if (relay->status & RELAY_WAIT_LIMITS)
+ return;
+
+ relay->limits = dict_get(env->sc_limits_dict, relay->domain->name);
+ if (relay->limits == NULL)
+ relay->limits = dict_get(env->sc_limits_dict, "default");
+
+ if (max_seen_conndelay_route < relay->limits->conndelay_route)
+ max_seen_conndelay_route = relay->limits->conndelay_route;
+ if (max_seen_discdelay_route < relay->limits->discdelay_route)
+ max_seen_discdelay_route = relay->limits->discdelay_route;
+}
+
+static void
mta_query_secret(struct mta_relay *relay)
{
if (relay->status & RELAY_WAIT_SECRET)
return;
- log_debug("debug: mta_query_secret(%s)", mta_relay_to_text(relay));
+ log_debug("debug: mta: querying secret for %s...",
+ mta_relay_to_text(relay));
tree_xset(&wait_secret, relay->id, relay);
relay->status |= RELAY_WAIT_SECRET;
@@ -649,7 +725,8 @@ mta_query_preference(struct mta_relay *relay)
if (relay->status & RELAY_WAIT_PREFERENCE)
return;
- log_debug("debug: mta_query_preference(%s)", mta_relay_to_text(relay));
+ log_debug("debug: mta: querying preference for %s...",
+ mta_relay_to_text(relay));
tree_xset(&wait_preference, relay->id, relay);
relay->status |= RELAY_WAIT_PREFERENCE;
@@ -661,7 +738,20 @@ mta_query_preference(struct mta_relay *relay)
static void
mta_query_source(struct mta_relay *relay)
{
- log_debug("debug: mta_query_source(%s)", mta_relay_to_text(relay));
+ log_debug("debug: mta: querying source for %s...",
+ mta_relay_to_text(relay));
+
+ relay->sourceloop += 1;
+
+ if (relay->sourcetable == NULL) {
+ /*
+ * This is a recursive call, but it only happens once, since
+ * another source will not be queried immediatly.
+ */
+ mta_relay_ref(relay);
+ mta_on_source(relay, mta_source(NULL));
+ return;
+ }
m_create(p_lka, IMSG_LKA_SOURCE, 0, 0, -1);
m_add_id(p_lka, relay->id);
@@ -679,7 +769,7 @@ mta_on_mx(void *tag, void *arg, void *data)
struct mta_domain *domain = data;
struct mta_relay *relay = arg;
- log_debug("debug: mta_on_mx(%p, %s, %s)",
+ log_debug("debug: mta: ... got mx (%p, %s, %s)",
tag, domain->name, mta_relay_to_text(relay));
switch (domain->mxstatus) {
@@ -716,34 +806,232 @@ mta_on_mx(void *tag, void *arg, void *data)
}
static void
+mta_on_secret(struct mta_relay *relay, const char *secret)
+{
+ log_debug("debug: mta: ... got secret for %s: %s",
+ mta_relay_to_text(relay), secret);
+
+ if (secret)
+ relay->secret = strdup(secret);
+
+ if (relay->secret == NULL) {
+ log_warnx("warn: Failed to retrieve secret "
+ "for %s", mta_relay_to_text(relay));
+ relay->fail = IMSG_DELIVERY_TEMPFAIL;
+ relay->failstr = "Could not retrieve credentials";
+ }
+
+ relay->status &= ~RELAY_WAIT_SECRET;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_query_secret() */
+}
+
+static void
+mta_on_preference(struct mta_relay *relay, int dnserror, int preference)
+{
+ if (dnserror) {
+ log_warnx("warn: Couldn't find backup preference for %s",
+ mta_relay_to_text(relay));
+ relay->backuppref = INT_MAX;
+ }
+ else {
+ log_debug("debug: mta: ... got preference for %s: %i, %i",
+ mta_relay_to_text(relay), dnserror, preference);
+ relay->backuppref = preference;
+ }
+
+ relay->status &= ~RELAY_WAIT_PREFERENCE;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_query_preference() */
+}
+
+static void
mta_on_source(struct mta_relay *relay, struct mta_source *source)
{
- mta_connector(relay, source);
+ struct mta_connector *c;
+ void *iter;
+ int delay, errmask;
+
+ log_debug("debug: mta: ... got source for %s: %s",
+ mta_relay_to_text(relay), source ? mta_source_to_text(source) : "NULL");
+
+ relay->lastsource = time(NULL);
+ delay = DELAY_CHECK_SOURCE_SLOW;
+
+ if (source) {
+ c = mta_connector(relay, source);
+ if (c->flags & CONNECTOR_NEW) {
+ c->flags &= ~CONNECTOR_NEW;
+ delay = DELAY_CHECK_SOURCE;
+ }
+ mta_connect(c);
+ if ((c->flags & CONNECTOR_ERROR) == 0)
+ relay->sourceloop = 0;
+ else
+ delay = DELAY_CHECK_SOURCE_FAST;
+ mta_source_unref(source); /* from constructor */
+ }
+ else {
+ log_warnx("warn: Failed to get source address"
+ "for %s", mta_relay_to_text(relay));
+ }
+
+ if (tree_count(&relay->connectors) == 0) {
+ relay->fail = IMSG_DELIVERY_TEMPFAIL;
+ relay->failstr = "Could not retrieve source address";
+ }
+ if (tree_count(&relay->connectors) < relay->sourceloop) {
+ relay->fail = IMSG_DELIVERY_TEMPFAIL;
+ relay->failstr = "No valid route to remote MX";
+
+ errmask = 0;
+ iter = NULL;
+ while (tree_iter(&relay->connectors, &iter, NULL, (void **)&c))
+ errmask |= c->flags;
+
+ if (errmask & CONNECTOR_ERROR_ROUTE_SMTP)
+ relay->failstr = "Destination seem to reject all mails";
+ else if (errmask & CONNECTOR_ERROR_ROUTE_NET)
+ relay->failstr = "Network error on destination MXs";
+ else if (errmask & CONNECTOR_ERROR_MX)
+ relay->failstr = "No MX found for destination";
+ else if (errmask & CONNECTOR_ERROR_FAMILY)
+ relay->failstr = "Address family mismatch on destination MXs";
+ else
+ relay->failstr = "No valid route to destination";
+ }
+
+ relay->nextsource = relay->lastsource + delay;
+ relay->status &= ~RELAY_WAIT_SOURCE;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_query_source() */
}
static void
mta_connect(struct mta_connector *c)
{
struct mta_route *route;
+ struct mta_limits *l = c->relay->limits;
+ int limits;
+ time_t nextconn, now;
+
+ again:
+
+ log_debug("debug: mta: connecting with %s", mta_connector_to_text(c));
+
+ /* Do not connect if this connector has an error. */
+ if (c->flags & CONNECTOR_ERROR) {
+ log_debug("debug: mta: connector error");
+ return;
+ }
+
+ if (c->flags & CONNECTOR_WAIT) {
+ log_debug("debug: mta: canceling connector timeout");
+ runq_cancel(runq_connector, NULL, c);
+ }
+
+ /* No job. */
+ if (c->relay->ntask == 0) {
+ log_debug("debug: mta: no task for connector");
+ return;
+ }
+
+ /* Do not create more connections than necessary */
+ if ((c->relay->nconn_ready >= c->relay->ntask) ||
+ (c->relay->nconn > 2 && c->relay->nconn >= c->relay->ntask / 2)) {
+ log_debug("debug: mta: enough connections already");
+ return;
+ }
+
+ limits = 0;
+ nextconn = now = time(NULL);
+
+ if (c->relay->domain->lastconn + l->conndelay_domain > nextconn) {
+ log_debug("debug: mta: cannot use domain %s before %llus",
+ c->relay->domain->name,
+ (unsigned long long) c->relay->domain->lastconn + l->conndelay_domain - now);
+ nextconn = c->relay->domain->lastconn + l->conndelay_domain;
+ }
+ if (c->relay->domain->nconn >= l->maxconn_per_domain) {
+ log_debug("debug: mta: hit domain limit");
+ limits |= CONNECTOR_LIMIT_DOMAIN;
+ }
+
+ if (c->source->lastconn + l->conndelay_source > nextconn) {
+ log_debug("debug: mta: cannot use source %s before %llus",
+ mta_source_to_text(c->source),
+ (unsigned long long) c->source->lastconn + l->conndelay_source - now);
+ nextconn = c->source->lastconn + l->conndelay_source;
+ }
+ if (c->source->nconn >= l->maxconn_per_source) {
+ log_debug("debug: mta: hit source limit");
+ limits |= CONNECTOR_LIMIT_SOURCE;
+ }
+
+ if (c->lastconn + l->conndelay_connector > nextconn) {
+ log_debug("debug: mta: cannot use %s before %llus",
+ mta_connector_to_text(c),
+ (unsigned long long) c->lastconn + l->conndelay_connector - now);
+ nextconn = c->lastconn + l->conndelay_connector;
+ }
+ if (c->nconn >= l->maxconn_per_connector) {
+ log_debug("debug: mta: hit connector limit");
+ limits |= CONNECTOR_LIMIT_CONN;
+ }
+
+ if (c->relay->lastconn + l->conndelay_relay > nextconn) {
+ log_debug("debug: mta: cannot use %s before %llus",
+ mta_relay_to_text(c->relay),
+ (unsigned long long) c->relay->lastconn + l->conndelay_relay - now);
+ nextconn = c->relay->lastconn + l->conndelay_relay;
+ }
+ if (c->relay->nconn >= l->maxconn_per_relay) {
+ log_debug("debug: mta: hit relay limit");
+ limits |= CONNECTOR_LIMIT_RELAY;
+ }
- log_debug("debug: mta_connect() for %s", mta_connector_to_text(c));
+ /* We can connect now, find a route */
+ if (!limits && nextconn <= now)
+ route = mta_find_route(c, now, &limits, &nextconn);
+ else
+ route = NULL;
- route = mta_find_route(c);
+ /* No route */
if (route == NULL) {
- mta_recycle(c);
- if (c->queue == &c->relay->c_limit)
- c->clearlimit = time(NULL) + CONNECTOR_DELAY_LIMIT;
- if (c->queue == &c->relay->c_ready)
- fatalx("connector with no route ended up in ready list");
+
+ if (c->flags & CONNECTOR_ERROR) {
+ /* XXX we might want to clear this flag later */
+ log_debug("debug: mta-routing: no route available for %s: errors on connector",
+ mta_connector_to_text(c));
+ return;
+ }
+ else if (limits) {
+ log_debug("debug: mta-routing: no route available for %s: limits reached",
+ mta_connector_to_text(c));
+ nextconn = now + DELAY_CHECK_LIMIT;
+ }
+ else {
+ log_debug("debug: mta-routing: no route available for %s: must wait a bit",
+ mta_connector_to_text(c));
+ }
+ log_debug("debug: mta: retrying to connect on %s in %llus...",
+ mta_connector_to_text(c),
+ (unsigned long long) nextconn - time(NULL));
+ c->flags |= CONNECTOR_WAIT;
+ runq_schedule(runq_connector, nextconn, NULL, c);
return;
}
+ log_debug("debug: mta-routing: spawning new connection on %s",
+ mta_route_to_text(route));
+
c->nconn += 1;
c->lastconn = time(NULL);
- c->nextconn = c->lastconn + CONNECTOR_DELAY_CONNECT;
c->relay->nconn += 1;
c->relay->lastconn = c->lastconn;
+ c->relay->domain->nconn += 1;
+ c->relay->domain->lastconn = c->lastconn;
route->nconn += 1;
route->lastconn = c->lastconn;
route->src->nconn += 1;
@@ -751,105 +1039,101 @@ mta_connect(struct mta_connector *c)
route->dst->nconn += 1;
route->dst->lastconn = c->lastconn;
- mta_recycle(c);
-
- mta_relay_ref(c->relay);
mta_session(c->relay, route); /* this never fails synchronously */
+ mta_relay_ref(c->relay);
+
+ goto again;
}
static void
-mta_recycle(struct mta_connector *c)
+mta_on_timeout(struct runq *runq, void *arg)
{
- TAILQ_REMOVE(c->queue, c, lst_entry);
-
- if (c->flags & CONNECTOR_ERROR) {
- log_debug("debug: mta: putting %s on error queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_error;
+ struct mta_connector *connector = arg;
+ struct mta_relay *relay = arg;
+ struct mta_route *route = arg;
+ struct hoststat *hs = arg;
+
+ if (runq == runq_relay) {
+ log_debug("debug: mta: ... timeout for %s",
+ mta_relay_to_text(relay));
+ relay->status &= ~RELAY_WAIT_CONNECTOR;
+ mta_drain(relay);
+ mta_relay_unref(relay); /* from mta_drain() */
}
- else if (c->flags & CONNECTOR_LIMIT) {
- log_debug("debug: mta: putting %s on limit queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_limit;
+ else if (runq == runq_connector) {
+ log_debug("debug: mta: ... timeout for %s",
+ mta_connector_to_text(connector));
+ connector->flags &= ~CONNECTOR_WAIT;
+ mta_connect(connector);
}
- else if (c->nextconn > time(NULL)) {
- log_debug("debug: mta: putting %s on delay queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_delay;
+ else if (runq == runq_route) {
+ route->flags &= ~ROUTE_RUNQ;
+ mta_route_enable(route);
+ mta_route_unref(route);
}
- else {
- log_debug("debug: mta: putting %s on ready queue",
- mta_connector_to_text(c));
- c->queue = &c->relay->c_ready;
+ else if (runq == runq_hoststat) {
+ log_debug("debug: mta: ... timeout for hoststat %s",
+ hs->name);
+ mta_hoststat_remove_entry(hs);
+ free(hs);
}
-
- TAILQ_INSERT_TAIL(c->queue, c, lst_entry);
}
static void
-mta_relay_timeout(int fd, short ev, void *arg)
+mta_route_disable(struct mta_route *route, int penalty, int reason)
{
- struct mta_relay *r = arg;
- struct mta_connector *c;
- time_t t;
-
- log_debug("debug: mta: timeout for %s", mta_relay_to_text(r));
+ unsigned long long delay;
- t = time(NULL);
+ route->penalty += penalty;
+ route->lastpenalty = time(NULL);
+ delay = (unsigned long long)DELAY_ROUTE_BASE * route->penalty * route->penalty;
+ if (delay > DELAY_ROUTE_MAX)
+ delay = DELAY_ROUTE_MAX;
+#if 0
+ delay = 60;
+#endif
- /*
- * Clear the limit flags on all connectors.
- */
- while ((c = TAILQ_FIRST(&r->c_limit))) {
- /* This requires that the list is always sorted */
- if (c->clearlimit > t)
- break;
- log_debug("debug: mta: clearing limits on %s",
- mta_connector_to_text(c));
- c->flags &= ~CONNECTOR_LIMIT;
- mta_recycle(c);
- }
+ log_info("smtp-out: Disabling route %s for %llus",
+ mta_route_to_text(route), delay);
- while ((c = TAILQ_FIRST(&r->c_delay))) {
- /* This requires that the list is always sorted */
- if (c->nextconn > t)
- break;
- log_debug("debug: mta: delay expired for %s",
- mta_connector_to_text(c));
- mta_recycle(c);
+ if (route->flags & ROUTE_DISABLED) {
+ runq_cancel(runq_route, NULL, route);
+ mta_route_unref(route); /* from last call to here */
}
-
- mta_drain(r);
- mta_relay_unref(r); /* from mta_relay_schedule() */
+ route->flags |= reason & ROUTE_DISABLED;
+ runq_schedule(runq_route, time(NULL) + delay, NULL, route);
+ mta_route_ref(route);
}
static void
-mta_relay_schedule(struct mta_relay *r, unsigned int delay)
+mta_route_enable(struct mta_route *route)
{
- struct timeval tv;
-
- if (evtimer_pending(&r->ev, &tv))
- return;
-
- log_debug("debug: mta: adding relay timeout: %u", delay);
-
- tv.tv_sec = delay;
- tv.tv_usec = 0;
- evtimer_add(&r->ev, &tv);
- mta_relay_ref(r);
+ if (route->flags & ROUTE_DISABLED) {
+ log_info("smtp-out: Enabling route %s",
+ mta_route_to_text(route));
+ route->flags &= ~ROUTE_DISABLED;
+ route->flags |= ROUTE_NEW;
+ }
+
+ if (route->penalty) {
+#if DELAY_QUADRATIC
+ route->penalty -= 1;
+ route->lastpenalty = time(NULL);
+#else
+ route->penalty = 0;
+#endif
+ }
}
static void
mta_drain(struct mta_relay *r)
{
- struct mta_connector *c;
- struct mta_source *s;
char buf[64];
log_debug("debug: mta: draining %s "
"refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu",
mta_relay_to_text(r),
- r->refcount, r->ntask, r->nconnector, r->nconn);
+ r->refcount, r->ntask, tree_count(&r->connectors), r->nconn);
/*
* All done.
@@ -879,99 +1163,40 @@ mta_drain(struct mta_relay *r)
if (r->domain->lastmxquery == 0)
mta_query_mx(r);
+ /* Query the limits if needed. */
+ if (r->limits == NULL)
+ mta_query_limits(r);
+
/* Wait until we are ready to proceed. */
if (r->status & RELAY_WAITMASK) {
buf[0] = '\0';
if (r->status & RELAY_WAIT_MX)
- strlcat(buf, "MX ", sizeof buf);
+ strlcat(buf, " MX", sizeof buf);
if (r->status & RELAY_WAIT_PREFERENCE)
- strlcat(buf, "preference ", sizeof buf);
+ strlcat(buf, " preference", sizeof buf);
if (r->status & RELAY_WAIT_SECRET)
- strlcat(buf, "secret ", sizeof buf);
+ strlcat(buf, " secret", sizeof buf);
if (r->status & RELAY_WAIT_SOURCE)
- strlcat(buf, "source ", sizeof buf);
- log_debug("debug: mta: %s waiting for %s",
+ strlcat(buf, " source", sizeof buf);
+ if (r->status & RELAY_WAIT_CONNECTOR)
+ strlcat(buf, " connector", sizeof buf);
+ log_debug("debug: mta: %s waiting for%s",
mta_relay_to_text(r), buf);
return;
}
/*
- * Start new connections if possible.
- * XXX find a better heuristic for the good number of connections
- * depending on the number of tasks and other factors. We might
- * want to try more than the number of task, to have a chance to
- * hit a mx faster if the first ones timeout.
+ * We have pending task, and it's maybe time too try a new source.
*/
- while (r->nconn < r->ntask) {
- log_debug("debug: mta: trying to create new connection: "
- "refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu",
- r->refcount, r->ntask, r->nconnector, r->nconn);
-
- /* Check the per-relay connection limit */
- if (r->nconn >= MAXCONN_PER_RELAY) {
- log_debug("debug: mta: hit connection limit on %s",
- mta_relay_to_text(r));
- return;
- }
-
- /* Use the first connector if ready */
- c = TAILQ_FIRST(&r->c_ready);
- if (c) {
- log_debug("debug: mta: using connector %s",
- mta_connector_to_text(c));
- r->sourceloop = 0;
- mta_connect(c);
- continue;
- }
-
- /* No new connectors */
- if (r->sourceloop > r->nconnector) {
- log_debug("debug: mta: no new connector available");
-
- if (TAILQ_FIRST(&r->c_delay)) {
- mta_relay_schedule(r, 1);
- log_debug(
- "debug: mta: waiting for relay timeout");
- return;
- }
-
- if (TAILQ_FIRST(&r->c_limit)) {
- mta_relay_schedule(r, 5);
- log_debug(
- "debug: mta: waiting for relay timeout");
- return;
- }
-
- log_debug("debug: mta: failing...");
- /*
- * All sources have been tried and no connectors can
- * be used.
- */
- if (r->nconnector == 0) {
- r->fail = IMSG_DELIVERY_TEMPFAIL;
- r->failstr = "No source address";
- }
- else {
- r->fail = IMSG_DELIVERY_TEMPFAIL;
- r->failstr = "No MX could be reached";
- }
- mta_flush(r, r->fail, r->failstr);
- return;
- }
-
- r->sourceloop++;
- log_debug("debug: mta: need new connector (attempt %zu)",
- r->sourceloop);
- if (r->sourcetable) {
- log_debug("debug: mta: querying source %s",
- r->sourcetable);
- mta_query_source(r);
- return;
- }
- log_debug("debug: mta: using default source");
- s = mta_source(NULL);
- mta_on_source(r, s);
- mta_source_unref(s);
+ if (r->nextsource <= time(NULL))
+ mta_query_source(r);
+ else {
+ log_debug("debug: mta: scheduling relay %s in %llus...",
+ mta_relay_to_text(r),
+ (unsigned long long) r->nextsource - time(NULL));
+ runq_schedule(runq_relay, r->nextsource, NULL, r);
+ r->status |= RELAY_WAIT_CONNECTOR;
+ mta_relay_ref(r);
}
}
@@ -980,7 +1205,11 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
{
struct mta_envelope *e;
struct mta_task *task;
+ const char *domain;
+ void *iter;
+ struct mta_connector *c;
size_t n;
+ size_t r;
log_debug("debug: mta_flush(%s, %i, \"%s\")",
mta_relay_to_text(relay), fail, error);
@@ -993,7 +1222,26 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
TAILQ_REMOVE(&relay->tasks, task, entry);
while ((e = TAILQ_FIRST(&task->envelopes))) {
TAILQ_REMOVE(&task->envelopes, e, entry);
- mta_delivery(e, NULL, relay->domain->name, fail, error);
+ mta_delivery(e, NULL, relay->domain->name, fail, error, 0);
+
+ /*
+ * host was suspended, cache envelope id in hoststat tree
+ * so that it can be retried when a delivery succeeds for
+ * that domain.
+ */
+ domain = strchr(e->dest, '@');
+ if (fail == IMSG_DELIVERY_TEMPFAIL && domain) {
+ r = 0;
+ iter = NULL;
+ while (tree_iter(&relay->connectors, &iter,
+ NULL, (void **)&c)) {
+ if (c->flags & CONNECTOR_ERROR_ROUTE)
+ r++;
+ }
+ if (tree_count(&relay->connectors) == r)
+ mta_hoststat_cache(domain+1, e->id);
+ }
+
free(e->dest);
free(e->rcpt);
free(e);
@@ -1012,27 +1260,28 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
* Find a route to use for this connector
*/
static struct mta_route *
-mta_find_route(struct mta_connector *c)
+mta_find_route(struct mta_connector *c, time_t now, int *limits,
+ time_t *nextconn)
{
struct mta_route *route, *best;
+ struct mta_limits *l = c->relay->limits;
struct mta_mx *mx;
int level, limit_host, limit_route;
- int family_mismatch, seen;
+ int family_mismatch, seen, suspended_route;
+ time_t tm;
+ log_debug("debug: mta-routing: searching new route for %s...",
+ mta_connector_to_text(c));
+
+ tm = 0;
limit_host = 0;
limit_route = 0;
+ suspended_route = 0;
family_mismatch = 0;
level = -1;
best = NULL;
seen = 0;
- if (c->nconn >= MAXCONN_PER_CONNECTOR) {
- log_debug("debug: mta: hit limit on connector %s",
- mta_connector_to_text(c));
- c->flags |= CONNECTOR_LIMIT_SOURCE;
- return (NULL);
- }
-
TAILQ_FOREACH(mx, &c->relay->domain->mxs, entry) {
/*
* New preference level
@@ -1047,9 +1296,10 @@ mta_find_route(struct mta_connector *c)
/*
* No candidate found. There are valid MXs at this
- * preference level but they reached their limit.
+ * preference level but they reached their limit, or
+ * we can't connect yet.
*/
- if (limit_host || limit_route)
+ if (limit_host || limit_route || tm)
break;
/*
@@ -1073,27 +1323,81 @@ mta_find_route(struct mta_connector *c)
/* Found a possibly valid mx */
seen++;
- if (c->source->sa &&
- c->source->sa->sa_family != mx->host->sa->sa_family) {
+ if ((c->source->sa &&
+ c->source->sa->sa_family != mx->host->sa->sa_family) ||
+ (l->family && l->family != mx->host->sa->sa_family)) {
+ log_debug("debug: mta-routing: skipping host %s: AF mismatch",
+ mta_host_to_text(mx->host));
family_mismatch = 1;
continue;
}
- if (mx->host->nconn >= MAXCONN_PER_HOST) {
+ if (mx->host->nconn >= l->maxconn_per_host) {
+ log_debug("debug: mta-routing: skipping host %s: too many connections",
+ mta_host_to_text(mx->host));
limit_host = 1;
continue;
}
+ if (mx->host->lastconn + l->conndelay_host > now) {
+ log_debug("debug: mta-routing: skipping host %s: cannot use before %llus",
+ mta_host_to_text(mx->host),
+ (unsigned long long) mx->host->lastconn + l->conndelay_host - now);
+ if (tm == 0 || mx->host->lastconn + l->conndelay_host < tm)
+ tm = mx->host->lastconn + l->conndelay_host;
+ continue;
+ }
+
route = mta_route(c->source, mx->host);
- if (route->nconn >= MAXCONN_PER_ROUTE) {
+ if (route->flags & ROUTE_DISABLED) {
+ log_debug("debug: mta-routing: skipping route %s: suspend",
+ mta_route_to_text(route));
+ suspended_route |= route->flags & ROUTE_DISABLED;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
+ if (route->nconn && (route->flags & ROUTE_NEW)) {
+ log_debug("debug: mta-routing: skipping route %s: not validated yet",
+ mta_route_to_text(route));
+ limit_route = 1;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
+ if (route->nconn >= l->maxconn_per_route) {
+ log_debug("debug: mta-routing: skipping route %s: too many connections",
+ mta_route_to_text(route));
limit_route = 1;
mta_route_unref(route); /* from here */
continue;
}
+ if (route->lastconn + l->conndelay_route > now) {
+ log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after connect)",
+ mta_route_to_text(route),
+ (unsigned long long) route->lastconn + l->conndelay_route - now);
+ if (tm == 0 || route->lastconn + l->conndelay_route < tm)
+ tm = route->lastconn + l->conndelay_route;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
+ if (route->lastdisc + l->discdelay_route > now) {
+ log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after disconnect)",
+ mta_route_to_text(route),
+ (unsigned long long) route->lastdisc + l->discdelay_route - now);
+ if (tm == 0 || route->lastdisc + l->discdelay_route < tm)
+ tm = route->lastdisc + l->discdelay_route;
+ mta_route_unref(route); /* from here */
+ continue;
+ }
+
/* Use the route with the lowest number of connections. */
if (best && route->nconn >= best->nconn) {
+ log_debug("debug: mta-routing: skipping route %s: current one is better",
+ mta_route_to_text(route));
mta_route_unref(route); /* from here */
continue;
}
@@ -1101,30 +1405,43 @@ mta_find_route(struct mta_connector *c)
if (best)
mta_route_unref(best); /* from here */
best = route;
+ log_debug("debug: mta-routing: selecting candidate route %s",
+ mta_route_to_text(route));
}
if (best)
return (best);
+ /* Order is important */
if (seen == 0) {
- log_info("smtp-out: No reachable MX for connector %s",
+ log_info("smtp-out: No MX found for %s",
mta_connector_to_text(c));
- c->flags |= CONNECTOR_MX_ERROR;
+ c->flags |= CONNECTOR_ERROR_MX;
}
else if (limit_route) {
- log_debug("debug: mta: hit route limit on connector %s",
- mta_connector_to_text(c));
- c->flags |= CONNECTOR_LIMIT_ROUTE;
+ log_debug("debug: mta: hit route limit");
+ *limits |= CONNECTOR_LIMIT_ROUTE;
}
else if (limit_host) {
- log_debug("debug: mta: hit host limit on connector %s",
- mta_connector_to_text(c));
- c->flags |= CONNECTOR_LIMIT_HOST;
+ log_debug("debug: mta: hit host limit");
+ *limits |= CONNECTOR_LIMIT_HOST;
+ }
+ else if (tm) {
+ if (tm > *nextconn)
+ *nextconn = tm;
}
else if (family_mismatch) {
- log_info("smtp-out: Address family mismatch on connector %s",
+ log_info("smtp-out: Address family mismatch on %s",
mta_connector_to_text(c));
- c->flags |= CONNECTOR_FAMILY_ERROR;
+ c->flags |= CONNECTOR_ERROR_FAMILY;
+ }
+ else if (suspended_route) {
+ log_info("smtp-out: No valid route for %s",
+ mta_connector_to_text(c));
+ if (suspended_route & ROUTE_DISABLED_NET)
+ c->flags |= CONNECTOR_ERROR_ROUTE_NET;
+ if (suspended_route & ROUTE_DISABLED_SMTP)
+ c->flags |= CONNECTOR_ERROR_ROUTE_SMTP;
}
return (NULL);
@@ -1134,33 +1451,16 @@ static void
mta_log(const struct mta_envelope *evp, const char *prefix, const char *source,
const char *relay, const char *status)
{
- char session[SMTPD_MAXLINESIZE];
- char rcpt[SMTPD_MAXLINESIZE];
- char src[SMTPD_MAXLINESIZE];
-
- session[0] = '\0';
- if (evp->session)
- snprintf(session, sizeof session, "session=%016"PRIx64", ",
- evp->session);
-
- rcpt[0] = '\0';
- if (evp->rcpt)
- snprintf(rcpt, sizeof rcpt, "rcpt=<%s>, ", evp->rcpt);
-
- src[0] = '\0';
- if (source)
- snprintf(src, sizeof src, "source=%s, ", source);
-
-
- log_info("relay: %s for %016" PRIx64 ": %sfrom=<%s>, to=<%s>, "
- "%s%srelay=%s, delay=%s, stat=%s",
+ log_info("relay: %s for %016" PRIx64 ": session=%016"PRIx64", "
+ "from=<%s>, to=<%s>, rcpt=<%s>, source=%s, "
+ "relay=%s, delay=%s, stat=%s",
prefix,
evp->id,
- session,
+ evp->session,
evp->task->sender,
evp->dest,
- src,
- rcpt,
+ evp->rcpt ? evp->rcpt : "-",
+ source ? source : "-",
relay,
duration_to_text(time(NULL) - evp->creation),
status);
@@ -1204,10 +1504,6 @@ mta_relay(struct envelope *e)
if ((r = SPLAY_FIND(mta_relay_tree, &relays, &key)) == NULL) {
r = xcalloc(1, sizeof *r, "mta_relay");
TAILQ_INIT(&r->tasks);
- TAILQ_INIT(&r->c_ready);
- TAILQ_INIT(&r->c_delay);
- TAILQ_INIT(&r->c_limit);
- TAILQ_INIT(&r->c_error);
r->id = generate_uid();
r->flags = key.flags;
r->domain = key.domain;
@@ -1227,12 +1523,9 @@ mta_relay(struct envelope *e)
r->helotable = xstrdup(key.helotable,
"mta: helotable");
SPLAY_INSERT(mta_relay_tree, &relays, r);
- evtimer_set(&r->ev, mta_relay_timeout, r);
- log_trace(TRACE_MTA, "mta: new %s", mta_relay_to_text(r));
stat_increment("mta.relay", 1);
} else {
mta_domain_unref(key.domain); /* from here */
- log_trace(TRACE_MTA, "mta: reusing %s", mta_relay_to_text(r));
}
r->refcount++;
@@ -1267,9 +1560,6 @@ mta_relay_unref(struct mta_relay *relay)
free(relay->secret);
free(relay->sourcetable);
- if (evtimer_pending(&relay->ev, NULL))
- evtimer_del(&relay->ev);
-
mta_domain_unref(relay->domain); /* from constructor */
free(relay);
stat_decrement("mta.relay", 1);
@@ -1594,14 +1884,11 @@ mta_connector(struct mta_relay *relay, struct mta_source *source)
c = xcalloc(1, sizeof(*c), "mta_connector");
c->relay = relay;
c->source = source;
+ c->flags |= CONNECTOR_NEW;
mta_source_ref(source);
- c->queue = &relay->c_ready;
- TAILQ_INSERT_HEAD(c->queue, c, lst_entry);
tree_xset(&relay->connectors, (uintptr_t)(source), c);
- relay->nconnector++;
stat_increment("mta.connector", 1);
- log_debug("debug: mta: new connector %s",
- mta_connector_to_text(c));
+ log_debug("debug: mta: new %s", mta_connector_to_text(c));
}
return (c);
@@ -1610,10 +1897,17 @@ mta_connector(struct mta_relay *relay, struct mta_source *source)
static void
mta_connector_free(struct mta_connector *c)
{
- c->relay->nconnector--;
- TAILQ_REMOVE(c->queue, c, lst_entry);
- mta_source_unref(c->source);
+ log_debug("debug: mta: freeing %s",
+ mta_connector_to_text(c));
+
+ if (c->flags & CONNECTOR_WAIT) {
+ log_debug("debug: mta: canceling timeout for %s",
+ mta_connector_to_text(c));
+ runq_cancel(runq_connector, NULL, c);
+ }
+ mta_source_unref(c->source); /* from constructor */
free(c);
+
stat_decrement("mta.connector", 1);
}
@@ -1622,9 +1916,10 @@ mta_connector_to_text(struct mta_connector *c)
{
static char buf[1024];
- snprintf(buf, sizeof buf, "%s->%s",
+ snprintf(buf, sizeof buf, "[connector:%s->%s,0x%x]",
mta_source_to_text(c->source),
- mta_relay_to_text(c->relay));
+ mta_relay_to_text(c->relay),
+ c->flags);
return (buf);
}
@@ -1632,6 +1927,7 @@ static struct mta_route *
mta_route(struct mta_source *src, struct mta_host *dst)
{
struct mta_route key, *r;
+ static uint64_t rid = 0;
key.src = src;
key.dst = dst;
@@ -1641,30 +1937,80 @@ mta_route(struct mta_source *src, struct mta_host *dst)
r = xcalloc(1, sizeof(*r), "mta_route");
r->src = src;
r->dst = dst;
+ r->flags |= ROUTE_NEW;
+ r->id = ++rid;
SPLAY_INSERT(mta_route_tree, &routes, r);
mta_source_ref(src);
mta_host_ref(dst);
stat_increment("mta.route", 1);
}
+ else if (r->flags & ROUTE_RUNQ) {
+ log_debug("debug: mta: mta_route_ref(): canceling runq for route %s",
+ mta_route_to_text(r));
+ r->flags &= ~(ROUTE_RUNQ | ROUTE_KEEPALIVE);
+ runq_cancel(runq_route, NULL, r);
+ r->refcount--; /* from mta_route_unref() */
+ }
r->refcount++;
return (r);
}
-#if 0
static void
mta_route_ref(struct mta_route *r)
{
r->refcount++;
}
-#endif
static void
mta_route_unref(struct mta_route *r)
{
+ time_t sched, now;
+ int delay;
+
if (--r->refcount)
return;
+ /*
+ * Nothing references this route, but we might want to keep it alive
+ * for a while.
+ */
+ now = time(NULL);
+ sched = 0;
+
+ if (r->penalty) {
+#if DELAY_QUADRATIC
+ delay = DELAY_ROUTE_BASE * r->penalty * r->penalty;
+#else
+ delay = 15 * 60;
+#endif
+ if (delay > DELAY_ROUTE_MAX)
+ delay = DELAY_ROUTE_MAX;
+ sched = r->lastpenalty + delay;
+ log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (penalty %i)",
+ mta_route_to_text(r), (unsigned long long) sched - now, r->penalty);
+ } else if (!(r->flags & ROUTE_KEEPALIVE)) {
+ if (r->lastconn + max_seen_conndelay_route > now)
+ sched = r->lastconn + max_seen_conndelay_route;
+ if (r->lastdisc + max_seen_discdelay_route > now &&
+ r->lastdisc + max_seen_discdelay_route < sched)
+ sched = r->lastdisc + max_seen_discdelay_route;
+
+ if (sched > now)
+ log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (imposed delay)",
+ mta_route_to_text(r), (unsigned long long) sched - now);
+ }
+
+ if (sched > now) {
+ r->flags |= ROUTE_RUNQ;
+ runq_schedule(runq_route, sched, NULL, r);
+ r->refcount++;
+ return;
+ }
+
+ log_debug("debug: mta: ma_route_unref(): really discarding route %s",
+ mta_route_to_text(r));
+
SPLAY_REMOVE(mta_route_tree, &routes, r);
mta_source_unref(r->src); /* from constructor */
mta_host_unref(r->dst); /* from constructor */
@@ -1701,3 +2047,94 @@ mta_route_cmp(const struct mta_route *a, const struct mta_route *b)
}
SPLAY_GENERATE(mta_route_tree, mta_route, entry, mta_route_cmp);
+
+
+/* hoststat errors are not critical, we do best effort */
+void
+mta_hoststat_update(const char *host, const char *error)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+ time_t tm;
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ tm = time(NULL);
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL) {
+ hs = calloc(1, sizeof *hs);
+ if (hs == NULL)
+ return;
+ tree_init(&hs->deferred);
+ runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs);
+ }
+ strlcpy(hs->name, buf, sizeof hs->name);
+ strlcpy(hs->error, error, sizeof hs->error);
+ hs->tm = time(NULL);
+ dict_set(&hoststat, buf, hs);
+
+ runq_cancel(runq_hoststat, NULL, hs);
+ runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs);
+}
+
+void
+mta_hoststat_cache(const char *host, uint64_t evpid)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL)
+ return;
+
+ tree_set(&hs->deferred, evpid, NULL);
+}
+
+void
+mta_hoststat_uncache(const char *host, uint64_t evpid)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL)
+ return;
+
+ tree_pop(&hs->deferred, evpid);
+}
+
+void
+mta_hoststat_reschedule(const char *host)
+{
+ struct hoststat *hs = NULL;
+ char buf[SMTPD_MAXHOSTNAMELEN];
+ uint64_t evpid;
+
+ if (! lowercase(buf, host, sizeof buf))
+ return;
+
+ hs = dict_get(&hoststat, buf);
+ if (hs == NULL)
+ return;
+
+ while (tree_poproot(&hs->deferred, &evpid, NULL)) {
+ m_compose(p_queue, IMSG_MTA_SCHEDULE, 0, 0, -1,
+ &evpid, sizeof evpid);
+ }
+}
+
+static void
+mta_hoststat_remove_entry(struct hoststat *hs)
+{
+ while (tree_poproot(&hs->deferred, NULL, NULL))
+ ;
+ dict_pop(&hoststat, hs->name);
+ runq_cancel(runq_hoststat, NULL, hs);
+}