diff options
author | Gilles Chehade <gilles@cvs.openbsd.org> | 2012-03-07 22:54:50 +0000 |
---|---|---|
committer | Gilles Chehade <gilles@cvs.openbsd.org> | 2012-03-07 22:54:50 +0000 |
commit | c864e968cfa81b16b0f4b8b62a85e57028862b0e (patch) | |
tree | f3dfbb4711d60ac143ef417acd9520304eac55d3 /usr.sbin/smtpd | |
parent | 233f9e36ee8d1415283602b90ac8140571855aa6 (diff) |
various reliability fixes:
- prevent queue_fsqueue from fatal() when it hits an ENOENT, it can happen
- change a bit the scheduler API to simplify it, fix runner accordingly
- we can't remove msg/batch from ramqueue while envelope is offloaded or
it will cause a double, instead we add refcnt to both msg/batch and
only free them when it hits 0
Diffstat (limited to 'usr.sbin/smtpd')
-rw-r--r-- | usr.sbin/smtpd/delivery_filename.c | 3 | ||||
-rw-r--r-- | usr.sbin/smtpd/delivery_maildir.c | 3 | ||||
-rw-r--r-- | usr.sbin/smtpd/delivery_mbox.c | 3 | ||||
-rw-r--r-- | usr.sbin/smtpd/delivery_mda.c | 3 | ||||
-rw-r--r-- | usr.sbin/smtpd/queue_fsqueue.c | 9 | ||||
-rw-r--r-- | usr.sbin/smtpd/runner.c | 26 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 185 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 7 |
8 files changed, 153 insertions, 86 deletions
diff --git a/usr.sbin/smtpd/delivery_filename.c b/usr.sbin/smtpd/delivery_filename.c index c916b3b1aad..e5ead5e04a8 100644 --- a/usr.sbin/smtpd/delivery_filename.c +++ b/usr.sbin/smtpd/delivery_filename.c @@ -1,4 +1,4 @@ -/* $OpenBSD: delivery_filename.c,v 1.2 2012/02/04 17:11:45 chl Exp $ */ +/* $OpenBSD: delivery_filename.c,v 1.3 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org> @@ -23,6 +23,7 @@ #include <sys/socket.h> #include <sys/stat.h> +#include <db.h> #include <ctype.h> #include <err.h> #include <errno.h> diff --git a/usr.sbin/smtpd/delivery_maildir.c b/usr.sbin/smtpd/delivery_maildir.c index 54c091c09b4..669770e6ede 100644 --- a/usr.sbin/smtpd/delivery_maildir.c +++ b/usr.sbin/smtpd/delivery_maildir.c @@ -1,4 +1,4 @@ -/* $OpenBSD: delivery_maildir.c,v 1.3 2012/02/04 17:11:45 chl Exp $ */ +/* $OpenBSD: delivery_maildir.c,v 1.4 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org> @@ -23,6 +23,7 @@ #include <sys/socket.h> #include <sys/stat.h> +#include <db.h> #include <ctype.h> #include <err.h> #include <errno.h> diff --git a/usr.sbin/smtpd/delivery_mbox.c b/usr.sbin/smtpd/delivery_mbox.c index 227ede9e611..86976075277 100644 --- a/usr.sbin/smtpd/delivery_mbox.c +++ b/usr.sbin/smtpd/delivery_mbox.c @@ -1,4 +1,4 @@ -/* $OpenBSD: delivery_mbox.c,v 1.3 2012/02/04 17:11:45 chl Exp $ */ +/* $OpenBSD: delivery_mbox.c,v 1.4 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org> @@ -22,6 +22,7 @@ #include <sys/param.h> #include <sys/socket.h> +#include <db.h> #include <ctype.h> #include <err.h> #include <event.h> diff --git a/usr.sbin/smtpd/delivery_mda.c b/usr.sbin/smtpd/delivery_mda.c index 47a85b558c6..03107605c10 100644 --- a/usr.sbin/smtpd/delivery_mda.c +++ b/usr.sbin/smtpd/delivery_mda.c @@ -1,4 +1,4 @@ -/* $OpenBSD: delivery_mda.c,v 1.2 2012/02/04 17:11:45 chl Exp $ */ +/* $OpenBSD: delivery_mda.c,v 1.3 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org> @@ -22,6 +22,7 @@ #include <sys/param.h> #include <sys/socket.h> +#include <db.h> #include <ctype.h> #include <err.h> #include <event.h> diff --git a/usr.sbin/smtpd/queue_fsqueue.c b/usr.sbin/smtpd/queue_fsqueue.c index f428965f4db..c57505009d4 100644 --- a/usr.sbin/smtpd/queue_fsqueue.c +++ b/usr.sbin/smtpd/queue_fsqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue_fsqueue.c,v 1.38 2012/01/31 21:05:26 gilles Exp $ */ +/* $OpenBSD: queue_fsqueue.c,v 1.39 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org> @@ -486,7 +486,6 @@ struct qwalk { int (*filefn)(struct qwalk *, char *); int bucket; int level; - int strict; u_int32_t msgid; }; @@ -508,7 +507,6 @@ fsqueue_qwalk_new(enum queue_kind kind, u_int32_t msgid) q->kind = kind; q->level = 0; - q->strict = 0; q->filefn = walk_simple; q->msgid = msgid; @@ -521,9 +519,6 @@ fsqueue_qwalk_new(enum queue_kind kind, u_int32_t msgid) fatalx("walk_queue: snprintf"); } - if (smtpd_process == PROC_QUEUE || smtpd_process == PROC_RUNNER) - q->strict = 1; - if (kind == Q_QUEUE) q->filefn = walk_queue; if (kind == Q_INCOMING) @@ -587,7 +582,7 @@ recurse: q->level++; q->dirs[q->level] = opendir(q->path); if (q->dirs[q->level] == NULL) { - if (errno == ENOENT && !q->strict) { + if (errno == ENOENT) { q->level--; goto again; } diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c index 75eb8a45bfd..007beb42b73 100644 --- a/usr.sbin/smtpd/runner.c +++ b/usr.sbin/smtpd/runner.c @@ -1,4 +1,4 @@ -/* $OpenBSD: runner.c,v 1.135 2012/01/31 21:05:26 gilles Exp $ */ +/* $OpenBSD: runner.c,v 1.136 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -78,6 +78,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) stat_decrement(STATS_RUNNER); e = imsg->data; log_debug("queue_delivery_ok: %016"PRIx64, e->id); + scheduler->remove(e->id); queue_envelope_delete(Q_QUEUE, e); return; @@ -101,6 +102,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) scheduler->insert(&bounce); runner_reset_events(); } + scheduler->remove(e->id); queue_envelope_delete(Q_QUEUE, e); return; @@ -152,7 +154,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) return; case IMSG_RUNNER_SCHEDULE: - scheduler->schedule(*(u_int64_t *)imsg->data); + scheduler->force(*(u_int64_t *)imsg->data); runner_reset_events(); return; @@ -253,6 +255,11 @@ runner(void) setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) fatal("runner: cannot drop privileges"); + /* see fdlimit()-related comment in queue.c */ + fdlimit(1.0); + if ((env->sc_maxconn = availdesc() / 4) < 1) + fatalx("runner: fd starvation"); + env->sc_scheduler = scheduler_backend_lookup(SCHED_RAMQUEUE); scheduler = env->sc_scheduler; @@ -268,11 +275,6 @@ runner(void) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); - /* see fdlimit()-related comment in queue.c */ - fdlimit(1.0); - if ((env->sc_maxconn = availdesc() / 4) < 1) - fatalx("runner: fd starvation"); - config_pipes(peers, nitems(peers)); config_peers(peers, nitems(peers)); @@ -385,7 +387,7 @@ runner_process_envelope(u_int64_t evpid) envelope_set_errormsg(&envelope, "loop has been detected"); bounce_record_message(&envelope, &bounce); scheduler->insert(&bounce); - scheduler->remove(NULL, evpid); + scheduler->remove(evpid); queue_envelope_delete(Q_QUEUE, &envelope); runner_setup_events(); @@ -414,7 +416,7 @@ runner_process_batch(enum delivery_type type, u_int64_t evpid) imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, sizeof evp); - scheduler->remove(batch, evpid); + scheduler->schedule(evpid); } stat_increment(STATS_RUNNER); stat_increment(STATS_RUNNER_BOUNCES); @@ -430,7 +432,7 @@ runner_process_batch(enum delivery_type type, u_int64_t evpid) imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp, sizeof evp); - scheduler->remove(batch, evpid); + scheduler->schedule(evpid); stat_increment(STATS_RUNNER); stat_increment(STATS_MDA_SESSION); @@ -465,7 +467,7 @@ runner_process_batch(enum delivery_type type, u_int64_t evpid) IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp, sizeof evp); - scheduler->remove(batch, evpid); + scheduler->schedule(evpid); stat_increment(STATS_RUNNER); } @@ -596,5 +598,5 @@ runner_remove_envelope(u_int64_t evpid) evp.id = evpid; queue_envelope_delete(Q_QUEUE, &evp); - scheduler->remove(NULL, evpid); + scheduler->remove(evpid); } diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c index c622ae4c576..7c57169b4e2 100644 --- a/usr.sbin/smtpd/scheduler_ramqueue.c +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: scheduler_ramqueue.c,v 1.4 2012/01/31 21:05:26 gilles Exp $ */ +/* $OpenBSD: scheduler_ramqueue.c,v 1.5 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org> @@ -39,16 +39,16 @@ struct ramqueue_host { RB_ENTRY(ramqueue_host) hosttree_entry; TAILQ_HEAD(,ramqueue_batch) batch_queue; - u_int64_t h_id; char hostname[MAXHOSTNAMELEN]; }; struct ramqueue_batch { enum delivery_type type; TAILQ_ENTRY(ramqueue_batch) batch_entry; TAILQ_HEAD(,ramqueue_envelope) envelope_queue; - u_int64_t h_id; + struct ramqueue_host *rq_host; u_int64_t b_id; u_int32_t msgid; + u_int32_t evpcnt; }; struct ramqueue_envelope { TAILQ_ENTRY(ramqueue_envelope) queue_entry; @@ -64,16 +64,19 @@ struct ramqueue_message { RB_ENTRY(ramqueue_message) msgtree_entry; RB_HEAD(evptree, ramqueue_envelope) evptree; u_int32_t msgid; + u_int32_t evpcnt; }; struct ramqueue { RB_HEAD(hosttree, ramqueue_host) hosttree; RB_HEAD(msgtree, ramqueue_message) msgtree; + RB_HEAD(offloadtree, ramqueue_envelope) offloadtree; TAILQ_HEAD(,ramqueue_envelope) queue; }; -RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); -RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); +RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); +RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); +RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); +RB_PROTOTYPE(offloadtree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); enum ramqueue_iter_type { RAMQUEUE_ITER_HOST, @@ -108,6 +111,7 @@ static struct ramqueue_message *ramqueue_insert_message(u_int32_t); static void ramqueue_remove_message(struct ramqueue_message *); static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t); +static struct ramqueue_envelope *ramqueue_lookup_offload(u_int64_t); /*NEEDSFIX*/ @@ -118,14 +122,15 @@ static void scheduler_ramqueue_init(void); static int scheduler_ramqueue_setup(time_t, time_t); static int scheduler_ramqueue_next(u_int64_t *, time_t *); static void scheduler_ramqueue_insert(struct envelope *); -static void scheduler_ramqueue_remove(void *, u_int64_t); +static void scheduler_ramqueue_schedule(u_int64_t); +static void scheduler_ramqueue_remove(u_int64_t); static void *scheduler_ramqueue_host(char *); static void *scheduler_ramqueue_message(u_int32_t); static void *scheduler_ramqueue_batch(u_int64_t); static void *scheduler_ramqueue_queue(void); static void scheduler_ramqueue_close(void *); static int scheduler_ramqueue_fetch(void *, u_int64_t *); -static int scheduler_ramqueue_schedule(u_int64_t); +static int scheduler_ramqueue_force(u_int64_t); static void scheduler_ramqueue_display(void); struct scheduler_backend scheduler_backend_ramqueue = { @@ -133,6 +138,7 @@ struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_setup, scheduler_ramqueue_next, scheduler_ramqueue_insert, + scheduler_ramqueue_schedule, scheduler_ramqueue_remove, scheduler_ramqueue_host, scheduler_ramqueue_message, @@ -140,7 +146,7 @@ struct scheduler_backend scheduler_backend_ramqueue = { scheduler_ramqueue_queue, scheduler_ramqueue_close, scheduler_ramqueue_fetch, - scheduler_ramqueue_schedule, + scheduler_ramqueue_force, scheduler_ramqueue_display }; static struct ramqueue ramqueue; @@ -184,6 +190,18 @@ scheduler_ramqueue_display_msgtree(void) } static void +scheduler_ramqueue_display_offloadtree(void) +{ + struct ramqueue_envelope *rq_evp; + + log_debug("\tscheduler_ramqueue: offloadtree display"); + RB_FOREACH(rq_evp, offloadtree, &ramqueue.offloadtree) { + log_debug("\t\t\tevp: [%p] %016"PRIx64, + rq_evp, rq_evp->evpid); + } +} + +static void scheduler_ramqueue_display_queue(void) { struct ramqueue_envelope *rq_evp; @@ -201,6 +219,7 @@ scheduler_ramqueue_display(void) log_debug("scheduler_ramqueue: display"); scheduler_ramqueue_display_hosttree(); scheduler_ramqueue_display_msgtree(); + scheduler_ramqueue_display_offloadtree(); scheduler_ramqueue_display_queue(); } @@ -212,6 +231,7 @@ scheduler_ramqueue_init(void) TAILQ_INIT(&ramqueue.queue); RB_INIT(&ramqueue.hosttree); RB_INIT(&ramqueue.msgtree); + RB_INIT(&ramqueue.offloadtree); } static int @@ -229,6 +249,11 @@ scheduler_ramqueue_setup(time_t curtm, time_t nsched) q = qwalk_new(Q_QUEUE, 0); while (qwalk(q, &evpid)) { + /* the envelope is already in ramqueue, skip */ + if (ramqueue_lookup_envelope(evpid) || + ramqueue_lookup_offload(evpid)) + continue; + if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) { log_debug("scheduler_ramqueue: evp -> /corrupt"); queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid)); @@ -291,31 +316,42 @@ scheduler_ramqueue_insert(struct envelope *envelope) time_t curtm = time(NULL); log_debug("scheduler_ramqueue: insert"); - msgid = evpid_to_msgid(envelope->id); - rq_msg = ramqueue_lookup_message(msgid); - if (rq_msg == NULL) - rq_msg = ramqueue_insert_message(msgid); + rq_evp = ramqueue_lookup_offload(envelope->id); + if (rq_evp) { + rq_msg = rq_evp->rq_msg; + rq_batch = rq_evp->rq_batch; + rq_host = rq_evp->rq_host; + RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); + } + else { + msgid = evpid_to_msgid(envelope->id); + rq_msg = ramqueue_lookup_message(msgid); + if (rq_msg == NULL) + rq_msg = ramqueue_insert_message(msgid); - rq_host = ramqueue_lookup_host(envelope->dest.domain); - if (rq_host == NULL) - rq_host = ramqueue_insert_host(envelope->dest.domain); + rq_host = ramqueue_lookup_host(envelope->dest.domain); + if (rq_host == NULL) + rq_host = ramqueue_insert_host(envelope->dest.domain); - rq_batch = ramqueue_lookup_batch(rq_host, msgid); - if (rq_batch == NULL) - rq_batch = ramqueue_insert_batch(rq_host, msgid); + rq_batch = ramqueue_lookup_batch(rq_host, msgid); + if (rq_batch == NULL) + rq_batch = ramqueue_insert_batch(rq_host, msgid); - rq_evp = calloc(1, sizeof (*rq_evp)); - if (rq_evp == NULL) - fatal("calloc"); - rq_evp->evpid = envelope->id; + rq_evp = calloc(1, sizeof (*rq_evp)); + if (rq_evp == NULL) + fatal("calloc"); + rq_evp->evpid = envelope->id; + rq_batch->evpcnt++; + rq_msg->evpcnt++; + } + rq_evp->sched = ramqueue_next_schedule(envelope, curtm); rq_evp->rq_host = rq_host; rq_evp->rq_batch = rq_batch; rq_evp->rq_msg = rq_msg; - RB_INSERT(evptree, &rq_msg->evptree, rq_evp); TAILQ_INSERT_TAIL(&rq_batch->envelope_queue, rq_evp, - batchqueue_entry); + batchqueue_entry); /* sorted insert */ TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) { @@ -331,51 +367,77 @@ scheduler_ramqueue_insert(struct envelope *envelope) } static void -scheduler_ramqueue_remove(void *hdl, u_int64_t evpid) +scheduler_ramqueue_schedule(u_int64_t evpid) { - struct ramqueue_iter *iter = hdl; - struct ramqueue_batch *rq_batch; - struct ramqueue_message *rq_msg; struct ramqueue_envelope *rq_evp; - struct ramqueue_host *rq_host; + struct ramqueue_message *rq_msg; + struct ramqueue_batch *rq_batch; + struct ramqueue_host *rq_host; + + log_debug("scheduler_ramqueue: schedule"); - log_debug("scheduler_ramqueue: remove"); rq_evp = ramqueue_lookup_envelope(evpid); rq_msg = rq_evp->rq_msg; rq_batch = rq_evp->rq_batch; rq_host = rq_evp->rq_host; + /* remove from msg tree, batch queue and linear queue */ RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + + /* insert into offload tree*/ + RB_INSERT(offloadtree, &ramqueue.offloadtree, rq_evp); + + /* that's one less envelope to process in the ramqueue */ stat_decrement(STATS_RAMQUEUE_ENVELOPE); +} +static void +scheduler_ramqueue_remove(u_int64_t evpid) +{ + struct ramqueue_batch *rq_batch; + struct ramqueue_message *rq_msg; + struct ramqueue_envelope *rq_evp; + struct ramqueue_host *rq_host; - /* check if we are the last of a batch */ - if (TAILQ_FIRST(&rq_batch->envelope_queue) == NULL) { - ramqueue_remove_batch(rq_host, rq_batch); - if (iter != NULL && iter->type == RAMQUEUE_ITER_BATCH) { - log_debug("scheduler_ramqueue_remove: batch removed"); - iter->u.batch = NULL; - } + log_debug("scheduler_ramqueue: remove"); + + rq_evp = ramqueue_lookup_offload(evpid); + if (rq_evp) { + RB_REMOVE(offloadtree, &ramqueue.offloadtree, rq_evp); + rq_msg = rq_evp->rq_msg; + rq_batch = rq_evp->rq_batch; + rq_host = rq_evp->rq_host; + } + else { + rq_evp = ramqueue_lookup_envelope(evpid); + rq_msg = rq_evp->rq_msg; + rq_batch = rq_evp->rq_batch; + rq_host = rq_evp->rq_host; + + RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); + TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); + TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + stat_decrement(STATS_RAMQUEUE_ENVELOPE); } + rq_batch->evpcnt--; + rq_msg->evpcnt--; + /* check if we are the last of a message */ - if (RB_ROOT(&rq_msg->evptree) == NULL) { + if (rq_msg->evpcnt == 0) { ramqueue_remove_message(rq_msg); - if (iter != NULL && iter->type == RAMQUEUE_ITER_MESSAGE) { - log_debug("scheduler_ramqueue_remove: message removed"); - iter->u.message = NULL; - } + } + + /* check if we are the last of a batch */ + if (rq_batch->evpcnt == 0) { + ramqueue_remove_batch(rq_host, rq_batch); } /* check if we are the last of a host */ if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) { ramqueue_remove_host(rq_host); - if (iter != NULL && iter->type == RAMQUEUE_ITER_HOST) { - log_debug("scheduler_ramqueue_remove: host removed"); - iter->u.host = NULL; - } } free(rq_evp); @@ -470,9 +532,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) struct ramqueue_batch *rq_batch; switch (iter->type) { - case RAMQUEUE_ITER_HOST: { - if (iter->u.host == NULL) - return 0; + case RAMQUEUE_ITER_HOST: rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue); if (rq_batch == NULL) break; @@ -481,11 +541,8 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) break; *evpid = rq_evp->evpid; return 1; - } case RAMQUEUE_ITER_BATCH: - if (iter->u.batch == NULL) - return 0; rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue); if (rq_evp == NULL) break; @@ -493,8 +550,6 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) return 1; case RAMQUEUE_ITER_MESSAGE: - if (iter->u.message == NULL) - return 0; rq_evp = RB_ROOT(&iter->u.message->evptree); if (rq_evp == NULL) break; @@ -513,7 +568,7 @@ scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) } static int -scheduler_ramqueue_schedule(u_int64_t id) +scheduler_ramqueue_force(u_int64_t id) { struct ramqueue_envelope *rq_evp; struct ramqueue_message *rq_msg; @@ -575,6 +630,15 @@ ramqueue_lookup_message(u_int32_t msgid) } static struct ramqueue_envelope * +ramqueue_lookup_offload(u_int64_t evpid) +{ + struct ramqueue_envelope evpkey; + + evpkey.evpid = evpid; + return RB_FIND(offloadtree, &ramqueue.offloadtree, &evpkey); +} + +static struct ramqueue_envelope * ramqueue_lookup_envelope(u_int64_t evpid) { struct ramqueue_message *rq_msg; @@ -679,7 +743,6 @@ ramqueue_insert_host(char *host) rq_host = calloc(1, sizeof (*rq_host)); if (rq_host == NULL) fatal("calloc"); - rq_host->h_id = generate_uid(); strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname)); TAILQ_INIT(&rq_host->batch_queue); RB_INSERT(hosttree, &ramqueue.hosttree, rq_host); @@ -697,6 +760,7 @@ ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid) if (rq_batch == NULL) fatal("calloc"); rq_batch->b_id = generate_uid(); + rq_batch->rq_host = rq_host; rq_batch->msgid = msgid; TAILQ_INIT(&rq_batch->envelope_queue); @@ -752,6 +816,7 @@ ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2) return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid); } -RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); -RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); +RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); +RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); +RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); +RB_GENERATE(offloadtree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index d7873d85ebd..f92b24b7512 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.286 2012/01/31 21:05:26 gilles Exp $ */ +/* $OpenBSD: smtpd.h,v 1.287 2012/03/07 22:54:49 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -978,7 +978,8 @@ struct scheduler_backend { int (*next)(u_int64_t *, time_t *); void (*insert)(struct envelope *); - void (*remove)(void *, u_int64_t); + void (*schedule)(u_int64_t); + void (*remove)(u_int64_t); void *(*host)(char *); void *(*message)(u_int32_t); @@ -987,7 +988,7 @@ struct scheduler_backend { void (*close)(void *); int (*fetch)(void *, u_int64_t *); - int (*schedule)(u_int64_t); + int (*force)(u_int64_t); void (*display)(void); /* may be NULL */ }; |