summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/scheduler_ramqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr.sbin/smtpd/scheduler_ramqueue.c')
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c259
1 files changed, 179 insertions, 80 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index 055d2c8b4d2..e073c4b35d1 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.24 2012/11/12 14:58:53 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.25 2012/11/20 09:47:45 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -42,8 +42,8 @@ TAILQ_HEAD(evplist, rq_envelope);
struct rq_message {
uint32_t msgid;
struct tree envelopes;
- struct rq_message *sched_next;
- struct rq_envelope *sched_mta;
+ struct rq_message *q_next;
+ struct evplist q_mta;
};
struct rq_envelope {
@@ -64,7 +64,6 @@ struct rq_envelope {
struct rq_message *message;
- struct rq_envelope *sched_next;
time_t t_inflight;
time_t t_scheduled;
};
@@ -73,14 +72,14 @@ struct rq_queue {
size_t evpcount;
struct tree messages;
- struct evplist pending;
-
- struct rq_message *sched_mta;
- struct rq_envelope *sched_mda;
- struct rq_envelope *sched_bounce;
- struct rq_envelope *sched_expired;
- struct rq_envelope *sched_removed;
+ struct evplist q_pending;
+ struct evplist q_inflight;
+ struct rq_message *q_mtabatch;
+ struct evplist q_mda;
+ struct evplist q_bounce;
+ struct evplist q_expired;
+ struct evplist q_removed;
};
static void scheduler_ramqueue_init(void);
@@ -90,6 +89,8 @@ static size_t scheduler_ramqueue_rollback(uint32_t);
static void scheduler_ramqueue_update(struct scheduler_info *);
static void scheduler_ramqueue_delete(uint64_t);
static void scheduler_ramqueue_batch(int, struct scheduler_batch *);
+static size_t scheduler_ramqueue_messages(uint32_t, uint32_t *, size_t);
+static size_t scheduler_ramqueue_envelopes(uint64_t, struct evpstate *, size_t);
static void scheduler_ramqueue_schedule(uint64_t);
static void scheduler_ramqueue_remove(uint64_t);
@@ -117,6 +118,8 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_batch,
+ scheduler_ramqueue_messages,
+ scheduler_ramqueue_envelopes,
scheduler_ramqueue_schedule,
scheduler_ramqueue_remove,
};
@@ -124,7 +127,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
static struct rq_queue ramqueue;
static struct tree updates;
-static time_t currtime;
+static time_t currtime;
extern int verbose;
@@ -159,6 +162,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
if ((message = tree_get(&update->messages, msgid)) == NULL) {
message = xcalloc(1, sizeof *message, "scheduler_insert");
message->msgid = msgid;
+ TAILQ_INIT(&message->q_mta);
tree_init(&message->envelopes);
tree_xset(&update->messages, msgid, message);
stat_increment("scheduler.ramqueue.message", 1);
@@ -177,7 +181,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si)
stat_increment("scheduler.ramqueue.envelope", 1);
envelope->flags = RQ_ENVELOPE_PENDING;
- sorted_insert(&update->pending, envelope);
+ sorted_insert(&update->q_pending, envelope);
}
static size_t
@@ -195,6 +199,10 @@ scheduler_ramqueue_commit(uint32_t msgid)
rq_queue_dump(update, "update to commit");
rq_queue_merge(&ramqueue, update);
+
+ if (verbose & TRACE_SCHEDULER)
+ rq_queue_dump(&ramqueue, "resulting queue");
+
rq_queue_schedule(&ramqueue);
free(update);
@@ -216,8 +224,8 @@ scheduler_ramqueue_rollback(uint32_t msgid)
return (0);
r = update->evpcount;
- while ((evp = TAILQ_FIRST(&update->pending))) {
- TAILQ_REMOVE(&update->pending, evp, entry);
+ while ((evp = TAILQ_FIRST(&update->q_pending))) {
+ TAILQ_REMOVE(&update->q_pending, evp, entry);
rq_envelope_delete(update, evp);
}
@@ -247,9 +255,10 @@ scheduler_ramqueue_update(struct scheduler_info *si)
while ((evp->sched = scheduler_compute_schedule(si)) <= currtime)
si->retry += 1;
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
+ sorted_insert(&ramqueue.q_pending, evp);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
evp->flags |= RQ_ENVELOPE_PENDING;
- sorted_insert(&ramqueue.pending, evp);
}
static void
@@ -269,6 +278,7 @@ scheduler_ramqueue_delete(uint64_t evpid)
if (!(evp->flags & RQ_ENVELOPE_INFLIGHT))
errx(1, "evp:%016" PRIx64 " not in-flight", evpid);
+ TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_INFLIGHT;
rq_envelope_delete(&ramqueue, evp);
}
@@ -276,7 +286,9 @@ scheduler_ramqueue_delete(uint64_t evpid)
static void
scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
{
- struct rq_envelope *evp, *tmp, **batch;
+ struct evplist *q;
+ struct rq_envelope *evp;
+ struct rq_message *msg;
struct id_list *item;
currtime = time(NULL);
@@ -285,28 +297,30 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
if (verbose & TRACE_SCHEDULER)
rq_queue_dump(&ramqueue, "scheduler_ramqueue_batch()");
- if (typemask & SCHED_REMOVE && ramqueue.sched_removed) {
- batch = &ramqueue.sched_removed;
+ if (typemask & SCHED_REMOVE && TAILQ_FIRST(&ramqueue.q_removed)) {
+ q = &ramqueue.q_removed;
ret->type = SCHED_REMOVE;
}
- else if (typemask & SCHED_EXPIRE && ramqueue.sched_expired) {
- batch = &ramqueue.sched_expired;
+ else if (typemask & SCHED_EXPIRE && TAILQ_FIRST(&ramqueue.q_expired)) {
+ q = &ramqueue.q_expired;
ret->type = SCHED_EXPIRE;
}
- else if (typemask & SCHED_BOUNCE && ramqueue.sched_bounce) {
- batch = &ramqueue.sched_bounce;
+ else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) {
+ q = &ramqueue.q_bounce;
ret->type = SCHED_BOUNCE;
}
- else if (typemask & SCHED_MDA && ramqueue.sched_mda) {
- batch = &ramqueue.sched_mda;
+ else if (typemask & SCHED_MDA && TAILQ_FIRST(&ramqueue.q_mda)) {
+ q = &ramqueue.q_mda;
ret->type = SCHED_MDA;
}
- else if (typemask & SCHED_MTA && ramqueue.sched_mta) {
- batch = &ramqueue.sched_mta->sched_mta;
- ramqueue.sched_mta = ramqueue.sched_mta->sched_next;
+ else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) {
+ msg = ramqueue.q_mtabatch;
+ ramqueue.q_mtabatch = msg->q_next;
+ msg->q_next = NULL;
+ q = &msg->q_mta;
ret->type = SCHED_MTA;
}
- else if ((evp = TAILQ_FIRST(&ramqueue.pending))) {
+ else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) {
ret->type = SCHED_DELAY;
if (evp->sched < evp->expire)
ret->delay = evp->sched - currtime;
@@ -321,8 +335,10 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
ret->evpids = NULL;
ret->evpcount = 0;
- for(evp = *batch; evp; evp = tmp) {
- tmp = evp->sched_next;
+
+ while ((evp = TAILQ_FIRST(q))) {
+
+ TAILQ_REMOVE(q, evp, entry);
/* consistency check */
if (!(evp->flags & RQ_ENVELOPE_SCHEDULED))
@@ -332,18 +348,17 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret)
item->id = evp->evpid;
item->next = ret->evpids;
ret->evpids = item;
- evp->sched_next = NULL;
+
if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE)
rq_envelope_delete(&ramqueue, evp);
else {
+ TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry);
evp->flags &= ~RQ_ENVELOPE_SCHEDULED;
evp->flags |= RQ_ENVELOPE_INFLIGHT;
evp->t_inflight = currtime;
}
ret->evpcount++;
}
-
- *batch = NULL;
}
static void
@@ -352,26 +367,18 @@ scheduler_ramqueue_schedule(uint64_t evpid)
struct rq_message *msg;
struct rq_envelope *evp;
uint32_t msgid;
- void *i, *j;
+ void *i;
currtime = time(NULL);
- if (evpid == 0) {
- j = NULL;
- while (tree_iter(&ramqueue.messages, &j, NULL, (void*)(&msg))) {
- i = NULL;
- while (tree_iter(&msg->envelopes, &i, NULL,
- (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
- }
- }
- else if (evpid > 0xffffffff) {
+ if (evpid > 0xffffffff) {
msgid = evpid_to_msgid(evpid);
if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL)
return;
if ((evp = tree_get(&msg->envelopes, evpid)) == NULL)
return;
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
else {
msgid = evpid;
@@ -379,7 +386,8 @@ scheduler_ramqueue_schedule(uint64_t evpid)
return;
i = NULL;
while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp)))
- rq_envelope_schedule(&ramqueue, evp);
+ if (evp->flags & RQ_ENVELOPE_PENDING)
+ rq_envelope_schedule(&ramqueue, evp);
}
}
@@ -411,6 +419,62 @@ scheduler_ramqueue_remove(uint64_t evpid)
}
}
+static size_t
+scheduler_ramqueue_messages(uint32_t from, uint32_t *dst, size_t size)
+{
+ uint64_t id;
+ size_t n;
+ void *i;
+
+ for (n = 0, i = NULL; n < size; n++) {
+ if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0)
+ break;
+ dst[n] = id;
+ }
+
+ return (n);
+}
+
+static size_t
+scheduler_ramqueue_envelopes(uint64_t from, struct evpstate *dst, size_t size)
+{
+ struct rq_message *msg;
+ struct rq_envelope *evp;
+ void *i;
+ size_t n;
+
+ if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL)
+ return (0);
+
+ for (n = 0, i = NULL; n < size; ) {
+
+ if (tree_iterfrom(&msg->envelopes, &i, from, NULL,
+ (void**)&evp) == 0)
+ break;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ continue;
+
+ dst[n].retry = 0;
+ dst[n].evpid = evp->evpid;
+ if (evp->flags & RQ_ENVELOPE_PENDING) {
+ dst[n].time = evp->sched;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ dst[n].time = evp->t_scheduled;
+ dst[n].flags = DF_PENDING;
+ }
+ else if (evp->flags & RQ_ENVELOPE_INFLIGHT) {
+ dst[n].time = evp->t_inflight;
+ dst[n].flags = DF_INFLIGHT;
+ }
+ n++;
+ }
+
+ return (n);
+}
+
static void
sorted_insert(struct evplist *list, struct rq_envelope *evp)
{
@@ -444,7 +508,12 @@ rq_queue_init(struct rq_queue *rq)
{
bzero(rq, sizeof *rq);
tree_init(&rq->messages);
- TAILQ_INIT(&rq->pending);
+ TAILQ_INIT(&rq->q_pending);
+ TAILQ_INIT(&rq->q_inflight);
+ TAILQ_INIT(&rq->q_mda);
+ TAILQ_INIT(&rq->q_bounce);
+ TAILQ_INIT(&rq->q_expired);
+ TAILQ_INIT(&rq->q_removed);
}
static void
@@ -463,7 +532,7 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
}
/* need to re-link all envelopes before merging them */
i = NULL;
- while((tree_iter(&message->envelopes, &i, &id,
+ while ((tree_iter(&message->envelopes, &i, &id,
(void*)&envelope)))
envelope->message = tomessage;
tree_merge(&tomessage->envelopes, &message->envelopes);
@@ -471,7 +540,8 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
stat_decrement("scheduler.ramqueue.message", 1);
}
- sorted_merge(&rq->pending, &update->pending);
+ sorted_merge(&rq->q_pending, &update->q_pending);
+ rq->evpcount += update->evpcount;
}
static void
@@ -479,23 +549,21 @@ rq_queue_schedule(struct rq_queue *rq)
{
struct rq_envelope *evp;
- while ((evp = TAILQ_FIRST(&rq->pending))) {
+ while ((evp = TAILQ_FIRST(&rq->q_pending))) {
if (evp->sched > currtime && evp->expire > currtime)
break;
- /* it *must* be pending */
if (evp->flags != RQ_ENVELOPE_PENDING)
errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid,
evp->flags);
if (evp->expire <= currtime) {
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_EXPIRED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
- evp->sched_next = rq->sched_expired;
- rq->sched_expired = evp;
continue;
}
rq_envelope_schedule(rq, evp);
@@ -505,28 +573,22 @@ rq_queue_schedule(struct rq_queue *rq)
static void
rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (evp->flags & (RQ_ENVELOPE_SCHEDULED | RQ_ENVELOPE_INFLIGHT))
- return;
-
- if (evp->flags & RQ_ENVELOPE_PENDING)
- TAILQ_REMOVE(&rq->pending, evp, entry);
+ struct evplist *q = NULL;
if (evp->type == D_MTA) {
- if (evp->message->sched_mta == NULL) {
- evp->message->sched_next = rq->sched_mta;
- rq->sched_mta = evp->message;
+ if (TAILQ_EMPTY(&evp->message->q_mta)) {
+ evp->message->q_next = rq->q_mtabatch;
+ rq->q_mtabatch = evp->message;
}
- evp->sched_next = evp->message->sched_mta;
- evp->message->sched_mta = evp;
- }
- else if (evp->type == D_MDA) {
- evp->sched_next = rq->sched_mda;
- rq->sched_mda = evp;
- }
- else if (evp->type == D_BOUNCE) {
- evp->sched_next = rq->sched_bounce;
- rq->sched_bounce = evp;
+ q = &evp->message->q_mta;
}
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+
+ TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ TAILQ_INSERT_TAIL(q, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
@@ -535,16 +597,51 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
static void
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
- if (!(evp->flags & (RQ_ENVELOPE_PENDING)))
+ struct rq_message *m;
+ struct evplist *q = NULL;
+
+ if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
+ return;
+ /*
+ * For now we just ignore it, but we could mark the envelope for
+ * removal and possibly send a cancellation to the agent.
+ */
+ if (evp->flags & (RQ_ENVELOPE_INFLIGHT))
return;
- TAILQ_REMOVE(&rq->pending, evp, entry);
- evp->sched_next = rq->sched_removed;
- rq->sched_removed = evp;
+ if (evp->flags & RQ_ENVELOPE_SCHEDULED) {
+ if (evp->type == D_MTA)
+ q = &evp->message->q_mta;
+ else if (evp->type == D_MDA)
+ q = &rq->q_mda;
+ else if (evp->type == D_BOUNCE)
+ q = &rq->q_bounce;
+ } else
+ q = &rq->q_pending;
+
+ TAILQ_REMOVE(q, evp, entry);
+ TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
evp->flags &= ~RQ_ENVELOPE_PENDING;
evp->flags |= RQ_ENVELOPE_REMOVED;
evp->flags |= RQ_ENVELOPE_SCHEDULED;
evp->t_scheduled = currtime;
+
+ /*
+ * We might need to unschedule the message if it was the only
+ * scheduled envelope
+ */
+ if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) {
+ if (rq->q_mtabatch == evp->message)
+ rq->q_mtabatch = evp->message->q_next;
+ else {
+ for (m = rq->q_mtabatch; m->q_next; m = m->q_next)
+ if (m->q_next == evp->message) {
+ m->q_next = evp->message->q_next;
+ break;
+ }
+ }
+ evp->message->q_next = NULL;
+ }
}
static void
@@ -558,6 +655,7 @@ rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp)
}
free(evp);
+ rq->evpcount--;
stat_decrement("scheduler.ramqueue.envelope", 1);
}
@@ -576,7 +674,8 @@ rq_envelope_to_text(struct rq_envelope *e)
else if (e->type == D_MTA)
strlcat(buf, "mta", sizeof buf);
- snprintf(t, sizeof t, ",expire=%s", duration_to_text(e->expire - currtime));
+ snprintf(t, sizeof t, ",expire=%s",
+ duration_to_text(e->expire - currtime));
strlcat(buf, t, sizeof buf);
if (e->flags & RQ_ENVELOPE_PENDING) {
@@ -615,10 +714,10 @@ rq_queue_dump(struct rq_queue *rq, const char * name)
log_debug("debug: /--- ramqueue: %s", name);
i = NULL;
- while((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
+ while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) {
log_debug("debug: | msg:%08" PRIx32, message->msgid);
j = NULL;
- while((tree_iter(&message->envelopes, &j, &id,
+ while ((tree_iter(&message->envelopes, &j, &id,
(void*)&envelope)))
log_debug("debug: | %s",
rq_envelope_to_text(envelope));