diff options
author | Eric Faurot <eric@cvs.openbsd.org> | 2013-12-05 09:26:48 +0000 |
---|---|---|
committer | Eric Faurot <eric@cvs.openbsd.org> | 2013-12-05 09:26:48 +0000 |
commit | ae6b2d92af1e0d9037d876f8d8ff4b0365fb8f97 (patch) | |
tree | 0b9f5ec12b9747b1abe8bbf70ffbf3ea45074b14 | |
parent | 707a5179d1c6c2ad1beff3d9e93fc8c42da1f3c0 (diff) |
When a relay fails, let the scheduler update all envelopes in the
holdq as if they tempfailed.
-rw-r--r-- | usr.sbin/smtpd/mta.c | 4 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 24 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 41 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd-api.h | 9 |
4 files changed, 68 insertions, 10 deletions
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index 4c7861b05f0..9f9c5e6d2d7 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.175 2013/12/03 09:06:26 eric Exp $ */ +/* $OpenBSD: mta.c,v 1.176 2013/12/05 09:26:47 eric Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -1354,7 +1354,7 @@ mta_flush(struct mta_relay *relay, int fail, const char *error) if (relay->state & RELAY_HOLDQ) { m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1); m_add_id(p_queue, relay->id); - m_add_int(p_queue, 0); + m_add_int(p_queue, -1); m_close(p_queue); } } diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c index cdff9970fec..1a5a5a6740d 100644 --- a/usr.sbin/smtpd/scheduler.c +++ b/usr.sbin/smtpd/scheduler.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler.c,v 1.38 2013/11/30 10:11:57 eric Exp $ */ +/* $OpenBSD: scheduler.c,v 1.39 2013/12/05 09:26:47 eric Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@poolp.org> @@ -51,6 +51,7 @@ static void scheduler_reset_events(void); static void scheduler_timeout(int, short, void *); static void scheduler_process_remove(struct scheduler_batch *); static void scheduler_process_expire(struct scheduler_batch *); +static void scheduler_process_update(struct scheduler_batch *); static void scheduler_process_bounce(struct scheduler_batch *); static void scheduler_process_mda(struct scheduler_batch *); static void scheduler_process_mta(struct scheduler_batch *); @@ -468,7 +469,7 @@ scheduler_timeout(int fd, short event, void *p) tv.tv_sec = 0; tv.tv_usec = 0; - typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE; + typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_UPDATE | SCHED_BOUNCE; if (ninflight < env->sc_scheduler_max_inflight && !(env->sc_flags & SMTPD_MDA_PAUSED)) typemask |= SCHED_MDA; @@ -504,6 +505,12 @@ scheduler_timeout(int fd, short event, void *p) scheduler_process_expire(&batch); break; + case SCHED_UPDATE: + log_trace(TRACE_SCHEDULER, "scheduler: SCHED_UPDATE %zu", + batch.evpcount); + scheduler_process_update(&batch); + break; + case SCHED_BOUNCE: log_trace(TRACE_SCHEDULER, "scheduler: SCHED_BOUNCE %zu", batch.evpcount); @@ -563,6 +570,19 @@ scheduler_process_expire(struct scheduler_batch *batch) } static void +scheduler_process_update(struct scheduler_batch *batch) +{ + size_t i; + + for (i = 0; i < batch->evpcount; i++) { + log_debug("debug: scheduler: evp:%016" PRIx64 + " scheduled (update)", batch->evpids[i]); + } + + stat_increment("scheduler.envelope.update", batch->evpcount); +} + +static void scheduler_process_bounce(struct scheduler_batch *batch) { size_t i; diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index 2578606810e..7703c0fea66 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.34 2013/12/03 10:38:40 eric Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.35 2013/12/05 09:26:47 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> @@ -60,8 +60,10 @@ struct rq_envelope { #define RQ_ENVELOPE_EXPIRED 0x01 #define RQ_ENVELOPE_REMOVED 0x02 #define RQ_ENVELOPE_SUSPEND 0x04 +#define RQ_ENVELOPE_UPDATE 0x08 uint8_t flags; + time_t ctime; time_t sched; time_t expire; @@ -86,6 +88,7 @@ struct rq_queue { struct evplist q_mta; struct evplist q_mda; struct evplist q_bounce; + struct evplist q_update; struct evplist q_expired; struct evplist q_removed; }; @@ -197,6 +200,7 @@ scheduler_ram_insert(struct scheduler_info *si) envelope->evpid = si->evpid; envelope->type = si->type; envelope->message = message; + envelope->ctime = si->creation; envelope->expire = si->creation + si->expire; envelope->sched = scheduler_compute_schedule(si); tree_xset(&message->envelopes, envelope->evpid, envelope); @@ -381,7 +385,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n) { struct rq_holdq *hq; struct rq_envelope *evp; - int i; + int i, update; currtime = time(NULL); @@ -389,6 +393,13 @@ scheduler_ram_release(int type, uint64_t holdq, int n) if (hq == NULL) return (0); + if (n == -1) { + n = 0; + update = 1; + } + else + update = 0; + for (i = 0; n == 0 || i < n; i++) { evp = TAILQ_FIRST(&hq->q); if (evp == NULL) @@ -402,6 +413,8 @@ scheduler_ram_release(int type, uint64_t holdq, int n) * we could just schedule them directly. */ evp->state = RQ_EVPSTATE_PENDING; + if (update) + evp->flags |= RQ_ENVELOPE_UPDATE; sorted_insert(&ramqueue, evp); } @@ -420,6 +433,7 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) struct evplist *q; struct rq_envelope *evp; size_t n; + int retry; currtime = time(NULL); @@ -435,6 +449,10 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) q = &ramqueue.q_expired; ret->type = SCHED_EXPIRE; } + else if (typemask & SCHED_UPDATE && TAILQ_FIRST(&ramqueue.q_update)) { + q = &ramqueue.q_update; + ret->type = SCHED_UPDATE; + } else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) { q = &ramqueue.q_bounce; ret->type = SCHED_BOUNCE; @@ -474,6 +492,19 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE) rq_envelope_delete(&ramqueue, evp); + else if (ret->type == SCHED_UPDATE) { + + evp->flags &= ~RQ_ENVELOPE_UPDATE; + + /* XXX we can't really use scheduler_compute_schedule */ + retry = 0; + while ((evp->sched = evp->ctime + 800 * retry * retry / 2) <= currtime) + retry += 1; + + evp->state = RQ_EVPSTATE_PENDING; + if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) + sorted_insert(&ramqueue, evp); + } else { TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); evp->state = RQ_EVPSTATE_INFLIGHT; @@ -717,6 +748,7 @@ rq_queue_init(struct rq_queue *rq) TAILQ_INIT(&rq->q_mta); TAILQ_INIT(&rq->q_mda); TAILQ_INIT(&rq->q_bounce); + TAILQ_INIT(&rq->q_update); TAILQ_INIT(&rq->q_expired); TAILQ_INIT(&rq->q_removed); SPLAY_INIT(&rq->q_priotree); @@ -793,6 +825,8 @@ rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) return &rq->q_expired; if (evp->flags & RQ_ENVELOPE_REMOVED) return &rq->q_removed; + if (evp->flags & RQ_ENVELOPE_UPDATE) + return &rq->q_update; if (evp->type == D_MTA) return &rq->q_mta; if (evp->type == D_MDA) @@ -830,6 +864,9 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) break; } + if (evp->flags & RQ_ENVELOPE_UPDATE) + q = &rq->q_update; + if (evp->state == RQ_EVPSTATE_HELD) { hq = tree_xget(&holdqs[evp->type], evp->holdq); TAILQ_REMOVE(&hq->q, evp, entry); diff --git a/usr.sbin/smtpd/smtpd-api.h b/usr.sbin/smtpd/smtpd-api.h index 9bb13dcfdc0..d2bc18b1ae1 100644 --- a/usr.sbin/smtpd/smtpd-api.h +++ b/usr.sbin/smtpd/smtpd-api.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd-api.h,v 1.12 2013/11/20 09:22:42 eric Exp $ */ +/* $OpenBSD: smtpd-api.h,v 1.13 2013/12/05 09:26:47 eric Exp $ */ /* * Copyright (c) 2013 Eric Faurot <eric@openbsd.org> @@ -171,9 +171,10 @@ struct scheduler_info { #define SCHED_DELAY 0x01 #define SCHED_REMOVE 0x02 #define SCHED_EXPIRE 0x04 -#define SCHED_BOUNCE 0x08 -#define SCHED_MDA 0x10 -#define SCHED_MTA 0x20 +#define SCHED_UPDATE 0x08 +#define SCHED_BOUNCE 0x10 +#define SCHED_MDA 0x20 +#define SCHED_MTA 0x40 struct scheduler_batch { int type; |