diff options
-rw-r--r-- | usr.sbin/smtpd/mta.c | 40 | ||||
-rw-r--r-- | usr.sbin/smtpd/queue.c | 10 | ||||
-rw-r--r-- | usr.sbin/smtpd/ramqueue.c | 476 | ||||
-rw-r--r-- | usr.sbin/smtpd/runner.c | 273 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler.c | 50 | ||||
-rw-r--r-- | usr.sbin/smtpd/scheduler_ramqueue.c | 674 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpctl.c | 41 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 113 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd/Makefile | 6 |
9 files changed, 919 insertions, 764 deletions
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index bd079ab80ff..ef871907a03 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.124 2012/01/26 12:31:53 eric Exp $ */ +/* $OpenBSD: mta.c,v 1.125 2012/01/28 11:33:06 gilles Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -59,7 +59,7 @@ static void mta_request_datafd(struct mta_session *); static void mta_imsg(struct imsgev *iev, struct imsg *imsg) { - struct ramqueue_batch *rq_batch; + struct mta_batch *mta_batch; struct mta_session *s; struct mta_relay *relay; struct envelope *e; @@ -73,34 +73,34 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg) if (iev->proc == PROC_QUEUE) { switch (imsg->hdr.type) { case IMSG_BATCH_CREATE: - rq_batch = imsg->data; + mta_batch = imsg->data; s = calloc(1, sizeof *s); if (s == NULL) fatal(NULL); - s->id = rq_batch->b_id; + s->id = mta_batch->id; s->state = MTA_INIT; /* establish host name */ - if (rq_batch->relay.hostname[0]) { - s->host = strdup(rq_batch->relay.hostname); + if (mta_batch->relay.hostname[0]) { + s->host = strdup(mta_batch->relay.hostname); s->flags |= MTA_FORCE_MX; } /* establish port */ - s->port = ntohs(rq_batch->relay.port); /* XXX */ + s->port = ntohs(mta_batch->relay.port); /* XXX */ /* use auth? */ - if ((rq_batch->relay.flags & F_SSL) && - (rq_batch->relay.flags & F_AUTH)) { + if ((mta_batch->relay.flags & F_SSL) && + (mta_batch->relay.flags & F_AUTH)) { s->flags |= MTA_USE_AUTH; - s->authmap = strdup(rq_batch->relay.authmap); + s->authmap = strdup(mta_batch->relay.authmap); if (s->authmap == NULL) fatalx("mta: strdup authmap"); } /* force a particular SSL mode? */ - switch (rq_batch->relay.flags & F_SSL) { + switch (mta_batch->relay.flags & F_SSL) { case F_SSL: s->flags |= MTA_FORCE_ANYSSL; break; @@ -115,7 +115,7 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg) } /* have cert? */ - cert = rq_batch->relay.cert; + cert = mta_batch->relay.cert; if (cert[0] != '\0') { s->flags |= MTA_USE_CERT; strlcpy(key.ssl_name, cert, sizeof(key.ssl_name)); @@ -150,8 +150,8 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg) return; case IMSG_BATCH_CLOSE: - rq_batch = imsg->data; - s = mta_lookup(rq_batch->b_id); + mta_batch = imsg->data; + s = mta_lookup(mta_batch->id); if (s->flags & MTA_USE_CERT && s->ssl == NULL) { mta_status(s, "190 certificate not found"); mta_enter_state(s, MTA_DONE, NULL); @@ -160,10 +160,10 @@ mta_imsg(struct imsgev *iev, struct imsg *imsg) return; case IMSG_QUEUE_MESSAGE_FD: - rq_batch = imsg->data; + mta_batch = imsg->data; if (imsg->fd == -1) fatalx("mta: cannot obtain msgfd"); - s = mta_lookup(rq_batch->b_id); + s = mta_lookup(mta_batch->id); s->datafp = fdopen(imsg->fd, "r"); if (s->datafp == NULL) fatal("mta: fdopen"); @@ -734,15 +734,15 @@ mta_connect_done(int fd, short event, void *p) static void mta_request_datafd(struct mta_session *s) { - struct ramqueue_batch rq_batch; + struct mta_batch mta_batch; struct envelope *e; e = TAILQ_FIRST(&s->recipients); - rq_batch.b_id = s->id; - rq_batch.msgid = evpid_to_msgid(e->id); + mta_batch.id = s->id; + mta_batch.msgid = evpid_to_msgid(e->id); imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_MESSAGE_FD, - 0, 0, -1, &rq_batch, sizeof(rq_batch)); + 0, 0, -1, &mta_batch, sizeof(mta_batch)); } int diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index c4ad440ff03..b91b3cbfd95 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.116 2012/01/13 21:58:35 eric Exp $ */ +/* $OpenBSD: queue.c,v 1.117 2012/01/28 11:33:07 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -48,7 +48,7 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) { struct submit_status ss; struct envelope *e; - struct ramqueue_batch *rq_batch; + struct mta_batch *mta_batch; int fd, ret; log_imsg(PROC_QUEUE, iev->proc, imsg); @@ -139,10 +139,10 @@ queue_imsg(struct imsgev *iev, struct imsg *imsg) if (iev->proc == PROC_MTA) { switch (imsg->hdr.type) { case IMSG_QUEUE_MESSAGE_FD: - rq_batch = imsg->data; - fd = queue_message_fd_r(Q_QUEUE, rq_batch->msgid); + mta_batch = imsg->data; + fd = queue_message_fd_r(Q_QUEUE, mta_batch->msgid); imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0, - fd, rq_batch, sizeof *rq_batch); + fd, mta_batch, sizeof *mta_batch); return; case IMSG_QUEUE_DELIVERY_OK: diff --git a/usr.sbin/smtpd/ramqueue.c b/usr.sbin/smtpd/ramqueue.c index 16534fb3e79..e69de29bb2d 100644 --- a/usr.sbin/smtpd/ramqueue.c +++ b/usr.sbin/smtpd/ramqueue.c @@ -1,476 +0,0 @@ -/* $OpenBSD: ramqueue.c,v 1.31 2012/01/12 23:17:02 gilles Exp $ */ - -/* - * Copyright (c) 2011 Gilles Chehade <gilles@openbsd.org> - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - */ - -#include <sys/types.h> -#include <sys/queue.h> -#include <sys/tree.h> -#include <sys/param.h> -#include <sys/socket.h> -#include <sys/stat.h> - -#include <dirent.h> -#include <err.h> -#include <errno.h> -#include <event.h> -#include <fcntl.h> -#include <imsg.h> -#include <inttypes.h> -#include <libgen.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <time.h> -#include <unistd.h> - -#include "smtpd.h" -#include "log.h" - - -void ramqueue_insert(struct ramqueue *, struct envelope *, time_t); -int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *); -void ramqueue_put_host(struct ramqueue *, struct ramqueue_host *); -void ramqueue_put_batch(struct ramqueue *, struct ramqueue_batch *); - -static int ramqueue_expire(struct envelope *, time_t); -static time_t ramqueue_next_schedule(struct envelope *, time_t); -static struct ramqueue_host *ramqueue_get_host(struct ramqueue *, char *); -static struct ramqueue_batch *ramqueue_get_batch(struct ramqueue *, - struct ramqueue_host *, struct envelope *); - - -void -ramqueue_init(struct ramqueue *rqueue) -{ - bzero(rqueue, sizeof (*rqueue)); - TAILQ_INIT(&rqueue->queue); - RB_INIT(&rqueue->hosttree); - RB_INIT(&rqueue->msgtree); -} - -int -ramqueue_is_empty(struct ramqueue *rqueue) -{ - return TAILQ_FIRST(&rqueue->queue) == NULL; -} - -int -ramqueue_message_is_empty(struct ramqueue_message *rq_message) -{ - return RB_EMPTY(&rq_message->evptree); -} - -int -ramqueue_batch_is_empty(struct ramqueue_batch *rq_batch) -{ - return TAILQ_FIRST(&rq_batch->envelope_queue) == NULL; -} - -int -ramqueue_host_is_empty(struct ramqueue_host *rq_host) -{ - return TAILQ_FIRST(&rq_host->batch_queue) == NULL; -} - -struct ramqueue_envelope * -ramqueue_next_envelope(struct ramqueue *rqueue) -{ - struct ramqueue_envelope *rq_evp = NULL; - - TAILQ_FOREACH(rq_evp, &rqueue->queue, queue_entry) { - if (rq_evp->rq_batch->type == D_MDA) - if (env->sc_opts & SMTPD_MDA_PAUSED) - continue; - if (rq_evp->rq_batch->type == D_MTA) - if (env->sc_opts & SMTPD_MTA_PAUSED) - continue; - break; - } - - return rq_evp; -} - -struct ramqueue_envelope * -ramqueue_batch_first_envelope(struct ramqueue_batch *rq_batch) -{ - return TAILQ_FIRST(&rq_batch->envelope_queue); -} - -int -ramqueue_load(struct ramqueue *rqueue, time_t *nsched) -{ - time_t curtm; - struct envelope envelope; - static struct qwalk *q = NULL; - struct ramqueue_envelope *rq_evp; - u_int64_t evpid; - - log_debug("ramqueue: queue loading in progress"); - - if (q == NULL) - q = qwalk_new(Q_QUEUE, 0); - - while (qwalk(q, &evpid)) { - curtm = time(NULL); - if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) { - log_debug("ramqueue: moved envelope to /corrupt"); - queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid)); - continue; - } - if (ramqueue_expire(&envelope, curtm)) - continue; - ramqueue_insert(rqueue, &envelope, curtm); - - rq_evp = ramqueue_next_envelope(rqueue); - if (rq_evp == NULL) - continue; - - if (rq_evp->sched <= *nsched) - *nsched = rq_evp->sched; - - if (*nsched <= curtm) { - log_debug("ramqueue: loading interrupted"); - return (0); - } - } - qwalk_close(q); - q = NULL; - log_debug("ramqueue: loading over"); - return (1); -} - -void -ramqueue_insert(struct ramqueue *rqueue, struct envelope *envelope, time_t curtm) -{ - struct ramqueue_envelope *rq_evp; - struct ramqueue_envelope *evp; - struct ramqueue_message *rq_msg, msgkey; - - msgkey.msgid = evpid_to_msgid(envelope->id); - rq_msg = RB_FIND(msgtree, &rqueue->msgtree, &msgkey); - if (rq_msg == NULL) { - rq_msg = calloc(1, sizeof (*rq_msg)); - if (rq_msg == NULL) - fatal("calloc"); - rq_msg->msgid = msgkey.msgid; - RB_INSERT(msgtree, &rqueue->msgtree, rq_msg); - RB_INIT(&rq_msg->evptree); - stat_increment(STATS_RAMQUEUE_MESSAGE); - } - - rq_evp = calloc(1, sizeof (*rq_evp)); - if (rq_evp == NULL) - fatal("calloc"); - rq_evp->evpid = envelope->id; - rq_evp->sched = ramqueue_next_schedule(envelope, curtm); - rq_evp->rq_host = ramqueue_get_host(rqueue, envelope->dest.domain); - rq_evp->rq_batch = ramqueue_get_batch(rqueue, rq_evp->rq_host, envelope); - RB_INSERT(evptree, &rq_msg->evptree, rq_evp); - rq_evp->rq_msg = rq_msg; - - TAILQ_INSERT_TAIL(&rq_evp->rq_batch->envelope_queue, rq_evp, - batchqueue_entry); - - /* sorted insert */ - TAILQ_FOREACH(evp, &rqueue->queue, queue_entry) { - if (evp->sched >= rq_evp->sched) { - TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry); - break; - } - } - if (evp == NULL) - TAILQ_INSERT_TAIL(&rqueue->queue, rq_evp, queue_entry); - - stat_increment(STATS_RAMQUEUE_ENVELOPE); -} - -static int -ramqueue_expire(struct envelope *envelope, time_t curtm) -{ - struct envelope bounce; - - if (curtm - envelope->creation >= envelope->expire) { - envelope_set_errormsg(envelope, - "message expired after sitting in queue for %d days", - envelope->expire / 60 / 60 / 24); - bounce_record_message(envelope, &bounce); - ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL)); - log_debug("#### %s: queue_envelope_delete: %016" PRIx64, - __func__, envelope->id); - queue_envelope_delete(Q_QUEUE, envelope); - return 1; - } - return 0; -} - -static time_t -ramqueue_next_schedule(struct envelope *envelope, time_t curtm) -{ - time_t delay; - - if (envelope->lasttry == 0) - return curtm; - - delay = SMTPD_QUEUE_MAXINTERVAL; - - if (envelope->type == D_MDA || - envelope->type == D_BOUNCE) { - if (envelope->retry < 5) - return curtm; - - if (envelope->retry < 15) - delay = (envelope->retry * 60) + arc4random_uniform(60); - } - - if (envelope->type == D_MTA) { - if (envelope->retry < 3) - delay = SMTPD_QUEUE_INTERVAL; - else if (envelope->retry <= 7) { - delay = SMTPD_QUEUE_INTERVAL * (1 << (envelope->retry - 3)); - if (delay > SMTPD_QUEUE_MAXINTERVAL) - delay = SMTPD_QUEUE_MAXINTERVAL; - } - } - - if (curtm >= envelope->lasttry + delay) - return curtm; - - return curtm + delay; -} - -static struct ramqueue_host * -ramqueue_get_host(struct ramqueue *rqueue, char *hostname) -{ - struct ramqueue_host *rq_host, key; - - strlcpy(key.hostname, hostname, sizeof(key.hostname)); - rq_host = RB_FIND(hosttree, &rqueue->hosttree, &key); - if (rq_host == NULL) { - rq_host = calloc(1, sizeof (*rq_host)); - if (rq_host == NULL) - fatal("calloc"); - rq_host->h_id = generate_uid(); - strlcpy(rq_host->hostname, hostname, sizeof(rq_host->hostname)); - TAILQ_INIT(&rq_host->batch_queue); - RB_INSERT(hosttree, &rqueue->hosttree, rq_host); - stat_increment(STATS_RAMQUEUE_HOST); - } - - return rq_host; -} - -void -ramqueue_put_host(struct ramqueue *rqueue, struct ramqueue_host *host) -{ - TAILQ_INIT(&host->batch_queue); - RB_INSERT(hosttree, &rqueue->hosttree, host); -} - -static struct ramqueue_batch * -ramqueue_get_batch(struct ramqueue *rqueue, struct ramqueue_host *host, - struct envelope *envelope) -{ - struct ramqueue_batch *rq_batch; - - TAILQ_FOREACH(rq_batch, &host->batch_queue, batch_entry) { - if (rq_batch->msgid == evpid_to_msgid(envelope->id)) - return rq_batch; - } - - rq_batch = calloc(1, sizeof (*rq_batch)); - if (rq_batch == NULL) - fatal("calloc"); - rq_batch->b_id = generate_uid(); - rq_batch->relay = envelope->agent.mta.relay; - rq_batch->type = envelope->type; - rq_batch->msgid = evpid_to_msgid(envelope->id); - - TAILQ_INIT(&rq_batch->envelope_queue); - TAILQ_INSERT_TAIL(&host->batch_queue, rq_batch, batch_entry); - - stat_increment(STATS_RAMQUEUE_BATCH); - - return rq_batch; -} - -void -ramqueue_put_batch(struct ramqueue *rqueue, struct ramqueue_batch *rq_batch) -{ - struct ramqueue_host *rq_host; - - TAILQ_INIT(&rq_batch->envelope_queue); - RB_FOREACH(rq_host, hosttree, &rqueue->hosttree) { - if (rq_host->h_id == rq_batch->h_id) { - TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, - batch_entry); - return; - } - } -} - -void -ramqueue_remove_batch(struct ramqueue_host *rq_host, struct ramqueue_batch *rq_batch) -{ - TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry); - free(rq_batch); - stat_decrement(STATS_RAMQUEUE_BATCH); -} - -void -ramqueue_remove_host(struct ramqueue *rqueue, struct ramqueue_host *rq_host) -{ - RB_REMOVE(hosttree, &rqueue->hosttree, rq_host); - free(rq_host); - stat_decrement(STATS_RAMQUEUE_HOST); -} - -void -ramqueue_remove_message(struct ramqueue *rqueue, struct ramqueue_message *rq_msg) -{ - RB_REMOVE(msgtree, &rqueue->msgtree, rq_msg); - free(rq_msg); - stat_decrement(STATS_RAMQUEUE_MESSAGE); -} - -void -ramqueue_schedule(struct ramqueue *rq, u_int64_t id) -{ - struct ramqueue_message *rq_msg; - struct ramqueue_envelope *rq_evp; - - /* schedule *all* */ - if (id == 0) { - TAILQ_FOREACH(rq_evp, &rq->queue, queue_entry) { - rq_evp->sched = 0; - } - } - - /* scheduling by evpid */ - if (id > 0xffffffffL) { - rq_evp = ramqueue_lookup_envelope(rq, id); - if (rq_evp == NULL) - return; - ramqueue_schedule_envelope(rq, rq_evp); - return; - } - - rq_msg = ramqueue_lookup_message(rq, id); - if (rq_msg == NULL) - return; - - /* scheduling by msgid */ - RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) { - ramqueue_schedule_envelope(rq, rq_evp); - } -} - -void -ramqueue_schedule_envelope(struct ramqueue *rq, struct ramqueue_envelope *rq_evp) -{ - rq_evp->sched = 0; - TAILQ_REMOVE(&rq->queue, rq_evp, queue_entry); - TAILQ_INSERT_HEAD(&rq->queue, rq_evp, queue_entry); -} - -struct ramqueue_envelope * -ramqueue_envelope_by_id(struct ramqueue *rqueue, u_int64_t id) -{ - struct ramqueue_envelope *rq_evp; - - TAILQ_FOREACH(rq_evp, &rqueue->queue, queue_entry) { - if (rq_evp->evpid == id) - return rq_evp; - } - - return NULL; -} - -int -ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2) -{ - return strcmp(h1->hostname, h2->hostname); -} - - -int -ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2) -{ - return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid); -} - -int -ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2) -{ - return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid); -} - -struct ramqueue_host * -ramqueue_lookup_host(struct ramqueue *rq, char *hostname) -{ - struct ramqueue_host hostkey; - - if (strlcpy(hostkey.hostname, hostname, sizeof(hostkey.hostname)) - >= sizeof(hostkey.hostname)) - fatalx("ramqueue_lookup_host: hostname truncated"); - - return RB_FIND(hosttree, &rq->hosttree, &hostkey); -} - -struct ramqueue_message * -ramqueue_lookup_message(struct ramqueue *rq, u_int32_t msgid) -{ - struct ramqueue_message msgkey; - - msgkey.msgid = msgid; - return RB_FIND(msgtree, &rq->msgtree, &msgkey); -} - -struct ramqueue_envelope * -ramqueue_lookup_envelope(struct ramqueue *rq, u_int64_t evpid) -{ - struct ramqueue_envelope evpkey; - struct ramqueue_message *rq_msg; - - rq_msg = ramqueue_lookup_message(rq, evpid_to_msgid(evpid)); - if (rq_msg == NULL) - return NULL; - - evpkey.evpid = evpid; - return RB_FIND(evptree, &rq_msg->evptree, &evpkey); -} - -void -ramqueue_remove_envelope(struct ramqueue *rq, struct ramqueue_envelope *rq_evp) -{ - struct ramqueue_batch *rq_batch; - struct ramqueue_message *rq_msg; - - rq_msg = rq_evp->rq_msg; - rq_batch = rq_evp->rq_batch; - - RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); - TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); - TAILQ_REMOVE(&rq->queue, rq_evp, queue_entry); - stat_decrement(STATS_RAMQUEUE_ENVELOPE); - free(rq_evp); -} - - -RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); -RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); - diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c index aa32883d9ed..4b0724a0401 100644 --- a/usr.sbin/smtpd/runner.c +++ b/usr.sbin/smtpd/runner.c @@ -1,4 +1,4 @@ -/* $OpenBSD: runner.c,v 1.132 2012/01/13 14:01:58 eric Exp $ */ +/* $OpenBSD: runner.c,v 1.133 2012/01/28 11:33:07 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -44,7 +44,6 @@ #include "smtpd.h" #include "log.h" - static void runner_imsg(struct imsgev *, struct imsg *); static void runner_shutdown(void); static void runner_sig_handler(int, short, void *); @@ -52,11 +51,14 @@ static void runner_setup_events(void); static void runner_reset_events(void); static void runner_disable_events(void); static void runner_timeout(int, short, void *); -static int runner_process_envelope(struct ramqueue_envelope *, time_t); -static void runner_process_batch(struct ramqueue_envelope *, time_t); +static void runner_remove(u_int64_t); +static void runner_remove_envelope(u_int64_t); +static int runner_process_envelope(u_int64_t); +static int runner_process_batch(enum delivery_type, u_int64_t); static int runner_check_loop(struct envelope *); -static int runner_force_message_to_ramqueue(struct ramqueue *, u_int32_t); +static int runner_message_to_scheduler(u_int32_t); +static struct scheduler_backend *scheduler = NULL; void runner_imsg(struct imsgev *iev, struct imsg *imsg) @@ -68,8 +70,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) switch (imsg->hdr.type) { case IMSG_QUEUE_COMMIT_MESSAGE: e = imsg->data; - runner_force_message_to_ramqueue(&env->sc_rqueue, - evpid_to_msgid(e->id)); + runner_message_to_scheduler(evpid_to_msgid(e->id)); runner_reset_events(); return; @@ -84,7 +85,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) e = imsg->data; e->retry++; queue_envelope_update(Q_QUEUE, e); - ramqueue_insert(&env->sc_rqueue, e, time(NULL)); + scheduler->insert(e); runner_reset_events(); return; @@ -94,8 +95,9 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) if (e->type != D_BOUNCE && e->sender.user[0] != '\0') { log_debug("PERMFAIL #2: %016" PRIx64, e->id); bounce_record_message(e, &bounce); - ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL)); + scheduler->insert(&bounce); runner_reset_events(); + } queue_envelope_delete(Q_QUEUE, e); return; @@ -112,7 +114,7 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) e = imsg->data; if (imsg->fd < 0 || !bounce_session(imsg->fd, e)) { queue_envelope_update(Q_QUEUE, e); - ramqueue_insert(&env->sc_rqueue, e, time(NULL)); + scheduler->insert(e); runner_reset_events(); return; } @@ -141,14 +143,12 @@ runner_imsg(struct imsgev *iev, struct imsg *imsg) return; case IMSG_RUNNER_SCHEDULE: - runner_schedule(&env->sc_rqueue, - *(u_int64_t *)imsg->data); + scheduler->schedule(*(u_int64_t *)imsg->data); runner_reset_events(); return; case IMSG_RUNNER_REMOVE: { - runner_remove(&env->sc_rqueue, - *(u_int64_t *)imsg->data); + runner_remove(*(u_int64_t *)imsg->data); runner_reset_events(); return; } @@ -244,7 +244,10 @@ runner(void) setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) fatal("runner: cannot drop privileges"); - ramqueue_init(&env->sc_rqueue); + env->sc_scheduler = scheduler_backend_lookup(SCHED_RAMQUEUE); + scheduler = env->sc_scheduler; + + scheduler->init(); imsg_callback = runner_imsg; event_init(); @@ -275,54 +278,49 @@ runner(void) void runner_timeout(int fd, short event, void *p) { - struct ramqueue *rqueue = &env->sc_rqueue; - struct ramqueue_envelope *rq_evp; struct timeval tv; static int rq_done = 0; time_t nsched; time_t curtm; + u_int64_t evpid; nsched = 0; again: - rq_evp = ramqueue_next_envelope(rqueue); - if (rq_evp) - nsched = rq_evp->sched; + curtm = time(NULL); + + /* set nsched to the time() of next schedulable envelope */ + scheduler->next(NULL, &nsched); /* load as many envelopes as possible from disk-queue to * ram-queue until a schedulable envelope is found. */ if (! rq_done) - rq_done = ramqueue_load(rqueue, &nsched); + rq_done = scheduler->setup(curtm, nsched); - /* let's do the schedule dance baby ! */ - curtm = time(NULL); - rq_evp = ramqueue_next_envelope(rqueue); - while (rq_evp) { - if (rq_evp->sched > curtm) { - nsched = rq_evp->sched; - break; - } - runner_process_envelope(rq_evp, curtm); - rq_evp = ramqueue_next_envelope(rqueue); + if (rq_done) { + if (! scheduler->next(NULL, &nsched)) + goto scheduler_sleep; + if (curtm < nsched) + goto scheduler_pause; } - if (rq_evp == NULL && rq_done) { - log_debug("runner: nothing to schedule, wake me up. zZzZzZ"); - return; - } + /* let's do the schedule dance baby ! */ + while (scheduler->next(&evpid, &nsched)) { + if (curtm < nsched) + goto scheduler_pause; - /* disk-queues not fully loaded, no time for sleeping */ - if (!rq_done) - nsched = 0; - else { - nsched = nsched - curtm; - if (nsched < 0) - nsched = 0; + runner_process_envelope(evpid); } - if (nsched == 0) - goto again; + if (rq_done) + goto scheduler_sleep; + + goto again; + + +scheduler_pause: + nsched = nsched - curtm; log_debug("runner: nothing to do for the next %lld seconds, zZzZzZ", (long long int) nsched); @@ -330,139 +328,146 @@ again: tv.tv_sec = nsched; tv.tv_usec = 0; evtimer_add(&env->sc_ev, &tv); + return; + + +scheduler_sleep: + log_debug("runner: nothing to schedule, wake me up. zZzZzZ"); + return; } -int -runner_process_envelope(struct ramqueue_envelope *rq_evp, time_t curtm) +static int +runner_process_envelope(u_int64_t evpid) { - size_t mta_av, mda_av, bnc_av; struct envelope envelope; + size_t mta_av, mda_av, bnc_av; mta_av = env->sc_maxconn - stat_get(STATS_MTA_SESSION, STAT_ACTIVE); mda_av = env->sc_maxconn - stat_get(STATS_MDA_SESSION, STAT_ACTIVE); bnc_av = env->sc_maxconn - stat_get(STATS_RUNNER_BOUNCES, STAT_ACTIVE); - if (rq_evp->rq_batch->type == D_MDA) + if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) + return 0; + + if (envelope.type == D_MDA) if (mda_av == 0) return 0; - if (rq_evp->rq_batch->type == D_MTA) + if (envelope.type == D_MTA) if (mta_av == 0) return 0; - if (rq_evp->rq_batch->type == D_BOUNCE) + if (envelope.type == D_BOUNCE) if (bnc_av == 0) return 0; - if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid, &envelope)) - return 0; - if (runner_check_loop(&envelope)) { struct envelope bounce; envelope_set_errormsg(&envelope, "loop has been detected"); bounce_record_message(&envelope, &bounce); - ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL)); - runner_setup_events(); - log_debug("#### %s: queue_envelope_delete: %016" PRIx64, - __func__, envelope.id); + scheduler->insert(&bounce); queue_envelope_delete(Q_QUEUE, &envelope); + + runner_setup_events(); return 0; } - runner_process_batch(rq_evp, curtm); - - return 1; + return runner_process_batch(envelope.type, evpid); } - -void -runner_process_batch(struct ramqueue_envelope *rq_evp, time_t curtm) +static int +runner_process_batch(enum delivery_type type, u_int64_t evpid) { - struct ramqueue_batch *rq_batch; - struct ramqueue_message *rq_msg; - struct ramqueue_host *rq_host; struct envelope evp; + void *batch; int fd; - rq_msg = rq_evp->rq_msg; - rq_batch = rq_evp->rq_batch; - rq_host = rq_evp->rq_host; - - switch (rq_batch->type) { + batch = scheduler->batch(evpid); + switch (type) { case D_BOUNCE: - while ((rq_evp = ramqueue_batch_first_envelope(rq_batch))) { - if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid, - &evp)) - return; - evp.lasttry = curtm; + while (scheduler->fetch(batch, &evpid)) { + if (! queue_envelope_load(Q_QUEUE, evpid, &evp)) + goto end; + + evp.lasttry = time(NULL); imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &evp, sizeof evp); - ramqueue_remove_envelope(&env->sc_rqueue, rq_evp); + scheduler->remove(evpid); } - stat_increment(STATS_RUNNER); stat_increment(STATS_RUNNER_BOUNCES); - break; case D_MDA: - rq_evp = ramqueue_batch_first_envelope(rq_batch); - if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid, &evp)) - return; - evp.lasttry = curtm; - fd = queue_message_fd_r(Q_QUEUE, - evpid_to_msgid(rq_evp->evpid)); + scheduler->fetch(batch, &evpid); + if (! queue_envelope_load(Q_QUEUE, evpid, &evp)) + goto end; + + evp.lasttry = time(NULL); + fd = queue_message_fd_r(Q_QUEUE, evpid_to_msgid(evpid)); imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &evp, sizeof evp); - ramqueue_remove_envelope(&env->sc_rqueue, rq_evp); + scheduler->remove(evpid); stat_increment(STATS_RUNNER); stat_increment(STATS_MDA_SESSION); - break; - - case D_MTA: + + case D_MTA: { + struct mta_batch mta_batch; + + /* FIXME */ + if (! scheduler->fetch(batch, &evpid)) + goto end; + if (! queue_envelope_load(Q_QUEUE, evpid, + &evp)) + goto end; + + bzero(&mta_batch, sizeof mta_batch); + mta_batch.id = arc4random(); + mta_batch.relay = evp.agent.mta.relay; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CREATE, PROC_MTA, 0, -1, rq_batch, - sizeof *rq_batch); - while ((rq_evp = ramqueue_batch_first_envelope(rq_batch))) { - if (! queue_envelope_load(Q_QUEUE, rq_evp->evpid, + IMSG_BATCH_CREATE, PROC_MTA, 0, -1, &mta_batch, + sizeof mta_batch); + + while (scheduler->fetch(batch, &evpid)) { + if (! queue_envelope_load(Q_QUEUE, evpid, &evp)) - return; - evp.lasttry = curtm; - evp.batch_id = rq_batch->b_id; + goto end; + evp.lasttry = time(NULL); /* FIXME */ + evp.batch_id = mta_batch.id; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &evp, sizeof evp); - ramqueue_remove_envelope(&env->sc_rqueue, rq_evp); + + scheduler->remove(evpid); stat_increment(STATS_RUNNER); } + imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, rq_batch, - sizeof *rq_batch); + IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, &mta_batch, + sizeof mta_batch); stat_increment(STATS_MTA_SESSION); break; + } default: fatalx("runner_process_batchqueue: unknown type"); } - if (ramqueue_message_is_empty(rq_msg)) - ramqueue_remove_message(&env->sc_rqueue, rq_msg); - - if (ramqueue_batch_is_empty(rq_batch)) - ramqueue_remove_batch(rq_host, rq_batch); - - if (ramqueue_host_is_empty(rq_host)) - ramqueue_remove_host(&env->sc_rqueue, rq_host); +end: + scheduler->close(batch); + return 1; } -int -runner_force_message_to_ramqueue(struct ramqueue *rqueue, u_int32_t msgid) +static int +runner_message_to_scheduler(u_int32_t msgid) { struct qwalk *q; u_int64_t evpid; @@ -475,14 +480,14 @@ runner_force_message_to_ramqueue(struct ramqueue *rqueue, u_int32_t msgid) if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) continue; - ramqueue_insert(rqueue, &envelope, curtm); + scheduler->insert(&envelope); } qwalk_close(q); return 1; } -int +static int runner_check_loop(struct envelope *ep) { int fd; @@ -546,48 +551,30 @@ runner_check_loop(struct envelope *ep) return ret; } -void -runner_schedule(struct ramqueue *rq, u_int64_t id) -{ - ramqueue_schedule(rq, id); -} - - -void -runner_remove(struct ramqueue *rq, u_int64_t id) +static void +runner_remove(u_int64_t id) { - struct ramqueue_message *rq_msg; - struct ramqueue_envelope *rq_evp; + void *msg; /* removing by evpid */ if (id > 0xffffffffL) { - rq_evp = ramqueue_lookup_envelope(rq, id); - if (rq_evp == NULL) - return; - runner_remove_envelope(rq, rq_evp); + runner_remove_envelope(id); return; } - rq_msg = ramqueue_lookup_message(rq, id); - if (rq_msg == NULL) - return; - - /* scheduling by msgid */ - RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) { - runner_remove_envelope(rq, rq_evp); - } + /* removing by msgid */ + msg = scheduler->message(id); + while (scheduler->fetch(msg, &id)) + runner_remove_envelope(id); + scheduler->close(msg); } -void -runner_remove_envelope(struct ramqueue *rq, struct ramqueue_envelope *rq_evp) +static void +runner_remove_envelope(u_int64_t evpid) { struct envelope evp; - if (queue_envelope_load(Q_QUEUE, rq_evp->evpid, &evp)) { - log_debug("#### %s: queue_envelope_delete: %016" PRIx64, - __func__, evp.id); - queue_envelope_delete(Q_QUEUE, &evp); - } - - ramqueue_remove_envelope(rq, rq_evp); + evp.id = evpid; + queue_envelope_delete(Q_QUEUE, &evp); + scheduler->remove(evpid); } diff --git a/usr.sbin/smtpd/scheduler.c b/usr.sbin/smtpd/scheduler.c new file mode 100644 index 00000000000..ff843fa8019 --- /dev/null +++ b/usr.sbin/smtpd/scheduler.c @@ -0,0 +1,50 @@ +/* $OpenBSD: scheduler.c,v 1.1 2012/01/28 11:33:07 gilles Exp $ */ + +/* + * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/param.h> +#include <sys/socket.h> + +#include <ctype.h> +#include <err.h> +#include <event.h> +#include <fcntl.h> +#include <imsg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "smtpd.h" +#include "log.h" + +extern struct scheduler_backend scheduler_backend_ramqueue; + +struct scheduler_backend * +scheduler_backend_lookup(enum scheduler_type type) +{ + switch (type) { + case SCHED_RAMQUEUE: + return &scheduler_backend_ramqueue; + default: + fatal("unsupported scheduler_backend type"); + } + + return NULL; +} diff --git a/usr.sbin/smtpd/scheduler_ramqueue.c b/usr.sbin/smtpd/scheduler_ramqueue.c new file mode 100644 index 00000000000..1d547356ee4 --- /dev/null +++ b/usr.sbin/smtpd/scheduler_ramqueue.c @@ -0,0 +1,674 @@ +/* $OpenBSD: scheduler_ramqueue.c,v 1.1 2012/01/28 11:33:07 gilles Exp $ */ + +/* + * Copyright (c) 2012 Gilles Chehade <gilles@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/param.h> +#include <sys/socket.h> + +#include <ctype.h> +#include <err.h> +#include <event.h> +#include <fcntl.h> +#include <imsg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include "smtpd.h" +#include "log.h" + + +struct ramqueue_host { + RB_ENTRY(ramqueue_host) hosttree_entry; + TAILQ_HEAD(,ramqueue_batch) batch_queue; + u_int64_t h_id; + char hostname[MAXHOSTNAMELEN]; +}; +struct ramqueue_batch { + enum delivery_type type; + TAILQ_ENTRY(ramqueue_batch) batch_entry; + TAILQ_HEAD(,ramqueue_envelope) envelope_queue; + u_int64_t h_id; + u_int64_t b_id; + u_int32_t msgid; +}; +struct ramqueue_envelope { + TAILQ_ENTRY(ramqueue_envelope) queue_entry; + TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry; + RB_ENTRY(ramqueue_envelope) evptree_entry; + struct ramqueue_batch *rq_batch; + struct ramqueue_message *rq_msg; + struct ramqueue_host *rq_host; + u_int64_t evpid; + time_t sched; +}; +struct ramqueue_message { + RB_ENTRY(ramqueue_message) msgtree_entry; + RB_HEAD(evptree, ramqueue_envelope) evptree; + u_int32_t msgid; +}; +struct ramqueue { + RB_HEAD(hosttree, ramqueue_host) hosttree; + RB_HEAD(msgtree, ramqueue_message) msgtree; + TAILQ_HEAD(,ramqueue_envelope) queue; +}; + +RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); +RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); +RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); + +enum ramqueue_iter_type { + RAMQUEUE_ITER_HOST, + RAMQUEUE_ITER_BATCH, + RAMQUEUE_ITER_MESSAGE, + RAMQUEUE_ITER_QUEUE +}; + +struct ramqueue_iter { + enum ramqueue_iter_type type; + union { + struct ramqueue_host *host; + struct ramqueue_batch *batch; + struct ramqueue_message *message; + } u; +}; + + +static int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *); +static int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *); +static int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *); +static struct ramqueue_host *ramqueue_lookup_host(char *); +static struct ramqueue_host *ramqueue_insert_host(char *); +static void ramqueue_remove_host(struct ramqueue_host *); +static struct ramqueue_batch *ramqueue_lookup_batch(struct ramqueue_host *, + u_int32_t); +static struct ramqueue_batch *ramqueue_insert_batch(struct ramqueue_host *, + u_int32_t); +static void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *); +static struct ramqueue_message *ramqueue_lookup_message(u_int32_t); +static struct ramqueue_message *ramqueue_insert_message(u_int32_t); +static void ramqueue_remove_message(struct ramqueue_message *); + +static struct ramqueue_envelope *ramqueue_lookup_envelope(u_int64_t); + + +/*NEEDSFIX*/ +static int ramqueue_expire(struct envelope *, time_t); +static time_t ramqueue_next_schedule(struct envelope *, time_t); + +static void scheduler_ramqueue_init(void); +static int scheduler_ramqueue_setup(time_t, time_t); +static int scheduler_ramqueue_next(u_int64_t *, time_t *); +static void scheduler_ramqueue_insert(struct envelope *); +static void scheduler_ramqueue_remove(u_int64_t); +static void *scheduler_ramqueue_host(char *); +static void *scheduler_ramqueue_message(u_int32_t); +static void *scheduler_ramqueue_batch(u_int64_t); +static void *scheduler_ramqueue_queue(void); +static void scheduler_ramqueue_close(void *); +static int scheduler_ramqueue_fetch(void *, u_int64_t *); +static int scheduler_ramqueue_schedule(u_int64_t); + +struct scheduler_backend scheduler_backend_ramqueue = { + scheduler_ramqueue_init, + scheduler_ramqueue_setup, + scheduler_ramqueue_next, + scheduler_ramqueue_insert, + scheduler_ramqueue_remove, + scheduler_ramqueue_host, + scheduler_ramqueue_message, + scheduler_ramqueue_batch, + scheduler_ramqueue_queue, + scheduler_ramqueue_close, + scheduler_ramqueue_fetch, + scheduler_ramqueue_schedule +}; +static struct ramqueue ramqueue; + + +static void +scheduler_ramqueue_init(void) +{ + log_debug("scheduler_ramqueue: init"); + bzero(&ramqueue, sizeof (ramqueue)); + TAILQ_INIT(&ramqueue.queue); + RB_INIT(&ramqueue.hosttree); + RB_INIT(&ramqueue.msgtree); +} + +static int +scheduler_ramqueue_setup(time_t curtm, time_t nsched) +{ + struct envelope envelope; + static struct qwalk *q = NULL; + u_int64_t evpid; + time_t sched; + + log_debug("scheduler_ramqueue: load"); + + log_info("scheduler_ramqueue: queue loading in progress"); + if (q == NULL) + q = qwalk_new(Q_QUEUE, 0); + + while (qwalk(q, &evpid)) { + if (! queue_envelope_load(Q_QUEUE, evpid, &envelope)) { + log_debug("scheduler_ramqueue: evp -> /corrupt"); + queue_message_corrupt(Q_QUEUE, evpid_to_msgid(evpid)); + continue; + } + if (ramqueue_expire(&envelope, curtm)) + continue; + scheduler_ramqueue_insert(&envelope); + + if (! scheduler_ramqueue_next(&evpid, &sched)) + continue; + + if (sched <= nsched) + nsched = sched; + + if (nsched <= curtm) { + log_debug("ramqueue: loading interrupted"); + return (0); + } + } + qwalk_close(q); + q = NULL; + log_debug("ramqueue: loading over"); + return (1); +} + +static int +scheduler_ramqueue_next(u_int64_t *evpid, time_t *sched) +{ + struct ramqueue_envelope *rq_evp = NULL; + + log_debug("scheduler_ramqueue: next"); + TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { + if (rq_evp->rq_batch->type == D_MDA) + if (env->sc_opts & SMTPD_MDA_PAUSED) + continue; + if (rq_evp->rq_batch->type == D_MTA) + if (env->sc_opts & SMTPD_MTA_PAUSED) + continue; + if (evpid) + *evpid = rq_evp->evpid; + if (sched) + *sched = rq_evp->sched; + log_debug("scheduler_ramqueue: next: found"); + return 1; + } + + log_debug("scheduler_ramqueue: next: nothing schedulable"); + return 0; +} + +static void +scheduler_ramqueue_insert(struct envelope *envelope) +{ + struct ramqueue_host *rq_host; + struct ramqueue_message *rq_msg; + struct ramqueue_batch *rq_batch; + struct ramqueue_envelope *rq_evp, *evp; + u_int32_t msgid; + time_t curtm = time(NULL); + + log_debug("scheduler_ramqueue: insert"); + msgid = evpid_to_msgid(envelope->id); + rq_msg = ramqueue_lookup_message(msgid); + if (rq_msg == NULL) + rq_msg = ramqueue_insert_message(msgid); + + rq_host = ramqueue_lookup_host(envelope->dest.domain); + if (rq_host == NULL) + rq_host = ramqueue_insert_host(envelope->dest.domain); + + rq_batch = ramqueue_lookup_batch(rq_host, msgid); + if (rq_batch == NULL) + rq_batch = ramqueue_insert_batch(rq_host, msgid); + + rq_evp = calloc(1, sizeof (*rq_evp)); + if (rq_evp == NULL) + fatal("calloc"); + rq_evp->evpid = envelope->id; + rq_evp->sched = ramqueue_next_schedule(envelope, curtm); + rq_evp->rq_host = rq_host; + rq_evp->rq_batch = rq_batch; + rq_evp->rq_msg = rq_msg; + + RB_INSERT(evptree, &rq_msg->evptree, rq_evp); + rq_evp->rq_msg = rq_msg; + + TAILQ_INSERT_TAIL(&rq_evp->rq_batch->envelope_queue, rq_evp, + batchqueue_entry); + + /* sorted insert */ + TAILQ_FOREACH(evp, &ramqueue.queue, queue_entry) { + if (evp->sched >= rq_evp->sched) { + TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry); + break; + } + } + if (evp == NULL) + TAILQ_INSERT_TAIL(&ramqueue.queue, rq_evp, queue_entry); + + stat_increment(STATS_RAMQUEUE_ENVELOPE); +} + +static void +scheduler_ramqueue_remove(u_int64_t evpid) +{ + struct ramqueue_batch *rq_batch; + struct ramqueue_message *rq_msg; + struct ramqueue_envelope *rq_evp; + struct ramqueue_host *rq_host; + + log_debug("scheduler_ramqueue: remove"); + rq_evp = ramqueue_lookup_envelope(evpid); + rq_msg = rq_evp->rq_msg; + rq_batch = rq_evp->rq_batch; + rq_host = rq_evp->rq_host; + + RB_REMOVE(evptree, &rq_msg->evptree, rq_evp); + TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); + TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + stat_decrement(STATS_RAMQUEUE_ENVELOPE); + + /* check if we are the last of a batch */ + if (TAILQ_FIRST(&rq_batch->envelope_queue) == NULL) + ramqueue_remove_batch(rq_host, rq_batch); + + /* check if we are the last of a message */ + if (RB_ROOT(&rq_msg->evptree) == NULL) + ramqueue_remove_message(rq_msg); + + /* check if we are the last of a host */ + if (TAILQ_FIRST(&rq_host->batch_queue) == NULL) + ramqueue_remove_host(rq_host); + + free(rq_evp); +} + +static void * +scheduler_ramqueue_host(char *host) +{ + struct ramqueue_iter *iter; + struct ramqueue_host *rq_host; + + rq_host = ramqueue_lookup_host(host); + if (rq_host == NULL) + return NULL; + + iter = calloc(1, sizeof *iter); + if (iter == NULL) + err(1, "calloc"); + + iter->type = RAMQUEUE_ITER_HOST; + iter->u.host = rq_host; + + return iter; +} + +static void * +scheduler_ramqueue_batch(u_int64_t evpid) +{ + struct ramqueue_iter *iter; + struct ramqueue_envelope *rq_evp; + + rq_evp = ramqueue_lookup_envelope(evpid); + if (rq_evp == NULL) + return NULL; + + iter = calloc(1, sizeof *iter); + if (iter == NULL) + err(1, "calloc"); + + iter->type = RAMQUEUE_ITER_BATCH; + iter->u.batch = rq_evp->rq_batch; + + return iter; +} + +static void * +scheduler_ramqueue_message(u_int32_t msgid) +{ + struct ramqueue_iter *iter; + struct ramqueue_message *rq_msg; + + rq_msg = ramqueue_lookup_message(msgid); + if (rq_msg == NULL) + return NULL; + + iter = calloc(1, sizeof *iter); + if (iter == NULL) + err(1, "calloc"); + + iter->type = RAMQUEUE_ITER_MESSAGE; + iter->u.message = rq_msg; + + return iter; +} + +static void * +scheduler_ramqueue_queue(void) +{ + struct ramqueue_iter *iter; + + iter = calloc(1, sizeof *iter); + if (iter == NULL) + err(1, "calloc"); + + iter->type = RAMQUEUE_ITER_QUEUE; + + return iter; +} + +static void +scheduler_ramqueue_close(void *hdl) +{ + free(hdl); +} + +int +scheduler_ramqueue_fetch(void *hdl, u_int64_t *evpid) +{ + struct ramqueue_iter *iter = hdl; + struct ramqueue_envelope *rq_evp; + struct ramqueue_batch *rq_batch; + + switch (iter->type) { + case RAMQUEUE_ITER_HOST: { + rq_batch = TAILQ_FIRST(&iter->u.host->batch_queue); + if (rq_batch == NULL) + break; + rq_evp = TAILQ_FIRST(&rq_batch->envelope_queue); + if (rq_evp == NULL) + break; + *evpid = rq_evp->evpid; + return 1; + } + + case RAMQUEUE_ITER_BATCH: + rq_evp = TAILQ_FIRST(&iter->u.batch->envelope_queue); + if (rq_evp == NULL) + break; + *evpid = rq_evp->evpid; + return 1; + + case RAMQUEUE_ITER_MESSAGE: + rq_evp = RB_ROOT(&iter->u.message->evptree); + if (rq_evp == NULL) + break; + *evpid = rq_evp->evpid; + return 1; + + case RAMQUEUE_ITER_QUEUE: + rq_evp = TAILQ_FIRST(&ramqueue.queue); + if (rq_evp == NULL) + break; + *evpid = rq_evp->evpid; + return 1; + } + + return 0; +} + +static int +scheduler_ramqueue_schedule(u_int64_t id) +{ + struct ramqueue_envelope *rq_evp; + struct ramqueue_message *rq_msg; + int ret; + + /* schedule *all* */ + if (id == 0) { + ret = 0; + TAILQ_FOREACH(rq_evp, &ramqueue.queue, queue_entry) { + rq_evp->sched = 0; + ret++; + } + return ret; + } + + /* scheduling by evpid */ + if (id > 0xffffffffL) { + rq_evp = ramqueue_lookup_envelope(id); + if (rq_evp == NULL) + return 0; + + rq_evp->sched = 0; + TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry); + return 1; + } + + rq_msg = ramqueue_lookup_message(id); + if (rq_msg == NULL) + return 0; + + /* scheduling by msgid */ + ret = 0; + RB_FOREACH(rq_evp, evptree, &rq_msg->evptree) { + rq_evp->sched = 0; + TAILQ_REMOVE(&ramqueue.queue, rq_evp, queue_entry); + TAILQ_INSERT_HEAD(&ramqueue.queue, rq_evp, queue_entry); + ret++; + } + return ret; +} + +static struct ramqueue_host * +ramqueue_lookup_host(char *host) +{ + struct ramqueue_host hostkey; + + strlcpy(hostkey.hostname, host, sizeof(hostkey.hostname)); + return RB_FIND(hosttree, &ramqueue.hosttree, &hostkey); +} + +static struct ramqueue_message * +ramqueue_lookup_message(u_int32_t msgid) +{ + struct ramqueue_message msgkey; + + msgkey.msgid = msgid; + return RB_FIND(msgtree, &ramqueue.msgtree, &msgkey); +} + +static struct ramqueue_envelope * +ramqueue_lookup_envelope(u_int64_t evpid) +{ + struct ramqueue_message *rq_msg; + struct ramqueue_envelope evpkey; + + rq_msg = ramqueue_lookup_message(evpid_to_msgid(evpid)); + if (rq_msg == NULL) + return NULL; + + evpkey.evpid = evpid; + return RB_FIND(evptree, &rq_msg->evptree, &evpkey); +} + +static struct ramqueue_batch * +ramqueue_lookup_batch(struct ramqueue_host *rq_host, u_int32_t msgid) +{ + struct ramqueue_batch *rq_batch; + + TAILQ_FOREACH(rq_batch, &rq_host->batch_queue, batch_entry) { + if (rq_batch->msgid == msgid) + return rq_batch; + } + + return NULL; +} + +static int +ramqueue_expire(struct envelope *envelope, time_t curtm) +{ + struct envelope bounce; + + if (curtm - envelope->creation >= envelope->expire) { + envelope_set_errormsg(envelope, + "message expired after sitting in queue for %d days", + envelope->expire / 60 / 60 / 24); + bounce_record_message(envelope, &bounce); + scheduler_ramqueue_insert(&bounce); + log_debug("#### %s: queue_envelope_delete: %016" PRIx64, + __func__, envelope->id); + queue_envelope_delete(Q_QUEUE, envelope); + return 1; + } + return 0; +} + +static time_t +ramqueue_next_schedule(struct envelope *envelope, time_t curtm) +{ + time_t delay; + + if (envelope->lasttry == 0) + return curtm; + + delay = SMTPD_QUEUE_MAXINTERVAL; + + if (envelope->type == D_MDA || + envelope->type == D_BOUNCE) { + if (envelope->retry < 5) + return curtm; + + if (envelope->retry < 15) + delay = (envelope->retry * 60) + arc4random_uniform(60); + } + + if (envelope->type == D_MTA) { + if (envelope->retry < 3) + delay = SMTPD_QUEUE_INTERVAL; + else if (envelope->retry <= 7) { + delay = SMTPD_QUEUE_INTERVAL * (1 << (envelope->retry - 3)); + if (delay > SMTPD_QUEUE_MAXINTERVAL) + delay = SMTPD_QUEUE_MAXINTERVAL; + } + } + + if (curtm >= envelope->lasttry + delay) + return curtm; + + return curtm + delay; +} + +static struct ramqueue_message * +ramqueue_insert_message(u_int32_t msgid) +{ + struct ramqueue_message *rq_msg; + + rq_msg = calloc(1, sizeof (*rq_msg)); + if (rq_msg == NULL) + fatal("calloc"); + rq_msg->msgid = msgid; + RB_INSERT(msgtree, &ramqueue.msgtree, rq_msg); + RB_INIT(&rq_msg->evptree); + stat_increment(STATS_RAMQUEUE_MESSAGE); + + return rq_msg; +} + +static struct ramqueue_host * +ramqueue_insert_host(char *host) +{ + struct ramqueue_host *rq_host; + + rq_host = calloc(1, sizeof (*rq_host)); + if (rq_host == NULL) + fatal("calloc"); + rq_host->h_id = generate_uid(); + strlcpy(rq_host->hostname, host, sizeof(rq_host->hostname)); + TAILQ_INIT(&rq_host->batch_queue); + RB_INSERT(hosttree, &ramqueue.hosttree, rq_host); + stat_increment(STATS_RAMQUEUE_HOST); + + return rq_host; +} + +static struct ramqueue_batch * +ramqueue_insert_batch(struct ramqueue_host *rq_host, u_int32_t msgid) +{ + struct ramqueue_batch *rq_batch; + + rq_batch = calloc(1, sizeof (*rq_batch)); + if (rq_batch == NULL) + fatal("calloc"); + rq_batch->b_id = generate_uid(); + rq_batch->msgid = msgid; + + TAILQ_INIT(&rq_batch->envelope_queue); + TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, batch_entry); + + stat_increment(STATS_RAMQUEUE_BATCH); + + return rq_batch; +} + +static void +ramqueue_remove_host(struct ramqueue_host *rq_host) +{ + RB_REMOVE(hosttree, &ramqueue.hosttree, rq_host); + free(rq_host); + stat_decrement(STATS_RAMQUEUE_HOST); +} + +static void +ramqueue_remove_message(struct ramqueue_message *rq_msg) +{ + RB_REMOVE(msgtree, &ramqueue.msgtree, rq_msg); + free(rq_msg); + stat_decrement(STATS_RAMQUEUE_MESSAGE); +} + + +static void +ramqueue_remove_batch(struct ramqueue_host *rq_host, + struct ramqueue_batch *rq_batch) +{ + TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry); + free(rq_batch); + stat_decrement(STATS_RAMQUEUE_BATCH); +} + +static int +ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2) +{ + return strcmp(h1->hostname, h2->hostname); +} + + +static int +ramqueue_msg_cmp(struct ramqueue_message *m1, struct ramqueue_message *m2) +{ + return (m1->msgid < m2->msgid ? -1 : m1->msgid > m2->msgid); +} + +static int +ramqueue_evp_cmp(struct ramqueue_envelope *e1, struct ramqueue_envelope *e2) +{ + return (e1->evpid < e2->evpid ? -1 : e1->evpid > e2->evpid); +} + +RB_GENERATE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); +RB_GENERATE(msgtree, ramqueue_message, msgtree_entry, ramqueue_msg_cmp); +RB_GENERATE(evptree, ramqueue_envelope, evptree_entry, ramqueue_evp_cmp); diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c index e4ab391b840..5b61f8fc846 100644 --- a/usr.sbin/smtpd/smtpctl.c +++ b/usr.sbin/smtpd/smtpctl.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpctl.c,v 1.77 2012/01/24 12:20:18 eric Exp $ */ +/* $OpenBSD: smtpctl.c,v 1.78 2012/01/28 11:33:07 gilles Exp $ */ /* * Copyright (c) 2006 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -44,7 +44,6 @@ void usage(void); static void setup_env(struct smtpd *); -static void show_sizes(void); static int show_command_output(struct imsg *); static int show_stats_output(struct imsg *); static void show_queue(enum queue_kind, int); @@ -127,9 +126,6 @@ main(int argc, char *argv[]) break; case SHOW_RUNQUEUE: break; - case SHOW_SIZES: - show_sizes(); - break; default: goto connected; } @@ -302,30 +298,6 @@ show_command_output(struct imsg *imsg) return (1); } -void -show_sizes(void) -{ - /* - * size _does_ matter. - * - * small changes to ramqueue and diskqueue structures may cause - * large changes to memory and disk usage on busy/large hosts. - * - * this will help developers optimize memory/disk use, and help - * admins understand how the ramqueue.size / ramqueue.size.max - * stats are computed (smtpctl show stats). - * - * -- gilles@ - * - */ - printf("struct ramqueue: %zu\n", sizeof (struct ramqueue)); - printf("struct ramqueue_host: %zu\n", sizeof (struct ramqueue_host)); - printf("struct ramqueue_message: %zu\n", sizeof (struct ramqueue_message)); - printf("struct ramqueue_envelope: %zu\n", sizeof (struct ramqueue_envelope)); - - printf("struct envelope: %zu\n", sizeof (struct envelope)); -} - static void stat_print(int stat, int what) { @@ -431,17 +403,6 @@ show_stats_output(struct imsg *imsg) stat_print(STATS_RAMQUEUE_MESSAGE, STAT_MAXACTIVE); stat_print(STATS_RAMQUEUE_ENVELOPE, STAT_MAXACTIVE); - printf("ramqueue.size=%zd\n", - s[STATS_RAMQUEUE_HOST].active * sizeof(struct ramqueue_host) + - s[STATS_RAMQUEUE_BATCH].active * sizeof(struct ramqueue_batch) + - s[STATS_RAMQUEUE_MESSAGE].active * sizeof(struct ramqueue_message) + - s[STATS_RAMQUEUE_ENVELOPE].active * sizeof(struct ramqueue_envelope)); - printf("ramqueue.size.max=%zd\n", - s[STATS_RAMQUEUE_HOST].maxactive * sizeof(struct ramqueue_host) + - s[STATS_RAMQUEUE_BATCH].maxactive * sizeof(struct ramqueue_batch) + - s[STATS_RAMQUEUE_MESSAGE].maxactive * sizeof(struct ramqueue_message) + - s[STATS_RAMQUEUE_ENVELOPE].maxactive * sizeof(struct ramqueue_envelope)); - printf("smtp.errors.delays=%zd\n", stats->smtp.delays); printf("smtp.errors.linetoolong=%zd\n", stats->smtp.linetoolong); printf("smtp.errors.read_eof=%zd\n", stats->smtp.read_eof); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 31a988ceaa1..c4e13f81666 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.281 2012/01/24 12:20:18 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.282 2012/01/28 11:33:07 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -580,43 +580,6 @@ struct session { }; -/* ram-queue structures */ -struct ramqueue_host { - RB_ENTRY(ramqueue_host) hosttree_entry; - TAILQ_HEAD(,ramqueue_batch) batch_queue; - u_int64_t h_id; - char hostname[MAXHOSTNAMELEN]; -}; -struct ramqueue_batch { - TAILQ_ENTRY(ramqueue_batch) batch_entry; - TAILQ_HEAD(,ramqueue_envelope) envelope_queue; - enum delivery_type type; - u_int64_t h_id; - u_int64_t b_id; - u_int32_t msgid; - struct relayhost relay; -}; -struct ramqueue_envelope { - TAILQ_ENTRY(ramqueue_envelope) queue_entry; - TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry; - RB_ENTRY(ramqueue_envelope) evptree_entry; - struct ramqueue_batch *rq_batch; - struct ramqueue_message *rq_msg; - struct ramqueue_host *rq_host; - u_int64_t evpid; - time_t sched; -}; -struct ramqueue_message { - RB_ENTRY(ramqueue_message) msgtree_entry; - RB_HEAD(evptree, ramqueue_envelope) evptree; - u_int32_t msgid; -}; -struct ramqueue { - RB_HEAD(hosttree, ramqueue_host) hosttree; - RB_HEAD(msgtree, ramqueue_message) msgtree; - TAILQ_HEAD(,ramqueue_envelope) queue; -}; - struct smtpd { char sc_conffile[MAXPATHLEN]; size_t sc_maxsize; @@ -642,8 +605,8 @@ struct smtpd { char *sc_title[PROC_COUNT]; struct passwd *sc_pw; char sc_hostname[MAXHOSTNAMELEN]; - struct ramqueue sc_rqueue; struct queue_backend *sc_queue; + struct scheduler_backend *sc_scheduler; TAILQ_HEAD(filterlist, filter) *sc_filters; @@ -887,6 +850,12 @@ struct mta_session { struct ssl *ssl; }; +struct mta_batch { + u_int64_t id; + struct relayhost relay; + + u_int32_t msgid; +}; /* maps return structures */ struct map_secret { @@ -977,6 +946,30 @@ struct delivery_backend { }; +/* scheduler_backend */ +enum scheduler_type { + SCHED_RAMQUEUE, +}; + +struct scheduler_backend { + void (*init)(void); + int (*setup)(time_t, time_t); + + int (*next)(u_int64_t *, time_t *); + + void (*insert)(struct envelope *); + void (*remove)(u_int64_t); + + void *(*host)(char *); + void *(*message)(u_int32_t); + void *(*batch)(u_int64_t); + void *(*queue)(void); + void (*close)(void *); + + int (*fetch)(void *, u_int64_t *); + int (*schedule)(u_int64_t); +}; + extern struct smtpd *env; @@ -1123,47 +1116,13 @@ int qwalk(void *, u_int64_t *); void qwalk_close(void *); -/* ramqueue.c */ -void ramqueue_init(struct ramqueue *); -int ramqueue_load(struct ramqueue *, time_t *); -int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *); -int ramqueue_msg_cmp(struct ramqueue_message *, struct ramqueue_message *); -int ramqueue_evp_cmp(struct ramqueue_envelope *, struct ramqueue_envelope *); -int ramqueue_is_empty(struct ramqueue *); -int ramqueue_is_empty(struct ramqueue *); -int ramqueue_batch_is_empty(struct ramqueue_batch *); -int ramqueue_host_is_empty(struct ramqueue_host *); -void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *); -void ramqueue_remove_host(struct ramqueue *, struct ramqueue_host *); -struct ramqueue_envelope *ramqueue_envelope_by_id(struct ramqueue *, u_int64_t); -struct ramqueue_envelope *ramqueue_first_envelope(struct ramqueue *); -struct ramqueue_envelope *ramqueue_next_envelope(struct ramqueue *); -struct ramqueue_envelope *ramqueue_batch_first_envelope(struct ramqueue_batch *); -void ramqueue_insert(struct ramqueue *, struct envelope *, time_t); -int ramqueue_message_is_empty(struct ramqueue_message *); -void ramqueue_remove_message(struct ramqueue *, struct ramqueue_message *); - -struct ramqueue_host *ramqueue_lookup_host(struct ramqueue *, char *); -struct ramqueue_message *ramqueue_lookup_message(struct ramqueue *, u_int32_t); -struct ramqueue_envelope *ramqueue_lookup_envelope(struct ramqueue *, u_int64_t); - -void ramqueue_schedule(struct ramqueue *, u_int64_t); -void ramqueue_schedule_envelope(struct ramqueue *, struct ramqueue_envelope *); - -void ramqueue_remove_envelope(struct ramqueue *, struct ramqueue_envelope *); - - -RB_PROTOTYPE(hosttree, ramqueue_host, hosttree_entry, ramqueue_host_cmp); -RB_PROTOTYPE(msgtree, ramqueue_message, msg_entry, ramqueue_msg_cmp); -RB_PROTOTYPE(evptree, ramqueue_envelope, evp_entry, ramqueue_evp_cmp); - - /* runner.c */ pid_t runner(void); void message_reset_flags(struct envelope *); -void runner_schedule(struct ramqueue *, u_int64_t); -void runner_remove(struct ramqueue *, u_int64_t); -void runner_remove_envelope(struct ramqueue *, struct ramqueue_envelope *); + + +/* scheduler.c */ +struct scheduler_backend *scheduler_backend_lookup(enum scheduler_type); /* smtp.c */ diff --git a/usr.sbin/smtpd/smtpd/Makefile b/usr.sbin/smtpd/smtpd/Makefile index e96d6f3227f..6d4f9d4d538 100644 --- a/usr.sbin/smtpd/smtpd/Makefile +++ b/usr.sbin/smtpd/smtpd/Makefile @@ -1,4 +1,4 @@ -# $OpenBSD: Makefile,v 1.38 2012/01/11 22:24:37 gilles Exp $ +# $OpenBSD: Makefile,v 1.39 2012/01/28 11:33:07 gilles Exp $ PROG= smtpd SRCS= aliases.c auth.c auth_bsd.c auth_pwd.c bounce.c \ @@ -10,8 +10,8 @@ SRCS= aliases.c auth.c auth_bsd.c auth_pwd.c bounce.c \ queue.c ruleset.c runner.c smtp.c \ smtp_session.c smtpd.c ssl.c ssl_privsep.c util.c asr.c \ print.c pack.c dname.c res_random.c sockaddr.c \ - ramqueue.c queue_backend.c queue_fsqueue.c \ - user.c user_pwd.c stats.c + queue_backend.c queue_fsqueue.c \ + user.c user_pwd.c stats.c scheduler.c scheduler_ramqueue.c MAN= smtpd.8 smtpd.conf.5 BINDIR= /usr/sbin |