diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2013-11-20 09:22:43 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2013-11-20 09:22:43 +0000 |
commit | 8b99e878a37f7c221d3846f92a0392a8c1eccf7c (patch) | |
tree | 2b25c614e2116b49af93c48ed781f95ec7f027b0 /usr.sbin | |
parent | af572325dc2535179e84c6bd340fe689e35ce89f (diff) |
Rework the mda and scheduler to use the holdq mechanism instead of
tempfail for limiting the number of pending deliveries to the same
user. This allows to reach optimal delivery time even in case of
burst, while keeping the number of inflight envelopes low.
Diffstat (limited to 'usr.sbin')
-rw-r--r-- | usr.sbin/smtpd/lka.c | 6 | ||||
-rw-r--r-- | usr.sbin/smtpd/mda.c | 385 | ||||
-rw-r--r-- | usr.sbin/smtpd/parse.y | 38 | ||||
-rw-r--r-- | usr.sbin/smtpd/queue.c | 20 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 10 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_api.c | 11 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_null.c | 6 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_proc.c | 6 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 32 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd-api.h | 4 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 11 |
11 files changed, 319 insertions, 210 deletions
diff --git a/usr.sbin/smtpd/lka.c b/usr.sbin/smtpd/lka.c index 471220960a3..a4b9b953fb1 100644 --- a/usr.sbin/smtpd/lka.c +++ b/usr.sbin/smtpd/lka.c @@ -1,4 +1,4 @@ -/* $OpenBSD: lka.c,v 1.160 2013/11/18 12:24:26 eric Exp $ */ +/* $OpenBSD: lka.c,v 1.161 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -228,6 +228,7 @@ lka_imsg(struct mproc *p, struct imsg *imsg) switch (imsg->hdr.type) { case IMSG_LKA_USERINFO: m_msg(&m, imsg); + m_get_id(&m, &reqid); m_get_string(&m, &tablename); m_get_string(&m, &username); m_end(&m); @@ -235,8 +236,7 @@ lka_imsg(struct mproc *p, struct imsg *imsg) ret = lka_userinfo(tablename, username, &userinfo); m_create(p, IMSG_LKA_USERINFO, 0, 0, -1); - m_add_string(p, tablename); - m_add_string(p, username); + m_add_id(p, reqid); m_add_int(p, ret); if (ret == LKA_OK) m_add_data(p, &userinfo, sizeof(userinfo)); diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c index f7a7b495946..cdd4692e3c7 100644 --- a/usr.sbin/smtpd/mda.c +++ b/usr.sbin/smtpd/mda.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mda.c,v 1.97 2013/10/28 09:14:58 eric Exp $ */ +/* $OpenBSD: mda.c,v 1.98 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -44,11 +44,6 @@ #define MDA_HIWAT 65536 -#define MDA_MAXEVP 200000 -#define MDA_MAXEVPUSER 15000 -#define MDA_MAXSESS 50 -#define MDA_MAXSESSUSER 7 - struct mda_envelope { TAILQ_ENTRY(mda_envelope) entry; uint64_t id; @@ -61,10 +56,13 @@ struct mda_envelope { char *buffer; }; -#define FLAG_USER_WAITINFO 0x01 -#define FLAG_USER_RUNNABLE 0x02 +#define USER_WAITINFO 0x01 +#define USER_RUNNABLE 0x02 +#define USER_ONHOLD 0x04 +#define USER_HOLDQ 0x08 struct mda_user { + uint64_t id; TAILQ_ENTRY(mda_user) entry; TAILQ_ENTRY(mda_user) entry_runnable; char name[SMTPD_MAXLOGNAME]; @@ -95,13 +93,17 @@ static void mda_done(struct mda_session *); static void mda_fail(struct mda_user *, int, const char *); static void mda_drain(void); static void mda_log(const struct mda_envelope *, const char *, const char *); +static struct mda_user *mda_user(const struct envelope *); +static void mda_user_free(struct mda_user *); +static const char *mda_user_to_text(const struct mda_user *); +static struct mda_envelope *mda_envelope(const struct envelope *); +static void mda_envelope_free(struct mda_envelope *); +static struct mda_session * mda_session(struct mda_user *); -size_t evpcount; static struct tree sessions; +static struct tree users; -static TAILQ_HEAD(, mda_user) users; static TAILQ_HEAD(, mda_user) runnable; -size_t running; static void mda_imsg(struct mproc *p, struct imsg *imsg) @@ -115,7 +117,6 @@ mda_imsg(struct mproc *p, struct imsg *imsg) struct msg m; const void *data; const char *error, *parent_error; - const char *username, *usertable; uint64_t reqid; size_t sz; char out[256], buf[SMTPD_MAXLINESIZE]; @@ -126,19 +127,13 @@ mda_imsg(struct mproc *p, struct imsg *imsg) switch (imsg->hdr.type) { case IMSG_LKA_USERINFO: m_msg(&m, imsg); - m_get_string(&m, &usertable); - m_get_string(&m, &username); + m_get_id(&m, &reqid); m_get_int(&m, (int *)&status); if (status == LKA_OK) m_get_data(&m, &data, &sz); m_end(&m); - TAILQ_FOREACH(u, &users, entry) - if (!strcmp(username, u->name) && - !strcmp(usertable, u->usertable)) - break; - if (u == NULL) - return; + u = tree_xget(&users, reqid); if (status == LKA_TEMPFAIL) mda_fail(u, 0, @@ -148,8 +143,8 @@ mda_imsg(struct mproc *p, struct imsg *imsg) "Permanent failure in user lookup"); else { memmove(&u->userinfo, data, sz); - u->flags &= ~FLAG_USER_WAITINFO; - u->flags |= FLAG_USER_RUNNABLE; + u->flags &= ~USER_WAITINFO; + u->flags |= USER_RUNNABLE; TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); mda_drain(); } @@ -165,91 +160,36 @@ mda_imsg(struct mproc *p, struct imsg *imsg) m_get_envelope(&m, &evp); m_end(&m); - e = xcalloc(1, sizeof *e, "mda_envelope"); - e->id = evp.id; - e->creation = evp.creation; - buf[0] = '\0'; - if (evp.sender.user[0] && evp.sender.domain[0]) - snprintf(buf, sizeof buf, "%s@%s", - evp.sender.user, evp.sender.domain); - e->sender = xstrdup(buf, "mda_envelope:sender"); - snprintf(buf, sizeof buf, "%s@%s", - evp.dest.user, evp.dest.domain); - e->dest = xstrdup(buf, "mda_envelope:dest"); - snprintf(buf, sizeof buf, "%s@%s", - evp.rcpt.user, evp.rcpt.domain); - if (strcmp(buf, e->dest)) - e->rcpt = xstrdup(buf, "mda_envelope:rcpt"); - e->method = evp.agent.mda.method; - e->buffer = xstrdup(evp.agent.mda.buffer, - "mda_envelope:buffer"); - e->user = xstrdup(evp.agent.mda.username, - "mda_envelope:user"); - - if (evpcount >= MDA_MAXEVP) { - log_debug("debug: mda: too many envelopes"); - queue_tempfail(e->id, 0, - "Too many envelopes in the delivery agent: " - "will try again later"); - mda_log(e, "TempFail", - "Too many envelopes in the delivery agent: " - "will try again later"); - free(e->sender); - free(e->dest); - free(e->rcpt); - free(e->user); - free(e->buffer); - free(e); - return; + u = mda_user(&evp); + + if (u->evpcount >= env->sc_mda_task_hiwat) { + if (!(u->flags & USER_ONHOLD)) { + log_debug("debug: mda: hiwat reached for " + "user \"%s\": holding envelopes", + mda_user_to_text(u)); + u->flags |= USER_ONHOLD; + } } - username = evp.agent.mda.username; - usertable = evp.agent.mda.usertable; - TAILQ_FOREACH(u, &users, entry) - if (!strcmp(username, u->name) && - !strcmp(usertable, u->usertable)) - break; - - if (u == NULL) { - u = xcalloc(1, sizeof *u, "mda_user"); - TAILQ_INSERT_TAIL(&users, u, entry); - TAILQ_INIT(&u->envelopes); - strlcpy(u->name, username, sizeof u->name); - strlcpy(u->usertable, usertable, sizeof u->usertable); - u->flags |= FLAG_USER_WAITINFO; - m_create(p_lka, IMSG_LKA_USERINFO, 0, 0, -1); - m_add_string(p_lka, usertable); - m_add_string(p_lka, username); - m_close(p_lka); - stat_increment("mda.user", 1); - } else if (u->evpcount >= MDA_MAXEVPUSER) { - log_debug("debug: mda: too many envelopes for " - "\"%s\"", u->name); - queue_tempfail(e->id, 0, - "Too many envelopes for this user in the " - "delivery agent: will try again later"); - mda_log(e, "TempFail", - "Too many envelopes for this user in the " - "delivery agent: will try again later"); - free(e->sender); - free(e->dest); - free(e->rcpt); - free(e->user); - free(e->buffer); - free(e); + if (u->flags & USER_ONHOLD) { + u->flags |= USER_HOLDQ; + m_create(p_queue, IMSG_DELIVERY_HOLD, 0, 0, -1); + m_add_evpid(p_queue, evp.id); + m_add_id(p_queue, u->id); + m_close(p_queue); return; - } else if (!(u->flags & FLAG_USER_RUNNABLE) && - !(u->flags & FLAG_USER_WAITINFO)) { - u->flags |= FLAG_USER_RUNNABLE; - TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); } - stat_increment("mda.envelope", 1); + e = mda_envelope(&evp); + TAILQ_INSERT_TAIL(&u->envelopes, e, entry); + u->evpcount += 1; stat_increment("mda.pending", 1); - evpcount += 1; - u->evpcount += 1; - TAILQ_INSERT_TAIL(&u->envelopes, e, entry); + if (!(u->flags & USER_RUNNABLE) && + !(u->flags & USER_WAITINFO)) { + u->flags |= USER_RUNNABLE; + TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); + } mda_drain(); return; @@ -276,10 +216,10 @@ mda_imsg(struct mproc *p, struct imsg *imsg) if ((s->datafp = fdopen(imsg->fd, "r")) == NULL) { log_warn("warn: mda: fdopen"); + close(imsg->fd); queue_tempfail(e->id, 0, "fdopen failed"); mda_log(e, "TempFail", "fdopen failed"); mda_done(s); - close(imsg->fd); return; } @@ -525,10 +465,8 @@ mda(void) fatal("mda: cannot drop privileges"); tree_init(&sessions); - TAILQ_INIT(&users); + tree_init(&users); TAILQ_INIT(&runnable); - evpcount = 0; - running = 0; imsg_callback = mda_imsg; event_init(); @@ -738,88 +676,67 @@ mda_fail(struct mda_user *user, int permfail, const char *error) mda_log(e, "TempFail", error); queue_tempfail(e->id, 0, error); } - free(e->sender); - free(e->dest); - free(e->rcpt); - free(e->user); - free(e->buffer); - free(e); - stat_decrement("mda.envelope", 1); + mda_envelope_free(e); } - TAILQ_REMOVE(&users, user, entry); - free(user); - stat_decrement("mda.user", 1); + mda_user_free(user); } static void mda_drain(void) { - struct mda_session *s; struct mda_user *u; while ((u = (TAILQ_FIRST(&runnable)))) { + TAILQ_REMOVE(&runnable, u, entry_runnable); if (u->evpcount == 0 && u->running == 0) { log_debug("debug: mda: all done for user \"%s\"", - u->name); - TAILQ_REMOVE(&users, u, entry); - free(u); - stat_decrement("mda.user", 1); + mda_user_to_text(u)); + mda_user_free(u); continue; } if (u->evpcount == 0) { log_debug("debug: mda: no more envelope for \"%s\"", - u->name); - u->flags &= ~FLAG_USER_RUNNABLE; + mda_user_to_text(u)); + u->flags &= ~USER_RUNNABLE; continue; } - if (u->running >= MDA_MAXSESSUSER) { + if (u->running >= env->sc_mda_max_user_session) { log_debug("debug: mda: " "maximum number of session reached for user \"%s\"", - u->name); - u->flags &= ~FLAG_USER_RUNNABLE; + mda_user_to_text(u)); + u->flags &= ~USER_RUNNABLE; continue; } - if (running >= MDA_MAXSESS) { + if (tree_count(&sessions) >= env->sc_mda_max_session) { log_debug("debug: mda: " "maximum number of session reached"); TAILQ_INSERT_HEAD(&runnable, u, entry_runnable); return; } - s = xcalloc(1, sizeof *s, "mda_drain"); - s->user = u; - s->evp = TAILQ_FIRST(&u->envelopes); - TAILQ_REMOVE(&u->envelopes, s->evp, entry); - s->id = generate_uid(); - if (iobuf_init(&s->iobuf, 0, 0) == -1) - fatal("mda_drain"); - s->io.sock = -1; - tree_xset(&sessions, s->id, s); - - log_debug("debug: mda: new session %016" PRIx64 - " for user \"%s\" evpid %016" PRIx64, s->id, u->name, - s->evp->id); - - m_create(p_queue, IMSG_QUEUE_MESSAGE_FD, 0, 0, -1); - m_add_id(p_queue, s->id); - m_add_msgid(p_queue, evpid_to_msgid(s->evp->id)); - m_close(p_queue); + mda_session(u); - evpcount--; - u->evpcount--; - stat_decrement("mda.pending", 1); - - running++; - u->running++; - stat_increment("mda.running", 1); + if (u->evpcount == env->sc_mda_task_lowat) { + if (u->flags & USER_ONHOLD) { + log_debug("debug: mda: down to lowat for user \"%s\": releasing", + mda_user_to_text(u)); + u->flags &= ~USER_ONHOLD; + } + if (u->flags & USER_HOLDQ) { + m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); + m_add_id(p_queue, u->id); + m_add_int(p_queue, env->sc_mda_task_release); + m_close(p_queue); + } + } - /* Re-add the user at the tail of the queue */ + /* re-add the user at the tail of the queue */ TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); } } @@ -827,37 +744,28 @@ mda_drain(void) static void mda_done(struct mda_session *s) { - struct mda_user *u; + log_debug("debug: mda: session %016" PRIx64 " done", s->id); tree_xpop(&sessions, s->id); - log_debug("debug: mda: session %016" PRIx64 " done", s->id); - - u = s->user; + mda_envelope_free(s->evp); - running--; - u->running--; - stat_decrement("mda.running", 1); + s->user->running--; + if (!(s->user->flags & USER_RUNNABLE)) { + log_debug("debug: mda: user \"%s\" becomes runnable", + s->user->name); + TAILQ_INSERT_TAIL(&runnable, s->user, entry_runnable); + s->user->flags |= USER_RUNNABLE; + } if (s->datafp) fclose(s->datafp); io_clear(&s->io); iobuf_clear(&s->iobuf); - free(s->evp->sender); - free(s->evp->dest); - free(s->evp->rcpt); - free(s->evp->user); - free(s->evp->buffer); - free(s->evp); free(s); - stat_decrement("mda.envelope", 1); - if (!(u->flags & FLAG_USER_RUNNABLE)) { - log_debug("debug: mda: user \"%s\" becomes runnable", u->name); - TAILQ_INSERT_TAIL(&runnable, u, entry_runnable); - u->flags |= FLAG_USER_RUNNABLE; - } + stat_decrement("mda.running", 1); mda_drain(); } @@ -897,3 +805,140 @@ mda_log(const struct mda_envelope *evp, const char *prefix, const char *status) duration_to_text(time(NULL) - evp->creation), status); } + +static struct mda_user * +mda_user(const struct envelope *evp) +{ + struct mda_user *u; + void *i; + + i = NULL; + while (tree_iter(&users, &i, NULL, (void**)(&u))) { + if (!strcmp(evp->agent.mda.username, u->name) && + !strcmp(evp->agent.mda.usertable, u->usertable)) + return (u); + } + + u = xcalloc(1, sizeof *u, "mda_user"); + u->id = generate_uid(); + TAILQ_INIT(&u->envelopes); + strlcpy(u->name, evp->agent.mda.username, sizeof(u->name)); + strlcpy(u->usertable, evp->agent.mda.usertable, + sizeof(u->usertable)); + + tree_xset(&users, u->id, u); + + m_create(p_lka, IMSG_LKA_USERINFO, 0, 0, -1); + m_add_id(p_lka, u->id); + m_add_string(p_lka, evp->agent.mda.usertable); + m_add_string(p_lka, evp->agent.mda.username); + m_close(p_lka); + u->flags |= USER_WAITINFO; + + stat_increment("mda.user", 1); + + log_debug("mda: new user %llx for \"%s\"", u->id, mda_user_to_text(u)); + + return (u); +} + +static void +mda_user_free(struct mda_user *u) +{ + tree_xpop(&users, u->id); + + if (u->flags & USER_HOLDQ) { + m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); + m_add_id(p_queue, u->id); + m_add_int(p_queue, 0); + m_close(p_queue); + } + + free(u); + stat_decrement("mda.user", 1); +} + +static const char * +mda_user_to_text(const struct mda_user *u) +{ + static char buf[1024]; + + snprintf(buf, sizeof(buf), "%s:%s", u->usertable, u->name); + + return (buf); +} + +static struct mda_envelope * +mda_envelope(const struct envelope *evp) +{ + struct mda_envelope *e; + char buf[SMTPD_MAXLINESIZE]; + + e = xcalloc(1, sizeof *e, "mda_envelope"); + e->id = evp->id; + e->creation = evp->creation; + buf[0] = '\0'; + if (evp->sender.user[0] && evp->sender.domain[0]) + snprintf(buf, sizeof buf, "%s@%s", + evp->sender.user, evp->sender.domain); + e->sender = xstrdup(buf, "mda_envelope:sender"); + snprintf(buf, sizeof buf, "%s@%s", evp->dest.user, evp->dest.domain); + e->dest = xstrdup(buf, "mda_envelope:dest"); + snprintf(buf, sizeof buf, "%s@%s", evp->rcpt.user, evp->rcpt.domain); + if (strcmp(buf, e->dest)) + e->rcpt = xstrdup(buf, "mda_envelope:rcpt"); + e->method = evp->agent.mda.method; + e->buffer = xstrdup(evp->agent.mda.buffer, "mda_envelope:buffer"); + e->user = xstrdup(evp->agent.mda.username, "mda_envelope:user"); + + stat_increment("mda.envelope", 1); + + return (e); +} + +static void +mda_envelope_free(struct mda_envelope *e) +{ + free(e->sender); + free(e->dest); + free(e->rcpt); + free(e->user); + free(e->buffer); + free(e); + + stat_decrement("mda.envelope", 1); +} + +static struct mda_session * +mda_session(struct mda_user * u) +{ + struct mda_session *s; + + s = xcalloc(1, sizeof *s, "mda_session"); + s->id = generate_uid(); + s->user = u; + s->io.sock = -1; + if (iobuf_init(&s->iobuf, 0, 0) == -1) + fatal("mda_session"); + + tree_xset(&sessions, s->id, s); + + s->evp = TAILQ_FIRST(&u->envelopes); + TAILQ_REMOVE(&u->envelopes, s->evp, entry); + u->evpcount--; + u->running++; + + stat_decrement("mda.pending", 1); + stat_increment("mda.running", 1); + + log_debug("debug: mda: new session %016" PRIx64 + " for user \"%s\" evpid %016" PRIx64, s->id, + mda_user_to_text(u), s->evp->id); + + m_create(p_queue, IMSG_QUEUE_MESSAGE_FD, 0, 0, -1); + m_add_id(p_queue, s->id); + m_add_msgid(p_queue, evpid_to_msgid(s->evp->id)); + m_close(p_queue); + + return (s); +} diff --git a/usr.sbin/smtpd/parse.y b/usr.sbin/smtpd/parse.y index 722f92aeef2..52f47b5c3f9 100644 --- a/usr.sbin/smtpd/parse.y +++ b/usr.sbin/smtpd/parse.y @@ -1,4 +1,4 @@ -/* $OpenBSD: parse.y,v 1.128 2013/11/19 10:01:20 eric Exp $ */ +/* $OpenBSD: parse.y,v 1.129 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -256,6 +256,35 @@ bouncedelays : bouncedelays ',' bouncedelay | /* EMPTY */ ; +opt_limit_mda : STRING NUMBER { + if (!strcmp($1, "max-session")) { + conf->sc_mda_max_session = $2; + } + else if (!strcmp($1, "max-session-per-user")) { + conf->sc_mda_max_user_session = $2; + } + else if (!strcmp($1, "task-lowat")) { + conf->sc_mda_task_lowat = $2; + } + else if (!strcmp($1, "task-hiwat")) { + conf->sc_mda_task_hiwat = $2; + } + else if (!strcmp($1, "task-release")) { + conf->sc_mda_task_release = $2; + } + else { + yyerror("invalid scheduler limit keyword: %s", $1); + free($1); + YYERROR; + } + free($1); + } + ; + +limits_mda : opt_limit_mda limits_mda + | /* empty */ + ; + opt_limit_mta : INET4 { limits->family = AF_INET; } @@ -548,6 +577,7 @@ main : BOUNCEWARN { | MAXMESSAGESIZE size { conf->sc_maxsize = $2; } + | LIMIT MDA limits_mda | LIMIT MTA FOR DOMAIN STRING { struct mta_limits *d; @@ -1527,6 +1557,12 @@ parse_config(struct smtpd *x_conf, const char *filename, int opts) conf->sc_scheduler_max_inflight = 5000; + conf->sc_mda_max_session = 50; + conf->sc_mda_max_user_session = 7; + conf->sc_mda_task_hiwat = 50; + conf->sc_mda_task_lowat = 30; + conf->sc_mda_task_release = 10; + if ((file = pushfile(filename, 0)) == NULL) { purge_config(PURGE_EVERYTHING); return (-1); diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index bba07281584..31a8027c55a 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.155 2013/10/27 17:47:53 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.156 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -65,7 +65,7 @@ queue_imsg(struct mproc *p, struct imsg *imsg) struct envelope evp; struct msg m; const char *reason; - uint64_t reqid, evpid; + uint64_t reqid, evpid, holdq; uint32_t msgid; uint32_t penalty; time_t nexttry; @@ -415,10 +415,24 @@ queue_imsg(struct mproc *p, struct imsg *imsg) return; case IMSG_DELIVERY_HOLD: - case IMSG_DELIVERY_RELEASE: case IMSG_MTA_SCHEDULE: m_forward(p_scheduler, imsg); return; + case IMSG_DELIVERY_RELEASE: + m_msg(&m, imsg); + m_get_id(&m, &holdq); + m_get_int(&m, &v); + m_end(&m); + + m_create(p_scheduler, IMSG_DELIVERY_RELEASE, 0, 0, -1); + if (p->proc == PROC_MTA) + m_add_int(p_scheduler, D_MTA); + else + m_add_int(p_scheduler, D_MDA); + m_add_id(p_scheduler, holdq); + m_add_int(p_scheduler, v); + m_close(p_scheduler); + return; } } diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index efe2cbf7622..feeb275995d 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler.c,v 1.36 2013/11/19 10:01:20 eric Exp $ */ +/* $OpenBSD: scheduler.c,v 1.37 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -80,7 +80,7 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg) uint32_t penalty; size_t n, i; time_t timestamp; - int v, r; + int v, r, type; switch (imsg->hdr.type) { @@ -228,12 +228,14 @@ scheduler_imsg(struct mproc *p, struct imsg *imsg) case IMSG_DELIVERY_RELEASE: m_msg(&m, imsg); + m_get_int(&m, &type); m_get_id(&m, &holdq); m_get_int(&m, &r); m_end(&m); log_trace(TRACE_SCHEDULER, - "scheduler: releasing %d on holdq %016" PRIx64, r, holdq); - backend->release(holdq, r); + "scheduler: releasing %d on holdq (%i, %016" PRIx64 ")", + r, type, holdq); + backend->release(type, holdq, r); scheduler_reset_events(); return; diff --git a/usr.sbin/smtpd/scheduler_api.c b/usr.sbin/smtpd/scheduler_api.c index de382075947..e2c22110e67 100644 --- a/usr.sbin/smtpd/scheduler_api.c +++ b/usr.sbin/smtpd/scheduler_api.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_api.c,v 1.3 2013/10/27 17:47:53 eric Exp $ */ +/* $OpenBSD: scheduler_api.c,v 1.4 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -36,7 +36,7 @@ static size_t (*handler_rollback)(uint32_t); static int (*handler_update)(struct scheduler_info *); static int (*handler_delete)(uint64_t); static int (*handler_hold)(uint64_t, uint64_t); -static int (*handler_release)(uint64_t, int); +static int (*handler_release)(int, uint64_t, int); static int (*handler_batch)(int, struct scheduler_batch *); static size_t (*handler_messages)(uint32_t, uint32_t *, size_t); static size_t (*handler_envelopes)(uint64_t, struct evpstate *, size_t); @@ -114,7 +114,7 @@ scheduler_msg_dispatch(void) uint32_t msgids[MAX_BATCH_SIZE], version, msgid; struct scheduler_info info; struct scheduler_batch batch; - int typemask, r; + int typemask, r, type; switch (imsg.hdr.type) { case PROC_SCHEDULER_INIT: @@ -198,11 +198,12 @@ scheduler_msg_dispatch(void) case PROC_SCHEDULER_RELEASE: log_debug("scheduler-api: PROC_SCHEDULER_RELEASE"); + scheduler_msg_get(&type, sizeof(type)); scheduler_msg_get(&u64, sizeof(u64)); scheduler_msg_get(&r, sizeof(r)); scheduler_msg_end(); - r = handler_release(u64, r); + r = handler_release(type, u64, r); imsg_compose(&ibuf, PROC_SCHEDULER_OK, 0, 0, -1, &r, sizeof(r)); break; @@ -385,7 +386,7 @@ scheduler_api_on_hold(int(*cb)(uint64_t, uint64_t)) } void -scheduler_api_on_release(int(*cb)(uint64_t, int)) +scheduler_api_on_release(int(*cb)(int, uint64_t, int)) { handler_release = cb; } diff --git a/usr.sbin/smtpd/scheduler_null.c b/usr.sbin/smtpd/scheduler_null.c index 8a4da95c765..157dd4c82a2 100644 --- a/usr.sbin/smtpd/scheduler_null.c +++ b/usr.sbin/smtpd/scheduler_null.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_null.c,v 1.5 2013/10/27 17:47:53 eric Exp $ */ +/* $OpenBSD: scheduler_null.c,v 1.6 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> @@ -37,7 +37,7 @@ static size_t scheduler_null_rollback(uint32_t); static int scheduler_null_update(struct scheduler_info *); static int scheduler_null_delete(uint64_t); static int scheduler_null_hold(uint64_t, uint64_t); -static int scheduler_null_release(uint64_t, int); +static int scheduler_null_release(int, uint64_t, int); static int scheduler_null_batch(int, struct scheduler_batch *); static size_t scheduler_null_messages(uint32_t, uint32_t *, size_t); static size_t scheduler_null_envelopes(uint64_t, struct evpstate *, size_t); @@ -111,7 +111,7 @@ scheduler_null_hold(uint64_t evpid, uint64_t holdq) } static int -scheduler_null_release(uint64_t holdq, int n) +scheduler_null_release(int type, uint64_t holdq, int n) { return (0); } diff --git a/usr.sbin/smtpd/scheduler_proc.c b/usr.sbin/smtpd/scheduler_proc.c index 0e927a6aca9..bf82ccdc0cc 100644 --- a/usr.sbin/smtpd/scheduler_proc.c +++ b/usr.sbin/smtpd/scheduler_proc.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_proc.c,v 1.2 2013/10/27 17:47:53 eric Exp $ */ +/* $OpenBSD: scheduler_proc.c,v 1.3 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -273,7 +273,7 @@ scheduler_proc_hold(uint64_t evpid, uint64_t holdq) } static int -scheduler_proc_release(uint64_t holdq, int n) +scheduler_proc_release(int type, uint64_t holdq, int n) { struct ibuf *buf; int r; @@ -284,6 +284,8 @@ scheduler_proc_release(uint64_t holdq, int n) sizeof(holdq) + sizeof(n)); if (buf == NULL) return (-1); + if (imsg_add(buf, &type, sizeof(type)) == -1) + return (-1); if (imsg_add(buf, &holdq, sizeof(holdq)) == -1) return (-1); if (imsg_add(buf, &n, sizeof(n)) == -1) diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index f28471700b1..adc392bc098 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.32 2013/10/27 17:47:53 eric Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.33 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> @@ -95,7 +95,7 @@ static size_t scheduler_ram_rollback(uint32_t); static int scheduler_ram_update(struct scheduler_info *); static int scheduler_ram_delete(uint64_t); static int scheduler_ram_hold(uint64_t, uint64_t); -static int scheduler_ram_release(uint64_t, int); +static int scheduler_ram_release(int, uint64_t, int); static int scheduler_ram_batch(int, struct scheduler_batch *); static size_t scheduler_ram_messages(uint32_t, uint32_t *, size_t); static size_t scheduler_ram_envelopes(uint64_t, struct evpstate *, size_t); @@ -143,7 +143,7 @@ struct scheduler_backend scheduler_backend_ramqueue = { static struct rq_queue ramqueue; static struct tree updates; -static struct tree holdqs; +static struct tree holdqs[3]; /* delivery type */ static time_t currtime; @@ -152,7 +152,9 @@ scheduler_ram_init(void) { rq_queue_init(&ramqueue); tree_init(&updates); - tree_init(&holdqs); + tree_init(&holdqs[D_MDA]); + tree_init(&holdqs[D_MTA]); + tree_init(&holdqs[D_BOUNCE]); return (1); } @@ -349,11 +351,11 @@ scheduler_ram_hold(uint64_t evpid, uint64_t holdq) return (0); } - hq = tree_get(&holdqs, holdq); + hq = tree_get(&holdqs[evp->type], holdq); if (hq == NULL) { hq = xcalloc(1, sizeof(*hq), "scheduler_hold"); TAILQ_INIT(&hq->q); - tree_xset(&holdqs, holdq, hq); + tree_xset(&holdqs[evp->type], holdq, hq); } evp->state = RQ_EVPSTATE_HELD; @@ -371,7 +373,7 @@ scheduler_ram_hold(uint64_t evpid, uint64_t holdq) } static int -scheduler_ram_release(uint64_t holdq, int n) +scheduler_ram_release(int type, uint64_t holdq, int n) { struct rq_holdq *hq; struct rq_envelope *evp; @@ -379,7 +381,7 @@ scheduler_ram_release(uint64_t holdq, int n) currtime = time(NULL); - hq = tree_get(&holdqs, holdq); + hq = tree_get(&holdqs[type], holdq); if (hq == NULL) return (0); @@ -400,7 +402,7 @@ scheduler_ram_release(uint64_t holdq, int n) } if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs, holdq); + tree_xpop(&holdqs[type], holdq); free(hq); } stat_decrement("scheduler.ramqueue.hold", i); @@ -833,10 +835,10 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) } if (evp->state == RQ_EVPSTATE_HELD) { - hq = tree_xget(&holdqs, evp->holdq); + hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs, evp->holdq); + tree_xpop(&holdqs[evp->type], evp->holdq); free(hq); } evp->holdq = 0; @@ -866,10 +868,10 @@ rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) } if (evp->state == RQ_EVPSTATE_HELD) { - hq = tree_xget(&holdqs, evp->holdq); + hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs, evp->holdq); + tree_xpop(&holdqs[evp->type], evp->holdq); free(hq); } evp->holdq = 0; @@ -896,10 +898,10 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) return (0); if (evp->state == RQ_EVPSTATE_HELD) { - hq = tree_xget(&holdqs, evp->holdq); + hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); if (TAILQ_EMPTY(&hq->q)) { - tree_xpop(&holdqs, evp->holdq); + tree_xpop(&holdqs[evp->type], evp->holdq); free(hq); } evp->holdq = 0; diff --git a/usr.sbin/smtpd/smtpd-api.h b/usr.sbin/smtpd/smtpd-api.h index 0c3224f59a6..9bb13dcfdc0 100644 --- a/usr.sbin/smtpd/smtpd-api.h +++ b/usr.sbin/smtpd/smtpd-api.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd-api.h,v 1.11 2013/11/18 11:47:16 eric Exp $ */ +/* $OpenBSD: smtpd-api.h,v 1.12 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -281,7 +281,7 @@ void scheduler_api_on_rollback(size_t(*)(uint32_t)); void scheduler_api_on_update(int(*)(struct scheduler_info *)); void scheduler_api_on_delete(int(*)(uint64_t)); void scheduler_api_on_hold(int(*)(uint64_t, uint64_t)); -void scheduler_api_on_release(int(*)(uint64_t, int)); +void scheduler_api_on_release(int(*)(int, uint64_t, int)); void scheduler_api_on_batch(int(*)(int, struct scheduler_batch *)); void scheduler_api_on_messages(size_t(*)(uint32_t, uint32_t *, size_t)); void scheduler_api_on_envelopes(size_t(*)(uint64_t, struct evpstate *, size_t)); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 38d5d1bf19a..a2278d417ae 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.435 2013/11/19 10:01:20 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.436 2013/11/20 09:22:42 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -505,7 +505,14 @@ struct smtpd { char *sc_queue_key; size_t sc_queue_evpcache_size; + size_t sc_mda_max_session; + size_t sc_mda_max_user_session; + size_t sc_mda_task_hiwat; + size_t sc_mda_task_lowat; + size_t sc_mda_task_release; + size_t sc_scheduler_max_inflight; + int sc_qexpire; #define MAX_BOUNCE_WARN 4 time_t sc_bounce_warn[MAX_BOUNCE_WARN]; @@ -798,7 +805,7 @@ struct scheduler_backend { int (*update)(struct scheduler_info *); int (*delete)(uint64_t); int (*hold)(uint64_t, uint64_t); - int (*release)(uint64_t, int); + int (*release)(int, uint64_t, int); int (*batch)(int, struct scheduler_batch *); |