summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c209
1 files changed, 107 insertions, 102 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index 4efd88d132b..e26b91831c6 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,8 +1,8 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.30 2013/07/19 21:34:31 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.31 2013/10/25 19:18:43 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
- * Copyright (c) 2012 Eric Faurot <eric@openbsd.org>
+ * Copyright (c) 2012-2013 Eric Faurot <eric@openbsd.org>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@@ -41,22 +41,22 @@ TAILQ_HEAD(evplist, rq_envelope);
struct rq_message {
uint32_t msgid;
struct tree envelopes;
- struct rq_message *q_next;
- struct evplist q_mta;
};
struct rq_envelope {
TAILQ_ENTRY(rq_envelope) entry;
uint64_t evpid;
- int type;
-
-#define RQ_ENVELOPE_PENDING 0x01
-#define RQ_ENVELOPE_SCHEDULED 0x02
-#define RQ_ENVELOPE_EXPIRED 0x04
-#define RQ_ENVELOPE_REMOVED 0x08
-#define RQ_ENVELOPE_INFLIGHT 0x10
-#define RQ_ENVELOPE_SUSPEND 0x20
+ enum delivery_type type;
+
+#define RQ_EVPSTATE_PENDING 0
+#define RQ_EVPSTATE_SCHEDULED 1
+#define RQ_EVPSTATE_INFLIGHT 2
+ uint8_t state;
+
+#define RQ_ENVELOPE_EXPIRED 0x01
+#define RQ_ENVELOPE_REMOVED 0x02
+#define RQ_ENVELOPE_SUSPEND 0x04
uint8_t flags;
time_t sched;
@@ -75,7 +75,7 @@ struct rq_queue {
struct evplist q_pending;
struct evplist q_inflight;
- struct rq_message *q_mtabatch;
+ struct evplist q_mta;
struct evplist q_mda;
struct evplist q_bounce;
struct evplist q_expired;
@@ -169,7 +169,6 @@ scheduler_ram_insert(struct scheduler_info *si)
if ((message = tree_get(&update->messages, msgid)) == NULL) {
message = xcalloc(1, sizeof *message, "scheduler_insert");
message->msgid = msgid;
- TAILQ_INIT(&message->q_mta);
tree_init(&message->envelopes);
tree_xset(&update->messages, msgid, message);
stat_increment("scheduler.ramqueue.message", 1);
@@ -187,7 +186,7 @@ scheduler_ram_insert(struct scheduler_info *si)
update->evpcount++;
stat_increment("scheduler.ramqueue.envelope", 1);
- envelope->flags = RQ_ENVELOPE_PENDING;
+ envelope->state = RQ_EVPSTATE_PENDING;
sorted_insert(&update->q_pending, envelope);
si->nexttry = envelope->sched;
@@ -260,15 +259,26 @@ scheduler_ram_update(struct scheduler_info *si)
evp = tree_xget(&msg->envelopes, si->evpid);
/* it *must* be in-flight */
- if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
+ if (evp->state != RQ_EVPSTATE_INFLIGHT)
errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid);
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
+
+ /*
+ * If the envelope was removed while inflight, schedule it for
+ * removal immediatly.
+ */
+ if (evp->flags & RQ_ENVELOPE_REMOVED) {
+ TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry);
+ evp->state = RQ_EVPSTATE_SCHEDULED;
+ evp->t_scheduled = currtime;
+ return (1);
+ }
+
while ((evp->sched = scheduler_compute_schedule(si)) <= currtime)
si->retry += 1;
- TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
- evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
- evp->flags |= RQ_ENVELOPE_PENDING;
+ evp->state = RQ_EVPSTATE_PENDING;
if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
sorted_insert(&ramqueue.q_pending, evp);
@@ -291,11 +301,11 @@ scheduler_ram_delete(uint64_t evpid)
evp = tree_xget(&msg->envelopes, evpid);
/* it *must* be in-flight */
- if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
+ if (evp->state != RQ_EVPSTATE_INFLIGHT)
errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
- evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
+
rq_envelope_delete(&ramqueue, evp);
return (1);
@@ -306,7 +316,6 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
{
struct evplist *q;
struct rq_envelope *evp;
- struct rq_message *msg;
size_t n;
currtime = time(NULL);
@@ -331,11 +340,8 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
q = &ramqueue.q_mda;
ret->type = SCHED_MDA;
}
- else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) {
- msg = ramqueue.q_mtabatch;
- ramqueue.q_mtabatch = msg->q_next;
- msg->q_next = NULL;
- q = &msg->q_mta;
+ else if (typemask & SCHED_MTA && TAILQ_FIRST(&ramqueue.q_mta)) {
+ q = &ramqueue.q_mta;
ret->type = SCHED_MTA;
}
else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
@@ -357,8 +363,8 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
TAILQ_REMOVE(q, evp, entry);
- /* consistency check */
- if (!(evp->flags & RQ_ENVELOPE_SCHEDULED))
+ /* consistency check */
+ if (evp->state != RQ_EVPSTATE_SCHEDULED)
errx(1, "evp:%016" PRIx64 " not scheduled", evp->evpid);
ret->evpids[n] = evp->evpid;
@@ -367,8 +373,7 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret)
rq_envelope_delete(&ramqueue, evp);
else {
TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
- evp->flags &= ~RQ_ENVELOPE_SCHEDULED;
- evp->flags |= RQ_ENVELOPE_INFLIGHT;
+ evp->state = RQ_EVPSTATE_INFLIGHT;
evp->t_inflight = currtime;
}
}
@@ -414,22 +419,26 @@ scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size)
if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
continue;
- dst[n].retry = 0;
dst[n].evpid = evp->evpid;
- if (evp->flags & RQ_ENVELOPE_PENDING) {
+ dst[n].flags = 0;
+ dst[n].retry = 0;
+ dst[n].time = 0;
+
+ if (evp->state == RQ_EVPSTATE_PENDING) {
dst[n].time = evp->sched;
dst[n].flags = EF_PENDING;
}
- else if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ else if (evp->state == RQ_EVPSTATE_SCHEDULED) {
dst[n].time = evp->t_scheduled;
dst[n].flags = EF_PENDING;
}
- else if (evp->flags & RQ_ENVELOPE_INFLIGHT) {
+ else if (evp->state == RQ_EVPSTATE_INFLIGHT) {
dst[n].time = evp->t_inflight;
dst[n].flags = EF_INFLIGHT;
}
if (evp->flags & RQ_ENVELOPE_SUSPEND)
dst[n].flags |= EF_SUSPEND;
+
n++;
}
@@ -453,11 +462,10 @@ scheduler_ram_schedule(uint64_t evpid)
return (0);
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
return (0);
- if (evp->flags & RQ_ENVELOPE_PENDING) {
- rq_envelope_schedule(&ramqueue, evp);
- return (1);
- }
- return (0);
+ if (evp->state == RQ_EVPSTATE_INFLIGHT)
+ return (0);
+ rq_envelope_schedule(&ramqueue, evp);
+ return (1);
}
else {
msgid = evpid;
@@ -465,11 +473,12 @@ scheduler_ram_schedule(uint64_t evpid)
return (0);
i = NULL;
r = 0;
- while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
- if (evp->flags & RQ_ENVELOPE_PENDING) {
- rq_envelope_schedule(&ramqueue, evp);
- r++;
- }
+ while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) {
+ if (evp->state == RQ_EVPSTATE_INFLIGHT)
+ continue;
+ rq_envelope_schedule(&ramqueue, evp);
+ r++;
+ }
return (r);
}
}
@@ -611,6 +620,7 @@ rq_queue_init(struct rq_queue *rq)
tree_init(&rq->messages);
TAILQ_INIT(&rq->q_pending);
TAILQ_INIT(&rq->q_inflight);
+ TAILQ_INIT(&rq->q_mta);
TAILQ_INIT(&rq->q_mda);
TAILQ_INIT(&rq->q_bounce);
TAILQ_INIT(&rq->q_expired);
@@ -654,16 +664,15 @@ rq_queue_schedule(struct rq_queue *rq)
if (evp->sched > currtime && evp->expire > currtime)
break;
- if (evp->flags != RQ_ENVELOPE_PENDING)
+ if (evp->state != RQ_EVPSTATE_PENDING)
errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
evp->flags);
if (evp->expire <= currtime) {
TAILQ_REMOVE(&rq->q_pending, evp, entry);
TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
- evp->flags &= ~RQ_ENVELOPE_PENDING;
+ evp->state = RQ_EVPSTATE_SCHEDULED;
evp->flags |= RQ_ENVELOPE_EXPIRED;
- evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
continue;
}
@@ -674,25 +683,28 @@ rq_queue_schedule(struct rq_queue *rq)
static struct evplist *
rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ switch (evp->state) {
+ case RQ_EVPSTATE_PENDING:
+ return &rq->q_pending;
+
+ case RQ_EVPSTATE_SCHEDULED:
if (evp->flags & RQ_ENVELOPE_EXPIRED)
return &rq->q_expired;
if (evp->flags & RQ_ENVELOPE_REMOVED)
return &rq->q_removed;
if (evp->type == D_MTA)
- return &evp->message->q_mta;
+ return &rq->q_mta;
if (evp->type == D_MDA)
return &rq->q_mda;
if (evp->type == D_BOUNCE)
return &rq->q_bounce;
- }
-
- if (evp->flags & RQ_ENVELOPE_PENDING)
- return &rq->q_pending;
+ errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type);
- if (evp->flags & RQ_ENVELOPE_INFLIGHT)
+ case RQ_EVPSTATE_INFLIGHT:
return &rq->q_inflight;
+ }
+ errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state);
return (NULL);
}
@@ -701,66 +713,48 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
struct evplist *q = NULL;
- if (evp->type == D_MTA) {
- if (TAILQ_EMPTY(&evp->message->q_mta)) {
- evp->message->q_next = rq->q_mtabatch;
- rq->q_mtabatch = evp->message;
- }
- q = &evp->message->q_mta;
- }
- else if (evp->type == D_MDA)
+ switch (evp->type) {
+ case D_MTA:
+ q = &rq->q_mta;
+ break;
+ case D_MDA:
q = &rq->q_mda;
- else if (evp->type == D_BOUNCE)
+ break;
+ case D_BOUNCE:
q = &rq->q_bounce;
+ break;
+ }
+
+ if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
- TAILQ_REMOVE(&rq->q_pending, evp, entry);
TAILQ_INSERT_TAIL(q, evp, entry);
- evp->flags &= ~RQ_ENVELOPE_PENDING;
- evp->flags |= RQ_ENVELOPE_SCHEDULED;
+ evp->state = RQ_EVPSTATE_SCHEDULED;
evp->t_scheduled = currtime;
}
static int
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
- struct rq_message *m;
- struct evplist *q = NULL;
-
if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
return (0);
/*
- * For now we just ignore it, but we could mark the envelope for
- * removal and possibly send a cancellation to the agent.
+ * If envelope is inflight, mark it envelope for removal.
*/
- if (evp->flags & (RQ_ENVELOPE_INFLIGHT))
- return (0);
+ if (evp->state == RQ_EVPSTATE_INFLIGHT) {
+ evp->flags |= RQ_ENVELOPE_REMOVED;
+ return (1);
+ }
- q = rq_envelope_list(rq, evp);
+ if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
+ TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry);
+ }
- TAILQ_REMOVE(q, evp, entry);
TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
- evp->flags &= ~RQ_ENVELOPE_PENDING;
+ evp->state = RQ_EVPSTATE_SCHEDULED;
evp->flags |= RQ_ENVELOPE_REMOVED;
- evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
- /*
- * We might need to unschedule the message if it was the only
- * scheduled envelope
- */
- if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) {
- if (rq->q_mtabatch == evp->message)
- rq->q_mtabatch = evp->message->q_next;
- else {
- for (m = rq->q_mtabatch; m->q_next; m = m->q_next)
- if (m->q_next == evp->message) {
- m->q_next = evp->message->q_next;
- break;
- }
- }
- evp->message->q_next = NULL;
- }
-
return (1);
}
@@ -770,8 +764,9 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
if (evp->flags & RQ_ENVELOPE_SUSPEND)
return (0);
- if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
+ if (evp->state != RQ_EVPSTATE_INFLIGHT) {
TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry);
+ }
evp->flags |= RQ_ENVELOPE_SUSPEND;
@@ -784,10 +779,11 @@ rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
return (0);
- if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
+ if (evp->state != RQ_EVPSTATE_INFLIGHT)
sorted_insert(rq_envelope_list(rq, evp), evp);
evp->flags &= ~RQ_ENVELOPE_SUSPEND;
+
return (1);
}
@@ -825,21 +821,30 @@ rq_envelope_to_text(struct rq_envelope *e)
duration_to_text(e->expire - currtime));
strlcat(buf, t, sizeof buf);
- if (e->flags & RQ_ENVELOPE_PENDING) {
+
+ switch (e->state) {
+ case RQ_EVPSTATE_PENDING:
snprintf(t, sizeof t, ",pending=%s",
duration_to_text(e->sched - currtime));
strlcat(buf, t, sizeof buf);
- }
- if (e->flags & RQ_ENVELOPE_SCHEDULED) {
+ break;
+
+ case RQ_EVPSTATE_SCHEDULED:
snprintf(t, sizeof t, ",scheduled=%s",
duration_to_text(currtime - e->t_scheduled));
strlcat(buf, t, sizeof buf);
- }
- if (e->flags & RQ_ENVELOPE_INFLIGHT) {
+ break;
+
+ case RQ_EVPSTATE_INFLIGHT:
snprintf(t, sizeof t, ",inflight=%s",
duration_to_text(currtime - e->t_inflight));
strlcat(buf, t, sizeof buf);
+ break;
+
+ default:
+ errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state);
}
+
if (e->flags & RQ_ENVELOPE_REMOVED)
strlcat(buf, ",removed", sizeof buf);
if (e->flags & RQ_ENVELOPE_EXPIRED)