diff options
Diffstat (limited to 'usr.sbin/smtpd/mda.c')
-rw-r--r-- | usr.sbin/smtpd/mda.c | 389 |
1 files changed, 239 insertions, 150 deletions
diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c index 19fe77d5095..0e3d6c26145 100644 --- a/usr.sbin/smtpd/mda.c +++ b/usr.sbin/smtpd/mda.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mda.c,v 1.86 2013/01/28 16:32:10 eric Exp $ */ +/* $OpenBSD: mda.c,v 1.87 2013/01/31 18:18:40 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -45,27 +45,42 @@ #define MDA_HIWAT 65536 -#define MDA_MAXEVP 5000 -#define MDA_MAXEVPUSER 500 +#define MDA_MAXEVP 200000 +#define MDA_MAXEVPUSER 15000 #define MDA_MAXSESS 50 #define MDA_MAXSESSUSER 7 +struct mda_envelope { + TAILQ_ENTRY(mda_envelope) entry; + uint64_t id; + time_t creation; + char *sender; + char *dest; + char *rcpt; + enum action_type method; + char *user; + char *buffer; +}; + +#define FLAG_USER_WAITINFO 0x01 +#define FLAG_USER_RUNNABLE 0x02 + struct mda_user { - TAILQ_ENTRY(mda_user) entry; - TAILQ_ENTRY(mda_user) entry_runnable; - char name[MAXLOGNAME]; - char usertable[MAXPATHLEN]; - size_t evpcount; - TAILQ_HEAD(, envelope) envelopes; - int runnable; - size_t running; - struct userinfo userinfo; + TAILQ_ENTRY(mda_user) entry; + TAILQ_ENTRY(mda_user) entry_runnable; + char name[MAXLOGNAME]; + char usertable[MAXPATHLEN]; + size_t evpcount; + TAILQ_HEAD(, mda_envelope) envelopes; + int flags; + size_t running; + struct userinfo userinfo; }; struct mda_session { uint64_t id; struct mda_user *user; - struct envelope *evp; + struct mda_envelope *evp; struct io io; struct iobuf iobuf; FILE *datafp; @@ -75,12 +90,12 @@ static void mda_imsg(struct mproc *, struct imsg *); static void mda_io(struct io *, int); static void mda_shutdown(void); static void mda_sig_handler(int, short, void *); -static int mda_check_loop(FILE *, struct envelope *); +static int mda_check_loop(FILE *, struct mda_envelope *); static int mda_getlastline(int, char *, size_t); static void mda_done(struct mda_session *); static void mda_fail(struct mda_user *, int, const char *); static void mda_drain(void); -static void mda_log(const struct envelope *, const char *, const char *); +static void mda_log(const struct mda_envelope *, const char *, const char *); size_t evpcount; static struct tree sessions; @@ -92,10 +107,10 @@ size_t running; static void mda_imsg(struct mproc *p, struct imsg *imsg) { - struct delivery_mda *d_mda; struct mda_session *s; struct mda_user *u; - struct envelope *e; + struct mda_envelope *e; + struct envelope evp; struct userinfo *userinfo; struct deliver deliver; struct msg m; @@ -104,8 +119,9 @@ mda_imsg(struct mproc *p, struct imsg *imsg) const char *username, *usertable; uint64_t reqid; size_t sz; - char out[256], stat[MAX_LINE_SIZE]; - int n, v, status; + char out[256], buf[MAX_LINE_SIZE]; + int n, v; + enum lka_resp_status status; if (p->proc == PROC_LKA) { switch (imsg->hdr.type) { @@ -113,7 +129,7 @@ mda_imsg(struct mproc *p, struct imsg *imsg) m_msg(&m, imsg); m_get_string(&m, &usertable); m_get_string(&m, &username); - m_get_int(&m, &status); + m_get_int(&m, (int *)&status); if (status == LKA_OK) m_get_data(&m, &data, &sz); m_end(&m); @@ -133,7 +149,8 @@ mda_imsg(struct mproc *p, struct imsg *imsg) "Permanent failure in user lookup"); else { memmove(&u->userinfo, data, sz); - u->runnable = 1; + u->flags &= ~FLAG_USER_WAITINFO; + u->flags |= FLAG_USER_RUNNABLE; TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); mda_drain(); } @@ -145,57 +162,97 @@ mda_imsg(struct mproc *p, struct imsg *imsg) switch (imsg->hdr.type) { case IMSG_MDA_DELIVER: - e = xmalloc(sizeof(*e), "mda:envelope"); m_msg(&m, imsg); - m_get_envelope(&m, e); + m_get_envelope(&m, &evp); m_end(&m); + e = xcalloc(1, sizeof *e, "mda_envelope"); + e->id = evp.id; + e->creation = evp.creation; + buf[0] = '\0'; + if (evp.sender.user[0] && evp.sender.domain[0]) + snprintf(buf, sizeof buf, "%s@%s", + evp.sender.user, evp.sender.domain); + e->sender = xstrdup(buf, "mda_envelope:sender"); + snprintf(buf, sizeof buf, "%s@%s", + evp.dest.user, evp.dest.domain); + e->dest = xstrdup(buf, "mda_envelope:dest"); + snprintf(buf, sizeof buf, "%s@%s", + evp.rcpt.user, evp.rcpt.domain); + if (strcmp(buf, e->dest)) + e->rcpt = xstrdup(buf, "mda_envelope:rcpt"); + e->method = evp.agent.mda.method; + e->buffer = xstrdup(evp.agent.mda.buffer, + "mda_envelope:buffer"); + e->user = xstrdup(evp.agent.mda.username, + "mda_envelope:user"); + if (evpcount >= MDA_MAXEVP) { log_debug("debug: mda: too many envelopes"); queue_tempfail(e->id, - "Global envelope limit reached"); + "Too many envelopes in the delivery agent: " + "will try again later"); mda_log(e, "TempFail", - "Global envelope limit reached"); + "Too many envelopes in the delivery agent: " + "will try again later"); + free(e->sender); + free(e->dest); + free(e->rcpt); + free(e->user); + free(e->buffer); free(e); return; } - username = e->agent.mda.username; - usertable = e->agent.mda.usertable; + username = evp.agent.mda.username; + usertable = evp.agent.mda.usertable; TAILQ_FOREACH(u, &users, entry) if (!strcmp(username, u->name) && !strcmp(usertable, u->usertable)) break; - if (u && u->evpcount >= MDA_MAXEVPUSER) { - log_debug("debug: mda: too many envelopes for " - "\"%s\"", u->name); - queue_tempfail(e->id, - "User envelope limit reached"); - mda_log(e, "TempFail", - "User envelope limit reached"); - free(e); - return; - } - if (u == NULL) { u = xcalloc(1, sizeof *u, "mda_user"); + TAILQ_INSERT_TAIL(&users, u, entry); TAILQ_INIT(&u->envelopes); strlcpy(u->name, username, sizeof u->name); strlcpy(u->usertable, usertable, sizeof u->usertable); - TAILQ_INSERT_TAIL(&users, u, entry); + u->flags |= FLAG_USER_WAITINFO; m_create(p_lka, IMSG_LKA_USERINFO, 0, 0, -1, 32 + strlen(usertable) + strlen(username)); m_add_string(p_lka, usertable); m_add_string(p_lka, username); m_close(p_lka); + stat_increment("mda.user", 1); + } else if (u->evpcount >= MDA_MAXEVPUSER) { + log_debug("debug: mda: too many envelopes for " + "\"%s\"", u->name); + queue_tempfail(e->id, + "Too many envelopes for this user in the " + "delivery agent: will try again later"); + mda_log(e, "TempFail", + "Too many envelopes for this user in the " + "delivery agent: will try again later"); + free(e->sender); + free(e->dest); + free(e->rcpt); + free(e->user); + free(e->buffer); + free(e); + return; + } else if (!(u->flags & FLAG_USER_RUNNABLE) && + !(u->flags & FLAG_USER_WAITINFO)) { + u->flags |= FLAG_USER_RUNNABLE; + TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); } + stat_increment("mda.envelope", 1); stat_increment("mda.pending", 1); evpcount += 1; u->evpcount += 1; TAILQ_INSERT_TAIL(&u->envelopes, e, entry); + mda_drain(); return; @@ -209,63 +266,62 @@ mda_imsg(struct mproc *p, struct imsg *imsg) if (imsg->fd == -1) { log_debug("debug: mda: cannot get message fd"); - queue_tempfail(s->evp->id, - "Cannot get message fd"); - mda_log(e, "TempFail", - "Cannot get messafe fd"); + queue_tempfail(e->id, "Cannot get message fd"); + mda_log(e, "TempFail", "Cannot get message fd"); mda_done(s); return; } + log_debug("debug: mda: got message fd %i " + "for session %016"PRIx64 " evpid %016"PRIx64, + imsg->fd, s->id, e->id); + if ((s->datafp = fdopen(imsg->fd, "r")) == NULL) { log_warn("warn: mda: fdopen"); - queue_tempfail(s->evp->id, "fdopen failed"); + queue_tempfail(e->id, "fdopen failed"); mda_log(e, "TempFail", "fdopen failed"); mda_done(s); + close(imsg->fd); return; } /* check delivery loop */ - if (mda_check_loop(s->datafp, s->evp)) { + if (mda_check_loop(s->datafp, e)) { log_debug("debug: mda: loop detected"); - queue_loop(s->evp->id); + queue_loop(e->id); mda_log(e, "PermFail", "Loop detected"); mda_done(s); return; } /* start queueing delivery headers */ - if (s->evp->sender.user[0] && s->evp->sender.domain[0]) + if (e->sender) /* XXX: remove exising Return-Path, if any */ n = iobuf_fqueue(&s->iobuf, - "Return-Path: %s@%s\nDelivered-To: %s@%s\n", - s->evp->sender.user, s->evp->sender.domain, - s->evp->rcpt.user, - s->evp->rcpt.domain); + "Return-Path: %s\nDelivered-To: %s\n", + e->sender, e->rcpt ? e->rcpt : e->dest); else n = iobuf_fqueue(&s->iobuf, - "Delivered-To: %s@%s\n", - s->evp->rcpt.user, - s->evp->rcpt.domain); + "Delivered-To: %s\n", + e->rcpt ? e->rcpt : e->dest); if (n == -1) { log_warn("warn: mda: " "fail to write delivery info"); - queue_tempfail(s->evp->id, "Out of memory"); - mda_log(s->evp, "TempFail", "Out of memory"); + queue_tempfail(e->id, "Out of memory"); + mda_log(e, "TempFail", "Out of memory"); mda_done(s); return; } /* request parent to fork a helper process */ - d_mda = &s->evp->agent.mda; userinfo = &s->user->userinfo; - switch (d_mda->method) { + switch (e->method) { case A_MDA: deliver.mode = A_MDA; deliver.userinfo = *userinfo; strlcpy(deliver.user, userinfo->username, sizeof(deliver.user)); - strlcpy(deliver.to, d_mda->buffer, + strlcpy(deliver.to, e->buffer, sizeof(deliver.to)); break; @@ -280,10 +336,8 @@ mda_imsg(struct mproc *p, struct imsg *imsg) sizeof(deliver.user)); strlcpy(deliver.to, userinfo->username, sizeof(deliver.to)); - snprintf(deliver.from, sizeof(deliver.from), - "%s@%s", - s->evp->sender.user, - s->evp->sender.domain); + strlcpy(deliver.from, e->sender, + sizeof(deliver.from)); break; case A_MAILDIR: @@ -291,7 +345,7 @@ mda_imsg(struct mproc *p, struct imsg *imsg) deliver.userinfo = *userinfo; strlcpy(deliver.user, userinfo->username, sizeof(deliver.user)); - strlcpy(deliver.to, d_mda->buffer, + strlcpy(deliver.to, e->buffer, sizeof(deliver.to)); break; @@ -300,15 +354,19 @@ mda_imsg(struct mproc *p, struct imsg *imsg) deliver.userinfo = *userinfo; strlcpy(deliver.user, userinfo->username, sizeof deliver.user); - strlcpy(deliver.to, d_mda->buffer, + strlcpy(deliver.to, e->buffer, sizeof deliver.to); break; default: errx(1, "mda: unknown delivery method: %d", - d_mda->method); + e->method); } + log_debug("debug: mda: querying mda fd " + "for session %016"PRIx64 " evpid %016"PRIx64, + s->id, s->evp->id); + m_create(p_parent, IMSG_PARENT_FORK_MDA, 0, 0, -1, 32 + sizeof(deliver)); m_add_id(p_parent, reqid); @@ -329,12 +387,16 @@ mda_imsg(struct mproc *p, struct imsg *imsg) e = s->evp; if (imsg->fd == -1) { log_warn("warn: mda: fail to retreive mda fd"); - queue_tempfail(s->evp->id, "Cannot get mda fd"); + queue_tempfail(e->id, "Cannot get mda fd"); mda_log(e, "TempFail", "Cannot get mda fd"); mda_done(s); return; } + log_debug("debug: mda: got mda fd %i " + "for session %016"PRIx64 " evpid %016"PRIx64, + imsg->fd, s->id, s->evp->id); + io_set_blocking(imsg->fd, 0); io_init(&s->io, imsg->fd, s, mda_io, &s->iobuf); io_set_write(&s->io); @@ -347,6 +409,7 @@ mda_imsg(struct mproc *p, struct imsg *imsg) m_end(&m); s = tree_xget(&sessions, reqid); + e = s->evp; /* * Grab last line of mda stdout/stderr if available. */ @@ -367,14 +430,13 @@ mda_imsg(struct mproc *p, struct imsg *imsg) /* update queue entry */ if (error) { - queue_tempfail(s->evp->id, error); - snprintf(stat, sizeof stat, "Error (%s)", - error); - mda_log(s->evp, "TempFail", stat); + queue_tempfail(e->id, error); + snprintf(buf, sizeof buf, "Error (%s)", error); + mda_log(e, "TempFail", buf); } else { - queue_ok(s->evp->id); - mda_log(s->evp, "Ok", "Delivered"); + queue_ok(e->id); + mda_log(e, "Ok", "Delivered"); } mda_done(s); return; @@ -455,6 +517,7 @@ mda(void) tree_init(&sessions); TAILQ_INIT(&users); TAILQ_INIT(&runnable); + evpcount = 0; running = 0; imsg_callback = mda_imsg; @@ -495,6 +558,9 @@ mda_io(struct io *io, int evt) /* done */ if (s->datafp == NULL) { + log_debug("debug: mda: all data sent for session" + " %016"PRIx64 " evpid %016"PRIx64, + s->id, s->evp->id); io_clear(io); return; } @@ -511,10 +577,16 @@ mda_io(struct io *io, int evt) io_pause(io, IO_PAUSE_OUT); return; } +#if 0 + log_debug("debug: mda: %zu bytes queued " + "for session %016"PRIx64 " evpid %016"PRIx64, + iobuf_queued(&s->iobuf), s->id, s->evp->id); +#endif } if (ferror(s->datafp)) { - log_debug("debug: mda_io: %p: ferror", s); + log_debug("debug: mda: ferror on session %016"PRIx64, + s->id); m_create(p_parent, IMSG_PARENT_KILL_MDA, 0, 0, -1, 128); m_add_id(p_parent, s->id); m_add_string(p_parent, "Error reading body"); @@ -524,39 +596,44 @@ mda_io(struct io *io, int evt) } if (feof(s->datafp)) { + log_debug("debug: mda: end-of-file for session" + " %016"PRIx64 " evpid %016"PRIx64, + s->id, s->evp->id); fclose(s->datafp); s->datafp = NULL; } return; case IO_TIMEOUT: - log_debug("debug: mda_io: timeout"); + log_debug("debug: mda: timeout on session %016"PRIx64, s->id); io_pause(io, IO_PAUSE_OUT); return; case IO_ERROR: - log_debug("debug: mda_io: io error: %s", io->error); + log_debug("debug: mda: io error on session %016"PRIx64": %s", + s->id, io->error); io_pause(io, IO_PAUSE_OUT); return; case IO_DISCONNECTED: - log_debug("debug: mda_io: disconnected"); + log_debug("debug: mda: io disconnected on session %016"PRIx64, + s->id); io_pause(io, IO_PAUSE_OUT); return; default: - log_debug("debug: mda_io: unexpected io event: %i", evt); + log_debug("debug: mda: unexpected event on session %016"PRIx64, + s->id); io_pause(io, IO_PAUSE_OUT); return; } } static int -mda_check_loop(FILE *fp, struct envelope *ep) +mda_check_loop(FILE *fp, struct mda_envelope *e) { char *buf, *lbuf; size_t len; - struct mailaddr maddr; int ret = 0; lbuf = NULL; @@ -575,12 +652,7 @@ mda_check_loop(FILE *fp, struct envelope *ep) break; if (strncasecmp("Delivered-To: ", buf, 14) == 0) { - - bzero(&maddr, sizeof maddr); - if (! text_to_mailaddr(&maddr, buf + 14)) - continue; - if (strcasecmp(maddr.user, ep->dest.user) == 0 && - strcasecmp(maddr.domain, ep->dest.domain) == 0) { + if (strcasecmp(buf + 14, e->dest) == 0) { ret = 1; break; } @@ -641,7 +713,7 @@ mda_getlastline(int fd, char *dst, size_t dstsz) static void mda_fail(struct mda_user *user, int permfail, const char *error) { - struct envelope *e; + struct mda_envelope *e; while ((e = TAILQ_FIRST(&user->envelopes))) { TAILQ_REMOVE(&user->envelopes, e, entry); @@ -653,143 +725,160 @@ mda_fail(struct mda_user *user, int permfail, const char *error) mda_log(e, "TempFail", error); queue_tempfail(e->id, error); } + free(e->sender); + free(e->dest); + free(e->rcpt); + free(e->user); + free(e->buffer); free(e); + stat_decrement("mda.envelope", 1); } TAILQ_REMOVE(&users, user, entry); free(user); + stat_decrement("mda.user", 1); } static void mda_drain(void) { struct mda_session *s; - struct mda_user *user; + struct mda_user *u; - while ((user = (TAILQ_FIRST(&runnable)))) { + while ((u = (TAILQ_FIRST(&runnable)))) { + TAILQ_REMOVE(&runnable, u, entry_runnable); + + if (u->evpcount == 0 && u->running == 0) { + log_debug("debug: mda: all done for user \"%s\"", + u->name); + TAILQ_REMOVE(&users, u, entry); + free(u); + stat_decrement("mda.user", 1); + continue; + } + + if (u->evpcount == 0) { + log_debug("debug: mda: no more envelope for \"%s\"", + u->name); + u->flags &= ~FLAG_USER_RUNNABLE; + continue; + } + + if (u->running >= MDA_MAXSESSUSER) { + log_debug("debug: mda: " + "maximum number of session reached for user \"%s\"", + u->name); + u->flags &= ~FLAG_USER_RUNNABLE; + continue; + } if (running >= MDA_MAXSESS) { log_debug("debug: mda: " "maximum number of session reached"); + TAILQ_INSERT_HEAD(&runnable, u, entry_runnable); return; } - log_debug("debug: mda: new session for user \"%s\"", - user->name); - s = xcalloc(1, sizeof *s, "mda_drain"); - s->user = user; - s->evp = TAILQ_FIRST(&user->envelopes); - TAILQ_REMOVE(&user->envelopes, s->evp, entry); + s->user = u; + s->evp = TAILQ_FIRST(&u->envelopes); + TAILQ_REMOVE(&u->envelopes, s->evp, entry); s->id = generate_uid(); if (iobuf_init(&s->iobuf, 0, 0) == -1) fatal("mda_drain"); s->io.sock = -1; tree_xset(&sessions, s->id, s); + log_debug("debug: mda: new session %016" PRIx64 + " for user \"%s\" evpid %016" PRIx64, s->id, u->name, + s->evp->id); + m_create(p_queue, IMSG_QUEUE_MESSAGE_FD, 0, 0, -1, 18); m_add_id(p_queue, s->id); m_add_msgid(p_queue, evpid_to_msgid(s->evp->id)); m_close(p_queue); - stat_decrement("mda.pending", 1); - - user->evpcount--; evpcount--; + u->evpcount--; + stat_decrement("mda.pending", 1); - stat_increment("mda.running", 1); - - user->running++; running++; + u->running++; + stat_increment("mda.running", 1); - /* - * The user is still runnable if there are pending envelopes - * and the session limit is not reached. We put it at the tail - * so that everyone gets a fair share. - */ - TAILQ_REMOVE(&runnable, user, entry_runnable); - user->runnable = 0; - if (TAILQ_FIRST(&user->envelopes) && - user->running < MDA_MAXSESSUSER) { - TAILQ_INSERT_TAIL(&runnable, user, entry_runnable); - user->runnable = 1; - log_debug("debug: mda: user \"%s\" still runnable", - user->name); - } + /* Re-add the user at the tail of the queue */ + TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); } } static void mda_done(struct mda_session *s) { + struct mda_user *u; + tree_xpop(&sessions, s->id); - running--; - s->user->running--; + log_debug("debug: mda: session %016" PRIx64 " done", s->id); - stat_decrement("mda.running", 1); + u = s->user; - if (TAILQ_FIRST(&s->user->envelopes) == NULL && s->user->running == 0) { - log_debug("debug: mda: " - "all done for user \"%s\"", s->user->name); - TAILQ_REMOVE(&users, s->user, entry); - free(s->user); - } else if (s->user->runnable == 0 && - TAILQ_FIRST(&s->user->envelopes) && - s->user->running < MDA_MAXSESSUSER) { - log_debug("debug: mda: user \"%s\" becomes runnable", - s->user->name); - TAILQ_INSERT_TAIL(&runnable, s->user, entry_runnable); - s->user->runnable = 1; - } + running--; + u->running--; + stat_decrement("mda.running", 1); if (s->datafp) fclose(s->datafp); io_clear(&s->io); iobuf_clear(&s->iobuf); + + free(s->evp->sender); + free(s->evp->dest); + free(s->evp->rcpt); + free(s->evp->user); + free(s->evp->buffer); free(s->evp); free(s); + stat_decrement("mda.envelope", 1); + + if (!(u->flags & FLAG_USER_RUNNABLE)) { + log_debug("debug: mda: user \"%s\" becomes runnable", u->name); + TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); + u->flags |= FLAG_USER_RUNNABLE; + } mda_drain(); } static void -mda_log(const struct envelope *evp, const char *prefix, const char *status) +mda_log(const struct mda_envelope *evp, const char *prefix, const char *status) { char rcpt[MAX_LINE_SIZE]; - char sender[MAX_LINE_SIZE]; const char *method; rcpt[0] = '\0'; - if (strcmp(evp->rcpt.user, evp->dest.user) || - strcmp(evp->rcpt.domain, evp->dest.domain)) - snprintf(rcpt, sizeof rcpt, "rcpt=<%s@%s>, ", - evp->rcpt.user, evp->rcpt.domain); - - sender[0] = '\0'; - if (evp->sender.user[0] || evp->sender.domain[0]) - snprintf(sender, sizeof(sender), "%s@%s", - evp->sender.user, evp->sender.domain); + if (evp->rcpt) + snprintf(rcpt, sizeof rcpt, "rcpt=<%s>, ", evp->rcpt); - if (evp->agent.mda.method == A_MAILDIR) + if (evp->method == A_MAILDIR) method = "maildir"; - else if (evp->agent.mda.method == A_MBOX) + else if (evp->method == A_MBOX) method = "mbox"; - else if (evp->agent.mda.method == A_FILENAME) + else if (evp->method == A_FILENAME) method = "file"; - else if (evp->agent.mda.method == A_MDA) + else if (evp->method == A_MDA) method = "mda"; else method = "???"; - log_info("delivery: %s for %016" PRIx64 ": from=<%s>, to=<%s@%s>, " + log_info("delivery: %s for %016" PRIx64 ": from=<%s>, to=<%s>, " "%suser=%s, method=%s, delay=%s, stat=%s", prefix, evp->id, - sender, - evp->dest.user, evp->dest.domain, + evp->sender ? evp->sender : "", + evp->dest, rcpt, - evp->agent.mda.username, method, + evp->user, + method, duration_to_text(time(NULL) - evp->creation), status); } |