diff options
Diffstat (limited to 'usr.sbin/smtpd/scheduler_ramqueue.c')
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 259 |
1 files changed, 179 insertions, 80 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index 055d2c8b4d2..e073c4b35d1 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.24 2012/11/12 14:58:53 eric Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.25 2012/11/20 09:47:45 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org> @@ -42,8 +42,8 @@ TAILQ_HEAD(evplist, rq_envelope); struct rq_message { uint32_t msgid; struct tree envelopes; - struct rq_message *sched_next; - struct rq_envelope *sched_mta; + struct rq_message *q_next; + struct evplist q_mta; }; struct rq_envelope { @@ -64,7 +64,6 @@ struct rq_envelope { struct rq_message *message; - struct rq_envelope *sched_next; time_t t_inflight; time_t t_scheduled; }; @@ -73,14 +72,14 @@ struct rq_queue { size_t evpcount; struct tree messages; - struct evplist pending; - - struct rq_message *sched_mta; - struct rq_envelope *sched_mda; - struct rq_envelope *sched_bounce; - struct rq_envelope *sched_expired; - struct rq_envelope *sched_removed; + struct evplist q_pending; + struct evplist q_inflight; + struct rq_message *q_mtabatch; + struct evplist q_mda; + struct evplist q_bounce; + struct evplist q_expired; + struct evplist q_removed; }; static void scheduler_ramqueue_init(void); @@ -90,6 +89,8 @@ static size_t scheduler_ramqueue_rollback(uint32_t); static void scheduler_ramqueue_update(struct scheduler_info *); static void scheduler_ramqueue_delete(uint64_t); static void scheduler_ramqueue_batch(int, struct scheduler_batch *); +static size_t scheduler_ramqueue_messages(uint32_t, uint32_t *, size_t); +static size_t scheduler_ramqueue_envelopes(uint64_t, struct evpstate *, size_t); static void scheduler_ramqueue_schedule(uint64_t); static void scheduler_ramqueue_remove(uint64_t); @@ -117,6 +118,8 @@ struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_batch, + scheduler_ramqueue_messages, + scheduler_ramqueue_envelopes, scheduler_ramqueue_schedule, scheduler_ramqueue_remove, }; @@ -124,7 +127,7 @@ struct scheduler_backend scheduler_backend_ramqueue = { static struct rq_queue ramqueue; static struct tree updates; -static time_t currtime; +static time_t currtime; extern int verbose; @@ -159,6 +162,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si) if ((message = tree_get(&update->messages, msgid)) == NULL) { message = xcalloc(1, sizeof *message, "scheduler_insert"); message->msgid = msgid; + TAILQ_INIT(&message->q_mta); tree_init(&message->envelopes); tree_xset(&update->messages, msgid, message); stat_increment("scheduler.ramqueue.message", 1); @@ -177,7 +181,7 @@ scheduler_ramqueue_insert(struct scheduler_info *si) stat_increment("scheduler.ramqueue.envelope", 1); envelope->flags = RQ_ENVELOPE_PENDING; - sorted_insert(&update->pending, envelope); + sorted_insert(&update->q_pending, envelope); } static size_t @@ -195,6 +199,10 @@ scheduler_ramqueue_commit(uint32_t msgid) rq_queue_dump(update, "update to commit"); rq_queue_merge(&ramqueue, update); + + if (verbose & TRACE_SCHEDULER) + rq_queue_dump(&ramqueue, "resulting queue"); + rq_queue_schedule(&ramqueue); free(update); @@ -216,8 +224,8 @@ scheduler_ramqueue_rollback(uint32_t msgid) return (0); r = update->evpcount; - while ((evp = TAILQ_FIRST(&update->pending))) { - TAILQ_REMOVE(&update->pending, evp, entry); + while ((evp = TAILQ_FIRST(&update->q_pending))) { + TAILQ_REMOVE(&update->q_pending, evp, entry); rq_envelope_delete(update, evp); } @@ -247,9 +255,10 @@ scheduler_ramqueue_update(struct scheduler_info *si) while ((evp->sched = scheduler_compute_schedule(si)) <= currtime) si->retry += 1; + TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); + sorted_insert(&ramqueue.q_pending, evp); evp->flags &= ~RQ_ENVELOPE_INFLIGHT; evp->flags |= RQ_ENVELOPE_PENDING; - sorted_insert(&ramqueue.pending, evp); } static void @@ -269,6 +278,7 @@ scheduler_ramqueue_delete(uint64_t evpid) if (!(evp->flags & RQ_ENVELOPE_INFLIGHT)) errx(1, "evp:%016" PRIx64 " not in-flight", evpid); + TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); evp->flags &= ~RQ_ENVELOPE_INFLIGHT; rq_envelope_delete(&ramqueue, evp); } @@ -276,7 +286,9 @@ scheduler_ramqueue_delete(uint64_t evpid) static void scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret) { - struct rq_envelope *evp, *tmp, **batch; + struct evplist *q; + struct rq_envelope *evp; + struct rq_message *msg; struct id_list *item; currtime = time(NULL); @@ -285,28 +297,30 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret) if (verbose & TRACE_SCHEDULER) rq_queue_dump(&ramqueue, "scheduler_ramqueue_batch()"); - if (typemask & SCHED_REMOVE && ramqueue.sched_removed) { - batch = &ramqueue.sched_removed; + if (typemask & SCHED_REMOVE && TAILQ_FIRST(&ramqueue.q_removed)) { + q = &ramqueue.q_removed; ret->type = SCHED_REMOVE; } - else if (typemask & SCHED_EXPIRE && ramqueue.sched_expired) { - batch = &ramqueue.sched_expired; + else if (typemask & SCHED_EXPIRE && TAILQ_FIRST(&ramqueue.q_expired)) { + q = &ramqueue.q_expired; ret->type = SCHED_EXPIRE; } - else if (typemask & SCHED_BOUNCE && ramqueue.sched_bounce) { - batch = &ramqueue.sched_bounce; + else if (typemask & SCHED_BOUNCE && TAILQ_FIRST(&ramqueue.q_bounce)) { + q = &ramqueue.q_bounce; ret->type = SCHED_BOUNCE; } - else if (typemask & SCHED_MDA && ramqueue.sched_mda) { - batch = &ramqueue.sched_mda; + else if (typemask & SCHED_MDA && TAILQ_FIRST(&ramqueue.q_mda)) { + q = &ramqueue.q_mda; ret->type = SCHED_MDA; } - else if (typemask & SCHED_MTA && ramqueue.sched_mta) { - batch = &ramqueue.sched_mta->sched_mta; - ramqueue.sched_mta = ramqueue.sched_mta->sched_next; + else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) { + msg = ramqueue.q_mtabatch; + ramqueue.q_mtabatch = msg->q_next; + msg->q_next = NULL; + q = &msg->q_mta; ret->type = SCHED_MTA; } - else if ((evp = TAILQ_FIRST(&ramqueue.pending))) { + else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { ret->type = SCHED_DELAY; if (evp->sched < evp->expire) ret->delay = evp->sched - currtime; @@ -321,8 +335,10 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret) ret->evpids = NULL; ret->evpcount = 0; - for(evp = *batch; evp; evp = tmp) { - tmp = evp->sched_next; + + while ((evp = TAILQ_FIRST(q))) { + + TAILQ_REMOVE(q, evp, entry); /* consistency check */ if (!(evp->flags & RQ_ENVELOPE_SCHEDULED)) @@ -332,18 +348,17 @@ scheduler_ramqueue_batch(int typemask, struct scheduler_batch *ret) item->id = evp->evpid; item->next = ret->evpids; ret->evpids = item; - evp->sched_next = NULL; + if (ret->type == SCHED_REMOVE || ret->type == SCHED_EXPIRE) rq_envelope_delete(&ramqueue, evp); else { + TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); evp->flags &= ~RQ_ENVELOPE_SCHEDULED; evp->flags |= RQ_ENVELOPE_INFLIGHT; evp->t_inflight = currtime; } ret->evpcount++; } - - *batch = NULL; } static void @@ -352,26 +367,18 @@ scheduler_ramqueue_schedule(uint64_t evpid) struct rq_message *msg; struct rq_envelope *evp; uint32_t msgid; - void *i, *j; + void *i; currtime = time(NULL); - if (evpid == 0) { - j = NULL; - while (tree_iter(&ramqueue.messages, &j, NULL, (void*)(&msg))) { - i = NULL; - while (tree_iter(&msg->envelopes, &i, NULL, - (void*)(&evp))) - rq_envelope_schedule(&ramqueue, evp); - } - } - else if (evpid > 0xffffffff) { + if (evpid > 0xffffffff) { msgid = evpid_to_msgid(evpid); if ((msg = tree_get(&ramqueue.messages, msgid)) == NULL) return; if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) return; - rq_envelope_schedule(&ramqueue, evp); + if (evp->flags & RQ_ENVELOPE_PENDING) + rq_envelope_schedule(&ramqueue, evp); } else { msgid = evpid; @@ -379,7 +386,8 @@ scheduler_ramqueue_schedule(uint64_t evpid) return; i = NULL; while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) - rq_envelope_schedule(&ramqueue, evp); + if (evp->flags & RQ_ENVELOPE_PENDING) + rq_envelope_schedule(&ramqueue, evp); } } @@ -411,6 +419,62 @@ scheduler_ramqueue_remove(uint64_t evpid) } } +static size_t +scheduler_ramqueue_messages(uint32_t from, uint32_t *dst, size_t size) +{ + uint64_t id; + size_t n; + void *i; + + for (n = 0, i = NULL; n < size; n++) { + if (tree_iterfrom(&ramqueue.messages, &i, from, &id, NULL) == 0) + break; + dst[n] = id; + } + + return (n); +} + +static size_t +scheduler_ramqueue_envelopes(uint64_t from, struct evpstate *dst, size_t size) +{ + struct rq_message *msg; + struct rq_envelope *evp; + void *i; + size_t n; + + if ((msg = tree_get(&ramqueue.messages, evpid_to_msgid(from))) == NULL) + return (0); + + for (n = 0, i = NULL; n < size; ) { + + if (tree_iterfrom(&msg->envelopes, &i, from, NULL, + (void**)&evp) == 0) + break; + + if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) + continue; + + dst[n].retry = 0; + dst[n].evpid = evp->evpid; + if (evp->flags & RQ_ENVELOPE_PENDING) { + dst[n].time = evp->sched; + dst[n].flags = DF_PENDING; + } + else if (evp->flags & RQ_ENVELOPE_SCHEDULED) { + dst[n].time = evp->t_scheduled; + dst[n].flags = DF_PENDING; + } + else if (evp->flags & RQ_ENVELOPE_INFLIGHT) { + dst[n].time = evp->t_inflight; + dst[n].flags = DF_INFLIGHT; + } + n++; + } + + return (n); +} + static void sorted_insert(struct evplist *list, struct rq_envelope *evp) { @@ -444,7 +508,12 @@ rq_queue_init(struct rq_queue *rq) { bzero(rq, sizeof *rq); tree_init(&rq->messages); - TAILQ_INIT(&rq->pending); + TAILQ_INIT(&rq->q_pending); + TAILQ_INIT(&rq->q_inflight); + TAILQ_INIT(&rq->q_mda); + TAILQ_INIT(&rq->q_bounce); + TAILQ_INIT(&rq->q_expired); + TAILQ_INIT(&rq->q_removed); } static void @@ -463,7 +532,7 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) } /* need to re-link all envelopes before merging them */ i = NULL; - while((tree_iter(&message->envelopes, &i, &id, + while ((tree_iter(&message->envelopes, &i, &id, (void*)&envelope))) envelope->message = tomessage; tree_merge(&tomessage->envelopes, &message->envelopes); @@ -471,7 +540,8 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) stat_decrement("scheduler.ramqueue.message", 1); } - sorted_merge(&rq->pending, &update->pending); + sorted_merge(&rq->q_pending, &update->q_pending); + rq->evpcount += update->evpcount; } static void @@ -479,23 +549,21 @@ rq_queue_schedule(struct rq_queue *rq) { struct rq_envelope *evp; - while ((evp = TAILQ_FIRST(&rq->pending))) { + while ((evp = TAILQ_FIRST(&rq->q_pending))) { if (evp->sched > currtime && evp->expire > currtime) break; - /* it *must* be pending */ if (evp->flags != RQ_ENVELOPE_PENDING) errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid, evp->flags); if (evp->expire <= currtime) { - TAILQ_REMOVE(&rq->pending, evp, entry); + TAILQ_REMOVE(&rq->q_pending, evp, entry); + TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); evp->flags &= ~RQ_ENVELOPE_PENDING; evp->flags |= RQ_ENVELOPE_EXPIRED; evp->flags |= RQ_ENVELOPE_SCHEDULED; evp->t_scheduled = currtime; - evp->sched_next = rq->sched_expired; - rq->sched_expired = evp; continue; } rq_envelope_schedule(rq, evp); @@ -505,28 +573,22 @@ rq_queue_schedule(struct rq_queue *rq) static void rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) { - if (evp->flags & (RQ_ENVELOPE_SCHEDULED | RQ_ENVELOPE_INFLIGHT)) - return; - - if (evp->flags & RQ_ENVELOPE_PENDING) - TAILQ_REMOVE(&rq->pending, evp, entry); + struct evplist *q = NULL; if (evp->type == D_MTA) { - if (evp->message->sched_mta == NULL) { - evp->message->sched_next = rq->sched_mta; - rq->sched_mta = evp->message; + if (TAILQ_EMPTY(&evp->message->q_mta)) { + evp->message->q_next = rq->q_mtabatch; + rq->q_mtabatch = evp->message; } - evp->sched_next = evp->message->sched_mta; - evp->message->sched_mta = evp; - } - else if (evp->type == D_MDA) { - evp->sched_next = rq->sched_mda; - rq->sched_mda = evp; - } - else if (evp->type == D_BOUNCE) { - evp->sched_next = rq->sched_bounce; - rq->sched_bounce = evp; + q = &evp->message->q_mta; } + else if (evp->type == D_MDA) + q = &rq->q_mda; + else if (evp->type == D_BOUNCE) + q = &rq->q_bounce; + + TAILQ_REMOVE(&rq->q_pending, evp, entry); + TAILQ_INSERT_TAIL(q, evp, entry); evp->flags &= ~RQ_ENVELOPE_PENDING; evp->flags |= RQ_ENVELOPE_SCHEDULED; evp->t_scheduled = currtime; @@ -535,16 +597,51 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) static void rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) { - if (!(evp->flags & (RQ_ENVELOPE_PENDING))) + struct rq_message *m; + struct evplist *q = NULL; + + if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) + return; + /* + * For now we just ignore it, but we could mark the envelope for + * removal and possibly send a cancellation to the agent. + */ + if (evp->flags & (RQ_ENVELOPE_INFLIGHT)) return; - TAILQ_REMOVE(&rq->pending, evp, entry); - evp->sched_next = rq->sched_removed; - rq->sched_removed = evp; + if (evp->flags & RQ_ENVELOPE_SCHEDULED) { + if (evp->type == D_MTA) + q = &evp->message->q_mta; + else if (evp->type == D_MDA) + q = &rq->q_mda; + else if (evp->type == D_BOUNCE) + q = &rq->q_bounce; + } else + q = &rq->q_pending; + + TAILQ_REMOVE(q, evp, entry); + TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); evp->flags &= ~RQ_ENVELOPE_PENDING; evp->flags |= RQ_ENVELOPE_REMOVED; evp->flags |= RQ_ENVELOPE_SCHEDULED; evp->t_scheduled = currtime; + + /* + * We might need to unschedule the message if it was the only + * scheduled envelope + */ + if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) { + if (rq->q_mtabatch == evp->message) + rq->q_mtabatch = evp->message->q_next; + else { + for (m = rq->q_mtabatch; m->q_next; m = m->q_next) + if (m->q_next == evp->message) { + m->q_next = evp->message->q_next; + break; + } + } + evp->message->q_next = NULL; + } } static void @@ -558,6 +655,7 @@ rq_envelope_delete(struct rq_queue *rq, struct rq_envelope *evp) } free(evp); + rq->evpcount--; stat_decrement("scheduler.ramqueue.envelope", 1); } @@ -576,7 +674,8 @@ rq_envelope_to_text(struct rq_envelope *e) else if (e->type == D_MTA) strlcat(buf, "mta", sizeof buf); - snprintf(t, sizeof t, ",expire=%s", duration_to_text(e->expire - currtime)); + snprintf(t, sizeof t, ",expire=%s", + duration_to_text(e->expire - currtime)); strlcat(buf, t, sizeof buf); if (e->flags & RQ_ENVELOPE_PENDING) { @@ -615,10 +714,10 @@ rq_queue_dump(struct rq_queue *rq, const char * name) log_debug("debug: /--- ramqueue: %s", name); i = NULL; - while((tree_iter(&rq->messages, &i, &id, (void*)&message))) { + while ((tree_iter(&rq->messages, &i, &id, (void*)&message))) { log_debug("debug: | msg:%08" PRIx32, message->msgid); j = NULL; - while((tree_iter(&message->envelopes, &j, &id, + while ((tree_iter(&message->envelopes, &j, &id, (void*)&envelope))) log_debug("debug: | %s", rq_envelope_to_text(envelope)); |