From d492bb3c6b94fba0edc78ef602fefeeb54c5a4b8 Mon Sep 17 00:00:00 2001 From: Eric Faurot Date: Tue, 3 Dec 2013 10:38:41 +0000 Subject: schedule in O(log n) --- usr.sbin/smtpd/scheduler_ramqueue.c | 105 +++++++++++++++++++++++------------- 1 file changed, 68 insertions(+), 37 deletions(-) diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index adc392bc098..2578606810e 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.33 2013/11/20 09:22:42 eric Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.34 2013/12/03 10:38:40 eric Exp $ */ /* * Copyright (c) 2012 Gilles Chehade @@ -45,6 +45,7 @@ struct rq_message { struct rq_envelope { TAILQ_ENTRY(rq_envelope) entry; + SPLAY_ENTRY(rq_envelope) t_entry; uint64_t evpid; uint64_t holdq; @@ -77,6 +78,7 @@ struct rq_holdq { struct rq_queue { size_t evpcount; struct tree messages; + SPLAY_HEAD(prioqtree, rq_envelope) q_priotree; struct evplist q_pending; struct evplist q_inflight; @@ -88,6 +90,9 @@ struct rq_queue { struct evplist q_removed; }; +static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *); + +SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); static int scheduler_ram_init(void); static int scheduler_ram_insert(struct scheduler_info *); static size_t scheduler_ram_commit(uint32_t); @@ -104,8 +109,7 @@ static int scheduler_ram_remove(uint64_t); static int scheduler_ram_suspend(uint64_t); static int scheduler_ram_resume(uint64_t); -static void sorted_insert(struct evplist *, struct rq_envelope *); -static void sorted_merge(struct evplist *, struct evplist *); +static void sorted_insert(struct rq_queue *, struct rq_envelope *); static void rq_queue_init(struct rq_queue *); static void rq_queue_merge(struct rq_queue *, struct rq_queue *); @@ -201,7 +205,7 @@ scheduler_ram_insert(struct scheduler_info *si) stat_increment("scheduler.ramqueue.envelope", 1); envelope->state = RQ_EVPSTATE_PENDING; - sorted_insert(&update->q_pending, envelope); + TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry); si->nexttry = envelope->sched; @@ -294,7 +298,7 @@ scheduler_ram_update(struct scheduler_info *si) evp->state = RQ_EVPSTATE_PENDING; if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) - sorted_insert(&ramqueue.q_pending, evp); + sorted_insert(&ramqueue, evp); si->nexttry = evp->sched; @@ -398,7 +402,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n) * we could just schedule them directly. */ evp->state = RQ_EVPSTATE_PENDING; - sorted_insert(&ramqueue.q_pending, evp); + sorted_insert(&ramqueue, evp); } if (TAILQ_EMPTY(&hq->q)) { @@ -691,31 +695,16 @@ scheduler_ram_resume(uint64_t evpid) } static void -sorted_insert(struct evplist *list, struct rq_envelope *evp) -{ - struct rq_envelope *item; - time_t ref; - - TAILQ_FOREACH(item, list, entry) { - ref = (evp->sched < evp->expire) ? evp->sched : evp->expire; - if (ref <= item->expire && ref <= item->sched) { - TAILQ_INSERT_BEFORE(item, evp, entry); - return; - } - } - TAILQ_INSERT_TAIL(list, evp, entry); -} - -static void -sorted_merge(struct evplist *list, struct evplist *from) +sorted_insert(struct rq_queue *rq, struct rq_envelope *evp) { - struct rq_envelope *e; - - /* XXX this is O(not good enough) */ - while ((e = TAILQ_LAST(from, evplist))) { - TAILQ_REMOVE(from, e, entry); - sorted_insert(list, e); - } + struct rq_envelope *evp2; + + SPLAY_INSERT(prioqtree, &rq->q_priotree, evp); + evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp); + if (evp2) + TAILQ_INSERT_BEFORE(evp2, evp, entry); + else + TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry); } static void @@ -730,6 +719,7 @@ rq_queue_init(struct rq_queue *rq) TAILQ_INIT(&rq->q_bounce); TAILQ_INIT(&rq->q_expired); TAILQ_INIT(&rq->q_removed); + SPLAY_INIT(&rq->q_priotree); } static void @@ -756,7 +746,12 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update) stat_decrement("scheduler.ramqueue.message", 1); } - sorted_merge(&rq->q_pending, &update->q_pending); + /* Sorted insert in the pending queue */ + while ((envelope = TAILQ_FIRST(&update->q_pending))) { + TAILQ_REMOVE(&update->q_pending, envelope, entry); + sorted_insert(rq, envelope); + } + rq->evpcount += update->evpcount; } @@ -775,6 +770,7 @@ rq_queue_schedule(struct rq_queue *rq) if (evp->expire <= currtime) { TAILQ_REMOVE(&rq->q_pending, evp, entry); + SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry); evp->state = RQ_EVPSTATE_SCHEDULED; evp->flags |= RQ_ENVELOPE_EXPIRED; @@ -844,8 +840,10 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) evp->holdq = 0; stat_decrement("scheduler.ramqueue.hold", 1); } - else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) + else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { TAILQ_REMOVE(&rq->q_pending, evp, entry); + SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); + } TAILQ_INSERT_TAIL(q, evp, entry); evp->state = RQ_EVPSTATE_SCHEDULED; @@ -855,7 +853,8 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp) static int rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) { - struct rq_holdq *hq; + struct rq_holdq *hq; + struct evplist *evl; if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED)) return (0); @@ -878,7 +877,10 @@ rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp) stat_decrement("scheduler.ramqueue.hold", 1); } else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) { - TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry); + evl = rq_envelope_list(rq, evp); + TAILQ_REMOVE(evl, evp, entry); + if (evl == &rq->q_pending) + SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); } TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry); @@ -893,6 +895,7 @@ static int rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) { struct rq_holdq *hq; + struct evplist *evl; if (evp->flags & RQ_ENVELOPE_SUSPEND) return (0); @@ -909,7 +912,10 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) stat_decrement("scheduler.ramqueue.hold", 1); } else if (evp->state != RQ_EVPSTATE_INFLIGHT) { - TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry); + evl = rq_envelope_list(rq, evp); + TAILQ_REMOVE(evl, evp, entry); + if (evl == &rq->q_pending) + SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp); } evp->flags |= RQ_ENVELOPE_SUSPEND; @@ -920,11 +926,18 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp) static int rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp) { + struct evplist *evl; + if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) return (0); - if (evp->state != RQ_EVPSTATE_INFLIGHT) - sorted_insert(rq_envelope_list(rq, evp), evp); + if (evp->state != RQ_EVPSTATE_INFLIGHT) { + evl = rq_envelope_list(rq, evp); + if (evl == &rq->q_pending) + sorted_insert(rq, evp); + else + TAILQ_INSERT_TAIL(evl, evp, entry); + } evp->flags &= ~RQ_ENVELOPE_SUSPEND; @@ -1027,3 +1040,21 @@ rq_queue_dump(struct rq_queue *rq, const char * name) } log_debug("debug: \\---"); } + +static int +rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2) +{ + time_t ref1, ref2; + + ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire; + ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire; + if (ref1 != ref2) + return (ref1 < ref2) ? -1 : 1; + + if (e1->evpid != e2->evpid) + return (e1->evpid < e2->evpid) ? -1 : 1; + + return 0; +} + +SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp); -- cgit v1.2.3