diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2012-08-18 15:45:13 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2012-08-18 15:45:13 +0000 |
commit | 563dc19788cd910fe9692adf60daca13c73d0f89 (patch) | |
tree | db349179debae813a018529d1f03ebf9d73e694a /usr.sbin/smtpd/mta.c | |
parent | e08c9c77e19684c71074b898b63a0427216bc57b (diff) |
Major update of the mta internals.
Add a mta_route structure which describes a route through which
outgoing mails are to be sent. This structure holds connection
parameters and limits. When an envelope is received in a batch,
the route for it is looked up, and the envelope is added to the
a list of envelope to be sent for this message on that route: a
task. When the batch is closed, each task is added to the list
of tasks for their respective route.
The routes are drained when new work can happen. The route will
create new mta sessions if necessary. When a session is up and
ready, it picks the first pending task on the route if any. In
the other case, it just closes the connection.
Errors on the connection are reported to the route, so that the
route could be flagged as broken. Currently, three errors on a
an attempt to open a route is reported as a failure for all pen-
ding tasks.
ok gilles@
Diffstat (limited to 'usr.sbin/smtpd/mta.c')
-rw-r--r-- | usr.sbin/smtpd/mta.c | 375 |
1 files changed, 364 insertions, 11 deletions
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index 500c77a5255..a28b572fbc4 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.133 2012/07/29 13:56:24 eric Exp $ */ +/* $OpenBSD: mta.c,v 1.134 2012/08/18 15:45:12 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -43,22 +43,106 @@ #include "smtpd.h" #include "log.h" +#define MTA_MAXCONN 5 /* connections per route */ +#define MTA_MAXMAIL 100 /* mails per session */ +#define MTA_MAXRCPT 1000 /* rcpt per mail */ + +struct mta_batch2 { + uint64_t id; + struct tree tasks; /* map route to task */ +}; + +SPLAY_HEAD(mta_route_tree, mta_route); + static void mta_imsg(struct imsgev *, struct imsg *); static void mta_shutdown(void); static void mta_sig_handler(int, short, void *); -static void +static struct mta_route *mta_route_for(struct envelope *); +static void mta_route_drain(struct mta_route *); +static void mta_route_free(struct mta_route *); +static void mta_envelope_done(struct mta_task *, struct envelope *, const char *); +static int mta_route_cmp(struct mta_route *, struct mta_route *); + +SPLAY_PROTOTYPE(mta_route_tree, mta_route, entry, mta_route_cmp); + +static struct mta_route_tree routes = SPLAY_INITIALIZER(&routes); +static struct tree batches = SPLAY_INITIALIZER(&batches); + +void mta_imsg(struct imsgev *iev, struct imsg *imsg) { - struct ssl *ssl; + struct mta_route *route; + struct mta_batch2 *batch; + struct mta_task *task; + struct envelope *e; + struct ssl *ssl; + uint64_t id; log_imsg(PROC_MTA, iev->proc, imsg); if (iev->proc == PROC_QUEUE) { switch (imsg->hdr.type) { + case IMSG_BATCH_CREATE: + id = *(uint64_t*)(imsg->data); + batch = xmalloc(sizeof *batch, "mta_batch"); + batch->id = id; + tree_init(&batch->tasks); + tree_xset(&batches, batch->id, batch); + log_trace(TRACE_MTA, + "mta: batch:%016" PRIx64 " created", batch->id); + return; + case IMSG_BATCH_APPEND: + e = xmalloc(sizeof *e, "mta:envelope"); + memmove(e, imsg->data, sizeof *e); + + route = mta_route_for(e); + batch = tree_xget(&batches, e->batch_id); + + if ((task = tree_get(&batch->tasks, route->id)) == NULL) { + log_trace(TRACE_MTA, "mta: new task for %s", + mta_route_to_text(route)); + task = xmalloc(sizeof *task, "mta_task"); + TAILQ_INIT(&task->envelopes); + task->route = route; + tree_xset(&batch->tasks, route->id, task); + task->msgid = evpid_to_msgid(e->id); + task->sender = e->sender; + route->refcount += 1; + } + + /* Technically, we could handle that by adding a msg + * level, but the batch sent by the scheduler should + * be valid. + */ + if (task->msgid != evpid_to_msgid(e->id)) + errx(1, "msgid mismatch in batch"); + + /* XXX honour route->maxrcpt */ + TAILQ_INSERT_TAIL(&task->envelopes, e, entry); + log_debug("mta: received evp:%016" PRIx64 " for <%s@%s>", + e->id, e->dest.user, e->dest.domain); + return; + case IMSG_BATCH_CLOSE: + id = *(uint64_t*)(imsg->data); + batch = tree_xpop(&batches, id); + log_trace(TRACE_MTA, "mta: batch:%016" PRIx64 " closed", + batch->id); + /* for all tasks, queue them on there route */ + while (tree_poproot(&batch->tasks, &id, (void**)&task)) { + if (id != task->route->id) + errx(1, "route id mismatch!"); + task->route->refcount -= 1; + task->route->ntask += 1; + TAILQ_INSERT_TAIL(&task->route->tasks, task, entry); + mta_route_drain(task->route); + } + free(batch); + return; + case IMSG_QUEUE_MESSAGE_FD: mta_session_imsg(iev, imsg); return; @@ -94,14 +178,10 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg) if (ssl == NULL) fatal(NULL); *ssl = *(struct ssl *)imsg->data; - ssl->ssl_cert = strdup((char *)imsg->data + - sizeof *ssl); - if (ssl->ssl_cert == NULL) - fatal(NULL); - ssl->ssl_key = strdup((char *)imsg->data + - sizeof *ssl + ssl->ssl_cert_len); - if (ssl->ssl_key == NULL) - fatal(NULL); + ssl->ssl_cert = xstrdup((char*)imsg->data + sizeof *ssl, + "mta:ssl_cert"); + ssl->ssl_key = xstrdup((char*)imsg->data + + sizeof *ssl + ssl->ssl_cert_len, "mta_ssl_key"); SPLAY_INSERT(ssltree, env->sc_ssl, ssl); return; @@ -200,3 +280,276 @@ mta(void) return (0); } + +const char * +mta_response_status(const char *r) +{ + switch (r[0]) { + case '2': + return "Sent"; + case '4': + case '5': + return "RemoteError"; + default: + return "LocalError"; + } +} + +int +mta_response_delivery(const char *r) +{ + switch (r[0]) { + case '2': + return IMSG_QUEUE_DELIVERY_OK; + case '5': + case '6': + return IMSG_QUEUE_DELIVERY_PERMFAIL; + default: + return IMSG_QUEUE_DELIVERY_TEMPFAIL; + } +} + +const char * +mta_response_text(const char *r) +{ + return (r + 4); +} + +void +mta_route_error(struct mta_route *route, const char *error) +{ + route->nfail += 1; + strlcpy(route->errorline, error, sizeof route->errorline); + log_warnx("mta: %s error: %s", mta_route_to_text(route), error); +} + +void +mta_route_ok(struct mta_route *route) +{ + log_debug("mta: %s ready", mta_route_to_text(route)); + route->nfail = 0; +} + +void +mta_route_collect(struct mta_route *route) +{ + route->nsession -= 1; + + mta_route_drain(route); +} + +const char * +mta_route_to_text(struct mta_route *route) +{ + static char buf[1024]; + const char *coma = ""; + + buf[0] = '\0'; + + snprintf(buf, sizeof buf, "route:%s[", route->hostname); + + if (route->flags & ROUTE_STARTTLS) { + coma = ","; + strlcat(buf, "starttls", sizeof buf); + } + + if (route->flags & ROUTE_SMTPS) { + strlcat(buf, coma, sizeof buf); + coma = ","; + strlcat(buf, "smtps", sizeof buf); + } + + if (route->flags & ROUTE_AUTH) { + strlcat(buf, coma, sizeof buf); + coma = ","; + strlcat(buf, "auth=", sizeof buf); + strlcat(buf, route->auth, sizeof buf); + } + + if (route->cert) { + strlcat(buf, coma, sizeof buf); + coma = ","; + strlcat(buf, "cert=", sizeof buf); + strlcat(buf, route->cert, sizeof buf); + } + + if (route->flags & ROUTE_MX) { + strlcat(buf, coma, sizeof buf); + coma = ","; + strlcat(buf, "mx", sizeof buf); + } + strlcat(buf, "]", sizeof buf); + + return (buf); +} + +static struct mta_route * +mta_route_for(struct envelope *e) +{ + struct ssl ssl; + struct mta_route key, *route; + + bzero(&key, sizeof key); + + key.flags = e->agent.mta.relay.flags; + if (e->agent.mta.relay.hostname[0]) { + key.hostname = e->agent.mta.relay.hostname; + key.flags |= ROUTE_MX; + } else + key.hostname = e->dest.domain; + key.port = e->agent.mta.relay.port; + key.cert = e->agent.mta.relay.cert; + if (!key.cert[0]) + key.cert = NULL; + key.auth = e->agent.mta.relay.authmap; + if (!key.auth[0]) + key.auth = NULL; + + if ((route = SPLAY_FIND(mta_route_tree, &routes, &key)) == NULL) { + route = xcalloc(1, sizeof *route, "mta_route"); + TAILQ_INIT(&route->tasks); + route->id = generate_uid(); + route->flags = key.flags; + route->hostname = xstrdup(key.hostname, "mta: hostname"); + route->port = key.port; + route->cert = key.cert ? xstrdup(key.cert, "mta: cert") : NULL; + route->auth = key.auth ? xstrdup(key.auth, "mta: auth") : NULL; + if (route->cert) { + strlcpy(ssl.ssl_name, route->cert, sizeof(ssl.ssl_name)); + route->ssl = SPLAY_FIND(ssltree, env->sc_ssl, &ssl); + } + SPLAY_INSERT(mta_route_tree, &routes, route); + + route->maxconn = MTA_MAXCONN; + route->maxmail = MTA_MAXMAIL; + route->maxrcpt = MTA_MAXRCPT; + + log_trace(TRACE_MTA, "mta: new %s", mta_route_to_text(route)); + } else { + log_trace(TRACE_MTA, "mta: reusing %s", mta_route_to_text(route)); + } + + return (route); +} + +static void +mta_route_free(struct mta_route *route) +{ + log_trace(TRACE_MTA, "mta: freeing %s", mta_route_to_text(route)); + SPLAY_REMOVE(mta_route_tree, &routes, route); + free(route->hostname); + if (route->cert) + free(route->cert); + if (route->auth) + free(route->auth); + free(route); +} + +static void +mta_route_drain(struct mta_route *route) +{ + struct mta_task *task; + struct envelope *e; + + log_debug("mta: draining %s (tasks=%i, refs=%i, sessions=%i)", + mta_route_to_text(route), + route->ntask, route->refcount, route->nsession); + + if (route->ntask == 0 && route->refcount == 0 && route->nsession == 0) { + mta_route_free(route); + return; + } + + if (route->ntask == 0) { + log_debug("mta: no task for %s", mta_route_to_text(route)); + return; + } + + if (route->nfail > 3) { + /* Three connection errors in a row: consider that the route + * has a problem. + */ + log_debug("mta: too many failures on %s", + mta_route_to_text(route)); + + while ((task = TAILQ_FIRST(&route->tasks))) { + TAILQ_REMOVE(&route->tasks, task, entry); + route->ntask -= 1; + while((e = TAILQ_FIRST(&task->envelopes))) + mta_envelope_done(task, e, route->errorline); + route->refcount -= 1; + free(task); + } + route->nfail = 0; + /* XXX maybe close the route for while */ + return; + } + + /* make sure there are one session for each task */ + while (route->nsession < route->ntask) { + /* if we have reached the max number of session, wait */ + if (route->nsession >= route->maxconn) { + log_debug("mta: max conn reached for %s", + mta_route_to_text(route)); + return; + } + route->nsession += 1; + mta_session(route); + } +} + +static void +mta_envelope_done(struct mta_task *task, struct envelope *e, const char *status) +{ + envelope_set_errormsg(e, "%s", status); + + log_info("%016" PRIx64 ": to=<%s@%s>, delay=%s, relay=%s, stat=%s (%s)", + e->id, e->dest.user, + e->dest.domain, + duration_to_text(time(NULL) - e->creation), + task->route->hostname, + mta_response_status(e->errorline), + mta_response_text(e->errorline)); + + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + mta_response_delivery(e->errorline), 0, 0, -1, e, sizeof(*e)); + TAILQ_REMOVE(&task->envelopes, e, entry); + free(e); +} + +static int +mta_route_cmp(struct mta_route *a, struct mta_route *b) +{ + int r; + + if (a->flags < b->flags) + return (-1); + if (a->flags > b->flags) + return (1); + + if (a->port < b->port) + return (-1); + if (a->port > b->port) + return (1); + + if (a->auth == NULL && b->auth) + return (-1); + if (a->auth && b->auth == NULL) + return (1); + if (a->auth && ((r = strcmp(a->auth, b->auth)))) + return (r); + + if (a->cert == NULL && b->cert) + return (-1); + if (a->cert && b->cert == NULL) + return (1); + if (a->cert && ((r = strcmp(a->cert, b->cert)))) + return (r); + + if ((r = strcmp(a->hostname, b->hostname))) + return (r); + + return (0); +} + +SPLAY_GENERATE(mta_route_tree, mta_route, entry, mta_route_cmp); |