diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2013-07-19 21:14:53 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2013-07-19 21:14:53 +0000 |
commit | 89ca14ddb53440286ab2ed0c2b9e4473dda6d12e (patch) | |
tree | cf8744bfc84648a1c7489c187f08afd665f2dcaf /usr.sbin/smtpd/mta.c | |
parent | 012c74efba199a734b02042792ff1c3e8f7b4546 (diff) |
Many MTA improvements:
- Better transient error handling logic: failing destinations are
automatically disabled for a while. When a destination is active
again, ask the scheduler to retry previous envelopes immediatly.
- More informative error report when all routes fail for a mail.
- Implement a "smtpctl show hoststats" command to get the latest stat
message per MX domain.
- Implement a "smtpctl show routes" command to show the state the
currently known routes to remote MXs.
- Implement a "smtpctl resume route" command to re-enable a route that
has been disabled.
- Do not hardcode limits
- Minor code improvements
Diffstat (limited to 'usr.sbin/smtpd/mta.c')
-rw-r--r-- | usr.sbin/smtpd/mta.c | 1169 |
1 files changed, 803 insertions, 366 deletions
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index 7f42ad043f5..9b8a8c93e8e 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.162 2013/07/19 15:14:23 eric Exp $ */ +/* $OpenBSD: mta.c,v 1.163 2013/07/19 21:14:52 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -44,14 +44,14 @@ #define MAXERROR_PER_HOST 4 -#define MAXCONN_PER_HOST 10 -#define MAXCONN_PER_ROUTE 5 -#define MAXCONN_PER_SOURCE 50 /* XXX missing */ -#define MAXCONN_PER_CONNECTOR 20 -#define MAXCONN_PER_RELAY 100 +#define DELAY_CHECK_SOURCE 1 +#define DELAY_CHECK_SOURCE_SLOW 10 +#define DELAY_CHECK_SOURCE_FAST 0 +#define DELAY_CHECK_LIMIT 5 -#define CONNECTOR_DELAY_CONNECT 1 -#define CONNECTOR_DELAY_LIMIT 5 +#define DELAY_QUADRATIC 1 +#define DELAY_ROUTE_BASE 200 +#define DELAY_ROUTE_MAX (3600 * 4) static void mta_imsg(struct mproc *, struct imsg *); static void mta_shutdown(void); @@ -62,14 +62,17 @@ static void mta_query_secret(struct mta_relay *); static void mta_query_preference(struct mta_relay *); static void mta_query_source(struct mta_relay *); static void mta_on_mx(void *, void *, void *); +static void mta_on_secret(struct mta_relay *, const char *); +static void mta_on_preference(struct mta_relay *, int, int); static void mta_on_source(struct mta_relay *, struct mta_source *); +static void mta_on_timeout(struct runq *, void *); static void mta_connect(struct mta_connector *); -static void mta_recycle(struct mta_connector *); +static void mta_route_enable(struct mta_route *); +static void mta_route_disable(struct mta_route *, int, int); static void mta_drain(struct mta_relay *); -static void mta_relay_schedule(struct mta_relay *, unsigned int); -static void mta_relay_timeout(int, short, void *); static void mta_flush(struct mta_relay *, int, const char *); -static struct mta_route *mta_find_route(struct mta_connector *); +static struct mta_route *mta_find_route(struct mta_connector *, time_t, int*, + time_t*); static void mta_log(const struct mta_envelope *, const char *, const char *, const char *, const char *); @@ -111,9 +114,7 @@ static const char *mta_connector_to_text(struct mta_connector *); SPLAY_HEAD(mta_route_tree, mta_route); static struct mta_route *mta_route(struct mta_source *, struct mta_host *); -#if 0 static void mta_route_ref(struct mta_route *); -#endif static void mta_route_unref(struct mta_route *); static const char *mta_route_to_text(struct mta_route *); static int mta_route_cmp(const struct mta_route *, const struct mta_route *); @@ -125,61 +126,77 @@ static struct mta_host_tree hosts; static struct mta_source_tree sources; static struct mta_route_tree routes; -static struct tree batches; - static struct tree wait_mx; static struct tree wait_preference; static struct tree wait_secret; static struct tree wait_source; +static struct runq *runq_relay; +static struct runq *runq_connector; +static struct runq *runq_route; +static struct runq *runq_hoststat; + +static time_t max_seen_conndelay_route; +static time_t max_seen_discdelay_route; + +#define HOSTSTAT_EXPIRE_DELAY (4 * 3600) +struct hoststat { + char name[SMTPD_MAXHOSTNAMELEN]; + time_t tm; + char error[SMTPD_MAXLINESIZE]; + struct tree deferred; +}; +static struct dict hoststat; + +void mta_hoststat_update(const char *, const char *); +void mta_hoststat_cache(const char *, uint64_t); +void mta_hoststat_uncache(const char *, uint64_t); +void mta_hoststat_reschedule(const char *); +static void mta_hoststat_remove_entry(struct hoststat *); + + void mta_imsg(struct mproc *p, struct imsg *imsg) { struct mta_relay *relay; struct mta_task *task; - struct mta_source *source; struct mta_domain *domain; + struct mta_route *route; struct mta_mx *mx, *imx; + struct hoststat *hs; struct mta_envelope *e; struct sockaddr_storage ss; - struct tree *batch; struct envelope evp; struct msg m; const char *secret; + const char *hostname; uint64_t reqid; + time_t t; char buf[SMTPD_MAXLINESIZE]; int dnserror, preference, v, status; + void *iter; + uint64_t u64; if (p->proc == PROC_QUEUE) { switch (imsg->hdr.type) { - case IMSG_MTA_BATCH: - m_msg(&m, imsg); - m_get_id(&m, &reqid); - m_end(&m); - batch = xmalloc(sizeof *batch, "mta_batch"); - tree_init(batch); - tree_xset(&batches, reqid, batch); - log_trace(TRACE_MTA, - "mta: batch:%016" PRIx64 " created", reqid); - return; - - case IMSG_MTA_BATCH_ADD: + case IMSG_MTA_TRANSFER: m_msg(&m, imsg); - m_get_id(&m, &reqid); m_get_envelope(&m, &evp); m_end(&m); relay = mta_relay(&evp); - batch = tree_xget(&batches, reqid); - if ((task = tree_get(batch, relay->id)) == NULL) { - log_trace(TRACE_MTA, "mta: new task for %s", - mta_relay_to_text(relay)); + TAILQ_FOREACH(task, &relay->tasks, entry) + if (task->msgid == evpid_to_msgid(evp.id)) + break; + + if (task == NULL) { task = xmalloc(sizeof *task, "mta_task"); TAILQ_INIT(&task->envelopes); task->relay = relay; - tree_xset(batch, relay->id, task); + relay->ntask += 1; + TAILQ_INSERT_TAIL(&relay->tasks, task, entry); task->msgid = evpid_to_msgid(evp.id); if (evp.sender.user[0] || evp.sender.domain[0]) snprintf(buf, sizeof buf, "%s@%s", @@ -187,16 +204,8 @@ mta_imsg(struct mproc *p, struct imsg *imsg) else buf[0] = '\0'; task->sender = xstrdup(buf, "mta_task:sender"); - } else - mta_relay_unref(relay); /* from here */ - - /* - * Technically, we could handle that by adding a msg - * level, but the batch sent by the scheduler should - * be valid. - */ - if (task->msgid != evpid_to_msgid(evp.id)) - errx(1, "msgid mismatch in batch"); + stat_increment("mta.task", 1); + } e = xcalloc(1, sizeof *e, "mta_envelope"); e->id = evp.id; @@ -209,32 +218,15 @@ mta_imsg(struct mproc *p, struct imsg *imsg) if (strcmp(buf, e->dest)) e->rcpt = xstrdup(buf, "mta_envelope:rcpt"); e->task = task; - /* XXX honour relay->maxrcpt */ + TAILQ_INSERT_TAIL(&task->envelopes, e, entry); - stat_increment("mta.envelope", 1); log_debug("debug: mta: received evp:%016" PRIx64 " for <%s>", e->id, e->dest); - return; - case IMSG_MTA_BATCH_END: - m_msg(&m, imsg); - m_get_id(&m, &reqid); - m_end(&m); - batch = tree_xpop(&batches, reqid); - log_trace(TRACE_MTA, "mta: batch:%016" PRIx64 " closed", - reqid); - /* For all tasks, queue them on its relay */ - while (tree_poproot(batch, &reqid, (void**)&task)) { - if (reqid != task->relay->id) - errx(1, "relay id mismatch!"); - relay = task->relay; - relay->ntask += 1; - TAILQ_INSERT_TAIL(&relay->tasks, task, entry); - stat_increment("mta.task", 1); - mta_drain(relay); - mta_relay_unref(relay); /* from BATCH_APPEND */ - } - free(batch); + stat_increment("mta.envelope", 1); + + mta_drain(relay); + mta_relay_unref(relay); /* from here */ return; case IMSG_QUEUE_MESSAGE_FD: @@ -252,40 +244,20 @@ mta_imsg(struct mproc *p, struct imsg *imsg) m_get_string(&m, &secret); m_end(&m); relay = tree_xpop(&wait_secret, reqid); - if (secret[0]) - relay->secret = strdup(secret); - if (relay->secret == NULL) { - log_warnx("warn: Failed to retrieve secret " - "for %s", mta_relay_to_text(relay)); - relay->fail = IMSG_DELIVERY_TEMPFAIL; - relay->failstr = "Could not retrieve secret"; - } - relay->status &= ~RELAY_WAIT_SECRET; - mta_drain(relay); - mta_relay_unref(relay); /* from mta_query_secret() */ + mta_on_secret(relay, secret[0] ? secret : NULL); return; case IMSG_LKA_SOURCE: m_msg(&m, imsg); m_get_id(&m, &reqid); m_get_int(&m, &status); - - relay = tree_xpop(&wait_source, reqid); - relay->status &= ~RELAY_WAIT_SOURCE; - if (status == LKA_OK) { + if (status == LKA_OK) m_get_sockaddr(&m, (struct sockaddr*)&ss); - source = mta_source((struct sockaddr *)&ss); - mta_on_source(relay, source); - mta_source_unref(source); - } - else { - log_warnx("warn: Failed to get source address" - "for %s", mta_relay_to_text(relay)); - } m_end(&m); - mta_drain(relay); - mta_relay_unref(relay); /* from mta_query_source() */ + relay = tree_xpop(&wait_source, reqid); + mta_on_source(relay, (status == LKA_OK) ? + mta_source((struct sockaddr *)&ss) : NULL); return; case IMSG_LKA_HELO: @@ -337,23 +309,12 @@ mta_imsg(struct mproc *p, struct imsg *imsg) m_msg(&m, imsg); m_get_id(&m, &reqid); m_get_int(&m, &dnserror); - relay = tree_xpop(&wait_preference, reqid); - if (dnserror) { - log_debug("debug: couldn't find backup " - "preference for %s", - mta_relay_to_text(relay)); - relay->backuppref = INT_MAX; - } else { - m_get_int(&m, &relay->backuppref); - log_debug("debug: found backup preference %i " - "for %s", - relay->backuppref, - mta_relay_to_text(relay)); - } + if (dnserror == 0) + m_get_int(&m, &preference); m_end(&m); - relay->status &= ~RELAY_WAIT_PREFERENCE; - mta_drain(relay); - mta_relay_unref(relay); /* from mta_query_preference() */ + + relay = tree_xpop(&wait_preference, reqid); + mta_on_preference(relay, dnserror, preference); return; case IMSG_DNS_PTR: @@ -388,6 +349,65 @@ mta_imsg(struct mproc *p, struct imsg *imsg) } } + if (p->proc == PROC_CONTROL) { + switch (imsg->hdr.type) { + + case IMSG_CTL_RESUME_ROUTE: + u64 = *((uint64_t *)imsg->data); + if (u64) + log_debug("resuming route: %llu", + (unsigned long long)u64); + else + log_debug("resuming all routes"); + SPLAY_FOREACH(route, mta_route_tree, &routes) { + if (u64 && route->id != u64) + continue; + mta_route_enable(route); + if (u64) + break; + } + return; + + case IMSG_CTL_MTA_SHOW_ROUTES: + SPLAY_FOREACH(route, mta_route_tree, &routes) { + v = runq_pending(runq_route, NULL, route, &t); + snprintf(buf, sizeof(buf), + "%llu. %s %c%c%c%c nconn=%zu penalty=%i timeout=%s", + (unsigned long long)route->id, + mta_route_to_text(route), + route->flags & ROUTE_NEW ? 'N' : '-', + route->flags & ROUTE_DISABLED ? 'D' : '-', + route->flags & ROUTE_RUNQ ? 'Q' : '-', + route->flags & ROUTE_KEEPALIVE ? 'K' : '-', + route->nconn, + route->penalty, + v ? duration_to_text(t - time(NULL)) : "-"); + m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES, + imsg->hdr.peerid, 0, -1, + buf, strlen(buf) + 1); + } + m_compose(p, IMSG_CTL_MTA_SHOW_ROUTES, imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + case IMSG_CTL_MTA_SHOW_HOSTSTATS: + iter = NULL; + while (dict_iter(&hoststat, &iter, &hostname, + (void **)&hs)) { + snprintf(buf, sizeof(buf), + "%s|%llu|%s", + hostname, (unsigned long long) hs->tm, + hs->error); + m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS, + imsg->hdr.peerid, 0, -1, + buf, strlen(buf) + 1); + } + m_compose(p, IMSG_CTL_MTA_SHOW_HOSTSTATS, + imsg->hdr.peerid, + 0, -1, NULL, 0); + return; + } + } + errx(1, "mta_imsg: unexpected %s imsg", imsg_to_str(imsg->hdr.type)); } @@ -451,15 +471,20 @@ mta(void) SPLAY_INIT(&sources); SPLAY_INIT(&routes); - tree_init(&batches); tree_init(&wait_secret); tree_init(&wait_mx); tree_init(&wait_preference); tree_init(&wait_source); + dict_init(&hoststat); imsg_callback = mta_imsg; event_init(); + runq_init(&runq_relay, mta_on_timeout); + runq_init(&runq_connector, mta_on_timeout); + runq_init(&runq_route, mta_on_timeout); + runq_init(&runq_hoststat, mta_on_timeout); + signal_set(&ev_sigint, SIGINT, mta_sig_handler, NULL); signal_set(&ev_sigterm, SIGTERM, mta_sig_handler, NULL); signal_add(&ev_sigint, NULL); @@ -487,14 +512,15 @@ void mta_source_error(struct mta_relay *relay, struct mta_route *route, const char *e) { struct mta_connector *c; + /* * Remember the source as broken for this connector. */ c = mta_connector(relay, route->src); - if (!(c->flags & CONNECTOR_SOURCE_ERROR)) - log_info("smtp-out: Error on connector %s: %s", - mta_connector_to_text(c), e); - c->flags |= CONNECTOR_SOURCE_ERROR; + if (!(c->flags & CONNECTOR_ERROR_SOURCE)) + log_info("smtp-out: Error on %s: %s", + mta_route_to_text(route), e); + c->flags |= CONNECTOR_ERROR_SOURCE; } /* @@ -526,12 +552,22 @@ mta_route_ok(struct mta_relay *relay, struct mta_route *route) { struct mta_connector *c; - log_debug("debug: mta: route ok %s", mta_route_to_text(route)); + if (!(route->flags & ROUTE_NEW)) + return; + + log_debug("debug: mta-routing: route %s is now valid.", + mta_route_to_text(route)); - route->dst->nerror = 0; + route->flags &= ~ROUTE_NEW; c = mta_connector(relay, route->src); - c->flags &= ~CONNECTOR_SOURCE_ERROR; + mta_connect(c); +} + +void +mta_route_down(struct mta_relay *relay, struct mta_route *route) +{ + mta_route_disable(route, 2, ROUTE_DISABLED_SMTP); } void @@ -539,26 +575,24 @@ mta_route_collect(struct mta_relay *relay, struct mta_route *route) { struct mta_connector *c; - log_debug("debug: mta: route collect %s", mta_route_to_text(route)); + log_debug("debug: mta_route_collect(%s)", + mta_route_to_text(route)); relay->nconn -= 1; + relay->domain->nconn -= 1; route->nconn -= 1; route->src->nconn -= 1; route->dst->nconn -= 1; + route->lastdisc = time(NULL); - c = mta_connector(relay, route->src); - mta_route_unref(route); /* from mta_find_route() */ + /* First connection failed */ + if (route->flags & ROUTE_NEW) + mta_route_disable(route, 2, ROUTE_DISABLED_NET); + c = mta_connector(relay, route->src); c->nconn -= 1; - - if (c->flags & CONNECTOR_LIMIT) { - log_debug("debug: mta; resetting limit flags on connector %s", - mta_connector_to_text(c)); - c->flags &= ~CONNECTOR_LIMIT; - } - - mta_recycle(c); - mta_drain(relay); + mta_connect(c); + mta_route_unref(route); /* from mta_find_route() */ mta_relay_unref(relay); /* from mta_connect() */ } @@ -577,29 +611,53 @@ mta_route_next_task(struct mta_relay *relay, struct mta_route *route) } void -mta_delivery(struct mta_envelope *e, const char *source, const char *relay, +mta_delivery_log(struct mta_envelope *e, const char *source, const char *relay, int delivery, const char *status) { if (delivery == IMSG_DELIVERY_OK) { mta_log(e, "Ok", source, relay, status); - queue_ok(e->id); } else if (delivery == IMSG_DELIVERY_TEMPFAIL) { mta_log(e, "TempFail", source, relay, status); - queue_tempfail(e->id, 0, status); } else if (delivery == IMSG_DELIVERY_PERMFAIL) { mta_log(e, "PermFail", source, relay, status); - queue_permfail(e->id, status); } else if (delivery == IMSG_DELIVERY_LOOP) { mta_log(e, "PermFail", source, relay, "Loop detected"); + } + else + errx(1, "bad delivery"); +} + +void +mta_delivery_notify(struct mta_envelope *e, int delivery, const char *status, + uint32_t penalty) +{ + if (delivery == IMSG_DELIVERY_OK) { + queue_ok(e->id); + } + else if (delivery == IMSG_DELIVERY_TEMPFAIL) { + queue_tempfail(e->id, penalty, status); + } + else if (delivery == IMSG_DELIVERY_PERMFAIL) { + queue_permfail(e->id, status); + } + else if (delivery == IMSG_DELIVERY_LOOP) { queue_loop(e->id); } else errx(1, "bad delivery"); } +void +mta_delivery(struct mta_envelope *e, const char *source, const char *relay, + int delivery, const char *status, uint32_t penalty) +{ + mta_delivery_log(e, source, relay, delivery, status); + mta_delivery_notify(e, delivery, status, penalty); +} + static void mta_query_mx(struct mta_relay *relay) { @@ -608,7 +666,8 @@ mta_query_mx(struct mta_relay *relay) if (relay->status & RELAY_WAIT_MX) return; - log_debug("debug: mta_query_mx(%s)", relay->domain->name); + log_debug("debug: mta: querying MX for %s...", + mta_relay_to_text(relay)); if (waitq_wait(&relay->domain->mxs, mta_on_mx, relay)) { id = generate_uid(); @@ -624,12 +683,29 @@ mta_query_mx(struct mta_relay *relay) } static void +mta_query_limits(struct mta_relay *relay) +{ + if (relay->status & RELAY_WAIT_LIMITS) + return; + + relay->limits = dict_get(env->sc_limits_dict, relay->domain->name); + if (relay->limits == NULL) + relay->limits = dict_get(env->sc_limits_dict, "default"); + + if (max_seen_conndelay_route < relay->limits->conndelay_route) + max_seen_conndelay_route = relay->limits->conndelay_route; + if (max_seen_discdelay_route < relay->limits->discdelay_route) + max_seen_discdelay_route = relay->limits->discdelay_route; +} + +static void mta_query_secret(struct mta_relay *relay) { if (relay->status & RELAY_WAIT_SECRET) return; - log_debug("debug: mta_query_secret(%s)", mta_relay_to_text(relay)); + log_debug("debug: mta: querying secret for %s...", + mta_relay_to_text(relay)); tree_xset(&wait_secret, relay->id, relay); relay->status |= RELAY_WAIT_SECRET; @@ -649,7 +725,8 @@ mta_query_preference(struct mta_relay *relay) if (relay->status & RELAY_WAIT_PREFERENCE) return; - log_debug("debug: mta_query_preference(%s)", mta_relay_to_text(relay)); + log_debug("debug: mta: querying preference for %s...", + mta_relay_to_text(relay)); tree_xset(&wait_preference, relay->id, relay); relay->status |= RELAY_WAIT_PREFERENCE; @@ -661,7 +738,20 @@ mta_query_preference(struct mta_relay *relay) static void mta_query_source(struct mta_relay *relay) { - log_debug("debug: mta_query_source(%s)", mta_relay_to_text(relay)); + log_debug("debug: mta: querying source for %s...", + mta_relay_to_text(relay)); + + relay->sourceloop += 1; + + if (relay->sourcetable == NULL) { + /* + * This is a recursive call, but it only happens once, since + * another source will not be queried immediatly. + */ + mta_relay_ref(relay); + mta_on_source(relay, mta_source(NULL)); + return; + } m_create(p_lka, IMSG_LKA_SOURCE, 0, 0, -1); m_add_id(p_lka, relay->id); @@ -679,7 +769,7 @@ mta_on_mx(void *tag, void *arg, void *data) struct mta_domain *domain = data; struct mta_relay *relay = arg; - log_debug("debug: mta_on_mx(%p, %s, %s)", + log_debug("debug: mta: ... got mx (%p, %s, %s)", tag, domain->name, mta_relay_to_text(relay)); switch (domain->mxstatus) { @@ -716,34 +806,232 @@ mta_on_mx(void *tag, void *arg, void *data) } static void +mta_on_secret(struct mta_relay *relay, const char *secret) +{ + log_debug("debug: mta: ... got secret for %s: %s", + mta_relay_to_text(relay), secret); + + if (secret) + relay->secret = strdup(secret); + + if (relay->secret == NULL) { + log_warnx("warn: Failed to retrieve secret " + "for %s", mta_relay_to_text(relay)); + relay->fail = IMSG_DELIVERY_TEMPFAIL; + relay->failstr = "Could not retrieve credentials"; + } + + relay->status &= ~RELAY_WAIT_SECRET; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_query_secret() */ +} + +static void +mta_on_preference(struct mta_relay *relay, int dnserror, int preference) +{ + if (dnserror) { + log_warnx("warn: Couldn't find backup preference for %s", + mta_relay_to_text(relay)); + relay->backuppref = INT_MAX; + } + else { + log_debug("debug: mta: ... got preference for %s: %i, %i", + mta_relay_to_text(relay), dnserror, preference); + relay->backuppref = preference; + } + + relay->status &= ~RELAY_WAIT_PREFERENCE; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_query_preference() */ +} + +static void mta_on_source(struct mta_relay *relay, struct mta_source *source) { - mta_connector(relay, source); + struct mta_connector *c; + void *iter; + int delay, errmask; + + log_debug("debug: mta: ... got source for %s: %s", + mta_relay_to_text(relay), source ? mta_source_to_text(source) : "NULL"); + + relay->lastsource = time(NULL); + delay = DELAY_CHECK_SOURCE_SLOW; + + if (source) { + c = mta_connector(relay, source); + if (c->flags & CONNECTOR_NEW) { + c->flags &= ~CONNECTOR_NEW; + delay = DELAY_CHECK_SOURCE; + } + mta_connect(c); + if ((c->flags & CONNECTOR_ERROR) == 0) + relay->sourceloop = 0; + else + delay = DELAY_CHECK_SOURCE_FAST; + mta_source_unref(source); /* from constructor */ + } + else { + log_warnx("warn: Failed to get source address" + "for %s", mta_relay_to_text(relay)); + } + + if (tree_count(&relay->connectors) == 0) { + relay->fail = IMSG_DELIVERY_TEMPFAIL; + relay->failstr = "Could not retrieve source address"; + } + if (tree_count(&relay->connectors) < relay->sourceloop) { + relay->fail = IMSG_DELIVERY_TEMPFAIL; + relay->failstr = "No valid route to remote MX"; + + errmask = 0; + iter = NULL; + while (tree_iter(&relay->connectors, &iter, NULL, (void **)&c)) + errmask |= c->flags; + + if (errmask & CONNECTOR_ERROR_ROUTE_SMTP) + relay->failstr = "Destination seem to reject all mails"; + else if (errmask & CONNECTOR_ERROR_ROUTE_NET) + relay->failstr = "Network error on destination MXs"; + else if (errmask & CONNECTOR_ERROR_MX) + relay->failstr = "No MX found for destination"; + else if (errmask & CONNECTOR_ERROR_FAMILY) + relay->failstr = "Address family mismatch on destination MXs"; + else + relay->failstr = "No valid route to destination"; + } + + relay->nextsource = relay->lastsource + delay; + relay->status &= ~RELAY_WAIT_SOURCE; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_query_source() */ } static void mta_connect(struct mta_connector *c) { struct mta_route *route; + struct mta_limits *l = c->relay->limits; + int limits; + time_t nextconn, now; + + again: + + log_debug("debug: mta: connecting with %s", mta_connector_to_text(c)); + + /* Do not connect if this connector has an error. */ + if (c->flags & CONNECTOR_ERROR) { + log_debug("debug: mta: connector error"); + return; + } + + if (c->flags & CONNECTOR_WAIT) { + log_debug("debug: mta: canceling connector timeout"); + runq_cancel(runq_connector, NULL, c); + } + + /* No job. */ + if (c->relay->ntask == 0) { + log_debug("debug: mta: no task for connector"); + return; + } + + /* Do not create more connections than necessary */ + if ((c->relay->nconn_ready >= c->relay->ntask) || + (c->relay->nconn > 2 && c->relay->nconn >= c->relay->ntask / 2)) { + log_debug("debug: mta: enough connections already"); + return; + } + + limits = 0; + nextconn = now = time(NULL); + + if (c->relay->domain->lastconn + l->conndelay_domain > nextconn) { + log_debug("debug: mta: cannot use domain %s before %llus", + c->relay->domain->name, + (unsigned long long) c->relay->domain->lastconn + l->conndelay_domain - now); + nextconn = c->relay->domain->lastconn + l->conndelay_domain; + } + if (c->relay->domain->nconn >= l->maxconn_per_domain) { + log_debug("debug: mta: hit domain limit"); + limits |= CONNECTOR_LIMIT_DOMAIN; + } + + if (c->source->lastconn + l->conndelay_source > nextconn) { + log_debug("debug: mta: cannot use source %s before %llus", + mta_source_to_text(c->source), + (unsigned long long) c->source->lastconn + l->conndelay_source - now); + nextconn = c->source->lastconn + l->conndelay_source; + } + if (c->source->nconn >= l->maxconn_per_source) { + log_debug("debug: mta: hit source limit"); + limits |= CONNECTOR_LIMIT_SOURCE; + } + + if (c->lastconn + l->conndelay_connector > nextconn) { + log_debug("debug: mta: cannot use %s before %llus", + mta_connector_to_text(c), + (unsigned long long) c->lastconn + l->conndelay_connector - now); + nextconn = c->lastconn + l->conndelay_connector; + } + if (c->nconn >= l->maxconn_per_connector) { + log_debug("debug: mta: hit connector limit"); + limits |= CONNECTOR_LIMIT_CONN; + } + + if (c->relay->lastconn + l->conndelay_relay > nextconn) { + log_debug("debug: mta: cannot use %s before %llus", + mta_relay_to_text(c->relay), + (unsigned long long) c->relay->lastconn + l->conndelay_relay - now); + nextconn = c->relay->lastconn + l->conndelay_relay; + } + if (c->relay->nconn >= l->maxconn_per_relay) { + log_debug("debug: mta: hit relay limit"); + limits |= CONNECTOR_LIMIT_RELAY; + } - log_debug("debug: mta_connect() for %s", mta_connector_to_text(c)); + /* We can connect now, find a route */ + if (!limits && nextconn <= now) + route = mta_find_route(c, now, &limits, &nextconn); + else + route = NULL; - route = mta_find_route(c); + /* No route */ if (route == NULL) { - mta_recycle(c); - if (c->queue == &c->relay->c_limit) - c->clearlimit = time(NULL) + CONNECTOR_DELAY_LIMIT; - if (c->queue == &c->relay->c_ready) - fatalx("connector with no route ended up in ready list"); + + if (c->flags & CONNECTOR_ERROR) { + /* XXX we might want to clear this flag later */ + log_debug("debug: mta-routing: no route available for %s: errors on connector", + mta_connector_to_text(c)); + return; + } + else if (limits) { + log_debug("debug: mta-routing: no route available for %s: limits reached", + mta_connector_to_text(c)); + nextconn = now + DELAY_CHECK_LIMIT; + } + else { + log_debug("debug: mta-routing: no route available for %s: must wait a bit", + mta_connector_to_text(c)); + } + log_debug("debug: mta: retrying to connect on %s in %llus...", + mta_connector_to_text(c), + (unsigned long long) nextconn - time(NULL)); + c->flags |= CONNECTOR_WAIT; + runq_schedule(runq_connector, nextconn, NULL, c); return; } + log_debug("debug: mta-routing: spawning new connection on %s", + mta_route_to_text(route)); + c->nconn += 1; c->lastconn = time(NULL); - c->nextconn = c->lastconn + CONNECTOR_DELAY_CONNECT; c->relay->nconn += 1; c->relay->lastconn = c->lastconn; + c->relay->domain->nconn += 1; + c->relay->domain->lastconn = c->lastconn; route->nconn += 1; route->lastconn = c->lastconn; route->src->nconn += 1; @@ -751,105 +1039,101 @@ mta_connect(struct mta_connector *c) route->dst->nconn += 1; route->dst->lastconn = c->lastconn; - mta_recycle(c); - - mta_relay_ref(c->relay); mta_session(c->relay, route); /* this never fails synchronously */ + mta_relay_ref(c->relay); + + goto again; } static void -mta_recycle(struct mta_connector *c) +mta_on_timeout(struct runq *runq, void *arg) { - TAILQ_REMOVE(c->queue, c, lst_entry); - - if (c->flags & CONNECTOR_ERROR) { - log_debug("debug: mta: putting %s on error queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_error; + struct mta_connector *connector = arg; + struct mta_relay *relay = arg; + struct mta_route *route = arg; + struct hoststat *hs = arg; + + if (runq == runq_relay) { + log_debug("debug: mta: ... timeout for %s", + mta_relay_to_text(relay)); + relay->status &= ~RELAY_WAIT_CONNECTOR; + mta_drain(relay); + mta_relay_unref(relay); /* from mta_drain() */ } - else if (c->flags & CONNECTOR_LIMIT) { - log_debug("debug: mta: putting %s on limit queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_limit; + else if (runq == runq_connector) { + log_debug("debug: mta: ... timeout for %s", + mta_connector_to_text(connector)); + connector->flags &= ~CONNECTOR_WAIT; + mta_connect(connector); } - else if (c->nextconn > time(NULL)) { - log_debug("debug: mta: putting %s on delay queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_delay; + else if (runq == runq_route) { + route->flags &= ~ROUTE_RUNQ; + mta_route_enable(route); + mta_route_unref(route); } - else { - log_debug("debug: mta: putting %s on ready queue", - mta_connector_to_text(c)); - c->queue = &c->relay->c_ready; + else if (runq == runq_hoststat) { + log_debug("debug: mta: ... timeout for hoststat %s", + hs->name); + mta_hoststat_remove_entry(hs); + free(hs); } - - TAILQ_INSERT_TAIL(c->queue, c, lst_entry); } static void -mta_relay_timeout(int fd, short ev, void *arg) +mta_route_disable(struct mta_route *route, int penalty, int reason) { - struct mta_relay *r = arg; - struct mta_connector *c; - time_t t; - - log_debug("debug: mta: timeout for %s", mta_relay_to_text(r)); + unsigned long long delay; - t = time(NULL); + route->penalty += penalty; + route->lastpenalty = time(NULL); + delay = (unsigned long long)DELAY_ROUTE_BASE * route->penalty * route->penalty; + if (delay > DELAY_ROUTE_MAX) + delay = DELAY_ROUTE_MAX; +#if 0 + delay = 60; +#endif - /* - * Clear the limit flags on all connectors. - */ - while ((c = TAILQ_FIRST(&r->c_limit))) { - /* This requires that the list is always sorted */ - if (c->clearlimit > t) - break; - log_debug("debug: mta: clearing limits on %s", - mta_connector_to_text(c)); - c->flags &= ~CONNECTOR_LIMIT; - mta_recycle(c); - } + log_info("smtp-out: Disabling route %s for %llus", + mta_route_to_text(route), delay); - while ((c = TAILQ_FIRST(&r->c_delay))) { - /* This requires that the list is always sorted */ - if (c->nextconn > t) - break; - log_debug("debug: mta: delay expired for %s", - mta_connector_to_text(c)); - mta_recycle(c); + if (route->flags & ROUTE_DISABLED) { + runq_cancel(runq_route, NULL, route); + mta_route_unref(route); /* from last call to here */ } - - mta_drain(r); - mta_relay_unref(r); /* from mta_relay_schedule() */ + route->flags |= reason & ROUTE_DISABLED; + runq_schedule(runq_route, time(NULL) + delay, NULL, route); + mta_route_ref(route); } static void -mta_relay_schedule(struct mta_relay *r, unsigned int delay) +mta_route_enable(struct mta_route *route) { - struct timeval tv; - - if (evtimer_pending(&r->ev, &tv)) - return; - - log_debug("debug: mta: adding relay timeout: %u", delay); - - tv.tv_sec = delay; - tv.tv_usec = 0; - evtimer_add(&r->ev, &tv); - mta_relay_ref(r); + if (route->flags & ROUTE_DISABLED) { + log_info("smtp-out: Enabling route %s", + mta_route_to_text(route)); + route->flags &= ~ROUTE_DISABLED; + route->flags |= ROUTE_NEW; + } + + if (route->penalty) { +#if DELAY_QUADRATIC + route->penalty -= 1; + route->lastpenalty = time(NULL); +#else + route->penalty = 0; +#endif + } } static void mta_drain(struct mta_relay *r) { - struct mta_connector *c; - struct mta_source *s; char buf[64]; log_debug("debug: mta: draining %s " "refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu", mta_relay_to_text(r), - r->refcount, r->ntask, r->nconnector, r->nconn); + r->refcount, r->ntask, tree_count(&r->connectors), r->nconn); /* * All done. @@ -879,99 +1163,40 @@ mta_drain(struct mta_relay *r) if (r->domain->lastmxquery == 0) mta_query_mx(r); + /* Query the limits if needed. */ + if (r->limits == NULL) + mta_query_limits(r); + /* Wait until we are ready to proceed. */ if (r->status & RELAY_WAITMASK) { buf[0] = '\0'; if (r->status & RELAY_WAIT_MX) - strlcat(buf, "MX ", sizeof buf); + strlcat(buf, " MX", sizeof buf); if (r->status & RELAY_WAIT_PREFERENCE) - strlcat(buf, "preference ", sizeof buf); + strlcat(buf, " preference", sizeof buf); if (r->status & RELAY_WAIT_SECRET) - strlcat(buf, "secret ", sizeof buf); + strlcat(buf, " secret", sizeof buf); if (r->status & RELAY_WAIT_SOURCE) - strlcat(buf, "source ", sizeof buf); - log_debug("debug: mta: %s waiting for %s", + strlcat(buf, " source", sizeof buf); + if (r->status & RELAY_WAIT_CONNECTOR) + strlcat(buf, " connector", sizeof buf); + log_debug("debug: mta: %s waiting for%s", mta_relay_to_text(r), buf); return; } /* - * Start new connections if possible. - * XXX find a better heuristic for the good number of connections - * depending on the number of tasks and other factors. We might - * want to try more than the number of task, to have a chance to - * hit a mx faster if the first ones timeout. + * We have pending task, and it's maybe time too try a new source. */ - while (r->nconn < r->ntask) { - log_debug("debug: mta: trying to create new connection: " - "refcount=%i, ntask=%zu, nconnector=%zu, nconn=%zu", - r->refcount, r->ntask, r->nconnector, r->nconn); - - /* Check the per-relay connection limit */ - if (r->nconn >= MAXCONN_PER_RELAY) { - log_debug("debug: mta: hit connection limit on %s", - mta_relay_to_text(r)); - return; - } - - /* Use the first connector if ready */ - c = TAILQ_FIRST(&r->c_ready); - if (c) { - log_debug("debug: mta: using connector %s", - mta_connector_to_text(c)); - r->sourceloop = 0; - mta_connect(c); - continue; - } - - /* No new connectors */ - if (r->sourceloop > r->nconnector) { - log_debug("debug: mta: no new connector available"); - - if (TAILQ_FIRST(&r->c_delay)) { - mta_relay_schedule(r, 1); - log_debug( - "debug: mta: waiting for relay timeout"); - return; - } - - if (TAILQ_FIRST(&r->c_limit)) { - mta_relay_schedule(r, 5); - log_debug( - "debug: mta: waiting for relay timeout"); - return; - } - - log_debug("debug: mta: failing..."); - /* - * All sources have been tried and no connectors can - * be used. - */ - if (r->nconnector == 0) { - r->fail = IMSG_DELIVERY_TEMPFAIL; - r->failstr = "No source address"; - } - else { - r->fail = IMSG_DELIVERY_TEMPFAIL; - r->failstr = "No MX could be reached"; - } - mta_flush(r, r->fail, r->failstr); - return; - } - - r->sourceloop++; - log_debug("debug: mta: need new connector (attempt %zu)", - r->sourceloop); - if (r->sourcetable) { - log_debug("debug: mta: querying source %s", - r->sourcetable); - mta_query_source(r); - return; - } - log_debug("debug: mta: using default source"); - s = mta_source(NULL); - mta_on_source(r, s); - mta_source_unref(s); + if (r->nextsource <= time(NULL)) + mta_query_source(r); + else { + log_debug("debug: mta: scheduling relay %s in %llus...", + mta_relay_to_text(r), + (unsigned long long) r->nextsource - time(NULL)); + runq_schedule(runq_relay, r->nextsource, NULL, r); + r->status |= RELAY_WAIT_CONNECTOR; + mta_relay_ref(r); } } @@ -980,7 +1205,11 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) { struct mta_envelope *e; struct mta_task *task; + const char *domain; + void *iter; + struct mta_connector *c; size_t n; + size_t r; log_debug("debug: mta_flush(%s, %i, \"%s\")", mta_relay_to_text(relay), fail, error); @@ -993,7 +1222,26 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) TAILQ_REMOVE(&relay->tasks, task, entry); while ((e = TAILQ_FIRST(&task->envelopes))) { TAILQ_REMOVE(&task->envelopes, e, entry); - mta_delivery(e, NULL, relay->domain->name, fail, error); + mta_delivery(e, NULL, relay->domain->name, fail, error, 0); + + /* + * host was suspended, cache envelope id in hoststat tree + * so that it can be retried when a delivery succeeds for + * that domain. + */ + domain = strchr(e->dest, '@'); + if (fail == IMSG_DELIVERY_TEMPFAIL && domain) { + r = 0; + iter = NULL; + while (tree_iter(&relay->connectors, &iter, + NULL, (void **)&c)) { + if (c->flags & CONNECTOR_ERROR_ROUTE) + r++; + } + if (tree_count(&relay->connectors) == r) + mta_hoststat_cache(domain+1, e->id); + } + free(e->dest); free(e->rcpt); free(e); @@ -1012,27 +1260,28 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) * Find a route to use for this connector */ static struct mta_route * -mta_find_route(struct mta_connector *c) +mta_find_route(struct mta_connector *c, time_t now, int *limits, + time_t *nextconn) { struct mta_route *route, *best; + struct mta_limits *l = c->relay->limits; struct mta_mx *mx; int level, limit_host, limit_route; - int family_mismatch, seen; + int family_mismatch, seen, suspended_route; + time_t tm; + log_debug("debug: mta-routing: searching new route for %s...", + mta_connector_to_text(c)); + + tm = 0; limit_host = 0; limit_route = 0; + suspended_route = 0; family_mismatch = 0; level = -1; best = NULL; seen = 0; - if (c->nconn >= MAXCONN_PER_CONNECTOR) { - log_debug("debug: mta: hit limit on connector %s", - mta_connector_to_text(c)); - c->flags |= CONNECTOR_LIMIT_SOURCE; - return (NULL); - } - TAILQ_FOREACH(mx, &c->relay->domain->mxs, entry) { /* * New preference level @@ -1047,9 +1296,10 @@ mta_find_route(struct mta_connector *c) /* * No candidate found. There are valid MXs at this - * preference level but they reached their limit. + * preference level but they reached their limit, or + * we can't connect yet. */ - if (limit_host || limit_route) + if (limit_host || limit_route || tm) break; /* @@ -1073,27 +1323,81 @@ mta_find_route(struct mta_connector *c) /* Found a possibly valid mx */ seen++; - if (c->source->sa && - c->source->sa->sa_family != mx->host->sa->sa_family) { + if ((c->source->sa && + c->source->sa->sa_family != mx->host->sa->sa_family) || + (l->family && l->family != mx->host->sa->sa_family)) { + log_debug("debug: mta-routing: skipping host %s: AF mismatch", + mta_host_to_text(mx->host)); family_mismatch = 1; continue; } - if (mx->host->nconn >= MAXCONN_PER_HOST) { + if (mx->host->nconn >= l->maxconn_per_host) { + log_debug("debug: mta-routing: skipping host %s: too many connections", + mta_host_to_text(mx->host)); limit_host = 1; continue; } + if (mx->host->lastconn + l->conndelay_host > now) { + log_debug("debug: mta-routing: skipping host %s: cannot use before %llus", + mta_host_to_text(mx->host), + (unsigned long long) mx->host->lastconn + l->conndelay_host - now); + if (tm == 0 || mx->host->lastconn + l->conndelay_host < tm) + tm = mx->host->lastconn + l->conndelay_host; + continue; + } + route = mta_route(c->source, mx->host); - if (route->nconn >= MAXCONN_PER_ROUTE) { + if (route->flags & ROUTE_DISABLED) { + log_debug("debug: mta-routing: skipping route %s: suspend", + mta_route_to_text(route)); + suspended_route |= route->flags & ROUTE_DISABLED; + mta_route_unref(route); /* from here */ + continue; + } + + if (route->nconn && (route->flags & ROUTE_NEW)) { + log_debug("debug: mta-routing: skipping route %s: not validated yet", + mta_route_to_text(route)); + limit_route = 1; + mta_route_unref(route); /* from here */ + continue; + } + + if (route->nconn >= l->maxconn_per_route) { + log_debug("debug: mta-routing: skipping route %s: too many connections", + mta_route_to_text(route)); limit_route = 1; mta_route_unref(route); /* from here */ continue; } + if (route->lastconn + l->conndelay_route > now) { + log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after connect)", + mta_route_to_text(route), + (unsigned long long) route->lastconn + l->conndelay_route - now); + if (tm == 0 || route->lastconn + l->conndelay_route < tm) + tm = route->lastconn + l->conndelay_route; + mta_route_unref(route); /* from here */ + continue; + } + + if (route->lastdisc + l->discdelay_route > now) { + log_debug("debug: mta-routing: skipping route %s: cannot use before %llus (delay after disconnect)", + mta_route_to_text(route), + (unsigned long long) route->lastdisc + l->discdelay_route - now); + if (tm == 0 || route->lastdisc + l->discdelay_route < tm) + tm = route->lastdisc + l->discdelay_route; + mta_route_unref(route); /* from here */ + continue; + } + /* Use the route with the lowest number of connections. */ if (best && route->nconn >= best->nconn) { + log_debug("debug: mta-routing: skipping route %s: current one is better", + mta_route_to_text(route)); mta_route_unref(route); /* from here */ continue; } @@ -1101,30 +1405,43 @@ mta_find_route(struct mta_connector *c) if (best) mta_route_unref(best); /* from here */ best = route; + log_debug("debug: mta-routing: selecting candidate route %s", + mta_route_to_text(route)); } if (best) return (best); + /* Order is important */ if (seen == 0) { - log_info("smtp-out: No reachable MX for connector %s", + log_info("smtp-out: No MX found for %s", mta_connector_to_text(c)); - c->flags |= CONNECTOR_MX_ERROR; + c->flags |= CONNECTOR_ERROR_MX; } else if (limit_route) { - log_debug("debug: mta: hit route limit on connector %s", - mta_connector_to_text(c)); - c->flags |= CONNECTOR_LIMIT_ROUTE; + log_debug("debug: mta: hit route limit"); + *limits |= CONNECTOR_LIMIT_ROUTE; } else if (limit_host) { - log_debug("debug: mta: hit host limit on connector %s", - mta_connector_to_text(c)); - c->flags |= CONNECTOR_LIMIT_HOST; + log_debug("debug: mta: hit host limit"); + *limits |= CONNECTOR_LIMIT_HOST; + } + else if (tm) { + if (tm > *nextconn) + *nextconn = tm; } else if (family_mismatch) { - log_info("smtp-out: Address family mismatch on connector %s", + log_info("smtp-out: Address family mismatch on %s", mta_connector_to_text(c)); - c->flags |= CONNECTOR_FAMILY_ERROR; + c->flags |= CONNECTOR_ERROR_FAMILY; + } + else if (suspended_route) { + log_info("smtp-out: No valid route for %s", + mta_connector_to_text(c)); + if (suspended_route & ROUTE_DISABLED_NET) + c->flags |= CONNECTOR_ERROR_ROUTE_NET; + if (suspended_route & ROUTE_DISABLED_SMTP) + c->flags |= CONNECTOR_ERROR_ROUTE_SMTP; } return (NULL); @@ -1134,33 +1451,16 @@ static void mta_log(const struct mta_envelope *evp, const char *prefix, const char *source, const char *relay, const char *status) { - char session[SMTPD_MAXLINESIZE]; - char rcpt[SMTPD_MAXLINESIZE]; - char src[SMTPD_MAXLINESIZE]; - - session[0] = '\0'; - if (evp->session) - snprintf(session, sizeof session, "session=%016"PRIx64", ", - evp->session); - - rcpt[0] = '\0'; - if (evp->rcpt) - snprintf(rcpt, sizeof rcpt, "rcpt=<%s>, ", evp->rcpt); - - src[0] = '\0'; - if (source) - snprintf(src, sizeof src, "source=%s, ", source); - - - log_info("relay: %s for %016" PRIx64 ": %sfrom=<%s>, to=<%s>, " - "%s%srelay=%s, delay=%s, stat=%s", + log_info("relay: %s for %016" PRIx64 ": session=%016"PRIx64", " + "from=<%s>, to=<%s>, rcpt=<%s>, source=%s, " + "relay=%s, delay=%s, stat=%s", prefix, evp->id, - session, + evp->session, evp->task->sender, evp->dest, - src, - rcpt, + evp->rcpt ? evp->rcpt : "-", + source ? source : "-", relay, duration_to_text(time(NULL) - evp->creation), status); @@ -1204,10 +1504,6 @@ mta_relay(struct envelope *e) if ((r = SPLAY_FIND(mta_relay_tree, &relays, &key)) == NULL) { r = xcalloc(1, sizeof *r, "mta_relay"); TAILQ_INIT(&r->tasks); - TAILQ_INIT(&r->c_ready); - TAILQ_INIT(&r->c_delay); - TAILQ_INIT(&r->c_limit); - TAILQ_INIT(&r->c_error); r->id = generate_uid(); r->flags = key.flags; r->domain = key.domain; @@ -1227,12 +1523,9 @@ mta_relay(struct envelope *e) r->helotable = xstrdup(key.helotable, "mta: helotable"); SPLAY_INSERT(mta_relay_tree, &relays, r); - evtimer_set(&r->ev, mta_relay_timeout, r); - log_trace(TRACE_MTA, "mta: new %s", mta_relay_to_text(r)); stat_increment("mta.relay", 1); } else { mta_domain_unref(key.domain); /* from here */ - log_trace(TRACE_MTA, "mta: reusing %s", mta_relay_to_text(r)); } r->refcount++; @@ -1267,9 +1560,6 @@ mta_relay_unref(struct mta_relay *relay) free(relay->secret); free(relay->sourcetable); - if (evtimer_pending(&relay->ev, NULL)) - evtimer_del(&relay->ev); - mta_domain_unref(relay->domain); /* from constructor */ free(relay); stat_decrement("mta.relay", 1); @@ -1594,14 +1884,11 @@ mta_connector(struct mta_relay *relay, struct mta_source *source) c = xcalloc(1, sizeof(*c), "mta_connector"); c->relay = relay; c->source = source; + c->flags |= CONNECTOR_NEW; mta_source_ref(source); - c->queue = &relay->c_ready; - TAILQ_INSERT_HEAD(c->queue, c, lst_entry); tree_xset(&relay->connectors, (uintptr_t)(source), c); - relay->nconnector++; stat_increment("mta.connector", 1); - log_debug("debug: mta: new connector %s", - mta_connector_to_text(c)); + log_debug("debug: mta: new %s", mta_connector_to_text(c)); } return (c); @@ -1610,10 +1897,17 @@ mta_connector(struct mta_relay *relay, struct mta_source *source) static void mta_connector_free(struct mta_connector *c) { - c->relay->nconnector--; - TAILQ_REMOVE(c->queue, c, lst_entry); - mta_source_unref(c->source); + log_debug("debug: mta: freeing %s", + mta_connector_to_text(c)); + + if (c->flags & CONNECTOR_WAIT) { + log_debug("debug: mta: canceling timeout for %s", + mta_connector_to_text(c)); + runq_cancel(runq_connector, NULL, c); + } + mta_source_unref(c->source); /* from constructor */ free(c); + stat_decrement("mta.connector", 1); } @@ -1622,9 +1916,10 @@ mta_connector_to_text(struct mta_connector *c) { static char buf[1024]; - snprintf(buf, sizeof buf, "%s->%s", + snprintf(buf, sizeof buf, "[connector:%s->%s,0x%x]", mta_source_to_text(c->source), - mta_relay_to_text(c->relay)); + mta_relay_to_text(c->relay), + c->flags); return (buf); } @@ -1632,6 +1927,7 @@ static struct mta_route * mta_route(struct mta_source *src, struct mta_host *dst) { struct mta_route key, *r; + static uint64_t rid = 0; key.src = src; key.dst = dst; @@ -1641,30 +1937,80 @@ mta_route(struct mta_source *src, struct mta_host *dst) r = xcalloc(1, sizeof(*r), "mta_route"); r->src = src; r->dst = dst; + r->flags |= ROUTE_NEW; + r->id = ++rid; SPLAY_INSERT(mta_route_tree, &routes, r); mta_source_ref(src); mta_host_ref(dst); stat_increment("mta.route", 1); } + else if (r->flags & ROUTE_RUNQ) { + log_debug("debug: mta: mta_route_ref(): canceling runq for route %s", + mta_route_to_text(r)); + r->flags &= ~(ROUTE_RUNQ | ROUTE_KEEPALIVE); + runq_cancel(runq_route, NULL, r); + r->refcount--; /* from mta_route_unref() */ + } r->refcount++; return (r); } -#if 0 static void mta_route_ref(struct mta_route *r) { r->refcount++; } -#endif static void mta_route_unref(struct mta_route *r) { + time_t sched, now; + int delay; + if (--r->refcount) return; + /* + * Nothing references this route, but we might want to keep it alive + * for a while. + */ + now = time(NULL); + sched = 0; + + if (r->penalty) { +#if DELAY_QUADRATIC + delay = DELAY_ROUTE_BASE * r->penalty * r->penalty; +#else + delay = 15 * 60; +#endif + if (delay > DELAY_ROUTE_MAX) + delay = DELAY_ROUTE_MAX; + sched = r->lastpenalty + delay; + log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (penalty %i)", + mta_route_to_text(r), (unsigned long long) sched - now, r->penalty); + } else if (!(r->flags & ROUTE_KEEPALIVE)) { + if (r->lastconn + max_seen_conndelay_route > now) + sched = r->lastconn + max_seen_conndelay_route; + if (r->lastdisc + max_seen_discdelay_route > now && + r->lastdisc + max_seen_discdelay_route < sched) + sched = r->lastdisc + max_seen_discdelay_route; + + if (sched > now) + log_debug("debug: mta: mta_route_unref(): keeping route %s alive for %llus (imposed delay)", + mta_route_to_text(r), (unsigned long long) sched - now); + } + + if (sched > now) { + r->flags |= ROUTE_RUNQ; + runq_schedule(runq_route, sched, NULL, r); + r->refcount++; + return; + } + + log_debug("debug: mta: ma_route_unref(): really discarding route %s", + mta_route_to_text(r)); + SPLAY_REMOVE(mta_route_tree, &routes, r); mta_source_unref(r->src); /* from constructor */ mta_host_unref(r->dst); /* from constructor */ @@ -1701,3 +2047,94 @@ mta_route_cmp(const struct mta_route *a, const struct mta_route *b) } SPLAY_GENERATE(mta_route_tree, mta_route, entry, mta_route_cmp); + + +/* hoststat errors are not critical, we do best effort */ +void +mta_hoststat_update(const char *host, const char *error) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + time_t tm; + + if (! lowercase(buf, host, sizeof buf)) + return; + + tm = time(NULL); + hs = dict_get(&hoststat, buf); + if (hs == NULL) { + hs = calloc(1, sizeof *hs); + if (hs == NULL) + return; + tree_init(&hs->deferred); + runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs); + } + strlcpy(hs->name, buf, sizeof hs->name); + strlcpy(hs->error, error, sizeof hs->error); + hs->tm = time(NULL); + dict_set(&hoststat, buf, hs); + + runq_cancel(runq_hoststat, NULL, hs); + runq_schedule(runq_hoststat, tm+HOSTSTAT_EXPIRE_DELAY, NULL, hs); +} + +void +mta_hoststat_cache(const char *host, uint64_t evpid) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + + if (! lowercase(buf, host, sizeof buf)) + return; + + hs = dict_get(&hoststat, buf); + if (hs == NULL) + return; + + tree_set(&hs->deferred, evpid, NULL); +} + +void +mta_hoststat_uncache(const char *host, uint64_t evpid) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + + if (! lowercase(buf, host, sizeof buf)) + return; + + hs = dict_get(&hoststat, buf); + if (hs == NULL) + return; + + tree_pop(&hs->deferred, evpid); +} + +void +mta_hoststat_reschedule(const char *host) +{ + struct hoststat *hs = NULL; + char buf[SMTPD_MAXHOSTNAMELEN]; + uint64_t evpid; + + if (! lowercase(buf, host, sizeof buf)) + return; + + hs = dict_get(&hoststat, buf); + if (hs == NULL) + return; + + while (tree_poproot(&hs->deferred, &evpid, NULL)) { + m_compose(p_queue, IMSG_MTA_SCHEDULE, 0, 0, -1, + &evpid, sizeof evpid); + } +} + +static void +mta_hoststat_remove_entry(struct hoststat *hs) +{ + while (tree_poproot(&hs->deferred, NULL, NULL)) + ; + dict_pop(&hoststat, hs->name); + runq_cancel(runq_hoststat, NULL, hs); +} |