summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Faurot <eric@cvs.openbsd.org>2013-12-03 10:38:41 +0000
committerEric Faurot <eric@cvs.openbsd.org>2013-12-03 10:38:41 +0000
commitd492bb3c6b94fba0edc78ef602fefeeb54c5a4b8 (patch)
tree92642eda3068504830af90adf7c60f36682fb945
parentd12ba81fc3076d387aa92befafc8fad2515b1cfb (diff)
schedule in O(log n)
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c105
1 files changed, 68 insertions, 37 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index adc392bc098..2578606810e 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.33 2013/11/20 09:22:42 eric Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.34 2013/12/03 10:38:40 eric Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@poolp.org>
@@ -45,6 +45,7 @@ struct rq_message {
struct rq_envelope {
TAILQ_ENTRY(rq_envelope) entry;
+ SPLAY_ENTRY(rq_envelope) t_entry;
uint64_t evpid;
uint64_t holdq;
@@ -77,6 +78,7 @@ struct rq_holdq {
struct rq_queue {
size_t evpcount;
struct tree messages;
+ SPLAY_HEAD(prioqtree, rq_envelope) q_priotree;
struct evplist q_pending;
struct evplist q_inflight;
@@ -88,6 +90,9 @@ struct rq_queue {
struct evplist q_removed;
};
+static int rq_envelope_cmp(struct rq_envelope *, struct rq_envelope *);
+
+SPLAY_PROTOTYPE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);
static int scheduler_ram_init(void);
static int scheduler_ram_insert(struct scheduler_info *);
static size_t scheduler_ram_commit(uint32_t);
@@ -104,8 +109,7 @@ static int scheduler_ram_remove(uint64_t);
static int scheduler_ram_suspend(uint64_t);
static int scheduler_ram_resume(uint64_t);
-static void sorted_insert(struct evplist *, struct rq_envelope *);
-static void sorted_merge(struct evplist *, struct evplist *);
+static void sorted_insert(struct rq_queue *, struct rq_envelope *);
static void rq_queue_init(struct rq_queue *);
static void rq_queue_merge(struct rq_queue *, struct rq_queue *);
@@ -201,7 +205,7 @@ scheduler_ram_insert(struct scheduler_info *si)
stat_increment("scheduler.ramqueue.envelope", 1);
envelope->state = RQ_EVPSTATE_PENDING;
- sorted_insert(&update->q_pending, envelope);
+ TAILQ_INSERT_TAIL(&update->q_pending, envelope, entry);
si->nexttry = envelope->sched;
@@ -294,7 +298,7 @@ scheduler_ram_update(struct scheduler_info *si)
evp->state = RQ_EVPSTATE_PENDING;
if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
- sorted_insert(&ramqueue.q_pending, evp);
+ sorted_insert(&ramqueue, evp);
si->nexttry = evp->sched;
@@ -398,7 +402,7 @@ scheduler_ram_release(int type, uint64_t holdq, int n)
* we could just schedule them directly.
*/
evp->state = RQ_EVPSTATE_PENDING;
- sorted_insert(&ramqueue.q_pending, evp);
+ sorted_insert(&ramqueue, evp);
}
if (TAILQ_EMPTY(&hq->q)) {
@@ -691,31 +695,16 @@ scheduler_ram_resume(uint64_t evpid)
}
static void
-sorted_insert(struct evplist *list, struct rq_envelope *evp)
-{
- struct rq_envelope *item;
- time_t ref;
-
- TAILQ_FOREACH(item, list, entry) {
- ref = (evp->sched < evp->expire) ? evp->sched : evp->expire;
- if (ref <= item->expire && ref <= item->sched) {
- TAILQ_INSERT_BEFORE(item, evp, entry);
- return;
- }
- }
- TAILQ_INSERT_TAIL(list, evp, entry);
-}
-
-static void
-sorted_merge(struct evplist *list, struct evplist *from)
+sorted_insert(struct rq_queue *rq, struct rq_envelope *evp)
{
- struct rq_envelope *e;
-
- /* XXX this is O(not good enough) */
- while ((e = TAILQ_LAST(from, evplist))) {
- TAILQ_REMOVE(from, e, entry);
- sorted_insert(list, e);
- }
+ struct rq_envelope *evp2;
+
+ SPLAY_INSERT(prioqtree, &rq->q_priotree, evp);
+ evp2 = SPLAY_NEXT(prioqtree, &rq->q_priotree, evp);
+ if (evp2)
+ TAILQ_INSERT_BEFORE(evp2, evp, entry);
+ else
+ TAILQ_INSERT_TAIL(&rq->q_pending, evp, entry);
}
static void
@@ -730,6 +719,7 @@ rq_queue_init(struct rq_queue *rq)
TAILQ_INIT(&rq->q_bounce);
TAILQ_INIT(&rq->q_expired);
TAILQ_INIT(&rq->q_removed);
+ SPLAY_INIT(&rq->q_priotree);
}
static void
@@ -756,7 +746,12 @@ rq_queue_merge(struct rq_queue *rq, struct rq_queue *update)
stat_decrement("scheduler.ramqueue.message", 1);
}
- sorted_merge(&rq->q_pending, &update->q_pending);
+ /* Sorted insert in the pending queue */
+ while ((envelope = TAILQ_FIRST(&update->q_pending))) {
+ TAILQ_REMOVE(&update->q_pending, envelope, entry);
+ sorted_insert(rq, envelope);
+ }
+
rq->evpcount += update->evpcount;
}
@@ -775,6 +770,7 @@ rq_queue_schedule(struct rq_queue *rq)
if (evp->expire <= currtime) {
TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
TAILQ_INSERT_TAIL(&rq->q_expired, evp, entry);
evp->state = RQ_EVPSTATE_SCHEDULED;
evp->flags |= RQ_ENVELOPE_EXPIRED;
@@ -844,8 +840,10 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
evp->holdq = 0;
stat_decrement("scheduler.ramqueue.hold", 1);
}
- else if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
+ else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
TAILQ_REMOVE(&rq->q_pending, evp, entry);
+ SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
+ }
TAILQ_INSERT_TAIL(q, evp, entry);
evp->state = RQ_EVPSTATE_SCHEDULED;
@@ -855,7 +853,8 @@ rq_envelope_schedule(struct rq_queue *rq, struct rq_envelope *evp)
static int
rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
{
- struct rq_holdq *hq;
+ struct rq_holdq *hq;
+ struct evplist *evl;
if (evp->flags & (RQ_ENVELOPE_REMOVED | RQ_ENVELOPE_EXPIRED))
return (0);
@@ -878,7 +877,10 @@ rq_envelope_remove(struct rq_queue *rq, struct rq_envelope *evp)
stat_decrement("scheduler.ramqueue.hold", 1);
}
else if (!(evp->flags & RQ_ENVELOPE_SUSPEND)) {
- TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry);
+ evl = rq_envelope_list(rq, evp);
+ TAILQ_REMOVE(evl, evp, entry);
+ if (evl == &rq->q_pending)
+ SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
}
TAILQ_INSERT_TAIL(&rq->q_removed, evp, entry);
@@ -893,6 +895,7 @@ static int
rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
{
struct rq_holdq *hq;
+ struct evplist *evl;
if (evp->flags & RQ_ENVELOPE_SUSPEND)
return (0);
@@ -909,7 +912,10 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
stat_decrement("scheduler.ramqueue.hold", 1);
}
else if (evp->state != RQ_EVPSTATE_INFLIGHT) {
- TAILQ_REMOVE(rq_envelope_list(rq, evp), evp, entry);
+ evl = rq_envelope_list(rq, evp);
+ TAILQ_REMOVE(evl, evp, entry);
+ if (evl == &rq->q_pending)
+ SPLAY_REMOVE(prioqtree, &rq->q_priotree, evp);
}
evp->flags |= RQ_ENVELOPE_SUSPEND;
@@ -920,11 +926,18 @@ rq_envelope_suspend(struct rq_queue *rq, struct rq_envelope *evp)
static int
rq_envelope_resume(struct rq_queue *rq, struct rq_envelope *evp)
{
+ struct evplist *evl;
+
if (!(evp->flags & RQ_ENVELOPE_SUSPEND))
return (0);
- if (evp->state != RQ_EVPSTATE_INFLIGHT)
- sorted_insert(rq_envelope_list(rq, evp), evp);
+ if (evp->state != RQ_EVPSTATE_INFLIGHT) {
+ evl = rq_envelope_list(rq, evp);
+ if (evl == &rq->q_pending)
+ sorted_insert(rq, evp);
+ else
+ TAILQ_INSERT_TAIL(evl, evp, entry);
+ }
evp->flags &= ~RQ_ENVELOPE_SUSPEND;
@@ -1027,3 +1040,21 @@ rq_queue_dump(struct rq_queue *rq, const char * name)
}
log_debug("debug: \\---");
}
+
+static int
+rq_envelope_cmp(struct rq_envelope *e1, struct rq_envelope *e2)
+{
+ time_t ref1, ref2;
+
+ ref1 = (e1->sched < e1->expire) ? e1->sched : e1->expire;
+ ref2 = (e2->sched < e2->expire) ? e2->sched : e2->expire;
+ if (ref1 != ref2)
+ return (ref1 < ref2) ? -1 : 1;
+
+ if (e1->evpid != e2->evpid)
+ return (e1->evpid < e2->evpid) ? -1 : 1;
+
+ return 0;
+}
+
+SPLAY_GENERATE(prioqtree, rq_envelope, t_entry, rq_envelope_cmp);