summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGilles Chehade <gilles@cvs.openbsd.org>2012-01-28 11:33:08 +0000
committerGilles Chehade <gilles@cvs.openbsd.org>2012-01-28 11:33:08 +0000
commit749c34bfd2e10250537cd775420114de8ff52afc (patch)
tree6fbc1251f94ca26fefbdd4e4e52c01914c31bc89
parentd7597ba98292ef9da362b2fd9d7f1f4c102ff997 (diff)
- introduce the scheduler_backend API
- introduce the scheduler_ramqueue backend - remove all occurences of ramqueue outside of the ramqueue backend - teach runner how to use the new API it is now possible to write custom schedulers ! ok eric@, ok chl@
-rw-r--r--usr.sbin/smtpd/mta.c40
-rw-r--r--usr.sbin/smtpd/queue.c10
-rw-r--r--usr.sbin/smtpd/ramqueue.c476
-rw-r--r--usr.sbin/smtpd/runner.c273
-rw-r--r--usr.sbin/smtpd/scheduler.c50
-rw-r--r--usr.sbin/smtpd/scheduler_ramqueue.c674
-rw-r--r--usr.sbin/smtpd/smtpctl.c41
-rw-r--r--usr.sbin/smtpd/smtpd.h113
-rw-r--r--usr.sbin/smtpd/smtpd/Makefile6
9 files changed, 919 insertions, 764 deletions
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c
index bd079ab80ff..ef871907a03 100644
--- a/usr.sbin/smtpd/mta.c
+++ b/usr.sbin/smtpd/mta.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta.c,v 1.124 2012/01/26 12:31:53 eric Exp $ */
+/* $OpenBSD: mta.c,v 1.125 2012/01/28 11:33:06 gilles Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -59,7 +59,7 @@ static void mta_request_datafd(struct mta_session *);
static void
mta_imsg(struct imsgev *iev, struct imsg *imsg)
{
- struct ramqueue_batch *rq_batch;
+ struct mta_batch *mta_batch;
struct mta_session *s;
struct mta_relay *relay;
struct envelope *e;
@@ -73,34 +73,34 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg)
if (iev->proc == PROC_QUEUE) {
switch (imsg->hdr.type) {
case IMSG_BATCH_CREATE:
- rq_batch = imsg->data;
+ mta_batch = imsg->data;
s = calloc(1, sizeof *s);
if (s == NULL)
fatal(NULL);
- s->id = rq_batch->b_id;
+ s->id = mta_batch->id;
s->state = MTA_INIT;
/* establish host name */
- if (rq_batch->relay.hostname[0]) {
- s->host = strdup(rq_batch->relay.hostname);
+ if (mta_batch->relay.hostname[0]) {
+ s->host = strdup(mta_batch->relay.hostname);
s->flags |= MTA_FORCE_MX;
}
/* establish port */
- s->port = ntohs(rq_batch->relay.port); /* XXX */
+ s->port = ntohs(mta_batch->relay.port); /* XXX */
/* use auth? */
- if ((rq_batch->relay.flags & F_SSL) &&
- (rq_batch->relay.flags & F_AUTH)) {
+ if ((mta_batch->relay.flags & F_SSL) &&
+ (mta_batch->relay.flags & F_AUTH)) {
s->flags |= MTA_USE_AUTH;
- s->authmap = strdup(rq_batch->relay.authmap);
+ s->authmap = strdup(mta_batch->relay.authmap);
if (s->authmap == NULL)
fatalx("mta: strdup authmap");
}
/* force a particular SSL mode? */
- switch (rq_batch->relay.flags & F_SSL) {
+ switch (mta_batch->relay.flags & F_SSL) {
case F_SSL:
s->flags |= MTA_FORCE_ANYSSL;
break;
@@ -115,7 +115,7 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg)
}
/* have cert? */
- cert = rq_batch->relay.cert;
+ cert = mta_batch->relay.cert;
if (cert[0] != '\0') {
s->flags |= MTA_USE_CERT;
strlcpy(key.ssl_name, cert, sizeof(key.ssl_name));
@@ -150,8 +150,8 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg)
return;
case IMSG_BATCH_CLOSE:
- rq_batch = imsg->data;
- s = mta_lookup(rq_batch->b_id);
+ mta_batch = imsg->data;
+ s = mta_lookup(mta_batch->id);
if (s->flags & MTA_USE_CERT && s->ssl == NULL) {
mta_status(s, "190 certificate not found");
mta_enter_state(s, MTA_DONE, NULL);
@@ -160,10 +160,10 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg)
return;
case IMSG_QUEUE_MESSAGE_FD:
- rq_batch = imsg->data;
+ mta_batch = imsg->data;
if (imsg->fd == -1)
fatalx("mta: cannot obtain msgfd");
- s = mta_lookup(rq_batch->b_id);
+ s = mta_lookup(mta_batch->id);
s->datafp = fdopen(imsg->fd, "r");
if (s->datafp == NULL)
fatal("mta: fdopen");
@@ -734,15 +734,15 @@ mta_connect_done(int fd, short event, void *p)
static void
mta_request_datafd(struct mta_session *s)
{
- struct ramqueue_batch rq_batch;
+ struct mta_batch mta_batch;
struct envelope *e;
e = TAILQ_FIRST(&s->recipients);
- rq_batch.b_id = s->id;
- rq_batch.msgid = evpid_to_msgid(e->id);
+ mta_batch.id = s->id;
+ mta_batch.msgid = evpid_to_msgid(e->id);
imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_MESSAGE_FD,
- 0, 0, -1, &rq_batch, sizeof(rq_batch));
+ 0, 0, -1, &mta_batch, sizeof(mta_batch));
}
int
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index c4ad440ff03..b91b3cbfd95 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.116 2012/01/13 21:58:35 eric Exp $ */
+/* $OpenBSD: queue.c,v 1.117 2012/01/28 11:33:07 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -48,7 +48,7 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
{
struct submit_status ss;
struct envelope *e;
- struct ramqueue_batch *rq_batch;
+ struct mta_batch *mta_batch;
int fd, ret;
log_imsg(PROC_QUEUE, iev->proc, imsg);
@@ -139,10 +139,10 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg)
if (iev->proc == PROC_MTA) {
switch (imsg->hdr.type) {
case IMSG_QUEUE_MESSAGE_FD:
- rq_batch = imsg->data;
- fd = queue_message_fd_r(Q_QUEUE, rq_batch->msgid);
+ mta_batch = imsg->data;
+ fd = queue_message_fd_r(Q_QUEUE, mta_batch->msgid);
imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0,
- fd, rq_batch, sizeof *rq_batch);
+ fd, mta_batch, sizeof *mta_batch);
return;
case IMSG_QUEUE_DELIVERY_OK:
diff --git a/usr.sbin/smtpd/ramqueue.c b/usr.sbin/smtpd/ramqueue.c
index 16534fb3e79..e69de29bb2d 100644
--- a/usr.sbin/smtpd/ramqueue.c
+++ b/usr.sbin/smtpd/ramqueue.c
@@ -1,476 +0,0 @@
-/* $OpenBSD: ramqueue.c,v 1.31 2012/01/12 23:17:02 gilles Exp $ */
-
-/*
- * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org>
- *
- * Permission to use, copy, modify, and distribute this software for any
- * purpose with or without fee is hereby granted, provided that the above
- * copyright notice and this permission notice appear in all copies.
- *
- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
- */
-
-#include <sys/types.h>
-#include <sys/queue.h>
-#include <sys/tree.h>
-#include <sys/param.h>
-#include <sys/socket.h>
-#include <sys/stat.h>
-
-#include <dirent.h>
-#include <err.h>
-#include <errno.h>
-#include <event.h>
-#include <fcntl.h>
-#include <imsg.h>
-#include <inttypes.h>
-#include <libgen.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <time.h>
-#include <unistd.h>
-
-#include "smtpd.h"
-#include "log.h"
-
-
-void ramqueue_insert(struct ramqueue *, struct envelope *, time_t);
-int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *);
-void ramqueue_put_host(struct ramqueue *, struct ramqueue_host *);
-void ramqueue_put_batch(struct ramqueue *, struct ramqueue_batch *);
-
-static int ramqueue_expire(struct envelope *, time_t);
-static time_t ramqueue_next_schedule(struct envelope *, time_t);
-static struct ramqueue_host *ramqueue_get_host(struct ramqueue *, char *);
-static struct ramqueue_batch *ramqueue_get_batch(struct ramqueue *,
- struct ramqueue_host *, struct envelope *);
-
-
-void
-ramqueue_init(struct ramqueue *rqueue)
-{
- bzero(rqueue, sizeof (*rqueue));
- TAILQ_INIT(&rqueue->queue);
- RB_INIT(&rqueue->hosttree);
- RB_INIT(&rqueue->msgtree);
-}
-
-int
-ramqueue_is_empty(struct ramqueue *rqueue)
-{
- return TAILQ_FIRST(&rqueue->queue) == NULL;
-}
-
-int
-ramqueue_message_is_empty(struct ramqueue_message *rq_message)
-{
- return RB_EMPTY(&rq_message->evptree);
-}
-
-int
-ramqueue_batch_is_empty(struct ramqueue_batch *rq_batch)
-{
- return TAILQ_FIRST(&rq_batch->envelope_queue) == NULL;
-}
-
-int
-ramqueue_host_is_empty(struct ramqueue_host *rq_host)
-{
- return TAILQ_FIRST(&rq_host->batch_queue) == NULL;
-}
-
-struct ramqueue_envelope *
-ramqueue_next_envelope(struct ramqueue *rqueue)
-{
- struct ramqueue_envelope *rq_evp = NULL;
-
- TAILQ_FOREACH(rq_evp, &rqueue->queue, queue_entry) {
- if (rq_evp->rq_batch->type == D_MDA)
- if (env->sc_opts & SMTPD_MDA_PAUSED)
- continue;
- if (rq_evp->rq_batch->type == D_MTA)
- if (env->sc_opts & SMTPD_MTA_PAUSED)
- continue;
- break;
- }
-
- return rq_evp;
-}
-
-struct ramqueue_envelope *
-ramqueue_batch_first_envelope(struct ramqueue_batch *rq_batch)
-{
- return TAILQ_FIRST(&rq_batch->envelope_queue);
-}
-
-int
-ramqueue_load(struct ramqueue *rqueue, time_t *nsched)
-{
- time_t curtm;
- struct envelope envelope;
- static struct qwalk *q = NULL;
- struct ramqueue_envelope *rq_evp;
- u_int64_t evpid;
-
- log_debug("ramqueue: queue loading in progress");
-
- if (q == NULL)
- q = qwalk_new(Q_QUEUE, 0);
-
- while (qwalk(q, &evpid)) {
- curtm = time(NULL);
- if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) {
- log_debug("ramqueue: moved envelope to /corrupt");
- queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid));
- continue;
- }
- if (ramqueue_expire(&envelope, curtm))
- continue;
- ramqueue_insert(rqueue, &envelope, curtm);
-
- rq_evp = ramqueue_next_envelope(rqueue);
- if (rq_evp == NULL)
- continue;
-
- if (rq_evp->sched <= *nsched)
- *nsched = rq_evp->sched;
-
- if (*nsched <= curtm) {
- log_debug("ramqueue: loading interrupted");
- return (0);
- }
- }
- qwalk_close(q);
- q = NULL;
- log_debug("ramqueue: loading over");
- return (1);
-}
-
-void
-ramqueue_insert(struct ramqueue *rqueue, struct envelope *envelope, time_t curtm)
-{
- struct ramqueue_envelope *rq_evp;
- struct ramqueue_envelope *evp;
- struct ramqueue_message *rq_msg, msgkey;
-
- msgkey.msgid = evpid_to_msgid(envelope->id);
- rq_msg = RB_FIND(msgtree, &rqueue->msgtree, &msgkey);
- if (rq_msg == NULL) {
- rq_msg = calloc(1, sizeof (*rq_msg));
- if (rq_msg == NULL)
- fatal("calloc");
- rq_msg->msgid = msgkey.msgid;
- RB_INSERT(msgtree, &rqueue->msgtree, rq_msg);
- RB_INIT(&rq_msg->evptree);
- stat_increment(STATS_RAMQUEUE_MESSAGE);
- }
-
- rq_evp = calloc(1, sizeof (*rq_evp));
- if (rq_evp == NULL)
- fatal("calloc");
- rq_evp->evpid = envelope->id;
- rq_evp->sched = ramqueue_next_schedule(envelope, curtm);
- rq_evp->rq_host = ramqueue_get_host(rqueue, envelope->dest.domain);
- rq_evp->rq_batch = ramqueue_get_batch(rqueue, rq_evp->rq_host, envelope);
- RB_INSERT(evptree, &rq_msg->evptree, rq_evp);
- rq_evp->rq_msg = rq_msg;
-
- TAILQ_INSERT_TAIL(&rq_evp->rq_batch->envelope_queue, rq_evp,
- batchqueue_entry);
-
- /* sorted insert */
- TAILQ_FOREACH(evp, &rqueue->queue, queue_entry) {
- if (evp->sched >= rq_evp->sched) {
- TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry);
- break;
- }
- }
- if (evp == NULL)
- TAILQ_INSERT_TAIL(&rqueue->queue, rq_evp, queue_entry);
-
- stat_increment(STATS_RAMQUEUE_ENVELOPE);
-}
-
-static int
-ramqueue_expire(struct envelope *envelope, time_t curtm)
-{
- struct envelope bounce;
-
- if (curtm - envelope->creation >= envelope->expire) {
- envelope_set_errormsg(envelope,
- "message expired after sitting in queue for %d days",
- envelope->expire / 60 / 60 / 24);
- bounce_record_message(envelope, &bounce);
- ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL));
- log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
- __func__, envelope->id);
- queue_envelope_delete(Q_QUEUE, envelope);
- return 1;
- }
- return 0;
-}
-
-static time_t
-ramqueue_next_schedule(struct envelope *envelope, time_t curtm)
-{
- time_t delay;
-
- if (envelope->lasttry == 0)
- return curtm;
-
- delay = SMTPD_QUEUE_MAXINTERVAL;
-
- if (envelope->type == D_MDA ||
- envelope->type == D_BOUNCE) {
- if (envelope->retry < 5)
- return curtm;
-
- if (envelope->retry < 15)
- delay = (envelope->retry * 60) + arc4random_uniform(60);
- }
-
- if (envelope->type == D_MTA) {
- if (envelope->retry < 3)
- delay = SMTPD_QUEUE_INTERVAL;
- else if (envelope->retry <= 7) {
- delay = SMTPD_QUEUE_INTERVAL * (1 << (envelope->retry - 3));
- if (delay > SMTPD_QUEUE_MAXINTERVAL)
- delay = SMTPD_QUEUE_MAXINTERVAL;
- }
- }
-
- if (curtm >= envelope->lasttry + delay)
- return curtm;
-
- return curtm + delay;
-}
-
-static struct ramqueue_host *
-ramqueue_get_host(struct ramqueue *rqueue, char *hostname)
-{
- struct ramqueue_host *rq_host, key;
-
- strlcpy(key.hostname, hostname, sizeof(key.hostname));
- rq_host = RB_FIND(hosttree, &rqueue->hosttree, &key);
- if (rq_host == NULL) {
- rq_host = calloc(1, sizeof (*rq_host));
- if (rq_host == NULL)
- fatal("calloc");
- rq_host->h_id = generate_uid();
- strlcpy(rq_host->hostname, hostname, sizeof(rq_host->hostname));
- TAILQ_INIT(&rq_host->batch_queue);
- RB_INSERT(hosttree, &rqueue->hosttree, rq_host);
- stat_increment(STATS_RAMQUEUE_HOST);
- }
-
- return rq_host;
-}
-
-void
-ramqueue_put_host(struct ramqueue *rqueue, struct ramqueue_host *host)
-{
- TAILQ_INIT(&host->batch_queue);
- RB_INSERT(hosttree, &rqueue->hosttree, host);
-}
-
-static struct ramqueue_batch *
-ramqueue_get_batch(struct ramqueue *rqueue, struct ramqueue_host *host,
- struct envelope *envelope)
-{
- struct ramqueue_batch *rq_batch;
-
- TAILQ_FOREACH(rq_batch, &host->batch_queue, batch_entry) {
- if (rq_batch->msgid == evpid_to_msgid(envelope->id))
- return rq_batch;
- }
-
- rq_batch = calloc(1, sizeof (*rq_batch));
- if (rq_batch == NULL)
- fatal("calloc");
- rq_batch->b_id = generate_uid();
- rq_batch->relay = envelope->agent.mta.relay;
- rq_batch->type = envelope->type;
- rq_batch->msgid = evpid_to_msgid(envelope->id);
-
- TAILQ_INIT(&rq_batch->envelope_queue);
- TAILQ_INSERT_TAIL(&host->batch_queue, rq_batch, batch_entry);
-
- stat_increment(STATS_RAMQUEUE_BATCH);
-
- return rq_batch;
-}
-
-void
-ramqueue_put_batch(struct ramqueue *rqueue, struct ramqueue_batch *rq_batch)
-{
- struct ramqueue_host *rq_host;
-
- TAILQ_INIT(&rq_batch->envelope_queue);
- RB_FOREACH(rq_host, hosttree, &rqueue->hosttree) {
- if (rq_host->h_id == rq_batch->h_id) {
- TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch,
- batch_entry);
- return;
- }
- }
-}
-
-void
-ramqueue_remove_batch(struct ramqueue_host *rq_host, struct ramqueue_batch *rq_batch)
-{
- TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry);
- free(rq_batch);
- stat_decrement(STATS_RAMQUEUE_BATCH);
-}
-
-void
-ramqueue_remove_host(struct ramqueue *rqueue, struct ramqueue_host *rq_host)
-{
- RB_REMOVE(hosttree, &rqueue->hosttree, rq_host);
- free(rq_host);
- stat_decrement(STATS_RAMQUEUE_HOST);
-}
-
-void
-ramqueue_remove_message(struct ramqueue *rqueue, struct ramqueue_message *rq_msg)
-{
- RB_REMOVE(msgtree, &rqueue->msgtree, rq_msg);
- free(rq_msg);
- stat_decrement(STATS_RAMQUEUE_MESSAGE);
-}
-
-void
-ramqueue_schedule(struct ramqueue *rq, u_int64_t id)
-{
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope *rq_evp;
-
- /* schedule *all* */
- if (id == 0) {
- TAILQ_FOREACH(rq_evp, &rq->queue, queue_entry) {
- rq_evp->sched = 0;
- }
- }
-
- /* scheduling by evpid */
- if (id > 0xffffffffL) {
- rq_evp = ramqueue_lookup_envelope(rq, id);
- if (rq_evp == NULL)
- return;
- ramqueue_schedule_envelope(rq, rq_evp);
- return;
- }
-
- rq_msg = ramqueue_lookup_message(rq, id);
- if (rq_msg == NULL)
- return;
-
- /* scheduling by msgid */
- RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
- ramqueue_schedule_envelope(rq, rq_evp);
- }
-}
-
-void
-ramqueue_schedule_envelope(struct ramqueue *rq, struct ramqueue_envelope *rq_evp)
-{
- rq_evp->sched = 0;
- TAILQ_REMOVE(&rq->queue, rq_evp, queue_entry);
- TAILQ_INSERT_HEAD(&rq->queue, rq_evp, queue_entry);
-}
-
-struct ramqueue_envelope *
-ramqueue_envelope_by_id(struct ramqueue *rqueue, u_int64_t id)
-{
- struct ramqueue_envelope *rq_evp;
-
- TAILQ_FOREACH(rq_evp, &rqueue->queue, queue_entry) {
- if (rq_evp->evpid == id)
- return rq_evp;
- }
-
- return NULL;
-}
-
-int
-ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2)
-{
- return strcmp(h1->hostname, h2->hostname);
-}
-
-
-int
-ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2)
-{
- return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid);
-}
-
-int
-ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2)
-{
- return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid);
-}
-
-struct ramqueue_host *
-ramqueue_lookup_host(struct ramqueue *rq, char *hostname)
-{
- struct ramqueue_host hostkey;
-
- if (strlcpy(hostkey.hostname, hostname, sizeof(hostkey.hostname))
- >= sizeof(hostkey.hostname))
- fatalx("ramqueue_lookup_host: hostname truncated");
-
- return RB_FIND(hosttree, &rq->hosttree, &hostkey);
-}
-
-struct ramqueue_message *
-ramqueue_lookup_message(struct ramqueue *rq, u_int32_t msgid)
-{
- struct ramqueue_message msgkey;
-
- msgkey.msgid = msgid;
- return RB_FIND(msgtree, &rq->msgtree, &msgkey);
-}
-
-struct ramqueue_envelope *
-ramqueue_lookup_envelope(struct ramqueue *rq, u_int64_t evpid)
-{
- struct ramqueue_envelope evpkey;
- struct ramqueue_message *rq_msg;
-
- rq_msg = ramqueue_lookup_message(rq, evpid_to_msgid(evpid));
- if (rq_msg == NULL)
- return NULL;
-
- evpkey.evpid = evpid;
- return RB_FIND(evptree, &rq_msg->evptree, &evpkey);
-}
-
-void
-ramqueue_remove_envelope(struct ramqueue *rq, struct ramqueue_envelope *rq_evp)
-{
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
-
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
-
- RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
- TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
- TAILQ_REMOVE(&rq->queue, rq_evp, queue_entry);
- stat_decrement(STATS_RAMQUEUE_ENVELOPE);
- free(rq_evp);
-}
-
-
-RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
-RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
-
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c
index aa32883d9ed..4b0724a0401 100644
--- a/usr.sbin/smtpd/runner.c
+++ b/usr.sbin/smtpd/runner.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: runner.c,v 1.132 2012/01/13 14:01:58 eric Exp $ */
+/* $OpenBSD: runner.c,v 1.133 2012/01/28 11:33:07 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -44,7 +44,6 @@
#include "smtpd.h"
#include "log.h"
-
static void runner_imsg(struct imsgev *, struct imsg *);
static void runner_shutdown(void);
static void runner_sig_handler(int, short, void *);
@@ -52,11 +51,14 @@ static void runner_setup_events(void);
static void runner_reset_events(void);
static void runner_disable_events(void);
static void runner_timeout(int, short, void *);
-static int runner_process_envelope(struct ramqueue_envelope *, time_t);
-static void runner_process_batch(struct ramqueue_envelope *, time_t);
+static void runner_remove(u_int64_t);
+static void runner_remove_envelope(u_int64_t);
+static int runner_process_envelope(u_int64_t);
+static int runner_process_batch(enum delivery_type, u_int64_t);
static int runner_check_loop(struct envelope *);
-static int runner_force_message_to_ramqueue(struct ramqueue *, u_int32_t);
+static int runner_message_to_scheduler(u_int32_t);
+static struct scheduler_backend *scheduler = NULL;
void
runner_imsg(struct imsgev *iev, struct imsg *imsg)
@@ -68,8 +70,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
switch (imsg->hdr.type) {
case IMSG_QUEUE_COMMIT_MESSAGE:
e = imsg->data;
- runner_force_message_to_ramqueue(&env->sc_rqueue,
- evpid_to_msgid(e->id));
+ runner_message_to_scheduler(evpid_to_msgid(e->id));
runner_reset_events();
return;
@@ -84,7 +85,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
e = imsg->data;
e->retry++;
queue_envelope_update(Q_QUEUE, e);
- ramqueue_insert(&env->sc_rqueue, e, time(NULL));
+ scheduler->insert(e);
runner_reset_events();
return;
@@ -94,8 +95,9 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
if (e->type != D_BOUNCE && e->sender.user[0] != '\0') {
log_debug("PERMFAIL #2: %016" PRIx64, e->id);
bounce_record_message(e, &bounce);
- ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL));
+ scheduler->insert(&bounce);
runner_reset_events();
+
}
queue_envelope_delete(Q_QUEUE, e);
return;
@@ -112,7 +114,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
e = imsg->data;
if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) {
queue_envelope_update(Q_QUEUE, e);
- ramqueue_insert(&env->sc_rqueue, e, time(NULL));
+ scheduler->insert(e);
runner_reset_events();
return;
}
@@ -141,14 +143,12 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg)
return;
case IMSG_RUNNER_SCHEDULE:
- runner_schedule(&env->sc_rqueue,
- *(u_int64_t *)imsg->data);
+ scheduler->schedule(*(u_int64_t *)imsg->data);
runner_reset_events();
return;
case IMSG_RUNNER_REMOVE: {
- runner_remove(&env->sc_rqueue,
- *(u_int64_t *)imsg->data);
+ runner_remove(*(u_int64_t *)imsg->data);
runner_reset_events();
return;
}
@@ -244,7 +244,10 @@ runner(void)
setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
fatal("runner: cannot drop privileges");
- ramqueue_init(&env->sc_rqueue);
+ env->sc_scheduler = scheduler_backend_lookup(SCHED_RAMQUEUE);
+ scheduler = env->sc_scheduler;
+
+ scheduler->init();
imsg_callback = runner_imsg;
event_init();
@@ -275,54 +278,49 @@ runner(void)
void
runner_timeout(int fd, short event, void *p)
{
- struct ramqueue *rqueue = &env->sc_rqueue;
- struct ramqueue_envelope *rq_evp;
struct timeval tv;
static int rq_done = 0;
time_t nsched;
time_t curtm;
+ u_int64_t evpid;
nsched = 0;
again:
- rq_evp = ramqueue_next_envelope(rqueue);
- if (rq_evp)
- nsched = rq_evp->sched;
+ curtm = time(NULL);
+
+ /* set nsched to the time() of next schedulable envelope */
+ scheduler->next(NULL, &nsched);
/* load as many envelopes as possible from disk-queue to
* ram-queue until a schedulable envelope is found.
*/
if (! rq_done)
- rq_done = ramqueue_load(rqueue, &nsched);
+ rq_done = scheduler->setup(curtm, nsched);
- /* let's do the schedule dance baby ! */
- curtm = time(NULL);
- rq_evp = ramqueue_next_envelope(rqueue);
- while (rq_evp) {
- if (rq_evp->sched > curtm) {
- nsched = rq_evp->sched;
- break;
- }
- runner_process_envelope(rq_evp, curtm);
- rq_evp = ramqueue_next_envelope(rqueue);
+ if (rq_done) {
+ if (! scheduler->next(NULL, &nsched))
+ goto scheduler_sleep;
+ if (curtm < nsched)
+ goto scheduler_pause;
}
- if (rq_evp == NULL && rq_done) {
- log_debug("runner: nothing to schedule, wake me up. zZzZzZ");
- return;
- }
+ /* let's do the schedule dance baby ! */
+ while (scheduler->next(&evpid, &nsched)) {
+ if (curtm < nsched)
+ goto scheduler_pause;
- /* disk-queues not fully loaded, no time for sleeping */
- if (!rq_done)
- nsched = 0;
- else {
- nsched = nsched - curtm;
- if (nsched < 0)
- nsched = 0;
+ runner_process_envelope(evpid);
}
- if (nsched == 0)
- goto again;
+ if (rq_done)
+ goto scheduler_sleep;
+
+ goto again;
+
+
+scheduler_pause:
+ nsched = nsched - curtm;
log_debug("runner: nothing to do for the next %lld seconds, zZzZzZ",
(long long int) nsched);
@@ -330,139 +328,146 @@ again:
tv.tv_sec = nsched;
tv.tv_usec = 0;
evtimer_add(&env->sc_ev, &tv);
+ return;
+
+
+scheduler_sleep:
+ log_debug("runner: nothing to schedule, wake me up. zZzZzZ");
+ return;
}
-int
-runner_process_envelope(struct ramqueue_envelope *rq_evp, time_t curtm)
+static int
+runner_process_envelope(u_int64_t evpid)
{
- size_t mta_av, mda_av, bnc_av;
struct envelope envelope;
+ size_t mta_av, mda_av, bnc_av;
mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE);
mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE);
bnc_av = env->sc_maxconn - stat_get(STATS_RUNNER_BOUNCES, STAT_ACTIVE);
- if (rq_evp->rq_batch->type == D_MDA)
+ if (! queue_envelope_load(Q_QUEUE, evpid, &envelope))
+ return 0;
+
+ if (envelope.type == D_MDA)
if (mda_av == 0)
return 0;
- if (rq_evp->rq_batch->type == D_MTA)
+ if (envelope.type == D_MTA)
if (mta_av == 0)
return 0;
- if (rq_evp->rq_batch->type == D_BOUNCE)
+ if (envelope.type == D_BOUNCE)
if (bnc_av == 0)
return 0;
- if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid, &envelope))
- return 0;
-
if (runner_check_loop(&envelope)) {
struct envelope bounce;
envelope_set_errormsg(&envelope, "loop has been detected");
bounce_record_message(&envelope, &bounce);
- ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL));
- runner_setup_events();
- log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
- __func__, envelope.id);
+ scheduler->insert(&bounce);
queue_envelope_delete(Q_QUEUE, &envelope);
+
+ runner_setup_events();
return 0;
}
- runner_process_batch(rq_evp, curtm);
-
- return 1;
+ return runner_process_batch(envelope.type, evpid);
}
-
-void
-runner_process_batch(struct ramqueue_envelope *rq_evp, time_t curtm)
+static int
+runner_process_batch(enum delivery_type type, u_int64_t evpid)
{
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
- struct ramqueue_host *rq_host;
struct envelope evp;
+ void *batch;
int fd;
- rq_msg = rq_evp->rq_msg;
- rq_batch = rq_evp->rq_batch;
- rq_host = rq_evp->rq_host;
-
- switch (rq_batch->type) {
+ batch = scheduler->batch(evpid);
+ switch (type) {
case D_BOUNCE:
- while ((rq_evp = ramqueue_batch_first_envelope(rq_batch))) {
- if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid,
- &evp))
- return;
- evp.lasttry = curtm;
+ while (scheduler->fetch(batch, &evpid)) {
+ if (! queue_envelope_load(Q_QUEUE, evpid, &evp))
+ goto end;
+
+ evp.lasttry = time(NULL);
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp,
sizeof evp);
- ramqueue_remove_envelope(&env->sc_rqueue, rq_evp);
+ scheduler->remove(evpid);
}
-
stat_increment(STATS_RUNNER);
stat_increment(STATS_RUNNER_BOUNCES);
-
break;
case D_MDA:
- rq_evp = ramqueue_batch_first_envelope(rq_batch);
- if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid, &evp))
- return;
- evp.lasttry = curtm;
- fd = queue_message_fd_r(Q_QUEUE,
- evpid_to_msgid(rq_evp->evpid));
+ scheduler->fetch(batch, &evpid);
+ if (! queue_envelope_load(Q_QUEUE, evpid, &evp))
+ goto end;
+
+ evp.lasttry = time(NULL);
+ fd = queue_message_fd_r(Q_QUEUE, evpid_to_msgid(evpid));
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp,
sizeof evp);
- ramqueue_remove_envelope(&env->sc_rqueue, rq_evp);
+ scheduler->remove(evpid);
stat_increment(STATS_RUNNER);
stat_increment(STATS_MDA_SESSION);
-
break;
-
- case D_MTA:
+
+ case D_MTA: {
+ struct mta_batch mta_batch;
+
+ /* FIXME */
+ if (! scheduler->fetch(batch, &evpid))
+ goto end;
+ if (! queue_envelope_load(Q_QUEUE, evpid,
+ &evp))
+ goto end;
+
+ bzero(&mta_batch, sizeof mta_batch);
+ mta_batch.id = arc4random();
+ mta_batch.relay = evp.agent.mta.relay;
+
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CREATE, PROC_MTA, 0, -1, rq_batch,
- sizeof *rq_batch);
- while ((rq_evp = ramqueue_batch_first_envelope(rq_batch))) {
- if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid,
+ IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch,
+ sizeof mta_batch);
+
+ while (scheduler->fetch(batch, &evpid)) {
+ if (! queue_envelope_load(Q_QUEUE, evpid,
&evp))
- return;
- evp.lasttry = curtm;
- evp.batch_id = rq_batch->b_id;
+ goto end;
+ evp.lasttry = time(NULL); /* FIXME */
+ evp.batch_id = mta_batch.id;
+
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp,
sizeof evp);
- ramqueue_remove_envelope(&env->sc_rqueue, rq_evp);
+
+ scheduler->remove(evpid);
stat_increment(STATS_RUNNER);
}
+
imsg_compose_event(env->sc_ievs[PROC_QUEUE],
- IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, rq_batch,
- sizeof *rq_batch);
+ IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch,
+ sizeof mta_batch);
stat_increment(STATS_MTA_SESSION);
break;
+ }
default:
fatalx("runner_process_batchqueue: unknown type");
}
- if (ramqueue_message_is_empty(rq_msg))
- ramqueue_remove_message(&env->sc_rqueue, rq_msg);
-
- if (ramqueue_batch_is_empty(rq_batch))
- ramqueue_remove_batch(rq_host, rq_batch);
-
- if (ramqueue_host_is_empty(rq_host))
- ramqueue_remove_host(&env->sc_rqueue, rq_host);
+end:
+ scheduler->close(batch);
+ return 1;
}
-int
-runner_force_message_to_ramqueue(struct ramqueue *rqueue, u_int32_t msgid)
+static int
+runner_message_to_scheduler(u_int32_t msgid)
{
struct qwalk *q;
u_int64_t evpid;
@@ -475,14 +480,14 @@ runner_force_message_to_ramqueue(struct ramqueue *rqueue, u_int32_t msgid)
if (! queue_envelope_load(Q_QUEUE, evpid,
&envelope))
continue;
- ramqueue_insert(rqueue, &envelope, curtm);
+ scheduler->insert(&envelope);
}
qwalk_close(q);
return 1;
}
-int
+static int
runner_check_loop(struct envelope *ep)
{
int fd;
@@ -546,48 +551,30 @@ runner_check_loop(struct envelope *ep)
return ret;
}
-void
-runner_schedule(struct ramqueue *rq, u_int64_t id)
-{
- ramqueue_schedule(rq, id);
-}
-
-
-void
-runner_remove(struct ramqueue *rq, u_int64_t id)
+static void
+runner_remove(u_int64_t id)
{
- struct ramqueue_message *rq_msg;
- struct ramqueue_envelope *rq_evp;
+ void *msg;
/* removing by evpid */
if (id > 0xffffffffL) {
- rq_evp = ramqueue_lookup_envelope(rq, id);
- if (rq_evp == NULL)
- return;
- runner_remove_envelope(rq, rq_evp);
+ runner_remove_envelope(id);
return;
}
- rq_msg = ramqueue_lookup_message(rq, id);
- if (rq_msg == NULL)
- return;
-
- /* scheduling by msgid */
- RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
- runner_remove_envelope(rq, rq_evp);
- }
+ /* removing by msgid */
+ msg = scheduler->message(id);
+ while (scheduler->fetch(msg, &id))
+ runner_remove_envelope(id);
+ scheduler->close(msg);
}
-void
-runner_remove_envelope(struct ramqueue *rq, struct ramqueue_envelope *rq_evp)
+static void
+runner_remove_envelope(u_int64_t evpid)
{
struct envelope evp;
- if (queue_envelope_load(Q_QUEUE, rq_evp->evpid, &evp)) {
- log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
- __func__, evp.id);
- queue_envelope_delete(Q_QUEUE, &evp);
- }
-
- ramqueue_remove_envelope(rq, rq_evp);
+ evp.id = evpid;
+ queue_envelope_delete(Q_QUEUE, &evp);
+ scheduler->remove(evpid);
}
diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c
new file mode 100644
index 00000000000..ff843fa8019
--- /dev/null
+++ b/usr.sbin/smtpd/scheduler.c
@@ -0,0 +1,50 @@
+/* $OpenBSD: scheduler.c,v 1.1 2012/01/28 11:33:07 gilles Exp $ */
+
+/*
+ * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/param.h>
+#include <sys/socket.h>
+
+#include <ctype.h>
+#include <err.h>
+#include <event.h>
+#include <fcntl.h>
+#include <imsg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "smtpd.h"
+#include "log.h"
+
+extern struct scheduler_backend scheduler_backend_ramqueue;
+
+struct scheduler_backend *
+scheduler_backend_lookup(enum scheduler_type type)
+{
+ switch (type) {
+ case SCHED_RAMQUEUE:
+ return &scheduler_backend_ramqueue;
+ default:
+ fatal("unsupported scheduler_backend type");
+ }
+
+ return NULL;
+}
diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c
new file mode 100644
index 00000000000..1d547356ee4
--- /dev/null
+++ b/usr.sbin/smtpd/scheduler_ramqueue.c
@@ -0,0 +1,674 @@
+/* $OpenBSD: scheduler_ramqueue.c,v 1.1 2012/01/28 11:33:07 gilles Exp $ */
+
+/*
+ * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/param.h>
+#include <sys/socket.h>
+
+#include <ctype.h>
+#include <err.h>
+#include <event.h>
+#include <fcntl.h>
+#include <imsg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "smtpd.h"
+#include "log.h"
+
+
+struct ramqueue_host {
+ RB_ENTRY(ramqueue_host) hosttree_entry;
+ TAILQ_HEAD(,ramqueue_batch) batch_queue;
+ u_int64_t h_id;
+ char hostname[MAXHOSTNAMELEN];
+};
+struct ramqueue_batch {
+ enum delivery_type type;
+ TAILQ_ENTRY(ramqueue_batch) batch_entry;
+ TAILQ_HEAD(,ramqueue_envelope) envelope_queue;
+ u_int64_t h_id;
+ u_int64_t b_id;
+ u_int32_t msgid;
+};
+struct ramqueue_envelope {
+ TAILQ_ENTRY(ramqueue_envelope) queue_entry;
+ TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry;
+ RB_ENTRY(ramqueue_envelope) evptree_entry;
+ struct ramqueue_batch *rq_batch;
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_host *rq_host;
+ u_int64_t evpid;
+ time_t sched;
+};
+struct ramqueue_message {
+ RB_ENTRY(ramqueue_message) msgtree_entry;
+ RB_HEAD(evptree, ramqueue_envelope) evptree;
+ u_int32_t msgid;
+};
+struct ramqueue {
+ RB_HEAD(hosttree, ramqueue_host) hosttree;
+ RB_HEAD(msgtree, ramqueue_message) msgtree;
+ TAILQ_HEAD(,ramqueue_envelope) queue;
+};
+
+RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
+RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
+RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
+
+enum ramqueue_iter_type {
+ RAMQUEUE_ITER_HOST,
+ RAMQUEUE_ITER_BATCH,
+ RAMQUEUE_ITER_MESSAGE,
+ RAMQUEUE_ITER_QUEUE
+};
+
+struct ramqueue_iter {
+ enum ramqueue_iter_type type;
+ union {
+ struct ramqueue_host *host;
+ struct ramqueue_batch *batch;
+ struct ramqueue_message *message;
+ } u;
+};
+
+
+static int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *);
+static int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *);
+static int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *);
+static struct ramqueue_host *ramqueue_lookup_host(char *);
+static struct ramqueue_host *ramqueue_insert_host(char *);
+static void ramqueue_remove_host(struct ramqueue_host *);
+static struct ramqueue_batch *ramqueue_lookup_batch(struct ramqueue_host *,
+ u_int32_t);
+static struct ramqueue_batch *ramqueue_insert_batch(struct ramqueue_host *,
+ u_int32_t);
+static void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *);
+static struct ramqueue_message *ramqueue_lookup_message(u_int32_t);
+static struct ramqueue_message *ramqueue_insert_message(u_int32_t);
+static void ramqueue_remove_message(struct ramqueue_message *);
+
+static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t);
+
+
+/*NEEDSFIX*/
+static int ramqueue_expire(struct envelope *, time_t);
+static time_t ramqueue_next_schedule(struct envelope *, time_t);
+
+static void scheduler_ramqueue_init(void);
+static int scheduler_ramqueue_setup(time_t, time_t);
+static int scheduler_ramqueue_next(u_int64_t *, time_t *);
+static void scheduler_ramqueue_insert(struct envelope *);
+static void scheduler_ramqueue_remove(u_int64_t);
+static void *scheduler_ramqueue_host(char *);
+static void *scheduler_ramqueue_message(u_int32_t);
+static void *scheduler_ramqueue_batch(u_int64_t);
+static void *scheduler_ramqueue_queue(void);
+static void scheduler_ramqueue_close(void *);
+static int scheduler_ramqueue_fetch(void *, u_int64_t *);
+static int scheduler_ramqueue_schedule(u_int64_t);
+
+struct scheduler_backend scheduler_backend_ramqueue = {
+ scheduler_ramqueue_init,
+ scheduler_ramqueue_setup,
+ scheduler_ramqueue_next,
+ scheduler_ramqueue_insert,
+ scheduler_ramqueue_remove,
+ scheduler_ramqueue_host,
+ scheduler_ramqueue_message,
+ scheduler_ramqueue_batch,
+ scheduler_ramqueue_queue,
+ scheduler_ramqueue_close,
+ scheduler_ramqueue_fetch,
+ scheduler_ramqueue_schedule
+};
+static struct ramqueue ramqueue;
+
+
+static void
+scheduler_ramqueue_init(void)
+{
+ log_debug("scheduler_ramqueue: init");
+ bzero(&ramqueue, sizeof (ramqueue));
+ TAILQ_INIT(&ramqueue.queue);
+ RB_INIT(&ramqueue.hosttree);
+ RB_INIT(&ramqueue.msgtree);
+}
+
+static int
+scheduler_ramqueue_setup(time_t curtm, time_t nsched)
+{
+ struct envelope envelope;
+ static struct qwalk *q = NULL;
+ u_int64_t evpid;
+ time_t sched;
+
+ log_debug("scheduler_ramqueue: load");
+
+ log_info("scheduler_ramqueue: queue loading in progress");
+ if (q == NULL)
+ q = qwalk_new(Q_QUEUE, 0);
+
+ while (qwalk(q, &evpid)) {
+ if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) {
+ log_debug("scheduler_ramqueue: evp -> /corrupt");
+ queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid));
+ continue;
+ }
+ if (ramqueue_expire(&envelope, curtm))
+ continue;
+ scheduler_ramqueue_insert(&envelope);
+
+ if (! scheduler_ramqueue_next(&evpid, &sched))
+ continue;
+
+ if (sched <= nsched)
+ nsched = sched;
+
+ if (nsched <= curtm) {
+ log_debug("ramqueue: loading interrupted");
+ return (0);
+ }
+ }
+ qwalk_close(q);
+ q = NULL;
+ log_debug("ramqueue: loading over");
+ return (1);
+}
+
+static int
+scheduler_ramqueue_next(u_int64_t *evpid, time_t *sched)
+{
+ struct ramqueue_envelope *rq_evp = NULL;
+
+ log_debug("scheduler_ramqueue: next");
+ TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
+ if (rq_evp->rq_batch->type == D_MDA)
+ if (env->sc_opts & SMTPD_MDA_PAUSED)
+ continue;
+ if (rq_evp->rq_batch->type == D_MTA)
+ if (env->sc_opts & SMTPD_MTA_PAUSED)
+ continue;
+ if (evpid)
+ *evpid = rq_evp->evpid;
+ if (sched)
+ *sched = rq_evp->sched;
+ log_debug("scheduler_ramqueue: next: found");
+ return 1;
+ }
+
+ log_debug("scheduler_ramqueue: next: nothing schedulable");
+ return 0;
+}
+
+static void
+scheduler_ramqueue_insert(struct envelope *envelope)
+{
+ struct ramqueue_host *rq_host;
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_batch *rq_batch;
+ struct ramqueue_envelope *rq_evp, *evp;
+ u_int32_t msgid;
+ time_t curtm = time(NULL);
+
+ log_debug("scheduler_ramqueue: insert");
+ msgid = evpid_to_msgid(envelope->id);
+ rq_msg = ramqueue_lookup_message(msgid);
+ if (rq_msg == NULL)
+ rq_msg = ramqueue_insert_message(msgid);
+
+ rq_host = ramqueue_lookup_host(envelope->dest.domain);
+ if (rq_host == NULL)
+ rq_host = ramqueue_insert_host(envelope->dest.domain);
+
+ rq_batch = ramqueue_lookup_batch(rq_host, msgid);
+ if (rq_batch == NULL)
+ rq_batch = ramqueue_insert_batch(rq_host, msgid);
+
+ rq_evp = calloc(1, sizeof (*rq_evp));
+ if (rq_evp == NULL)
+ fatal("calloc");
+ rq_evp->evpid = envelope->id;
+ rq_evp->sched = ramqueue_next_schedule(envelope, curtm);
+ rq_evp->rq_host = rq_host;
+ rq_evp->rq_batch = rq_batch;
+ rq_evp->rq_msg = rq_msg;
+
+ RB_INSERT(evptree, &rq_msg->evptree, rq_evp);
+ rq_evp->rq_msg = rq_msg;
+
+ TAILQ_INSERT_TAIL(&rq_evp->rq_batch->envelope_queue, rq_evp,
+ batchqueue_entry);
+
+ /* sorted insert */
+ TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) {
+ if (evp->sched >= rq_evp->sched) {
+ TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry);
+ break;
+ }
+ }
+ if (evp == NULL)
+ TAILQ_INSERT_TAIL(&ramqueue.queue, rq_evp, queue_entry);
+
+ stat_increment(STATS_RAMQUEUE_ENVELOPE);
+}
+
+static void
+scheduler_ramqueue_remove(u_int64_t evpid)
+{
+ struct ramqueue_batch *rq_batch;
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_envelope *rq_evp;
+ struct ramqueue_host *rq_host;
+
+ log_debug("scheduler_ramqueue: remove");
+ rq_evp = ramqueue_lookup_envelope(evpid);
+ rq_msg = rq_evp->rq_msg;
+ rq_batch = rq_evp->rq_batch;
+ rq_host = rq_evp->rq_host;
+
+ RB_REMOVE(evptree, &rq_msg->evptree, rq_evp);
+ TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry);
+ TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+ stat_decrement(STATS_RAMQUEUE_ENVELOPE);
+
+ /* check if we are the last of a batch */
+ if (TAILQ_FIRST(&rq_batch->envelope_queue) == NULL)
+ ramqueue_remove_batch(rq_host, rq_batch);
+
+ /* check if we are the last of a message */
+ if (RB_ROOT(&rq_msg->evptree) == NULL)
+ ramqueue_remove_message(rq_msg);
+
+ /* check if we are the last of a host */
+ if (TAILQ_FIRST(&rq_host->batch_queue) == NULL)
+ ramqueue_remove_host(rq_host);
+
+ free(rq_evp);
+}
+
+static void *
+scheduler_ramqueue_host(char *host)
+{
+ struct ramqueue_iter *iter;
+ struct ramqueue_host *rq_host;
+
+ rq_host = ramqueue_lookup_host(host);
+ if (rq_host == NULL)
+ return NULL;
+
+ iter = calloc(1, sizeof *iter);
+ if (iter == NULL)
+ err(1, "calloc");
+
+ iter->type = RAMQUEUE_ITER_HOST;
+ iter->u.host = rq_host;
+
+ return iter;
+}
+
+static void *
+scheduler_ramqueue_batch(u_int64_t evpid)
+{
+ struct ramqueue_iter *iter;
+ struct ramqueue_envelope *rq_evp;
+
+ rq_evp = ramqueue_lookup_envelope(evpid);
+ if (rq_evp == NULL)
+ return NULL;
+
+ iter = calloc(1, sizeof *iter);
+ if (iter == NULL)
+ err(1, "calloc");
+
+ iter->type = RAMQUEUE_ITER_BATCH;
+ iter->u.batch = rq_evp->rq_batch;
+
+ return iter;
+}
+
+static void *
+scheduler_ramqueue_message(u_int32_t msgid)
+{
+ struct ramqueue_iter *iter;
+ struct ramqueue_message *rq_msg;
+
+ rq_msg = ramqueue_lookup_message(msgid);
+ if (rq_msg == NULL)
+ return NULL;
+
+ iter = calloc(1, sizeof *iter);
+ if (iter == NULL)
+ err(1, "calloc");
+
+ iter->type = RAMQUEUE_ITER_MESSAGE;
+ iter->u.message = rq_msg;
+
+ return iter;
+}
+
+static void *
+scheduler_ramqueue_queue(void)
+{
+ struct ramqueue_iter *iter;
+
+ iter = calloc(1, sizeof *iter);
+ if (iter == NULL)
+ err(1, "calloc");
+
+ iter->type = RAMQUEUE_ITER_QUEUE;
+
+ return iter;
+}
+
+static void
+scheduler_ramqueue_close(void *hdl)
+{
+ free(hdl);
+}
+
+int
+scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid)
+{
+ struct ramqueue_iter *iter = hdl;
+ struct ramqueue_envelope *rq_evp;
+ struct ramqueue_batch *rq_batch;
+
+ switch (iter->type) {
+ case RAMQUEUE_ITER_HOST: {
+ rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue);
+ if (rq_batch == NULL)
+ break;
+ rq_evp = TAILQ_FIRST(&rq_batch->envelope_queue);
+ if (rq_evp == NULL)
+ break;
+ *evpid = rq_evp->evpid;
+ return 1;
+ }
+
+ case RAMQUEUE_ITER_BATCH:
+ rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue);
+ if (rq_evp == NULL)
+ break;
+ *evpid = rq_evp->evpid;
+ return 1;
+
+ case RAMQUEUE_ITER_MESSAGE:
+ rq_evp = RB_ROOT(&iter->u.message->evptree);
+ if (rq_evp == NULL)
+ break;
+ *evpid = rq_evp->evpid;
+ return 1;
+
+ case RAMQUEUE_ITER_QUEUE:
+ rq_evp = TAILQ_FIRST(&ramqueue.queue);
+ if (rq_evp == NULL)
+ break;
+ *evpid = rq_evp->evpid;
+ return 1;
+ }
+
+ return 0;
+}
+
+static int
+scheduler_ramqueue_schedule(u_int64_t id)
+{
+ struct ramqueue_envelope *rq_evp;
+ struct ramqueue_message *rq_msg;
+ int ret;
+
+ /* schedule *all* */
+ if (id == 0) {
+ ret = 0;
+ TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) {
+ rq_evp->sched = 0;
+ ret++;
+ }
+ return ret;
+ }
+
+ /* scheduling by evpid */
+ if (id > 0xffffffffL) {
+ rq_evp = ramqueue_lookup_envelope(id);
+ if (rq_evp == NULL)
+ return 0;
+
+ rq_evp->sched = 0;
+ TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+ TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
+ return 1;
+ }
+
+ rq_msg = ramqueue_lookup_message(id);
+ if (rq_msg == NULL)
+ return 0;
+
+ /* scheduling by msgid */
+ ret = 0;
+ RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) {
+ rq_evp->sched = 0;
+ TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry);
+ TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry);
+ ret++;
+ }
+ return ret;
+}
+
+static struct ramqueue_host *
+ramqueue_lookup_host(char *host)
+{
+ struct ramqueue_host hostkey;
+
+ strlcpy(hostkey.hostname, host, sizeof(hostkey.hostname));
+ return RB_FIND(hosttree, &ramqueue.hosttree, &hostkey);
+}
+
+static struct ramqueue_message *
+ramqueue_lookup_message(u_int32_t msgid)
+{
+ struct ramqueue_message msgkey;
+
+ msgkey.msgid = msgid;
+ return RB_FIND(msgtree, &ramqueue.msgtree, &msgkey);
+}
+
+static struct ramqueue_envelope *
+ramqueue_lookup_envelope(u_int64_t evpid)
+{
+ struct ramqueue_message *rq_msg;
+ struct ramqueue_envelope evpkey;
+
+ rq_msg = ramqueue_lookup_message(evpid_to_msgid(evpid));
+ if (rq_msg == NULL)
+ return NULL;
+
+ evpkey.evpid = evpid;
+ return RB_FIND(evptree, &rq_msg->evptree, &evpkey);
+}
+
+static struct ramqueue_batch *
+ramqueue_lookup_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
+{
+ struct ramqueue_batch *rq_batch;
+
+ TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) {
+ if (rq_batch->msgid == msgid)
+ return rq_batch;
+ }
+
+ return NULL;
+}
+
+static int
+ramqueue_expire(struct envelope *envelope, time_t curtm)
+{
+ struct envelope bounce;
+
+ if (curtm - envelope->creation >= envelope->expire) {
+ envelope_set_errormsg(envelope,
+ "message expired after sitting in queue for %d days",
+ envelope->expire / 60 / 60 / 24);
+ bounce_record_message(envelope, &bounce);
+ scheduler_ramqueue_insert(&bounce);
+ log_debug("#### %s: queue_envelope_delete: %016" PRIx64,
+ __func__, envelope->id);
+ queue_envelope_delete(Q_QUEUE, envelope);
+ return 1;
+ }
+ return 0;
+}
+
+static time_t
+ramqueue_next_schedule(struct envelope *envelope, time_t curtm)
+{
+ time_t delay;
+
+ if (envelope->lasttry == 0)
+ return curtm;
+
+ delay = SMTPD_QUEUE_MAXINTERVAL;
+
+ if (envelope->type == D_MDA ||
+ envelope->type == D_BOUNCE) {
+ if (envelope->retry < 5)
+ return curtm;
+
+ if (envelope->retry < 15)
+ delay = (envelope->retry * 60) + arc4random_uniform(60);
+ }
+
+ if (envelope->type == D_MTA) {
+ if (envelope->retry < 3)
+ delay = SMTPD_QUEUE_INTERVAL;
+ else if (envelope->retry <= 7) {
+ delay = SMTPD_QUEUE_INTERVAL * (1 << (envelope->retry - 3));
+ if (delay > SMTPD_QUEUE_MAXINTERVAL)
+ delay = SMTPD_QUEUE_MAXINTERVAL;
+ }
+ }
+
+ if (curtm >= envelope->lasttry + delay)
+ return curtm;
+
+ return curtm + delay;
+}
+
+static struct ramqueue_message *
+ramqueue_insert_message(u_int32_t msgid)
+{
+ struct ramqueue_message *rq_msg;
+
+ rq_msg = calloc(1, sizeof (*rq_msg));
+ if (rq_msg == NULL)
+ fatal("calloc");
+ rq_msg->msgid = msgid;
+ RB_INSERT(msgtree, &ramqueue.msgtree, rq_msg);
+ RB_INIT(&rq_msg->evptree);
+ stat_increment(STATS_RAMQUEUE_MESSAGE);
+
+ return rq_msg;
+}
+
+static struct ramqueue_host *
+ramqueue_insert_host(char *host)
+{
+ struct ramqueue_host *rq_host;
+
+ rq_host = calloc(1, sizeof (*rq_host));
+ if (rq_host == NULL)
+ fatal("calloc");
+ rq_host->h_id = generate_uid();
+ strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname));
+ TAILQ_INIT(&rq_host->batch_queue);
+ RB_INSERT(hosttree, &ramqueue.hosttree, rq_host);
+ stat_increment(STATS_RAMQUEUE_HOST);
+
+ return rq_host;
+}
+
+static struct ramqueue_batch *
+ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid)
+{
+ struct ramqueue_batch *rq_batch;
+
+ rq_batch = calloc(1, sizeof (*rq_batch));
+ if (rq_batch == NULL)
+ fatal("calloc");
+ rq_batch->b_id = generate_uid();
+ rq_batch->msgid = msgid;
+
+ TAILQ_INIT(&rq_batch->envelope_queue);
+ TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, batch_entry);
+
+ stat_increment(STATS_RAMQUEUE_BATCH);
+
+ return rq_batch;
+}
+
+static void
+ramqueue_remove_host(struct ramqueue_host *rq_host)
+{
+ RB_REMOVE(hosttree, &ramqueue.hosttree, rq_host);
+ free(rq_host);
+ stat_decrement(STATS_RAMQUEUE_HOST);
+}
+
+static void
+ramqueue_remove_message(struct ramqueue_message *rq_msg)
+{
+ RB_REMOVE(msgtree, &ramqueue.msgtree, rq_msg);
+ free(rq_msg);
+ stat_decrement(STATS_RAMQUEUE_MESSAGE);
+}
+
+
+static void
+ramqueue_remove_batch(struct ramqueue_host *rq_host,
+ struct ramqueue_batch *rq_batch)
+{
+ TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry);
+ free(rq_batch);
+ stat_decrement(STATS_RAMQUEUE_BATCH);
+}
+
+static int
+ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2)
+{
+ return strcmp(h1->hostname, h2->hostname);
+}
+
+
+static int
+ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2)
+{
+ return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid);
+}
+
+static int
+ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2)
+{
+ return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid);
+}
+
+RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
+RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp);
+RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp);
diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c
index e4ab391b840..5b61f8fc846 100644
--- a/usr.sbin/smtpd/smtpctl.c
+++ b/usr.sbin/smtpd/smtpctl.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpctl.c,v 1.77 2012/01/24 12:20:18 eric Exp $ */
+/* $OpenBSD: smtpctl.c,v 1.78 2012/01/28 11:33:07 gilles Exp $ */
/*
* Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -44,7 +44,6 @@
void usage(void);
static void setup_env(struct smtpd *);
-static void show_sizes(void);
static int show_command_output(struct imsg *);
static int show_stats_output(struct imsg *);
static void show_queue(enum queue_kind, int);
@@ -127,9 +126,6 @@ main(int argc, char *argv[])
break;
case SHOW_RUNQUEUE:
break;
- case SHOW_SIZES:
- show_sizes();
- break;
default:
goto connected;
}
@@ -302,30 +298,6 @@ show_command_output(struct imsg *imsg)
return (1);
}
-void
-show_sizes(void)
-{
- /*
- * size _does_ matter.
- *
- * small changes to ramqueue and diskqueue structures may cause
- * large changes to memory and disk usage on busy/large hosts.
- *
- * this will help developers optimize memory/disk use, and help
- * admins understand how the ramqueue.size / ramqueue.size.max
- * stats are computed (smtpctl show stats).
- *
- * -- gilles@
- *
- */
- printf("struct ramqueue: %zu\n", sizeof (struct ramqueue));
- printf("struct ramqueue_host: %zu\n", sizeof (struct ramqueue_host));
- printf("struct ramqueue_message: %zu\n", sizeof (struct ramqueue_message));
- printf("struct ramqueue_envelope: %zu\n", sizeof (struct ramqueue_envelope));
-
- printf("struct envelope: %zu\n", sizeof (struct envelope));
-}
-
static void
stat_print(int stat, int what)
{
@@ -431,17 +403,6 @@ show_stats_output(struct imsg *imsg)
stat_print(STATS_RAMQUEUE_MESSAGE, STAT_MAXACTIVE);
stat_print(STATS_RAMQUEUE_ENVELOPE, STAT_MAXACTIVE);
- printf("ramqueue.size=%zd\n",
- s[STATS_RAMQUEUE_HOST].active * sizeof(struct ramqueue_host) +
- s[STATS_RAMQUEUE_BATCH].active * sizeof(struct ramqueue_batch) +
- s[STATS_RAMQUEUE_MESSAGE].active * sizeof(struct ramqueue_message) +
- s[STATS_RAMQUEUE_ENVELOPE].active * sizeof(struct ramqueue_envelope));
- printf("ramqueue.size.max=%zd\n",
- s[STATS_RAMQUEUE_HOST].maxactive * sizeof(struct ramqueue_host) +
- s[STATS_RAMQUEUE_BATCH].maxactive * sizeof(struct ramqueue_batch) +
- s[STATS_RAMQUEUE_MESSAGE].maxactive * sizeof(struct ramqueue_message) +
- s[STATS_RAMQUEUE_ENVELOPE].maxactive * sizeof(struct ramqueue_envelope));
-
printf("smtp.errors.delays=%zd\n", stats->smtp.delays);
printf("smtp.errors.linetoolong=%zd\n", stats->smtp.linetoolong);
printf("smtp.errors.read_eof=%zd\n", stats->smtp.read_eof);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 31a988ceaa1..c4e13f81666 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.281 2012/01/24 12:20:18 eric Exp $ */
+/* $OpenBSD: smtpd.h,v 1.282 2012/01/28 11:33:07 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -580,43 +580,6 @@ struct session {
};
-/* ram-queue structures */
-struct ramqueue_host {
- RB_ENTRY(ramqueue_host) hosttree_entry;
- TAILQ_HEAD(,ramqueue_batch) batch_queue;
- u_int64_t h_id;
- char hostname[MAXHOSTNAMELEN];
-};
-struct ramqueue_batch {
- TAILQ_ENTRY(ramqueue_batch) batch_entry;
- TAILQ_HEAD(,ramqueue_envelope) envelope_queue;
- enum delivery_type type;
- u_int64_t h_id;
- u_int64_t b_id;
- u_int32_t msgid;
- struct relayhost relay;
-};
-struct ramqueue_envelope {
- TAILQ_ENTRY(ramqueue_envelope) queue_entry;
- TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry;
- RB_ENTRY(ramqueue_envelope) evptree_entry;
- struct ramqueue_batch *rq_batch;
- struct ramqueue_message *rq_msg;
- struct ramqueue_host *rq_host;
- u_int64_t evpid;
- time_t sched;
-};
-struct ramqueue_message {
- RB_ENTRY(ramqueue_message) msgtree_entry;
- RB_HEAD(evptree, ramqueue_envelope) evptree;
- u_int32_t msgid;
-};
-struct ramqueue {
- RB_HEAD(hosttree, ramqueue_host) hosttree;
- RB_HEAD(msgtree, ramqueue_message) msgtree;
- TAILQ_HEAD(,ramqueue_envelope) queue;
-};
-
struct smtpd {
char sc_conffile[MAXPATHLEN];
size_t sc_maxsize;
@@ -642,8 +605,8 @@ struct smtpd {
char *sc_title[PROC_COUNT];
struct passwd *sc_pw;
char sc_hostname[MAXHOSTNAMELEN];
- struct ramqueue sc_rqueue;
struct queue_backend *sc_queue;
+ struct scheduler_backend *sc_scheduler;
TAILQ_HEAD(filterlist, filter) *sc_filters;
@@ -887,6 +850,12 @@ struct mta_session {
struct ssl *ssl;
};
+struct mta_batch {
+ u_int64_t id;
+ struct relayhost relay;
+
+ u_int32_t msgid;
+};
/* maps return structures */
struct map_secret {
@@ -977,6 +946,30 @@ struct delivery_backend {
};
+/* scheduler_backend */
+enum scheduler_type {
+ SCHED_RAMQUEUE,
+};
+
+struct scheduler_backend {
+ void (*init)(void);
+ int (*setup)(time_t, time_t);
+
+ int (*next)(u_int64_t *, time_t *);
+
+ void (*insert)(struct envelope *);
+ void (*remove)(u_int64_t);
+
+ void *(*host)(char *);
+ void *(*message)(u_int32_t);
+ void *(*batch)(u_int64_t);
+ void *(*queue)(void);
+ void (*close)(void *);
+
+ int (*fetch)(void *, u_int64_t *);
+ int (*schedule)(u_int64_t);
+};
+
extern struct smtpd *env;
@@ -1123,47 +1116,13 @@ int qwalk(void *, u_int64_t *);
void qwalk_close(void *);
-/* ramqueue.c */
-void ramqueue_init(struct ramqueue *);
-int ramqueue_load(struct ramqueue *, time_t *);
-int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *);
-int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *);
-int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *);
-int ramqueue_is_empty(struct ramqueue *);
-int ramqueue_is_empty(struct ramqueue *);
-int ramqueue_batch_is_empty(struct ramqueue_batch *);
-int ramqueue_host_is_empty(struct ramqueue_host *);
-void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *);
-void ramqueue_remove_host(struct ramqueue *, struct ramqueue_host *);
-struct ramqueue_envelope *ramqueue_envelope_by_id(struct ramqueue *, u_int64_t);
-struct ramqueue_envelope *ramqueue_first_envelope(struct ramqueue *);
-struct ramqueue_envelope *ramqueue_next_envelope(struct ramqueue *);
-struct ramqueue_envelope *ramqueue_batch_first_envelope(struct ramqueue_batch *);
-void ramqueue_insert(struct ramqueue *, struct envelope *, time_t);
-int ramqueue_message_is_empty(struct ramqueue_message *);
-void ramqueue_remove_message(struct ramqueue *, struct ramqueue_message *);
-
-struct ramqueue_host *ramqueue_lookup_host(struct ramqueue *, char *);
-struct ramqueue_message *ramqueue_lookup_message(struct ramqueue *, u_int32_t);
-struct ramqueue_envelope *ramqueue_lookup_envelope(struct ramqueue *, u_int64_t);
-
-void ramqueue_schedule(struct ramqueue *, u_int64_t);
-void ramqueue_schedule_envelope(struct ramqueue *, struct ramqueue_envelope *);
-
-void ramqueue_remove_envelope(struct ramqueue *, struct ramqueue_envelope *);
-
-
-RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp);
-RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp);
-RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp);
-
-
/* runner.c */
pid_t runner(void);
void message_reset_flags(struct envelope *);
-void runner_schedule(struct ramqueue *, u_int64_t);
-void runner_remove(struct ramqueue *, u_int64_t);
-void runner_remove_envelope(struct ramqueue *, struct ramqueue_envelope *);
+
+
+/* scheduler.c */
+struct scheduler_backend *scheduler_backend_lookup(enum scheduler_type);
/* smtp.c */
diff --git a/usr.sbin/smtpd/smtpd/Makefile b/usr.sbin/smtpd/smtpd/Makefile
index e96d6f3227f..6d4f9d4d538 100644
--- a/usr.sbin/smtpd/smtpd/Makefile
+++ b/usr.sbin/smtpd/smtpd/Makefile
@@ -1,4 +1,4 @@
-# $OpenBSD: Makefile,v 1.38 2012/01/11 22:24:37 gilles Exp $
+# $OpenBSD: Makefile,v 1.39 2012/01/28 11:33:07 gilles Exp $
PROG= smtpd
SRCS= aliases.c auth.c auth_bsd.c auth_pwd.c bounce.c \
@@ -10,8 +10,8 @@ SRCS= aliases.c auth.c auth_bsd.c auth_pwd.c bounce.c \
queue.c ruleset.c runner.c smtp.c \
smtp_session.c smtpd.c ssl.c ssl_privsep.c util.c asr.c \
print.c pack.c dname.c res_random.c sockaddr.c \
- ramqueue.c queue_backend.c queue_fsqueue.c \
- user.c user_pwd.c stats.c
+ queue_backend.c queue_fsqueue.c \
+ user.c user_pwd.c stats.c scheduler.c scheduler_ramqueue.c
MAN= smtpd.8 smtpd.conf.5
BINDIR= /usr/sbin