summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd/scheduler_ramqueue.c
diff options
context:
space:
mode:
Diffstat (limited to 'usr.sbin/smtpd/scheduler_ramqueue.c')
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c185
1 files changed, 125 insertions, 60 deletions
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index c622ae4c576..7c57169b4e2 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.4 2012/01/31 21:05:26 gilles Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.5 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -39,16 +39,16 @@
struct ramqueue_host {
RB_ENTRY(ramqueue_host) hosttree_entry;
TAILQ_HEAD(,ramqueue_batch) batch_queue;
- u_int64_t h_id;
char hostname[MAXHOSTNAMELEN];
};
struct ramqueue_batch {
enum delivery_type type;
TAILQ_ENTRY(ramqueue_batch) batch_entry;
TAILQ_HEAD(,ramqueue_envelope) envelope_queue;
- u_int64_t h_id;
+ struct ramqueue_host *rq_host;
u_int64_t b_id;
u_int32_t msgid;
+ u_int32_t evpcnt;
};
struct ramqueue_envelope {
TAILQ_ENTRY(ramqueue_envelope) queue_entry;
@@ -64,16 +64,19 @@ struct ramqueue_message {
RB_ENTRY(ramqueue_message) msgtree_entry;
RB_HEAD(evptree, ramqueue_envelope) evptree;
u_int32_t msgid;
+ u_int32_t evpcnt;
};
struct ramqueue {
RB_HEAD(hosttree, ramqueue_host) hosttree;
RB_HEAD(msgtree, ramqueue_message) msgtree;
+ RB_HEAD(offloadtree, ramqueue_envelope) offloadtree;
TAILQ_HEAD(,ramqueue_envelope) queue;
};
-RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
-RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
+RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
+RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
+RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
+RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
enum ramqueue_iter_type {
RAMQUEUE_ITER_HOST,
@@ -108,6 +111,7 @@ static struct ramqueue_message *ramqueue_insert_message(u_int32_t);
static void ramqueue_remove_message(struct ramqueue_message *);
static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t);
+static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t);
/*NEEDSFIX*/
@@ -118,14 +122,15 @@ static void scheduler_ramqueue_init(void);
static int scheduler_ramqueue_setup(time_t, time_t);
static int scheduler_ramqueue_next(u_int64_t *, time_t *);
static void scheduler_ramqueue_insert(struct envelope *);
-static void scheduler_ramqueue_remove(void *, u_int64_t);
+static void scheduler_ramqueue_schedule(u_int64_t);
+static void scheduler_ramqueue_remove(u_int64_t);
static void *scheduler_ramqueue_host(char *);
static void *scheduler_ramqueue_message(u_int32_t);
static void *scheduler_ramqueue_batch(u_int64_t);
static void *scheduler_ramqueue_queue(void);
static void scheduler_ramqueue_close(void *);
static int scheduler_ramqueue_fetch(void *, u_int64_t *);
-static int scheduler_ramqueue_schedule(u_int64_t);
+static int scheduler_ramqueue_force(u_int64_t);
static void scheduler_ramqueue_display(void);
struct scheduler_backend scheduler_backend_ramqueue = {
@@ -133,6 +138,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_setup,
scheduler_ramqueue_next,
scheduler_ramqueue_insert,
+ scheduler_ramqueue_schedule,
scheduler_ramqueue_remove,
scheduler_ramqueue_host,
scheduler_ramqueue_message,
@@ -140,7 +146,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_queue,
scheduler_ramqueue_close,
scheduler_ramqueue_fetch,
- scheduler_ramqueue_schedule,
+ scheduler_ramqueue_force,
scheduler_ramqueue_display
};
static struct ramqueue ramqueue;
@@ -184,6 +190,18 @@ scheduler_ramqueue_display_msgtree(void)
}
static void
+scheduler_ramqueue_display_offloadtree(void)
+{
+ struct ramqueue_envelope *rq_evp;
+
+ log_debug("\tscheduler_ramqueue: offloadtree display");
+ RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) {
+ log_debug("\t\t\tevp: [%p] %016"PRIx64,
+ rq_evp, rq_evp->evpid);
+ }
+}
+
+static void
scheduler_ramqueue_display_queue(void)
{
struct ramqueue_envelope *rq_evp;
@@ -201,6 +219,7 @@ scheduler_ramqueue_display(void)
log_debug("scheduler_ramqueue: display");
scheduler_ramqueue_display_hosttree();
scheduler_ramqueue_display_msgtree();
+ scheduler_ramqueue_display_offloadtree();
scheduler_ramqueue_display_queue();
}
@@ -212,6 +231,7 @@ scheduler_ramqueue_init(void)
TAILQ_INIT(&ramqueue.queue);
RB_INIT(&ramqueue.hosttree);
RB_INIT(&ramqueue.msgtree);
+ RB_INIT(&ramqueue.offloadtree);
}
static int
@@ -229,6 +249,11 @@ scheduler_ramqueue_setup(time_t curtm, time_t nsched)
q = qwalk_new(Q_QUEUE, 0);
while (qwalk(q, &evpid)) {
+ /* the envelope is already in ramqueue, skip */
+ if (ramqueue_lookup_envelope(evpid) ||
+ ramqueue_lookup_offload(evpid))
+ continue;
+
if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) {
log_debug("scheduler_ramqueue: evp -> /corrupt");
queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid));
@@ -291,31 +316,42 @@ scheduler_ramqueue_insert(struct envelope *envelope)
time_t curtm = time(NULL);
log_debug("scheduler_ramqueue: insert");
- msgid = evpid_to_msgid(envelope->id);
- rq_msg = ramqueue_lookup_message(msgid);
- if (rq_msg == NULL)
- rq_msg = ramqueue_insert_message(msgid);
+ rq_evp = ramqueue_lookup_offload(envelope->id);
+ if (rq_evp) {
+ rq_msg = rq_evp->rq_msg;
+ rq_batch = rq_evp->rq_batch;
+ rq_host = rq_evp->rq_host;
+ RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
+ }
+ else {
+ msgid = evpid_to_msgid(envelope->id);
+ rq_msg = ramqueue_lookup_message(msgid);
+ if (rq_msg == NULL)
+ rq_msg = ramqueue_insert_message(msgid);
- rq_host = ramqueue_lookup_host(envelope->dest.domain);
- if (rq_host == NULL)
- rq_host = ramqueue_insert_host(envelope->dest.domain);
+ rq_host = ramqueue_lookup_host(envelope->dest.domain);
+ if (rq_host == NULL)
+ rq_host = ramqueue_insert_host(envelope->dest.domain);
- rq_batch = ramqueue_lookup_batch(rq_host, msgid);
- if (rq_batch == NULL)
- rq_batch = ramqueue_insert_batch(rq_host, msgid);
+ rq_batch = ramqueue_lookup_batch(rq_host, msgid);
+ if (rq_batch == NULL)
+ rq_batch = ramqueue_insert_batch(rq_host, msgid);
- rq_evp = calloc(1, sizeof (*rq_evp));
- if (rq_evp == NULL)
- fatal("calloc");
- rq_evp->evpid = envelope->id;
+ rq_evp = calloc(1, sizeof (*rq_evp));
+ if (rq_evp == NULL)
+ fatal("calloc");
+ rq_evp->evpid = envelope->id;
+ rq_batch->evpcnt++;
+ rq_msg->evpcnt++;
+ }
+
rq_evp->sched = ramqueue_next_schedule(envelope, curtm);
rq_evp->rq_host = rq_host;
rq_evp->rq_batch = rq_batch;
rq_evp->rq_msg = rq_msg;
-
RB_INSERT(evptree, &rq_msg->evptree, rq_evp);
TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp,
- batchqueue_entry);
+ batchqueue_entry);
/* sorted insert */
TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) {
@@ -331,51 +367,77 @@ scheduler_ramqueue_insert(struct envelope *envelope)
}
static void
-scheduler_ramqueue_remove(void *hdl, u_int64_t evpid)
+scheduler_ramqueue_schedule(u_int64_t evpid)
{
- struct ramqueue_iter *iter = hdl;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
struct ramqueue_envelope *rq_evp;
- struct ramqueue_host *rq_host;
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_batch *rq_batch;
+ struct ramqueue_host *rq_host;
+
+ log_debug("scheduler_ramqueue: schedule");
- log_debug("scheduler_ramqueue: remove");
rq_evp = ramqueue_lookup_envelope(evpid);
rq_msg = rq_evp->rq_msg;
rq_batch = rq_evp->rq_batch;
rq_host = rq_evp->rq_host;
+ /* remove from msg tree, batch queue and linear queue */
RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+
+ /* insert into offload tree*/
+ RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp);
+
+ /* that's one less envelope to process in the ramqueue */
stat_decrement(STATS_RAMQUEUE_ENVELOPE);
+}
+static void
+scheduler_ramqueue_remove(u_int64_t evpid)
+{
+ struct ramqueue_batch *rq_batch;
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_envelope *rq_evp;
+ struct ramqueue_host *rq_host;
- /* check if we are the last of a batch */
- if (TAILQ_FIRST(&rq_batch->envelope_queue) == NULL) {
- ramqueue_remove_batch(rq_host, rq_batch);
- if (iter != NULL && iter->type == RAMQUEUE_ITER_BATCH) {
- log_debug("scheduler_ramqueue_remove: batch removed");
- iter->u.batch = NULL;
- }
+ log_debug("scheduler_ramqueue: remove");
+
+ rq_evp = ramqueue_lookup_offload(evpid);
+ if (rq_evp) {
+ RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
+ rq_msg = rq_evp->rq_msg;
+ rq_batch = rq_evp->rq_batch;
+ rq_host = rq_evp->rq_host;
+ }
+ else {
+ rq_evp = ramqueue_lookup_envelope(evpid);
+ rq_msg = rq_evp->rq_msg;
+ rq_batch = rq_evp->rq_batch;
+ rq_host = rq_evp->rq_host;
+
+ RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
+ TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
+ TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+ stat_decrement(STATS_RAMQUEUE_ENVELOPE);
}
+ rq_batch->evpcnt--;
+ rq_msg->evpcnt--;
+
/* check if we are the last of a message */
- if (RB_ROOT(&rq_msg->evptree) == NULL) {
+ if (rq_msg->evpcnt == 0) {
ramqueue_remove_message(rq_msg);
- if (iter != NULL && iter->type == RAMQUEUE_ITER_MESSAGE) {
- log_debug("scheduler_ramqueue_remove: message removed");
- iter->u.message = NULL;
- }
+ }
+
+ /* check if we are the last of a batch */
+ if (rq_batch->evpcnt == 0) {
+ ramqueue_remove_batch(rq_host, rq_batch);
}
/* check if we are the last of a host */
if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) {
ramqueue_remove_host(rq_host);
- if (iter != NULL && iter->type == RAMQUEUE_ITER_HOST) {
- log_debug("scheduler_ramqueue_remove: host removed");
- iter->u.host = NULL;
- }
}
free(rq_evp);
@@ -470,9 +532,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
struct ramqueue_batch *rq_batch;
switch (iter->type) {
- case RAMQUEUE_ITER_HOST: {
- if (iter->u.host == NULL)
- return 0;
+ case RAMQUEUE_ITER_HOST:
rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue);
if (rq_batch == NULL)
break;
@@ -481,11 +541,8 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
break;
*evpid = rq_evp->evpid;
return 1;
- }
case RAMQUEUE_ITER_BATCH:
- if (iter->u.batch == NULL)
- return 0;
rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue);
if (rq_evp == NULL)
break;
@@ -493,8 +550,6 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
return 1;
case RAMQUEUE_ITER_MESSAGE:
- if (iter->u.message == NULL)
- return 0;
rq_evp = RB_ROOT(&iter->u.message->evptree);
if (rq_evp == NULL)
break;
@@ -513,7 +568,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
}
static int
-scheduler_ramqueue_schedule(u_int64_t id)
+scheduler_ramqueue_force(u_int64_t id)
{
struct ramqueue_envelope *rq_evp;
struct ramqueue_message *rq_msg;
@@ -575,6 +630,15 @@ ramqueue_lookup_message(u_int32_t msgid)
}
static struct ramqueue_envelope *
+ramqueue_lookup_offload(u_int64_t evpid)
+{
+ struct ramqueue_envelope evpkey;
+
+ evpkey.evpid = evpid;
+ return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey);
+}
+
+static struct ramqueue_envelope *
ramqueue_lookup_envelope(u_int64_t evpid)
{
struct ramqueue_message *rq_msg;
@@ -679,7 +743,6 @@ ramqueue_insert_host(char *host)
rq_host = calloc(1, sizeof (*rq_host));
if (rq_host == NULL)
fatal("calloc");
- rq_host->h_id = generate_uid();
strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname));
TAILQ_INIT(&rq_host->batch_queue);
RB_INSERT(hosttree, &ramqueue.hosttree, rq_host);
@@ -697,6 +760,7 @@ ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
if (rq_batch == NULL)
fatal("calloc");
rq_batch->b_id = generate_uid();
+ rq_batch->rq_host = rq_host;
rq_batch->msgid = msgid;
TAILQ_INIT(&rq_batch->envelope_queue);
@@ -752,6 +816,7 @@ ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2)
return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid);
}
-RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
-RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
+RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
+RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
+RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
+RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);