diff options
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 209 |
1 files changed, 107 insertions, 102 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index 4efd88d132b..e26b91831c6 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,8 +1,8 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.30 2013/07/19 21:34:31 eric Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.31 2013/10/25 19:18:43 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@poolp.org> - * Copyright (c) 2012 Eric Faurot <eric@openbsd.org> + * Copyright (c) 2012-2013 Eric Faurot <eric@openbsd.org> * * Permission to use, copy, modify, and distribute this software for any * purpose with or without fee is hereby granted, provided that the above @@ -41,22 +41,22 @@ TAILQ_HEAD(evplist, rq_envelope); struct rq_message { uint32_t msgid; struct tree envelopes; - struct rq_message *q_next; - struct evplist q_mta; }; struct rq_envelope { TAILQ_ENTRY(rq_envelope) entry; uint64_t evpid; - int type; - -#define RQ_ENVELOPE_PENDING 0x01 -#define RQ_ENVELOPE_SCHEDULED 0x02 -#define RQ_ENVELOPE_EXPIRED 0x04 -#define RQ_ENVELOPE_REMOVED 0x08 -#define RQ_ENVELOPE_INFLIGHT 0x10 -#define RQ_ENVELOPE_SUSPEND 0x20 + enum delivery_type type; + +#define RQ_EVPSTATE_PENDING 0 +#define RQ_EVPSTATE_SCHEDULED 1 +#define RQ_EVPSTATE_INFLIGHT 2 + uint8_t state; + +#define RQ_ENVELOPE_EXPIRED 0x01 +#define RQ_ENVELOPE_REMOVED 0x02 +#define RQ_ENVELOPE_SUSPEND 0x04 uint8_t flags; time_t sched; @@ -75,7 +75,7 @@ struct rq_queue { struct evplist q_pending; struct evplist q_inflight; - struct rq_message *q_mtabatch; + struct evplist q_mta; struct evplist q_mda; struct evplist q_bounce; struct evplist q_expired; @@ -169,7 +169,6 @@ scheduler_ram_insert(struct scheduler_info *si) if ((message = tree_get(&update->messages, msgid)) == NULL) { message = xcalloc(1, sizeof *message, "scheduler_insert"); message->msgid = msgid; - TAILQ_INIT(&message->q_mta); tree_init(&message->envelopes); tree_xset(&update->messages, msgid, message); stat_increment("scheduler.ramqueue.message", 1); @@ -187,7 +186,7 @@ scheduler_ram_insert(struct scheduler_info *si) update->evpcount++; stat_increment("scheduler.ramqueue.envelope", 1); - envelope->flags = RQ_ENVELOPE_PENDING; + envelope->state = RQ_EVPSTATE_PENDING; sorted_insert(&update->q_pending, envelope); si->nexttry = envelope->sched; @@ -260,15 +259,26 @@ scheduler_ram_update(struct scheduler_info *si) evp = tree_xget(&msg->envelopes, si->evpid); /* it *must* be in-flight */ - if (!(evp->flags & RQ_ENVELOPE_INFLIGHT)) + if (evp->state != RQ_EVPSTATE_INFLIGHT) errx(1, "evp:%016" PRIx64 " not in-flight", si->evpid); + TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); + + /* + * If the envelope was removed while inflight, schedule it for + * removal immediatly. + */ + if (evp->flags & RQ_ENVELOPE_REMOVED) { + TAILQ_INSERT_TAIL(&ramqueue.q_removed, evp, entry); + evp->state = RQ_EVPSTATE_SCHEDULED; + evp->t_scheduled = currtime; + return (1); + } + while ((evp->sched = scheduler_compute_schedule(si)) <= currtime) si->retry += 1; - TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); - evp->flags &= ~RQ_ENVELOPE_INFLIGHT; - evp->flags |= RQ_ENVELOPE_PENDING; + evp->state = RQ_EVPSTATE_PENDING; if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) sorted_insert(&ramqueue.q_pending, evp); @@ -291,11 +301,11 @@ scheduler_ram_delete(uint64_t evpid) evp = tree_xget(&msg->envelopes, evpid); /* it *must* be in-flight */ - if (!(evp->flags & RQ_ENVELOPE_INFLIGHT)) + if (evp->state != RQ_EVPSTATE_INFLIGHT) errx(1, "evp:%016" PRIx64 " not in-flight", evpid); TAILQ_REMOVE(&ramqueue.q_inflight, evp, entry); - evp->flags &= ~RQ_ENVELOPE_INFLIGHT; + rq_envelope_delete(&ramqueue, evp); return (1); @@ -306,7 +316,6 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) { struct evplist *q; struct rq_envelope *evp; - struct rq_message *msg; size_t n; currtime = time(NULL); @@ -331,11 +340,8 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) q = &ramqueue.q_mda; ret->type = SCHED_MDA; } - else if (typemask & SCHED_MTA && ramqueue.q_mtabatch) { - msg = ramqueue.q_mtabatch; - ramqueue.q_mtabatch = msg->q_next; - msg->q_next = NULL; - q = &msg->q_mta; + else if (typemask & SCHED_MTA && TAILQ_FIRST(&ramqueue.q_mta)) { + q = &ramqueue.q_mta; ret->type = SCHED_MTA; } else if ((evp = TAILQ_FIRST(&ramqueue.q_pending))) { @@ -357,8 +363,8 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) TAILQ_REMOVE(q, evp, entry); - /* consistency check */ - if (!(evp->flags & RQ_ENVELOPE_SCHEDULED)) + /* consistency check */ + if (evp->state != RQ_EVPSTATE_SCHEDULED) errx(1, "evp:%016" PRIx64 " not scheduled", evp->evpid); ret->evpids[n] = evp->evpid; @@ -367,8 +373,7 @@ scheduler_ram_batch(int typemask, struct scheduler_batch *ret) rq_envelope_delete(&ramqueue, evp); else { TAILQ_INSERT_TAIL(&ramqueue.q_inflight, evp, entry); - evp->flags &= ~RQ_ENVELOPE_SCHEDULED; - evp->flags |= RQ_ENVELOPE_INFLIGHT; + evp->state = RQ_EVPSTATE_INFLIGHT; evp->t_inflight = currtime; } } @@ -414,22 +419,26 @@ scheduler_ram_envelopes(uint64_t from, struct evpstate *dst, size_t size) if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) continue; - dst[n].retry = 0; dst[n].evpid = evp->evpid; - if (evp->flags & RQ_ENVELOPE_PENDING) { + dst[n].flags = 0; + dst[n].retry = 0; + dst[n].time = 0; + + if (evp->state == RQ_EVPSTATE_PENDING) { dst[n].time = evp->sched; dst[n].flags = EF_PENDING; } - else if (evp->flags & RQ_ENVELOPE_SCHEDULED) { + else if (evp->state == RQ_EVPSTATE_SCHEDULED) { dst[n].time = evp->t_scheduled; dst[n].flags = EF_PENDING; } - else if (evp->flags & RQ_ENVELOPE_INFLIGHT) { + else if (evp->state == RQ_EVPSTATE_INFLIGHT) { dst[n].time = evp->t_inflight; dst[n].flags = EF_INFLIGHT; } if (evp->flags & RQ_ENVELOPE_SUSPEND) dst[n].flags |= EF_SUSPEND; + n++; } @@ -453,11 +462,10 @@ scheduler_ram_schedule(uint64_t evpid) return (0); if ((evp = tree_get(&msg->envelopes, evpid)) == NULL) return (0); - if (evp->flags & RQ_ENVELOPE_PENDING) { - rq_envelope_schedule(&ramqueue, evp); - return (1); - } - return (0); + if (evp->state == RQ_EVPSTATE_INFLIGHT) + return (0); + rq_envelope_schedule(&ramqueue, evp); + return (1); } else { msgid = evpid; @@ -465,11 +473,12 @@ scheduler_ram_schedule(uint64_t evpid) return (0); i = NULL; r = 0; - while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) - if (evp->flags & RQ_ENVELOPE_PENDING) { - rq_envelope_schedule(&ramqueue, evp); - r++; - } + while (tree_iter(&msg->envelopes, &i, NULL, (void*)(&evp))) { + if (evp->state == RQ_EVPSTATE_INFLIGHT) + continue; + rq_envelope_schedule(&ramqueue, evp); + r++; + } return (r); } } @@ -611,6 +620,7 @@ rq_queue_init(struct rq_queue *rq) tree_init(&rq->messages); TAILQ_INIT(&rq->q_pending); TAILQ_INIT(&rq->q_inflight); + TAILQ_INIT(&rq->q_mta); TAILQ_INIT(&rq->q_mda); TAILQ_INIT(&rq->q_bounce); TAILQ_INIT(&rq->q_expired); @@ -654,16 +664,15 @@ rq_queue_schedule(struct rq_queue *rq) if (evp->sched > currtime && evp->expire > currtime) break; - if (evp->flags != RQ_ENVELOPE_PENDING) + if (evp->state != RQ_EVPSTATE_PENDING) errx(1, "evp:%016" PRIx64 " flags=0x%x", evp->evpid, evp->flags); if (evp->expire <= currtime) { TAILQ_REMOVE(&rq->q_pending, evp, entry); TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); - evp->flags &= ~RQ_ENVELOPE_PENDING; + evp->state = RQ_EVPSTATE_SCHEDULED; evp->flags |= RQ_ENVELOPE_EXPIRED; - evp->flags |= RQ_ENVELOPE_SCHEDULED; evp->t_scheduled = currtime; continue; } @@ -674,25 +683,28 @@ rq_queue_schedule(struct rq_queue *rq) static struct evplist * rq_envelope_list(struct rq_queue *rq, struct rq_envelope *evp) { - if (evp->flags & RQ_ENVELOPE_SCHEDULED) { + switch (evp->state) { + case RQ_EVPSTATE_PENDING: + return &rq->q_pending; + + case RQ_EVPSTATE_SCHEDULED: if (evp->flags & RQ_ENVELOPE_EXPIRED) return &rq->q_expired; if (evp->flags & RQ_ENVELOPE_REMOVED) return &rq->q_removed; if (evp->type == D_MTA) - return &evp->message->q_mta; + return &rq->q_mta; if (evp->type == D_MDA) return &rq->q_mda; if (evp->type == D_BOUNCE) return &rq->q_bounce; - } - - if (evp->flags & RQ_ENVELOPE_PENDING) - return &rq->q_pending; + errx(1, "%016" PRIx64 " bad evp type %d", evp->evpid, evp->type); - if (evp->flags & RQ_ENVELOPE_INFLIGHT) + case RQ_EVPSTATE_INFLIGHT: return &rq->q_inflight; + } + errx(1, "%016" PRIx64 " bad state %d", evp->evpid, evp->state); return (NULL); } @@ -701,66 +713,48 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) { struct evplist *q = NULL; - if (evp->type == D_MTA) { - if (TAILQ_EMPTY(&evp->message->q_mta)) { - evp->message->q_next = rq->q_mtabatch; - rq->q_mtabatch = evp->message; - } - q = &evp->message->q_mta; - } - else if (evp->type == D_MDA) + switch (evp->type) { + case D_MTA: + q = &rq->q_mta; + break; + case D_MDA: q = &rq->q_mda; - else if (evp->type == D_BOUNCE) + break; + case D_BOUNCE: q = &rq->q_bounce; + break; + } + + if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) + TAILQ_REMOVE(&rq->q_pending, evp, entry); - TAILQ_REMOVE(&rq->q_pending, evp, entry); TAILQ_INSERT_TAIL(q, evp, entry); - evp->flags &= ~RQ_ENVELOPE_PENDING; - evp->flags |= RQ_ENVELOPE_SCHEDULED; + evp->state = RQ_EVPSTATE_SCHEDULED; evp->t_scheduled = currtime; } static int rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) { - struct rq_message *m; - struct evplist *q = NULL; - if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) return (0); /* - * For now we just ignore it, but we could mark the envelope for - * removal and possibly send a cancellation to the agent. + * If envelope is inflight, mark it envelope for removal. */ - if (evp->flags & (RQ_ENVELOPE_INFLIGHT)) - return (0); + if (evp->state == RQ_EVPSTATE_INFLIGHT) { + evp->flags |= RQ_ENVELOPE_REMOVED; + return (1); + } - q = rq_envelope_list(rq, evp); + if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { + TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry); + } - TAILQ_REMOVE(q, evp, entry); TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); - evp->flags &= ~RQ_ENVELOPE_PENDING; + evp->state = RQ_EVPSTATE_SCHEDULED; evp->flags |= RQ_ENVELOPE_REMOVED; - evp->flags |= RQ_ENVELOPE_SCHEDULED; evp->t_scheduled = currtime; - /* - * We might need to unschedule the message if it was the only - * scheduled envelope - */ - if (q == &evp->message->q_mta && TAILQ_EMPTY(q)) { - if (rq->q_mtabatch == evp->message) - rq->q_mtabatch = evp->message->q_next; - else { - for (m = rq->q_mtabatch; m->q_next; m = m->q_next) - if (m->q_next == evp->message) { - m->q_next = evp->message->q_next; - break; - } - } - evp->message->q_next = NULL; - } - return (1); } @@ -770,8 +764,9 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) if (evp->flags & RQ_ENVELOPE_SUSPEND) return (0); - if (!(evp->flags & RQ_ENVELOPE_INFLIGHT)) + if (evp->state != RQ_EVPSTATE_INFLIGHT) { TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry); + } evp->flags |= RQ_ENVELOPE_SUSPEND; @@ -784,10 +779,11 @@ rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) return (0); - if (!(evp->flags & RQ_ENVELOPE_INFLIGHT)) + if (evp->state != RQ_EVPSTATE_INFLIGHT) sorted_insert(rq_envelope_list(rq, evp), evp); evp->flags &= ~RQ_ENVELOPE_SUSPEND; + return (1); } @@ -825,21 +821,30 @@ rq_envelope_to_text(struct rq_envelope *e) duration_to_text(e->expire - currtime)); strlcat(buf, t, sizeof buf); - if (e->flags & RQ_ENVELOPE_PENDING) { + + switch (e->state) { + case RQ_EVPSTATE_PENDING: snprintf(t, sizeof t, ",pending=%s", duration_to_text(e->sched - currtime)); strlcat(buf, t, sizeof buf); - } - if (e->flags & RQ_ENVELOPE_SCHEDULED) { + break; + + case RQ_EVPSTATE_SCHEDULED: snprintf(t, sizeof t, ",scheduled=%s", duration_to_text(currtime - e->t_scheduled)); strlcat(buf, t, sizeof buf); - } - if (e->flags & RQ_ENVELOPE_INFLIGHT) { + break; + + case RQ_EVPSTATE_INFLIGHT: snprintf(t, sizeof t, ",inflight=%s", duration_to_text(currtime - e->t_inflight)); strlcat(buf, t, sizeof buf); + break; + + default: + errx(1, "%016" PRIx64 " bad state %d", e->evpid, e->state); } + if (e->flags & RQ_ENVELOPE_REMOVED) strlcat(buf, ",removed", sizeof buf); if (e->flags & RQ_ENVELOPE_EXPIRED) |