diff options
Diffstat (limited to 'usr.sbin/smtpd/scheduler_ramqueue.c')
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 185 |
1 files changed, 125 insertions, 60 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index c622ae4c576..7c57169b4e2 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.4 2012/01/31 21:05:26 gilles Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.5 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org> @@ -39,16 +39,16 @@ struct ramqueue_host { RB_ENTRY(ramqueue_host) hosttree_entry; TAILQ_HEAD(,ramqueue_batch) batch_queue; - u_int64_t h_id; char hostname[MAXHOSTNAMELEN]; }; struct ramqueue_batch { enum delivery_type type; TAILQ_ENTRY(ramqueue_batch) batch_entry; TAILQ_HEAD(,ramqueue_envelope) envelope_queue; - u_int64_t h_id; + struct ramqueue_host *rq_host; u_int64_t b_id; u_int32_t msgid; + u_int32_t evpcnt; }; struct ramqueue_envelope { TAILQ_ENTRY(ramqueue_envelope) queue_entry; @@ -64,16 +64,19 @@ struct ramqueue_message { RB_ENTRY(ramqueue_message) msgtree_entry; RB_HEAD(evptree, ramqueue_envelope) evptree; u_int32_t msgid; + u_int32_t evpcnt; }; struct ramqueue { RB_HEAD(hosttree, ramqueue_host) hosttree; RB_HEAD(msgtree, ramqueue_message) msgtree; + RB_HEAD(offloadtree, ramqueue_envelope) offloadtree; TAILQ_HEAD(,ramqueue_envelope) queue; }; -RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); -RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); +RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); +RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); +RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); +RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); enum ramqueue_iter_type { RAMQUEUE_ITER_HOST, @@ -108,6 +111,7 @@ static struct ramqueue_message *ramqueue_insert_message(u_int32_t); static void ramqueue_remove_message(struct ramqueue_message *); static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t); +static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t); /*NEEDSFIX*/ @@ -118,14 +122,15 @@ static void scheduler_ramqueue_init(void); static int scheduler_ramqueue_setup(time_t, time_t); static int scheduler_ramqueue_next(u_int64_t *, time_t *); static void scheduler_ramqueue_insert(struct envelope *); -static void scheduler_ramqueue_remove(void *, u_int64_t); +static void scheduler_ramqueue_schedule(u_int64_t); +static void scheduler_ramqueue_remove(u_int64_t); static void *scheduler_ramqueue_host(char *); static void *scheduler_ramqueue_message(u_int32_t); static void *scheduler_ramqueue_batch(u_int64_t); static void *scheduler_ramqueue_queue(void); static void scheduler_ramqueue_close(void *); static int scheduler_ramqueue_fetch(void *, u_int64_t *); -static int scheduler_ramqueue_schedule(u_int64_t); +static int scheduler_ramqueue_force(u_int64_t); static void scheduler_ramqueue_display(void); struct scheduler_backend scheduler_backend_ramqueue = { @@ -133,6 +138,7 @@ struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_setup, scheduler_ramqueue_next, scheduler_ramqueue_insert, + scheduler_ramqueue_schedule, scheduler_ramqueue_remove, scheduler_ramqueue_host, scheduler_ramqueue_message, @@ -140,7 +146,7 @@ struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_queue, scheduler_ramqueue_close, scheduler_ramqueue_fetch, - scheduler_ramqueue_schedule, + scheduler_ramqueue_force, scheduler_ramqueue_display }; static struct ramqueue ramqueue; @@ -184,6 +190,18 @@ scheduler_ramqueue_display_msgtree(void) } static void +scheduler_ramqueue_display_offloadtree(void) +{ + struct ramqueue_envelope *rq_evp; + + log_debug("\tscheduler_ramqueue: offloadtree display"); + RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) { + log_debug("\t\t\tevp: [%p] %016"PRIx64, + rq_evp, rq_evp->evpid); + } +} + +static void scheduler_ramqueue_display_queue(void) { struct ramqueue_envelope *rq_evp; @@ -201,6 +219,7 @@ scheduler_ramqueue_display(void) log_debug("scheduler_ramqueue: display"); scheduler_ramqueue_display_hosttree(); scheduler_ramqueue_display_msgtree(); + scheduler_ramqueue_display_offloadtree(); scheduler_ramqueue_display_queue(); } @@ -212,6 +231,7 @@ scheduler_ramqueue_init(void) TAILQ_INIT(&ramqueue.queue); RB_INIT(&ramqueue.hosttree); RB_INIT(&ramqueue.msgtree); + RB_INIT(&ramqueue.offloadtree); } static int @@ -229,6 +249,11 @@ scheduler_ramqueue_setup(time_t curtm, time_t nsched) q = qwalk_new(Q_QUEUE, 0); while (qwalk(q, &evpid)) { + /* the envelope is already in ramqueue, skip */ + if (ramqueue_lookup_envelope(evpid) || + ramqueue_lookup_offload(evpid)) + continue; + if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) { log_debug("scheduler_ramqueue: evp -> /corrupt"); queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid)); @@ -291,31 +316,42 @@ scheduler_ramqueue_insert(struct envelope *envelope) time_t curtm = time(NULL); log_debug("scheduler_ramqueue: insert"); - msgid = evpid_to_msgid(envelope->id); - rq_msg = ramqueue_lookup_message(msgid); - if (rq_msg == NULL) - rq_msg = ramqueue_insert_message(msgid); + rq_evp = ramqueue_lookup_offload(envelope->id); + if (rq_evp) { + rq_msg = rq_evp->rq_msg; + rq_batch = rq_evp->rq_batch; + rq_host = rq_evp->rq_host; + RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); + } + else { + msgid = evpid_to_msgid(envelope->id); + rq_msg = ramqueue_lookup_message(msgid); + if (rq_msg == NULL) + rq_msg = ramqueue_insert_message(msgid); - rq_host = ramqueue_lookup_host(envelope->dest.domain); - if (rq_host == NULL) - rq_host = ramqueue_insert_host(envelope->dest.domain); + rq_host = ramqueue_lookup_host(envelope->dest.domain); + if (rq_host == NULL) + rq_host = ramqueue_insert_host(envelope->dest.domain); - rq_batch = ramqueue_lookup_batch(rq_host, msgid); - if (rq_batch == NULL) - rq_batch = ramqueue_insert_batch(rq_host, msgid); + rq_batch = ramqueue_lookup_batch(rq_host, msgid); + if (rq_batch == NULL) + rq_batch = ramqueue_insert_batch(rq_host, msgid); - rq_evp = calloc(1, sizeof (*rq_evp)); - if (rq_evp == NULL) - fatal("calloc"); - rq_evp->evpid = envelope->id; + rq_evp = calloc(1, sizeof (*rq_evp)); + if (rq_evp == NULL) + fatal("calloc"); + rq_evp->evpid = envelope->id; + rq_batch->evpcnt++; + rq_msg->evpcnt++; + } + rq_evp->sched = ramqueue_next_schedule(envelope, curtm); rq_evp->rq_host = rq_host; rq_evp->rq_batch = rq_batch; rq_evp->rq_msg = rq_msg; - RB_INSERT(evptree, &rq_msg->evptree, rq_evp); TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp, - batchqueue_entry); + batchqueue_entry); /* sorted insert */ TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) { @@ -331,51 +367,77 @@ scheduler_ramqueue_insert(struct envelope *envelope) } static void -scheduler_ramqueue_remove(void *hdl, u_int64_t evpid) +scheduler_ramqueue_schedule(u_int64_t evpid) { - struct ramqueue_iter *iter = hdl; - struct ramqueue_batch *rq_batch; - struct ramqueue_message *rq_msg; struct ramqueue_envelope *rq_evp; - struct ramqueue_host *rq_host; + struct ramqueue_message *rq_msg; + struct ramqueue_batch *rq_batch; + struct ramqueue_host *rq_host; + + log_debug("scheduler_ramqueue: schedule"); - log_debug("scheduler_ramqueue: remove"); rq_evp = ramqueue_lookup_envelope(evpid); rq_msg = rq_evp->rq_msg; rq_batch = rq_evp->rq_batch; rq_host = rq_evp->rq_host; + /* remove from msg tree, batch queue and linear queue */ RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + + /* insert into offload tree*/ + RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp); + + /* that's one less envelope to process in the ramqueue */ stat_decrement(STATS_RAMQUEUE_ENVELOPE); +} +static void +scheduler_ramqueue_remove(u_int64_t evpid) +{ + struct ramqueue_batch *rq_batch; + struct ramqueue_message *rq_msg; + struct ramqueue_envelope *rq_evp; + struct ramqueue_host *rq_host; - /* check if we are the last of a batch */ - if (TAILQ_FIRST(&rq_batch->envelope_queue) == NULL) { - ramqueue_remove_batch(rq_host, rq_batch); - if (iter != NULL && iter->type == RAMQUEUE_ITER_BATCH) { - log_debug("scheduler_ramqueue_remove: batch removed"); - iter->u.batch = NULL; - } + log_debug("scheduler_ramqueue: remove"); + + rq_evp = ramqueue_lookup_offload(evpid); + if (rq_evp) { + RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp); + rq_msg = rq_evp->rq_msg; + rq_batch = rq_evp->rq_batch; + rq_host = rq_evp->rq_host; + } + else { + rq_evp = ramqueue_lookup_envelope(evpid); + rq_msg = rq_evp->rq_msg; + rq_batch = rq_evp->rq_batch; + rq_host = rq_evp->rq_host; + + RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); + TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); + TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + stat_decrement(STATS_RAMQUEUE_ENVELOPE); } + rq_batch->evpcnt--; + rq_msg->evpcnt--; + /* check if we are the last of a message */ - if (RB_ROOT(&rq_msg->evptree) == NULL) { + if (rq_msg->evpcnt == 0) { ramqueue_remove_message(rq_msg); - if (iter != NULL && iter->type == RAMQUEUE_ITER_MESSAGE) { - log_debug("scheduler_ramqueue_remove: message removed"); - iter->u.message = NULL; - } + } + + /* check if we are the last of a batch */ + if (rq_batch->evpcnt == 0) { + ramqueue_remove_batch(rq_host, rq_batch); } /* check if we are the last of a host */ if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) { ramqueue_remove_host(rq_host); - if (iter != NULL && iter->type == RAMQUEUE_ITER_HOST) { - log_debug("scheduler_ramqueue_remove: host removed"); - iter->u.host = NULL; - } } free(rq_evp); @@ -470,9 +532,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) struct ramqueue_batch *rq_batch; switch (iter->type) { - case RAMQUEUE_ITER_HOST: { - if (iter->u.host == NULL) - return 0; + case RAMQUEUE_ITER_HOST: rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue); if (rq_batch == NULL) break; @@ -481,11 +541,8 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) break; *evpid = rq_evp->evpid; return 1; - } case RAMQUEUE_ITER_BATCH: - if (iter->u.batch == NULL) - return 0; rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue); if (rq_evp == NULL) break; @@ -493,8 +550,6 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) return 1; case RAMQUEUE_ITER_MESSAGE: - if (iter->u.message == NULL) - return 0; rq_evp = RB_ROOT(&iter->u.message->evptree); if (rq_evp == NULL) break; @@ -513,7 +568,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) } static int -scheduler_ramqueue_schedule(u_int64_t id) +scheduler_ramqueue_force(u_int64_t id) { struct ramqueue_envelope *rq_evp; struct ramqueue_message *rq_msg; @@ -575,6 +630,15 @@ ramqueue_lookup_message(u_int32_t msgid) } static struct ramqueue_envelope * +ramqueue_lookup_offload(u_int64_t evpid) +{ + struct ramqueue_envelope evpkey; + + evpkey.evpid = evpid; + return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey); +} + +static struct ramqueue_envelope * ramqueue_lookup_envelope(u_int64_t evpid) { struct ramqueue_message *rq_msg; @@ -679,7 +743,6 @@ ramqueue_insert_host(char *host) rq_host = calloc(1, sizeof (*rq_host)); if (rq_host == NULL) fatal("calloc"); - rq_host->h_id = generate_uid(); strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname)); TAILQ_INIT(&rq_host->batch_queue); RB_INSERT(hosttree, &ramqueue.hosttree, rq_host); @@ -697,6 +760,7 @@ ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid) if (rq_batch == NULL) fatal("calloc"); rq_batch->b_id = generate_uid(); + rq_batch->rq_host = rq_host; rq_batch->msgid = msgid; TAILQ_INIT(&rq_batch->envelope_queue); @@ -752,6 +816,7 @@ ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2) return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid); } -RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); -RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); +RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); +RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); +RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); +RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); |