summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2013-12-05 09:26:48 +0000
committerEric Faurot <eric@cvs.openbsd.org>2013-12-05 09:26:48 +0000
commitae6b2d92af1e0d9037d876f8d8ff4b0365fb8f97 (patch)
tree0b9f5ec12b9747b1abe8bbf70ffbf3ea45074b14
parent707a5179d1c6c2ad1beff3d9e93fc8c42da1f3c0 (diff)
When a relay fails, let the scheduler update all envelopes in the
holdq as if they tempfailed.
-rw-r--r--usr.sbin/smtpd/mta.c4
-rw-r--r--usr.sbin/smtpd/scheduler.c24
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c41
-rw-r--r--usr.sbin/smtpd/smtpd-api.h9
4 files changed, 68 insertions, 10 deletions
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c
index 4c7861b05f0..9f9c5e6d2d7 100644
--- a/usr.sbin/smtpd/mta.c
+++ b/usr.sbin/smtpd/mta.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta.c,v 1.175 2013/12/03 09:06:26 eric Exp $ */
+/* $OpenBSD: mta.c,v 1.176 2013/12/05 09:26:47 eric Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -1354,7 +1354,7 @@ mta_flush(struct mta_relay *relay, int fail, const char *error)
if (relay->state & RELAY_HOLDQ) {
m_create(p_queue, IMSG_DELIVERY_RELEASE, 0, 0, -1);
m_add_id(p_queue, relay->id);
- m_add_int(p_queue, 0);
+ m_add_int(p_queue, -1);
m_close(p_queue);
}
}
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
index cdff9970fec..1a5a5a6740d 100644
--- a/usr.sbin/smtpd/scheduler.c
+++ b/usr.sbin/smtpd/scheduler.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler.c,v 1.38 2013/11/30 10:11:57 eric Exp $ */
+/* $OpenBSD: scheduler.c,v 1.39 2013/12/05 09:26:47 eric Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@poolp.org>
@@ -51,6 +51,7 @@ static void scheduler_reset_events(void);
static void scheduler_timeout(int, short, void *);
static void scheduler_process_remove(struct scheduler_batch *);
static void scheduler_process_expire(struct scheduler_batch *);
+static void scheduler_process_update(struct scheduler_batch *);
static void scheduler_process_bounce(struct scheduler_batch *);
static void scheduler_process_mda(struct scheduler_batch *);
static void scheduler_process_mta(struct scheduler_batch *);
@@ -468,7 +469,7 @@ scheduler_timeout(int fd, short event, void *p)
tv.tv_sec = 0;
tv.tv_usec = 0;
- typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_BOUNCE;
+ typemask = SCHED_REMOVE | SCHED_EXPIRE | SCHED_UPDATE | SCHED_BOUNCE;
if (ninflight < env->sc_scheduler_max_inflight &&
!(env->sc_flags & SMTPD_MDA_PAUSED))
typemask |= SCHED_MDA;
@@ -504,6 +505,12 @@ scheduler_timeout(int fd, short event, void *p)
scheduler_process_expire(&batch);
break;
+ case SCHED_UPDATE:
+ log_trace(TRACE_SCHEDULER, "scheduler: SCHED_UPDATE %zu",
+ batch.evpcount);
+ scheduler_process_update(&batch);
+ break;
+
case SCHED_BOUNCE:
log_trace(TRACE_SCHEDULER, "scheduler: SCHED_BOUNCE %zu",
batch.evpcount);
@@ -563,6 +570,19 @@ scheduler_process_expire(struct scheduler_batch *batch)
}
static void
+scheduler_process_update(struct scheduler_batch *batch)
+{
+ size_t i;
+
+ for (i = 0; i < batch->evpcount; i++) {
+ log_debug("debug: scheduler: evp:%016" PRIx64
+ " scheduled (update)", batch->evpids[i]);
+ }
+
+ stat_increment("scheduler.envelope.update", batch->evpcount);
+}
+
+static void
scheduler_process_bounce(struct scheduler_batch *batch)
{
size_t i;
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index 2578606810e..7703c0fea66 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.34 2013/12/03 10:38:40 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.35 2013/12/05 09:26:47 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
@@ -60,8 +60,10 @@ struct rq_envelope {
#define RQ_ENVELOPE_EXPIRED 0x01
#define RQ_ENVELOPE_REMOVED 0x02
#define RQ_ENVELOPE_SUSPEND 0x04
+#define RQ_ENVELOPE_UPDATE 0x08
uint8_t flags;
+ time_t ctime;
time_t sched;
time_t expire;
@@ -86,6 +88,7 @@ struct rq_queue {
struct evplist q_mta;
struct evplist q_mda;
struct evplist q_bounce;
+ struct evplist q_update;
struct evplist q_expired;
struct evplist q_removed;
};
@@ -197,6 +200,7 @@ scheduler_ram_insert(struct scheduler_info *si)
envelope->evpid = si->evpid;
envelope->type = si->type;
envelope->message = message;
+ envelope->ctime = si->creation;
envelope->expire = si->creation + si->expire;
envelope->sched = scheduler_compute_schedule(si);
tree_xset(&message->envelopes, envelope->evpid, envelope);
@@ -381,7 +385,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n)
{
struct rq_holdq *hq;
struct rq_envelope *evp;
- int i;
+ int i, update;
currtime = time(NULL);
@@ -389,6 +393,13 @@ scheduler_ram_release(int type, uint64_t holdq, int n)
if (hq == NULL)
return (0);
+ if (n == -1) {
+ n = 0;
+ update = 1;
+ }
+ else
+ update = 0;
+
for (i = 0; n == 0 || i < n; i++) {
evp = TAILQ_FIRST(&hq->q);
if (evp == NULL)
@@ -402,6 +413,8 @@ scheduler_ram_release(int type, uint64_t holdq, int n)
* we could just schedule them directly.
*/
evp->state = RQ_EVPSTATE_PENDING;
+ if (update)
+ evp->flags |= RQ_ENVELOPE_UPDATE;
sorted_insert(&ramqueue, evp);
}
@@ -420,6 +433,7 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
struct evplist *q;
struct rq_envelope *evp;
size_t n;
+ int retry;
currtime = time(NULL);
@@ -435,6 +449,10 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
q = &ramqueue.q_expired;
ret->type = SCHED_EXPIRE;
}
+ else if (typemask & SCHED_UPDATE && TAILQ_FIRST(&ramqueue.q_update)) {
+ q = &ramqueue.q_update;
+ ret->type = SCHED_UPDATE;
+ }
else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) {
q = &ramqueue.q_bounce;
ret->type = SCHED_BOUNCE;
@@ -474,6 +492,19 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
rq_envelope_delete(&ramqueue, evp);
+ else if (ret->type == SCHED_UPDATE) {
+
+ evp->flags &= ~RQ_ENVELOPE_UPDATE;
+
+ /* XXX we can't really use scheduler_compute_schedule */
+ retry = 0;
+ while ((evp->sched = evp->ctime + 800 * retry * retry / 2) <= currtime)
+ retry += 1;
+
+ evp->state = RQ_EVPSTATE_PENDING;
+ if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
+ sorted_insert(&ramqueue, evp);
+ }
else {
TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
evp->state = RQ_EVPSTATE_INFLIGHT;
@@ -717,6 +748,7 @@ rq_queue_init(struct rq_queue *rq)
TAILQ_INIT(&rq->q_mta);
TAILQ_INIT(&rq->q_mda);
TAILQ_INIT(&rq->q_bounce);
+ TAILQ_INIT(&rq->q_update);
TAILQ_INIT(&rq->q_expired);
TAILQ_INIT(&rq->q_removed);
SPLAY_INIT(&rq->q_priotree);
@@ -793,6 +825,8 @@ rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
return &rq->q_expired;
if (evp->flags & RQ_ENVELOPE_REMOVED)
return &rq->q_removed;
+ if (evp->flags & RQ_ENVELOPE_UPDATE)
+ return &rq->q_update;
if (evp->type == D_MTA)
return &rq->q_mta;
if (evp->type == D_MDA)
@@ -830,6 +864,9 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
break;
}
+ if (evp->flags & RQ_ENVELOPE_UPDATE)
+ q = &rq->q_update;
+
if (evp->state == RQ_EVPSTATE_HELD) {
hq = tree_xget(&holdqs[evp->type], evp->holdq);
TAILQ_REMOVE(&hq->q, evp, entry);
diff --git a/usr.sbin/smtpd/smtpd-api.h b/usr.sbin/smtpd/smtpd-api.h
index 9bb13dcfdc0..d2bc18b1ae1 100644
--- a/usr.sbin/smtpd/smtpd-api.h
+++ b/usr.sbin/smtpd/smtpd-api.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd-api.h,v 1.12 2013/11/20 09:22:42 eric Exp $ */
+/* $OpenBSD: smtpd-api.h,v 1.13 2013/12/05 09:26:47 eric Exp $ */
/*
* Copyright (c) 2013 Eric Faurot <eric@openbsd.org>
@@ -171,9 +171,10 @@ struct scheduler_info {
#define SCHED_DELAY 0x01
#define SCHED_REMOVE 0x02
#define SCHED_EXPIRE 0x04
-#define SCHED_BOUNCE 0x08
-#define SCHED_MDA 0x10
-#define SCHED_MTA 0x20
+#define SCHED_UPDATE 0x08
+#define SCHED_BOUNCE 0x10
+#define SCHED_MDA 0x20
+#define SCHED_MTA 0x40
struct scheduler_batch {
int type;