summaryrefslogtreecommitdiff
path: root/usr.sbin/smtpd
diff options
context:
space:
mode:
authorGilles Chehade <gilles@cvs.openbsd.org>2012-03-07 22:54:50 +0000
committerGilles Chehade <gilles@cvs.openbsd.org>2012-03-07 22:54:50 +0000
commitc864e968cfa81b16b0f4b8b62a85e57028862b0e (patch)
treef3dfbb4711d60ac143ef417acd9520304eac55d3 /usr.sbin/smtpd
parent233f9e36ee8d1415283602b90ac8140571855aa6 (diff)
various reliability fixes:
- prevent queue_fsqueue from fatal() when it hits an ENOENT, it can happen - change a bit the scheduler API to simplify it, fix runner accordingly - we can't remove msg/batch from ramqueue while envelope is offloaded or it will cause a double, instead we add refcnt to both msg/batch and only free them when it hits 0
Diffstat (limited to 'usr.sbin/smtpd')
-rw-r--r--usr.sbin/smtpd/delivery_filename.c3
-rw-r--r--usr.sbin/smtpd/delivery_maildir.c3
-rw-r--r--usr.sbin/smtpd/delivery_mbox.c3
-rw-r--r--usr.sbin/smtpd/delivery_mda.c3
-rw-r--r--usr.sbin/smtpd/queue_fsqueue.c9
-rw-r--r--usr.sbin/smtpd/runner.c26
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c185
-rw-r--r--usr.sbin/smtpd/smtpd.h7
8 files changed, 153 insertions, 86 deletions
diff --git a/usr.sbin/smtpd/delivery_filename.c b/usr.sbin/smtpd/delivery_filename.c
index c916b3b1aad..e5ead5e04a8 100644
--- a/usr.sbin/smtpd/delivery_filename.c
+++ b/usr.sbin/smtpd/delivery_filename.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: delivery_filename.c,v 1.2 2012/02/04 17:11:45 chl Exp $ */
+/* $OpenBSD: delivery_filename.c,v 1.3 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
@@ -23,6 +23,7 @@
#include <sys/socket.h>
#include <sys/stat.h>
+#include <db.h>
#include <ctype.h>
#include <err.h>
#include <errno.h>
diff --git a/usr.sbin/smtpd/delivery_maildir.c b/usr.sbin/smtpd/delivery_maildir.c
index 54c091c09b4..669770e6ede 100644
--- a/usr.sbin/smtpd/delivery_maildir.c
+++ b/usr.sbin/smtpd/delivery_maildir.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: delivery_maildir.c,v 1.3 2012/02/04 17:11:45 chl Exp $ */
+/* $OpenBSD: delivery_maildir.c,v 1.4 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
@@ -23,6 +23,7 @@
#include <sys/socket.h>
#include <sys/stat.h>
+#include <db.h>
#include <ctype.h>
#include <err.h>
#include <errno.h>
diff --git a/usr.sbin/smtpd/delivery_mbox.c b/usr.sbin/smtpd/delivery_mbox.c
index 227ede9e611..86976075277 100644
--- a/usr.sbin/smtpd/delivery_mbox.c
+++ b/usr.sbin/smtpd/delivery_mbox.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: delivery_mbox.c,v 1.3 2012/02/04 17:11:45 chl Exp $ */
+/* $OpenBSD: delivery_mbox.c,v 1.4 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
@@ -22,6 +22,7 @@
#include <sys/param.h>
#include <sys/socket.h>
+#include <db.h>
#include <ctype.h>
#include <err.h>
#include <event.h>
diff --git a/usr.sbin/smtpd/delivery_mda.c b/usr.sbin/smtpd/delivery_mda.c
index 47a85b558c6..03107605c10 100644
--- a/usr.sbin/smtpd/delivery_mda.c
+++ b/usr.sbin/smtpd/delivery_mda.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: delivery_mda.c,v 1.2 2012/02/04 17:11:45 chl Exp $ */
+/* $OpenBSD: delivery_mda.c,v 1.3 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
@@ -22,6 +22,7 @@
#include <sys/param.h>
#include <sys/socket.h>
+#include <db.h>
#include <ctype.h>
#include <err.h>
#include <event.h>
diff --git a/usr.sbin/smtpd/queue_fsqueue.c b/usr.sbin/smtpd/queue_fsqueue.c
index f428965f4db..c57505009d4 100644
--- a/usr.sbin/smtpd/queue_fsqueue.c
+++ b/usr.sbin/smtpd/queue_fsqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue_fsqueue.c,v 1.38 2012/01/31 21:05:26 gilles Exp $ */
+/* $OpenBSD: queue_fsqueue.c,v 1.39 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
@@ -486,7 +486,6 @@ struct qwalk {
int (*filefn)(struct qwalk *, char *);
int bucket;
int level;
- int strict;
u_int32_t msgid;
};
@@ -508,7 +507,6 @@ fsqueue_qwalk_new(enum queue_kind kind, u_int32_t msgid)
q->kind = kind;
q->level = 0;
- q->strict = 0;
q->filefn = walk_simple;
q->msgid = msgid;
@@ -521,9 +519,6 @@ fsqueue_qwalk_new(enum queue_kind kind, u_int32_t msgid)
fatalx("walk_queue: snprintf");
}
- if (smtpd_process == PROC_QUEUE || smtpd_process == PROC_RUNNER)
- q->strict = 1;
-
if (kind == Q_QUEUE)
q->filefn = walk_queue;
if (kind == Q_INCOMING)
@@ -587,7 +582,7 @@ recurse:
q->level++;
q->dirs[q->level] = opendir(q->path);
if (q->dirs[q->level] == NULL) {
- if (errno == ENOENT && !q->strict) {
+ if (errno == ENOENT) {
q->level--;
goto again;
}
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c
index 75eb8a45bfd..007beb42b73 100644
--- a/usr.sbin/smtpd/runner.c
+++ b/usr.sbin/smtpd/runner.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: runner.c,v 1.135 2012/01/31 21:05:26 gilles Exp $ */
+/* $OpenBSD: runner.c,v 1.136 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -78,6 +78,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
stat_decrement(STATS_RUNNER);
e = imsg->data;
log_debug("queue_delivery_ok: %016"PRIx64, e->id);
+ scheduler->remove(e->id);
queue_envelope_delete(Q_QUEUE, e);
return;
@@ -101,6 +102,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
scheduler->insert(&bounce);
runner_reset_events();
}
+ scheduler->remove(e->id);
queue_envelope_delete(Q_QUEUE, e);
return;
@@ -152,7 +154,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
return;
case IMSG_RUNNER_SCHEDULE:
- scheduler->schedule(*(u_int64_t *)imsg->data);
+ scheduler->force(*(u_int64_t *)imsg->data);
runner_reset_events();
return;
@@ -253,6 +255,11 @@ runner(void)
setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
fatal("runner: cannot drop privileges");
+ /* see fdlimit()-related comment in queue.c */
+ fdlimit(1.0);
+ if ((env->sc_maxconn = availdesc() / 4) < 1)
+ fatalx("runner: fd starvation");
+
env->sc_scheduler = scheduler_backend_lookup(SCHED_RAMQUEUE);
scheduler = env->sc_scheduler;
@@ -268,11 +275,6 @@ runner(void)
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
- /* see fdlimit()-related comment in queue.c */
- fdlimit(1.0);
- if ((env->sc_maxconn = availdesc() / 4) < 1)
- fatalx("runner: fd starvation");
-
config_pipes(peers, nitems(peers));
config_peers(peers, nitems(peers));
@@ -385,7 +387,7 @@ runner_process_envelope(u_int64_t evpid)
envelope_set_errormsg(&envelope, "loop has been detected");
bounce_record_message(&envelope, &bounce);
scheduler->insert(&bounce);
- scheduler->remove(NULL, evpid);
+ scheduler->remove(evpid);
queue_envelope_delete(Q_QUEUE, &envelope);
runner_setup_events();
@@ -414,7 +416,7 @@ runner_process_batch(enum delivery_type type, u_int64_t evpid)
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
sizeof evp);
- scheduler->remove(batch, evpid);
+ scheduler->schedule(evpid);
}
stat_increment(STATS_RUNNER);
stat_increment(STATS_RUNNER_BOUNCES);
@@ -430,7 +432,7 @@ runner_process_batch(enum delivery_type type, u_int64_t evpid)
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
sizeof evp);
- scheduler->remove(batch, evpid);
+ scheduler->schedule(evpid);
stat_increment(STATS_RUNNER);
stat_increment(STATS_MDA_SESSION);
@@ -465,7 +467,7 @@ runner_process_batch(enum delivery_type type, u_int64_t evpid)
IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp,
sizeof evp);
- scheduler->remove(batch, evpid);
+ scheduler->schedule(evpid);
stat_increment(STATS_RUNNER);
}
@@ -596,5 +598,5 @@ runner_remove_envelope(u_int64_t evpid)
evp.id = evpid;
queue_envelope_delete(Q_QUEUE, &evp);
- scheduler->remove(NULL, evpid);
+ scheduler->remove(evpid);
}
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
index c622ae4c576..7c57169b4e2 100644
--- a/usr.sbin/smtpd/scheduler_ramqueue.c
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: scheduler_ramqueue.c,v 1.4 2012/01/31 21:05:26 gilles Exp $ */
+/* $OpenBSD: scheduler_ramqueue.c,v 1.5 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
@@ -39,16 +39,16 @@
struct ramqueue_host {
RB_ENTRY(ramqueue_host) hosttree_entry;
TAILQ_HEAD(,ramqueue_batch) batch_queue;
- u_int64_t h_id;
char hostname[MAXHOSTNAMELEN];
};
struct ramqueue_batch {
enum delivery_type type;
TAILQ_ENTRY(ramqueue_batch) batch_entry;
TAILQ_HEAD(,ramqueue_envelope) envelope_queue;
- u_int64_t h_id;
+ struct ramqueue_host *rq_host;
u_int64_t b_id;
u_int32_t msgid;
+ u_int32_t evpcnt;
};
struct ramqueue_envelope {
TAILQ_ENTRY(ramqueue_envelope) queue_entry;
@@ -64,16 +64,19 @@ struct ramqueue_message {
RB_ENTRY(ramqueue_message) msgtree_entry;
RB_HEAD(evptree, ramqueue_envelope) evptree;
u_int32_t msgid;
+ u_int32_t evpcnt;
};
struct ramqueue {
RB_HEAD(hosttree, ramqueue_host) hosttree;
RB_HEAD(msgtree, ramqueue_message) msgtree;
+ RB_HEAD(offloadtree, ramqueue_envelope) offloadtree;
TAILQ_HEAD(,ramqueue_envelope) queue;
};
-RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
-RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
+RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
+RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
+RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
+RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
enum ramqueue_iter_type {
RAMQUEUE_ITER_HOST,
@@ -108,6 +111,7 @@ static struct ramqueue_message *ramqueue_insert_message(u_int32_t);
static void ramqueue_remove_message(struct ramqueue_message *);
static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t);
+static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t);
/*NEEDSFIX*/
@@ -118,14 +122,15 @@ static void scheduler_ramqueue_init(void);
static int scheduler_ramqueue_setup(time_t, time_t);
static int scheduler_ramqueue_next(u_int64_t *, time_t *);
static void scheduler_ramqueue_insert(struct envelope *);
-static void scheduler_ramqueue_remove(void *, u_int64_t);
+static void scheduler_ramqueue_schedule(u_int64_t);
+static void scheduler_ramqueue_remove(u_int64_t);
static void *scheduler_ramqueue_host(char *);
static void *scheduler_ramqueue_message(u_int32_t);
static void *scheduler_ramqueue_batch(u_int64_t);
static void *scheduler_ramqueue_queue(void);
static void scheduler_ramqueue_close(void *);
static int scheduler_ramqueue_fetch(void *, u_int64_t *);
-static int scheduler_ramqueue_schedule(u_int64_t);
+static int scheduler_ramqueue_force(u_int64_t);
static void scheduler_ramqueue_display(void);
struct scheduler_backend scheduler_backend_ramqueue = {
@@ -133,6 +138,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_setup,
scheduler_ramqueue_next,
scheduler_ramqueue_insert,
+ scheduler_ramqueue_schedule,
scheduler_ramqueue_remove,
scheduler_ramqueue_host,
scheduler_ramqueue_message,
@@ -140,7 +146,7 @@ struct scheduler_backend scheduler_backend_ramqueue = {
scheduler_ramqueue_queue,
scheduler_ramqueue_close,
scheduler_ramqueue_fetch,
- scheduler_ramqueue_schedule,
+ scheduler_ramqueue_force,
scheduler_ramqueue_display
};
static struct ramqueue ramqueue;
@@ -184,6 +190,18 @@ scheduler_ramqueue_display_msgtree(void)
}
static void
+scheduler_ramqueue_display_offloadtree(void)
+{
+ struct ramqueue_envelope *rq_evp;
+
+ log_debug("\tscheduler_ramqueue: offloadtree display");
+ RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) {
+ log_debug("\t\t\tevp: [%p] %016"PRIx64,
+ rq_evp, rq_evp->evpid);
+ }
+}
+
+static void
scheduler_ramqueue_display_queue(void)
{
struct ramqueue_envelope *rq_evp;
@@ -201,6 +219,7 @@ scheduler_ramqueue_display(void)
log_debug("scheduler_ramqueue: display");
scheduler_ramqueue_display_hosttree();
scheduler_ramqueue_display_msgtree();
+ scheduler_ramqueue_display_offloadtree();
scheduler_ramqueue_display_queue();
}
@@ -212,6 +231,7 @@ scheduler_ramqueue_init(void)
TAILQ_INIT(&ramqueue.queue);
RB_INIT(&ramqueue.hosttree);
RB_INIT(&ramqueue.msgtree);
+ RB_INIT(&ramqueue.offloadtree);
}
static int
@@ -229,6 +249,11 @@ scheduler_ramqueue_setup(time_t curtm, time_t nsched)
q = qwalk_new(Q_QUEUE, 0);
while (qwalk(q, &evpid)) {
+ /* the envelope is already in ramqueue, skip */
+ if (ramqueue_lookup_envelope(evpid) ||
+ ramqueue_lookup_offload(evpid))
+ continue;
+
if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) {
log_debug("scheduler_ramqueue: evp -> /corrupt");
queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid));
@@ -291,31 +316,42 @@ scheduler_ramqueue_insert(struct envelope *envelope)
time_t curtm = time(NULL);
log_debug("scheduler_ramqueue: insert");
- msgid = evpid_to_msgid(envelope->id);
- rq_msg = ramqueue_lookup_message(msgid);
- if (rq_msg == NULL)
- rq_msg = ramqueue_insert_message(msgid);
+ rq_evp = ramqueue_lookup_offload(envelope->id);
+ if (rq_evp) {
+ rq_msg = rq_evp->rq_msg;
+ rq_batch = rq_evp->rq_batch;
+ rq_host = rq_evp->rq_host;
+ RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
+ }
+ else {
+ msgid = evpid_to_msgid(envelope->id);
+ rq_msg = ramqueue_lookup_message(msgid);
+ if (rq_msg == NULL)
+ rq_msg = ramqueue_insert_message(msgid);
- rq_host = ramqueue_lookup_host(envelope->dest.domain);
- if (rq_host == NULL)
- rq_host = ramqueue_insert_host(envelope->dest.domain);
+ rq_host = ramqueue_lookup_host(envelope->dest.domain);
+ if (rq_host == NULL)
+ rq_host = ramqueue_insert_host(envelope->dest.domain);
- rq_batch = ramqueue_lookup_batch(rq_host, msgid);
- if (rq_batch == NULL)
- rq_batch = ramqueue_insert_batch(rq_host, msgid);
+ rq_batch = ramqueue_lookup_batch(rq_host, msgid);
+ if (rq_batch == NULL)
+ rq_batch = ramqueue_insert_batch(rq_host, msgid);
- rq_evp = calloc(1, sizeof (*rq_evp));
- if (rq_evp == NULL)
- fatal("calloc");
- rq_evp->evpid = envelope->id;
+ rq_evp = calloc(1, sizeof (*rq_evp));
+ if (rq_evp == NULL)
+ fatal("calloc");
+ rq_evp->evpid = envelope->id;
+ rq_batch->evpcnt++;
+ rq_msg->evpcnt++;
+ }
+
rq_evp->sched = ramqueue_next_schedule(envelope, curtm);
rq_evp->rq_host = rq_host;
rq_evp->rq_batch = rq_batch;
rq_evp->rq_msg = rq_msg;
-
RB_INSERT(evptree, &rq_msg->evptree, rq_evp);
TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp,
- batchqueue_entry);
+ batchqueue_entry);
/* sorted insert */
TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) {
@@ -331,51 +367,77 @@ scheduler_ramqueue_insert(struct envelope *envelope)
}
static void
-scheduler_ramqueue_remove(void *hdl, u_int64_t evpid)
+scheduler_ramqueue_schedule(u_int64_t evpid)
{
- struct ramqueue_iter *iter = hdl;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
struct ramqueue_envelope *rq_evp;
- struct ramqueue_host *rq_host;
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_batch *rq_batch;
+ struct ramqueue_host *rq_host;
+
+ log_debug("scheduler_ramqueue: schedule");
- log_debug("scheduler_ramqueue: remove");
rq_evp = ramqueue_lookup_envelope(evpid);
rq_msg = rq_evp->rq_msg;
rq_batch = rq_evp->rq_batch;
rq_host = rq_evp->rq_host;
+ /* remove from msg tree, batch queue and linear queue */
RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+
+ /* insert into offload tree*/
+ RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp);
+
+ /* that's one less envelope to process in the ramqueue */
stat_decrement(STATS_RAMQUEUE_ENVELOPE);
+}
+static void
+scheduler_ramqueue_remove(u_int64_t evpid)
+{
+ struct ramqueue_batch *rq_batch;
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_envelope *rq_evp;
+ struct ramqueue_host *rq_host;
- /* check if we are the last of a batch */
- if (TAILQ_FIRST(&rq_batch->envelope_queue) == NULL) {
- ramqueue_remove_batch(rq_host, rq_batch);
- if (iter != NULL && iter->type == RAMQUEUE_ITER_BATCH) {
- log_debug("scheduler_ramqueue_remove: batch removed");
- iter->u.batch = NULL;
- }
+ log_debug("scheduler_ramqueue: remove");
+
+ rq_evp = ramqueue_lookup_offload(evpid);
+ if (rq_evp) {
+ RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp);
+ rq_msg = rq_evp->rq_msg;
+ rq_batch = rq_evp->rq_batch;
+ rq_host = rq_evp->rq_host;
+ }
+ else {
+ rq_evp = ramqueue_lookup_envelope(evpid);
+ rq_msg = rq_evp->rq_msg;
+ rq_batch = rq_evp->rq_batch;
+ rq_host = rq_evp->rq_host;
+
+ RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
+ TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
+ TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+ stat_decrement(STATS_RAMQUEUE_ENVELOPE);
}
+ rq_batch->evpcnt--;
+ rq_msg->evpcnt--;
+
/* check if we are the last of a message */
- if (RB_ROOT(&rq_msg->evptree) == NULL) {
+ if (rq_msg->evpcnt == 0) {
ramqueue_remove_message(rq_msg);
- if (iter != NULL && iter->type == RAMQUEUE_ITER_MESSAGE) {
- log_debug("scheduler_ramqueue_remove: message removed");
- iter->u.message = NULL;
- }
+ }
+
+ /* check if we are the last of a batch */
+ if (rq_batch->evpcnt == 0) {
+ ramqueue_remove_batch(rq_host, rq_batch);
}
/* check if we are the last of a host */
if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) {
ramqueue_remove_host(rq_host);
- if (iter != NULL && iter->type == RAMQUEUE_ITER_HOST) {
- log_debug("scheduler_ramqueue_remove: host removed");
- iter->u.host = NULL;
- }
}
free(rq_evp);
@@ -470,9 +532,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
struct ramqueue_batch *rq_batch;
switch (iter->type) {
- case RAMQUEUE_ITER_HOST: {
- if (iter->u.host == NULL)
- return 0;
+ case RAMQUEUE_ITER_HOST:
rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue);
if (rq_batch == NULL)
break;
@@ -481,11 +541,8 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
break;
*evpid = rq_evp->evpid;
return 1;
- }
case RAMQUEUE_ITER_BATCH:
- if (iter->u.batch == NULL)
- return 0;
rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue);
if (rq_evp == NULL)
break;
@@ -493,8 +550,6 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
return 1;
case RAMQUEUE_ITER_MESSAGE:
- if (iter->u.message == NULL)
- return 0;
rq_evp = RB_ROOT(&iter->u.message->evptree);
if (rq_evp == NULL)
break;
@@ -513,7 +568,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
}
static int
-scheduler_ramqueue_schedule(u_int64_t id)
+scheduler_ramqueue_force(u_int64_t id)
{
struct ramqueue_envelope *rq_evp;
struct ramqueue_message *rq_msg;
@@ -575,6 +630,15 @@ ramqueue_lookup_message(u_int32_t msgid)
}
static struct ramqueue_envelope *
+ramqueue_lookup_offload(u_int64_t evpid)
+{
+ struct ramqueue_envelope evpkey;
+
+ evpkey.evpid = evpid;
+ return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey);
+}
+
+static struct ramqueue_envelope *
ramqueue_lookup_envelope(u_int64_t evpid)
{
struct ramqueue_message *rq_msg;
@@ -679,7 +743,6 @@ ramqueue_insert_host(char *host)
rq_host = calloc(1, sizeof (*rq_host));
if (rq_host == NULL)
fatal("calloc");
- rq_host->h_id = generate_uid();
strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname));
TAILQ_INIT(&rq_host->batch_queue);
RB_INSERT(hosttree, &ramqueue.hosttree, rq_host);
@@ -697,6 +760,7 @@ ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
if (rq_batch == NULL)
fatal("calloc");
rq_batch->b_id = generate_uid();
+ rq_batch->rq_host = rq_host;
rq_batch->msgid = msgid;
TAILQ_INIT(&rq_batch->envelope_queue);
@@ -752,6 +816,7 @@ ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2)
return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid);
}
-RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
-RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
+RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
+RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
+RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
+RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index d7873d85ebd..f92b24b7512 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.286 2012/01/31 21:05:26 gilles Exp $ */
+/* $OpenBSD: smtpd.h,v 1.287 2012/03/07 22:54:49 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -978,7 +978,8 @@ struct scheduler_backend {
int (*next)(u_int64_t *, time_t *);
void (*insert)(struct envelope *);
- void (*remove)(void *, u_int64_t);
+ void (*schedule)(u_int64_t);
+ void (*remove)(u_int64_t);
void *(*host)(char *);
void *(*message)(u_int32_t);
@@ -987,7 +988,7 @@ struct scheduler_backend {
void (*close)(void *);
int (*fetch)(void *, u_int64_t *);
- int (*schedule)(u_int64_t);
+ int (*force)(u_int64_t);
void (*display)(void); /* may be NULL */
};