diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2012-10-25 09:51:09 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2012-10-25 09:51:09 +0000 |
commit | c0411ea8d985cedd8f13107e4d7064f1a06a6eea (patch) | |
tree | fdee9f810a479df8fb8f942fec9115bbd543bb8e /usr.sbin/smtpd | |
parent | ab20b8f4d1b3d70a7add8b28a95c34ccc60ab3a8 (diff) |
Make the mda request the message fd from the queue when needed, instead of
pushing the fd with the envelope. This allows the mda to deal itself with
session limits. Envelopes are sent at full rate to the mda, which buffers
them on per-user queues, or sends them back for rescheduling if it already
has too many pending envelopes. Delivery sessions are created (within per-
user and global limits) to drain the queues.
This makes the server handle envelope bursts more efficiently.
ok gilles@
Diffstat (limited to 'usr.sbin/smtpd')
-rw-r--r-- | usr.sbin/smtpd/mda.c | 259 | ||||
-rw-r--r-- | usr.sbin/smtpd/queue.c | 23 |
2 files changed, 174 insertions, 108 deletions
diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c index 6de3c4c5e96..7630df78599 100644 --- a/usr.sbin/smtpd/mda.c +++ b/usr.sbin/smtpd/mda.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mda.c,v 1.81 2012/10/17 17:14:11 eric Exp $ */ +/* $OpenBSD: mda.c,v 1.82 2012/10/25 09:51:08 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -41,42 +41,55 @@ #include "smtpd.h" #include "log.h" -#define MDA_MAXPERUSER 7 - -struct mda_session { - struct envelope evp; - struct msgbuf w; - struct event ev; - FILE *datafp; -}; +#define MDA_MAXEVP 5000 +#define MDA_MAXEVPUSER 500 +#define MDA_MAXSESS 50 +#define MDA_MAXSESSUSER 7 struct mda_user { TAILQ_ENTRY(mda_user) entry; + TAILQ_ENTRY(mda_user) entry_runnable; char name[MAXLOGNAME]; + size_t evpcount; + TAILQ_HEAD(, envelope) envelopes; + int runnable; size_t running; }; +struct mda_session { + uint32_t id; + struct mda_user *user; + struct envelope *evp; + struct msgbuf w; + struct event ev; + FILE *datafp; +}; + static void mda_imsg(struct imsgev *, struct imsg *); static void mda_shutdown(void); static void mda_sig_handler(int, short, void *); static void mda_store(struct mda_session *); static void mda_store_event(int, short, void *); static int mda_check_loop(FILE *, struct envelope *); -static int mda_user_increment(const char *); -static void mda_user_decrement(const char *); +static void mda_done(struct mda_session *, int); +static void mda_drain(void); +size_t evpcount; static struct tree sessions; static uint32_t mda_id = 0; static TAILQ_HEAD(, mda_user) users; +static TAILQ_HEAD(, mda_user) runnable; +size_t running; static void mda_imsg(struct imsgev *iev, struct imsg *imsg) { - char output[128], *error, *parent_error; + char output[128], *error, *parent_error, *name; char stat[MAX_LINE_SIZE]; struct deliver deliver; struct mda_session *s; + struct mda_user *u; struct delivery_mda *d_mda; struct envelope *ep; FILE *fp; @@ -85,42 +98,73 @@ mda_imsg(struct imsgev *iev, struct imsg *imsg) if (iev->proc == PROC_QUEUE) { switch (imsg->hdr.type) { + case IMSG_MDA_SESS_NEW: - ep = (struct envelope *)imsg->data; - fp = fdopen(imsg->fd, "r"); - if (fp == NULL) - fatalx("mda: fdopen"); + ep = xmemdup(imsg->data, sizeof *ep, "mda_imsg"); - if (mda_check_loop(fp, ep)) { - log_debug("mda: loop detected"); - envelope_set_errormsg(ep, "646 loop detected"); + if (evpcount >= MDA_MAXEVP) { + log_debug("mda: too many envelopes"); imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1, ep, - sizeof *ep); - fclose(fp); + IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, + ep, sizeof *ep); + free(ep); return; } - if (mda_user_increment(ep->agent.mda.user) == -1) { - envelope_set_errormsg(ep, "mda limit reached"); + name = ep->agent.mda.user; + TAILQ_FOREACH(u, &users, entry) + if (!strcmp(name, u->name)) + break; + if (u && u->evpcount >= MDA_MAXEVPUSER) { + log_debug("mda: too many envelopes for \"%s\"", + u->name); imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, ep, - sizeof *ep); - fclose(fp); + IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, + ep, sizeof *ep); + free(ep); return; } + if (u == NULL) { + u = xcalloc(1, sizeof *u, "mda_user"); + TAILQ_INIT(&u->envelopes); + strlcpy(u->name, name, sizeof u->name); + TAILQ_INSERT_TAIL(&users, u, entry); + } + if (u->runnable == 0 && u->running < MDA_MAXSESSUSER) { + log_debug("mda: \"%s\" immediatly runnable", + u->name); + TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); + u->runnable = 1; + } + + stat_increment("mda.pending", 1); + + evpcount += 1; + u->evpcount += 1; + TAILQ_INSERT_TAIL(&u->envelopes, ep, entry); + mda_drain(); + return; + + case IMSG_QUEUE_MESSAGE_FD: + id = *(uint32_t*)(imsg->data); + + s = tree_xget(&sessions, id); - /* make new session based on provided args */ - s = xcalloc(1, sizeof *s, "mda_imsg"); - msgbuf_init(&s->w); - s->evp = *ep; - s->datafp = fp; - id = mda_id++; - tree_xset(&sessions, id, s); + s->datafp = fdopen(imsg->fd, "r"); + if (s->datafp == NULL) + fatalx("mda: fdopen"); + + if (mda_check_loop(s->datafp, s->evp)) { + log_debug("mda: loop detected"); + envelope_set_errormsg(s->evp, + "646 loop detected"); + mda_done(s, IMSG_QUEUE_DELIVERY_LOOP); + return; + } /* request parent to fork a helper process */ - ep = &s->evp; - d_mda = &s->evp.agent.mda; + ep = s->evp; + d_mda = &s->evp->agent.mda; switch (d_mda->method) { case A_MDA: deliver.mode = A_MDA; @@ -180,7 +224,7 @@ mda_imsg(struct imsgev *iev, struct imsg *imsg) return; case IMSG_MDA_DONE: - s = tree_xpop(&sessions, imsg->hdr.peerid); + s = tree_xget(&sessions, imsg->hdr.peerid); /* * Grab last line of mda stdout/stderr if available. */ @@ -237,24 +281,11 @@ mda_imsg(struct imsgev *iev, struct imsg *imsg) msg = IMSG_QUEUE_DELIVERY_OK; if (error) { msg = IMSG_QUEUE_DELIVERY_TEMPFAIL; - envelope_set_errormsg(&s->evp, "%s", error); + envelope_set_errormsg(s->evp, "%s", error); snprintf(stat, sizeof stat, "Error (%s)", error); } - imsg_compose_event(env->sc_ievs[PROC_QUEUE], msg, - 0, 0, -1, &s->evp, sizeof s->evp); - - log_envelope(&s->evp, NULL, error ? stat : "Delivered"); - - mda_user_decrement(s->evp.agent.mda.user); - - /* destroy session */ - if (s->w.fd != -1) - close(s->w.fd); - if (s->datafp) - fclose(s->datafp); - msgbuf_clear(&s->w); - event_del(&s->ev); - free(s); + log_envelope(s->evp, NULL, error ? stat : "Delivered"); + mda_done(s, msg); return; case IMSG_CTL_VERBOSE: @@ -329,6 +360,8 @@ mda(void) tree_init(&sessions); TAILQ_INIT(&users); + TAILQ_INIT(&runnable); + running = 0; imsg_callback = mda_imsg; event_init(); @@ -357,16 +390,16 @@ mda_store(struct mda_session *s) struct ibuf *buf; int len; - if (s->evp.sender.user[0] && s->evp.sender.domain[0]) + if (s->evp->sender.user[0] && s->evp->sender.domain[0]) /* XXX: remove user provided Return-Path, if any */ len = asprintf(&p, "Return-Path: %s@%s\nDelivered-To: %s@%s\n", - s->evp.sender.user, s->evp.sender.domain, - s->evp.rcpt.user, - s->evp.rcpt.domain); + s->evp->sender.user, s->evp->sender.domain, + s->evp->rcpt.user, + s->evp->rcpt.domain); else len = asprintf(&p, "Delivered-To: %s@%s\n", - s->evp.rcpt.user, - s->evp.rcpt.domain); + s->evp->rcpt.user, + s->evp->rcpt.domain); if (len == -1) fatal("mda_store: asprintf"); @@ -463,40 +496,92 @@ mda_check_loop(FILE *fp, struct envelope *ep) return (ret); } -static int -mda_user_increment(const char *name) +static void +mda_drain(void) { - struct mda_user *user; - - TAILQ_FOREACH(user, &users, entry) - if (!strcmp(name, user->name)) { - if (user->running >= MDA_MAXPERUSER) { - log_debug("mda: too many mda proc for user %s", - name); - return (-1); - } - user->running += 1; - return (0); - } + struct mda_session *s; + struct mda_user *user; - user = xmalloc(sizeof *user, "mda_user"); - strlcpy(user->name, name, sizeof user->name); - user->running = 1; - TAILQ_INSERT_TAIL(&users, user, entry); + while ((user = (TAILQ_FIRST(&runnable)))) { - return (0); + if (running >= MDA_MAXSESS) { + log_debug("mda: maximum number of session reached"); + return; + } + + log_debug("mda: new session for user \"%s\"", user->name); + + s = xcalloc(1, sizeof *s, "mda_drain"); + s->user = user; + s->evp = TAILQ_FIRST(&user->envelopes); + TAILQ_REMOVE(&user->envelopes, s->evp, entry); + s->id = mda_id++; + msgbuf_init(&s->w); + tree_xset(&sessions, s->id, s); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_QUEUE_MESSAGE_FD, evpid_to_msgid(s->evp->id), 0, -1, + &s->id, sizeof(s->id)); + + stat_decrement("mda.pending", 1); + + user->evpcount--; + evpcount--; + + stat_increment("mda.running", 1); + + user->running++; + running++; + + /* + * The user is still runnable if there are pending envelopes + * and the session limit is not reached. We put it at the tail + * so that everyone gets a fair share. + */ + TAILQ_REMOVE(&runnable, user, entry_runnable); + user->runnable = 0; + if (TAILQ_FIRST(&user->envelopes) && + user->running < MDA_MAXSESSUSER) { + TAILQ_INSERT_TAIL(&runnable, user, entry_runnable); + user->runnable = 1; + log_debug("mda: user \"%s\" still runnable", user->name); + } + } } static void -mda_user_decrement(const char *name) +mda_done(struct mda_session *s, int msg) { - struct mda_user *user; + tree_xpop(&sessions, s->id); + + imsg_compose_event(env->sc_ievs[PROC_QUEUE], msg, 0, 0, -1, + s->evp, sizeof *s->evp); + + running--; + s->user->running--; + + stat_decrement("mda.running", 1); + + if (TAILQ_FIRST(&s->user->envelopes) == NULL && s->user->running == 0) { + log_debug("mda: all done for user \"%s\"", s->user->name); + TAILQ_REMOVE(&users, s->user, entry); + free(s->user); + } else if (s->user->runnable == 0 && + TAILQ_FIRST(&s->user->envelopes) && + s->user->running < MDA_MAXSESSUSER) { + log_debug("mda: user \"%s\" becomes runnable", + s->user->name); + TAILQ_INSERT_TAIL(&runnable, s->user, entry_runnable); + s->user->runnable = 1; + } - TAILQ_FOREACH(user, &users, entry) - if (!strcmp(name, user->name)) - if (--user->running == 0) { - TAILQ_REMOVE(&users, user, entry); - free(user); - break; - } + if (s->datafp) + fclose(s->datafp); + if (s->w.fd != -1) + close(s->w.fd); + event_del(&s->ev); + msgbuf_clear(&s->w); + free(s->evp); + free(s); + + mda_drain(); } diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index f552d1950b1..bc6635827c6 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.139 2012/10/14 18:45:34 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.140 2012/10/25 09:51:08 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -48,12 +48,9 @@ static void queue_pass_to_scheduler(struct imsgev *, struct imsg *); static void queue_shutdown(void); static void queue_sig_handler(int, short, void *); -#define MDA_RUNMAX 50 - static void queue_imsg(struct imsgev *iev, struct imsg *imsg) { - static size_t mda_running; static uint64_t batch_id; struct submit_status ss; struct envelope *e, evp; @@ -167,17 +164,9 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) id = *(uint64_t*)(imsg->data); if (queue_envelope_load(id, &evp) == 0) errx(1, "cannot load evp:%016" PRIx64, id); - if (mda_running >= MDA_RUNMAX) { - imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], - IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, - &evp, sizeof evp); - return; - } evp.lasttry = time(NULL); - fd = queue_message_fd_r(evpid_to_msgid(id)); imsg_compose_event(env->sc_ievs[PROC_MDA], - IMSG_MDA_SESS_NEW, 0, 0, fd, &evp, sizeof evp); - mda_running += 1; + IMSG_MDA_SESS_NEW, 0, 0, -1, &evp, sizeof evp); return; case IMSG_SMTP_ENQUEUE: @@ -224,8 +213,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], IMSG_QUEUE_DELIVERY_OK, 0, 0, -1, &e->id, sizeof e->id); - if (iev->proc == PROC_MDA) - mda_running--; return; case IMSG_QUEUE_DELIVERY_TEMPFAIL: @@ -235,8 +222,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], IMSG_QUEUE_DELIVERY_TEMPFAIL, 0, 0, -1, e, sizeof *e); - if (iev->proc == PROC_MDA) - mda_running--; return; case IMSG_QUEUE_DELIVERY_PERMFAIL: @@ -246,8 +231,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], IMSG_QUEUE_DELIVERY_PERMFAIL, 0, 0, -1, &e->id, sizeof e->id); - if (iev->proc == PROC_MDA) - mda_running--; return; case IMSG_QUEUE_DELIVERY_LOOP: @@ -257,8 +240,6 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) imsg_compose_event(env->sc_ievs[PROC_SCHEDULER], IMSG_QUEUE_DELIVERY_LOOP, 0, 0, -1, &e->id, sizeof e->id); - if (iev->proc == PROC_MDA) - mda_running--; return; } } |