From d6f127de12f4279d152b2b751e1081343eb1feb9 Mon Sep 17 00:00:00 2001 From: Gilles Chehade Date: Wed, 13 Apr 2011 20:53:19 +0000 Subject: following an idea from jacekm@, smtpd now uses a ram-queue instead of doing a continuous walk on the disk-queue. the implementation differs from what jacekm@ commited (and I backed out) a while ago in that it uses a queue and a host tree required for upcoming features. code will be improved in tree, it requires changes to be done in queue and bounce API, I just wanted to commit a working version first ... tested by todd@ and I --- usr.sbin/smtpd/control.c | 90 +----- usr.sbin/smtpd/mta.c | 54 ++-- usr.sbin/smtpd/parser.c | 16 +- usr.sbin/smtpd/parser.h | 4 +- usr.sbin/smtpd/queue.c | 24 +- usr.sbin/smtpd/queue_shared.c | 24 +- usr.sbin/smtpd/ramqueue.c | 350 +++++++++++++++++++++ usr.sbin/smtpd/runner.c | 713 ++++++++++++++---------------------------- usr.sbin/smtpd/smtpctl.c | 41 +-- usr.sbin/smtpd/smtpd.c | 5 +- usr.sbin/smtpd/smtpd.h | 106 ++++--- usr.sbin/smtpd/smtpd/Makefile | 4 +- 12 files changed, 722 insertions(+), 709 deletions(-) create mode 100644 usr.sbin/smtpd/ramqueue.c diff --git a/usr.sbin/smtpd/control.c b/usr.sbin/smtpd/control.c index fe716d6b40b..8134aa0ce00 100644 --- a/usr.sbin/smtpd/control.c +++ b/usr.sbin/smtpd/control.c @@ -1,4 +1,4 @@ -/* $OpenBSD: control.c,v 1.56 2010/11/28 13:56:43 gilles Exp $ */ +/* $OpenBSD: control.c,v 1.57 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard @@ -65,8 +65,6 @@ control_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) { struct ctl_conn *c; struct reload *reload; - struct remove *rem; - struct sched *sched; if (iev->proc == PROC_SMTP) { switch (imsg->hdr.type) { @@ -80,30 +78,6 @@ control_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) } } - if (iev->proc == PROC_QUEUE) { - switch (imsg->hdr.type) { - case IMSG_QUEUE_SCHEDULE: - sched = imsg->data; - c = control_connbyfd(sched->fd); - if (c == NULL) - return; - imsg_compose_event(&c->iev, - sched->ret ? IMSG_CTL_OK : IMSG_CTL_FAIL, 0, 0, -1, - NULL, 0); - return; - - case IMSG_QUEUE_REMOVE: - rem = imsg->data; - c = control_connbyfd(rem->fd); - if (c == NULL) - return; - imsg_compose_event(&c->iev, - rem->ret ? IMSG_CTL_OK : IMSG_CTL_FAIL, 0, 0, - -1, NULL, 0); - return; - } - } - if (iev->proc == PROC_PARENT) { switch (imsg->hdr.type) { case IMSG_CONF_RELOAD: @@ -400,68 +374,6 @@ control_dispatch_ext(int fd, short event, void *arg) imsg_compose_event(&c->iev, IMSG_STATS, 0, 0, -1, env->stats, sizeof(struct stats)); break; - case IMSG_QUEUE_SCHEDULE: { - struct sched *s = imsg.data; - - if (euid) - goto badcred; - - if (IMSG_DATA_SIZE(&imsg) != sizeof(*s)) - goto badcred; - - s->fd = fd; - - if (! valid_message_id(s->mid) && ! valid_message_uid(s->mid)) { - imsg_compose_event(&c->iev, IMSG_CTL_FAIL, 0, 0, -1, - NULL, 0); - break; - } - - imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_SCHEDULE, 0, 0, -1, s, sizeof(*s)); - break; - } - - case IMSG_QUEUE_REMOVE: { - struct remove *s = imsg.data; - - if (euid) - goto badcred; - - if (IMSG_DATA_SIZE(&imsg) != sizeof(*s)) - goto badcred; - - s->fd = fd; - - if (! valid_message_id(s->mid) && ! valid_message_uid(s->mid)) { - imsg_compose_event(&c->iev, IMSG_CTL_FAIL, 0, 0, -1, - NULL, 0); - break; - } - - imsg_compose_event(env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_REMOVE, 0, 0, -1, s, sizeof(*s)); - break; - } -/* - case IMSG_CONF_RELOAD: { - struct reload r; - - log_debug("received reload request"); - - if (euid) - goto badcred; - - if (env->sc_flags & SMTPD_CONFIGURING) { - imsg_compose_event(&c->iev, IMSG_CTL_FAIL, 0, 0, -1, - NULL, 0); - break; - } - env->sc_flags |= SMTPD_CONFIGURING; - - r.fd = fd; - imsg_compose_event(env->sc_ievs[PROC_PARENT], IMSG_CONF_RELOAD, 0, 0, -1, &r, sizeof(r)); - break; - } -*/ case IMSG_CTL_SHUTDOWN: /* NEEDS_FIX */ log_debug("received shutdown request"); diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index 114200d1080..d37c3938cdd 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.100 2011/03/26 17:43:01 gilles Exp $ */ +/* $OpenBSD: mta.c,v 1.101 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard @@ -58,40 +58,40 @@ void mta_request_datafd(struct mta_session *); void mta_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) { + struct ramqueue_batch *rq_batch; struct mta_session *s; struct mta_relay *relay; struct message *m; struct secret *secret; - struct batch *b; struct dns *dns; struct ssl *ssl; if (iev->proc == PROC_QUEUE) { switch (imsg->hdr.type) { case IMSG_BATCH_CREATE: - b = imsg->data; + rq_batch = imsg->data; + s = calloc(1, sizeof *s); if (s == NULL) fatal(NULL); - s->id = b->id; + s->id = rq_batch->b_id; s->state = MTA_INIT; s->env = env; + s->batch = rq_batch; /* establish host name */ - if (b->rule.r_action == A_RELAYVIA) { - s->host = strdup(b->rule.r_value.relayhost.hostname); + if (rq_batch->rule.r_action == A_RELAYVIA) { + s->host = strdup(rq_batch->rule.r_value.relayhost.hostname); s->flags |= MTA_FORCE_MX; } else - s->host = strdup(b->hostname); - if (s->host == NULL) - fatal(NULL); + s->host = NULL; /* establish port */ - s->port = ntohs(b->rule.r_value.relayhost.port); /* XXX */ + s->port = ntohs(rq_batch->rule.r_value.relayhost.port); /* XXX */ /* have cert? */ - s->cert = strdup(b->rule.r_value.relayhost.cert); + s->cert = strdup(rq_batch->rule.r_value.relayhost.cert); if (s->cert == NULL) fatal(NULL); else if (s->cert[0] == '\0') { @@ -100,14 +100,14 @@ mta_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) } /* use auth? */ - if ((b->rule.r_value.relayhost.flags & F_SSL) && - (b->rule.r_value.relayhost.flags & F_AUTH)) { + if ((rq_batch->rule.r_value.relayhost.flags & F_SSL) && + (rq_batch->rule.r_value.relayhost.flags & F_AUTH)) { s->flags |= MTA_USE_AUTH; - s->secmapid = b->rule.r_value.relayhost.secmapid; + s->secmapid = rq_batch->rule.r_value.relayhost.secmapid; } /* force a particular SSL mode? */ - switch (b->rule.r_value.relayhost.flags & F_SSL) { + switch (rq_batch->rule.r_value.relayhost.flags & F_SSL) { case F_SSL: s->flags |= MTA_FORCE_ANYSSL; break; @@ -128,6 +128,7 @@ mta_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) SPLAY_INSERT(mtatree, &env->mta_sessions, s); return; + case IMSG_BATCH_APPEND: m = imsg->data; s = mta_lookup(env, m->batch_id); @@ -137,17 +138,23 @@ mta_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) *m = *(struct message *)imsg->data; strlcpy(m->session_errorline, "000 init", sizeof(m->session_errorline)); + + if (s->host == NULL) { + s->host = strdup(m->recipient.domain); + if (s->host == NULL) + fatal("strdup"); + } TAILQ_INSERT_TAIL(&s->recipients, m, entry); return; case IMSG_BATCH_CLOSE: - b = imsg->data; - mta_pickup(mta_lookup(env, b->id), NULL); + rq_batch = imsg->data; + mta_pickup(mta_lookup(env, rq_batch->b_id), NULL); return; case IMSG_QUEUE_MESSAGE_FD: - b = imsg->data; - mta_pickup(mta_lookup(env, b->id), &imsg->fd); + rq_batch = imsg->data; + mta_pickup(mta_lookup(env, rq_batch->b_id), &imsg->fd); return; } } @@ -307,6 +314,7 @@ mta(struct smtpd *env) config_pipes(env, peers, nitems(peers)); config_peers(env, peers, nitems(peers)); + ramqueue_init(env, &env->sc_rqueue); SPLAY_INIT(&env->mta_sessions); if (event_dispatch() < 0) @@ -738,14 +746,14 @@ mta_connect_done(int fd, short event, void *p) void mta_request_datafd(struct mta_session *s) { - struct batch b; + struct ramqueue_batch rq_batch; struct message *m; - b.id = s->id; + rq_batch.b_id = s->id; m = TAILQ_FIRST(&s->recipients); - strlcpy(b.message_id, m->message_id, sizeof(b.message_id)); + strlcpy(rq_batch.m_id, m->message_id, sizeof(rq_batch.m_id)); imsg_compose_event(s->env->sc_ievs[PROC_QUEUE], IMSG_QUEUE_MESSAGE_FD, - 0, 0, -1, &b, sizeof(b)); + 0, 0, -1, &rq_batch, sizeof(rq_batch)); } SPLAY_GENERATE(mtatree, mta_session, entry, mta_session_cmp); diff --git a/usr.sbin/smtpd/parser.c b/usr.sbin/smtpd/parser.c index 26adc172f86..bc9e0fac9b1 100644 --- a/usr.sbin/smtpd/parser.c +++ b/usr.sbin/smtpd/parser.c @@ -1,4 +1,4 @@ -/* $OpenBSD: parser.c,v 1.18 2010/11/28 14:35:58 gilles Exp $ */ +/* $OpenBSD: parser.c,v 1.19 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2006 Pierre-Yves Ritschard @@ -51,8 +51,6 @@ static const struct token t_main[]; static const struct token t_show[]; static const struct token t_pause[]; static const struct token t_resume[]; -static const struct token t_schedule[]; -static const struct token t_remove[]; static const struct token t_log[]; static const struct token t_main[] = { @@ -62,8 +60,6 @@ static const struct token t_main[] = { /* {KEYWORD, "reload", RELOAD, NULL},*/ {KEYWORD, "resume", NONE, t_resume}, {KEYWORD, "stop", SHUTDOWN, NULL}, - {KEYWORD, "schedule", SCHEDULE, t_schedule}, - {KEYWORD, "remove", REMOVE, t_remove}, {KEYWORD, "log", NONE, t_log}, {ENDTOKEN, "", NONE, NULL} }; @@ -89,16 +85,6 @@ static const struct token t_resume[] = { {ENDTOKEN, "", NONE, NULL} }; -static const struct token t_schedule[] = { - {VARIABLE, "message id/uid", SCHEDULE, NULL}, - {ENDTOKEN, "", NONE, NULL} -}; - -static const struct token t_remove[] = { - {VARIABLE, "message id/uid", REMOVE, NULL}, - {ENDTOKEN, "", NONE, NULL} -}; - static const struct token t_log[] = { {KEYWORD, "verbose", LOG_VERBOSE, NULL}, {KEYWORD, "brief", LOG_BRIEF, NULL}, diff --git a/usr.sbin/smtpd/parser.h b/usr.sbin/smtpd/parser.h index d9fefe838ad..06229ccb929 100644 --- a/usr.sbin/smtpd/parser.h +++ b/usr.sbin/smtpd/parser.h @@ -1,4 +1,4 @@ -/* $OpenBSD: parser.h,v 1.15 2010/10/09 22:05:35 gilles Exp $ */ +/* $OpenBSD: parser.h,v 1.16 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2006 Pierre-Yves Ritschard @@ -21,8 +21,6 @@ enum actions { SHUTDOWN, RELOAD, MONITOR, - SCHEDULE, - REMOVE, LOG_VERBOSE, LOG_BRIEF, SHOW_QUEUE, diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index 7d6c59e90cf..c9515bc6bc6 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.94 2010/11/28 15:32:00 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.95 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -54,7 +54,7 @@ queue_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) { struct submit_status ss; struct message *m; - struct batch *b; + struct ramqueue_batch *rq_batch; int fd, ret; if (iev->proc == PROC_SMTP) { @@ -97,6 +97,10 @@ queue_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) } imsg_compose_event(iev, IMSG_QUEUE_COMMIT_MESSAGE, 0, 0, -1, &ss, sizeof ss); + + if (ss.code != 421) + queue_pass_to_runner(env, iev, imsg); + return; case IMSG_QUEUE_MESSAGE_FILE: @@ -165,10 +169,10 @@ queue_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) if (iev->proc == PROC_MTA) { switch (imsg->hdr.type) { case IMSG_QUEUE_MESSAGE_FD: - b = imsg->data; - fd = queue_open_message_file(b->message_id); + rq_batch = imsg->data; + fd = queue_open_message_file(rq_batch->m_id); imsg_compose_event(iev, IMSG_QUEUE_MESSAGE_FD, 0, 0, - fd, b, sizeof *b); + fd, rq_batch, sizeof *rq_batch); return; case IMSG_QUEUE_MESSAGE_UPDATE: @@ -321,16 +325,6 @@ queue(struct smtpd *env) return (0); } -struct batch * -batch_by_id(struct smtpd *env, u_int64_t id) -{ - struct batch lookup; - - lookup.id = id; - return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); -} - - void queue_purge(char *queuepath) { diff --git a/usr.sbin/smtpd/queue_shared.c b/usr.sbin/smtpd/queue_shared.c index 37a42702e43..18549901553 100644 --- a/usr.sbin/smtpd/queue_shared.c +++ b/usr.sbin/smtpd/queue_shared.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue_shared.c,v 1.36 2011/03/21 09:21:57 gilles Exp $ */ +/* $OpenBSD: queue_shared.c,v 1.37 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -325,28 +325,27 @@ bounce_commit_message(struct message *message) } int -bounce_record_message(struct message *messagep) +bounce_record_message(struct message *messagep, struct message *mbounce) { char msgid[MAX_ID_SIZE]; - struct message mbounce; if (messagep->type == T_BOUNCE_MESSAGE) { log_debug("mailer daemons loop detected !"); return 0; } - mbounce = *messagep; - mbounce.type = T_BOUNCE_MESSAGE; - mbounce.status &= ~S_MESSAGE_PERMFAILURE; + *mbounce = *messagep; + mbounce->type = T_BOUNCE_MESSAGE; + mbounce->status &= ~S_MESSAGE_PERMFAILURE; if (! bounce_create_layout(msgid, messagep)) return 0; - strlcpy(mbounce.message_id, msgid, sizeof(mbounce.message_id)); - if (! bounce_record_envelope(&mbounce)) + strlcpy(mbounce->message_id, msgid, sizeof(mbounce->message_id)); + if (! bounce_record_envelope(mbounce)) return 0; - return bounce_commit_message(&mbounce); + return bounce_commit_message(mbounce); } int @@ -457,8 +456,11 @@ queue_message_update(struct message *messagep) if (messagep->status & S_MESSAGE_PERMFAILURE) { if (messagep->type != T_BOUNCE_MESSAGE && - messagep->sender.user[0] != '\0') - bounce_record_message(messagep); + messagep->sender.user[0] != '\0') { + struct message bounce; + + bounce_record_message(messagep, &bounce); + } queue_remove_envelope(messagep); return; } diff --git a/usr.sbin/smtpd/ramqueue.c b/usr.sbin/smtpd/ramqueue.c new file mode 100644 index 00000000000..17d10f7002b --- /dev/null +++ b/usr.sbin/smtpd/ramqueue.c @@ -0,0 +1,350 @@ +/* $OpenBSD: ramqueue.c,v 1.1 2011/04/13 20:53:18 gilles Exp $ */ + +/* + * Copyright (c) 2011 Gilles Chehade + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "smtpd.h" +#include "log.h" + +int ramqueue_expire(struct smtpd *, struct message *, time_t); +void ramqueue_insert(struct ramqueue *, struct message *, time_t); +time_t ramqueue_next_schedule(struct message *, time_t); +int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *); +struct ramqueue_host *ramqueue_get_host(struct ramqueue *, char *); +struct ramqueue_batch *ramqueue_get_batch(struct ramqueue *, + struct ramqueue_host *, struct message *); +void ramqueue_put_host(struct ramqueue *, struct ramqueue_host *); +void ramqueue_put_batch(struct ramqueue *, struct ramqueue_batch *); +int ramqueue_load_offline(struct ramqueue *); + + +void +ramqueue_init(struct smtpd *env, struct ramqueue *rqueue) +{ + bzero(rqueue, sizeof (*rqueue)); + TAILQ_INIT(&rqueue->queue); + rqueue->env = env; + rqueue->current_evp = NULL; +} + +int +ramqueue_is_empty(struct ramqueue *rqueue) +{ + return TAILQ_FIRST(&rqueue->queue) == NULL; +} + +int +ramqueue_batch_is_empty(struct ramqueue_batch *rq_batch) +{ + return TAILQ_FIRST(&rq_batch->envelope_queue) == NULL; +} + +int +ramqueue_host_is_empty(struct ramqueue_host *rq_host) +{ + return TAILQ_FIRST(&rq_host->batch_queue) == NULL; +} + +struct ramqueue_envelope * +ramqueue_first_envelope(struct ramqueue *rqueue) +{ + return TAILQ_FIRST(&rqueue->queue); +} + +struct ramqueue_envelope * +ramqueue_next_envelope(struct ramqueue *rqueue) +{ + if (rqueue->current_evp == NULL) + rqueue->current_evp = TAILQ_FIRST(&rqueue->queue); + else + rqueue->current_evp = TAILQ_NEXT(rqueue->current_evp, queue_entry); + return rqueue->current_evp; +} + +struct ramqueue_envelope * +ramqueue_batch_first_envelope(struct ramqueue_batch *rq_batch) +{ + return TAILQ_FIRST(&rq_batch->envelope_queue); +} + +int +ramqueue_load_offline(struct ramqueue *rqueue) +{ + char path[MAXPATHLEN]; + struct smtpd *env = rqueue->env; + static struct qwalk *q = NULL; + + log_debug("ramqueue: offline queue loading in progress"); + if (q == NULL) + q = qwalk_new(PATH_OFFLINE); + while (qwalk(q, path)) { + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_PARENT_ENQUEUE_OFFLINE, PROC_PARENT, 0, -1, path, + strlen(path) + 1); + log_debug("ramqueue: offline queue loading interrupted"); + return 0; + } + qwalk_close(q); + q = NULL; + log_debug("ramqueue: offline queue loading over"); + return 1; +} + +int +ramqueue_load(struct ramqueue *rqueue, time_t *nsched) +{ + char path[MAXPATHLEN]; + time_t curtm; + struct message envelope; + static struct qwalk *q = NULL; + struct ramqueue_envelope *rq_evp; + + + + log_debug("ramqueue: queue loading in progress"); + + if (q == NULL) + q = qwalk_new(PATH_QUEUE); + while (qwalk(q, path)) { + curtm = time(NULL); + + if (! queue_load_envelope(&envelope, basename(path))) + continue; + if (ramqueue_expire(rqueue->env, &envelope, curtm)) + continue; + ramqueue_insert(rqueue, &envelope, curtm); + + rq_evp = TAILQ_FIRST(&rqueue->queue); + *nsched = rq_evp->sched; + + if (rq_evp->sched <= *nsched) { + log_debug("ramqueue: loading interrupted"); + return (0); + } + } + qwalk_close(q); + q = NULL; + log_debug("ramqueue: loading over"); + return (1); +} + +void +ramqueue_insert(struct ramqueue *rqueue, struct message *envelope, time_t curtm) +{ + struct ramqueue_envelope *rq_evp; + struct ramqueue_envelope *evp; + + rq_evp = calloc(1, sizeof (*rq_evp)); + if (rq_evp == NULL) + fatal("calloc"); + strlcpy(rq_evp->id, envelope->message_uid, sizeof(rq_evp->id)); + rq_evp->sched = ramqueue_next_schedule(envelope, curtm); + rq_evp->host = ramqueue_get_host(rqueue, envelope->recipient.domain); + rq_evp->batch = ramqueue_get_batch(rqueue, rq_evp->host, envelope); + + TAILQ_INSERT_TAIL(&rq_evp->batch->envelope_queue, rq_evp, + batchqueue_entry); + + /* sorted insert */ + TAILQ_FOREACH(evp, &rqueue->queue, queue_entry) { + if (evp->sched >= rq_evp->sched) { + TAILQ_INSERT_BEFORE(evp, rq_evp, queue_entry); + break; + } + } + if (evp == NULL) + TAILQ_INSERT_TAIL(&rqueue->queue, rq_evp, queue_entry); + + rqueue->env->stats->ramqueue.envelopes++; + SET_IF_GREATER(rqueue->env->stats->ramqueue.envelopes, + rqueue->env->stats->ramqueue.envelopes_max); +} + +void +ramqueue_remove(struct ramqueue *rqueue, struct ramqueue_envelope *rq_evp) +{ + struct ramqueue_batch *rq_batch = rq_evp->batch; + + if (rq_evp == rqueue->current_evp) + rqueue->current_evp = TAILQ_NEXT(rqueue->current_evp, queue_entry); + + TAILQ_REMOVE(&rq_batch->envelope_queue, rq_evp, batchqueue_entry); + TAILQ_REMOVE(&rqueue->queue, rq_evp, queue_entry); + rqueue->env->stats->ramqueue.envelopes--; +} + +int +ramqueue_expire(struct smtpd *env, struct message *envelope, time_t curtm) +{ + struct message bounce; + + if (curtm - envelope->creation >= envelope->expire) { + message_set_errormsg(envelope, + "message expired after sitting in queue for %d days", + envelope->expire / 60 / 60 / 24); + bounce_record_message(envelope, &bounce); + ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL)); + queue_remove_envelope(envelope); + return 1; + } + return 0; +} + +time_t +ramqueue_next_schedule(struct message *envelope, time_t curtm) +{ + time_t delay; + + if (envelope->lasttry == 0) + return curtm; + + delay = SMTPD_QUEUE_MAXINTERVAL; + + if (envelope->type == T_MDA_MESSAGE || + envelope->type == T_BOUNCE_MESSAGE) { + if (envelope->retry < 5) + return curtm; + + if (envelope->retry < 15) + delay = (envelope->retry * 60) + arc4random_uniform(60); + } + + if (envelope->type == T_MTA_MESSAGE) { + if (envelope->retry < 3) + delay = SMTPD_QUEUE_INTERVAL; + else if (envelope->retry <= 7) { + delay = SMTPD_QUEUE_INTERVAL * (1 << (envelope->retry - 3)); + if (delay > SMTPD_QUEUE_MAXINTERVAL) + delay = SMTPD_QUEUE_MAXINTERVAL; + } + } + + if (curtm >= envelope->lasttry + delay) + return curtm; + + return curtm + delay; +} + +struct ramqueue_host * +ramqueue_get_host(struct ramqueue *rqueue, char *hostname) +{ + struct ramqueue_host *rq_host, key; + + strlcpy(key.hostname, hostname, sizeof(key.hostname)); + rq_host = RB_FIND(hosttree, &rqueue->hosttree, &key); + if (rq_host == NULL) { + rq_host = calloc(1, sizeof (*rq_host)); + if (rq_host == NULL) + fatal("calloc"); + rq_host->h_id = generate_uid(); + strlcpy(rq_host->hostname, hostname, sizeof(rq_host->hostname)); + TAILQ_INIT(&rq_host->batch_queue); + RB_INSERT(hosttree, &rqueue->hosttree, rq_host); + rqueue->env->stats->ramqueue.hosts++; + SET_IF_GREATER(rqueue->env->stats->ramqueue.hosts, + rqueue->env->stats->ramqueue.hosts_max); + } + + return rq_host; +} + +void +ramqueue_put_host(struct ramqueue *rqueue, struct ramqueue_host *host) +{ + TAILQ_INIT(&host->batch_queue); + RB_INSERT(hosttree, &rqueue->hosttree, host); +} + +struct ramqueue_batch * +ramqueue_get_batch(struct ramqueue *rqueue, struct ramqueue_host *host, + struct message *envelope) +{ + struct ramqueue_batch *rq_batch; + + TAILQ_FOREACH(rq_batch, &host->batch_queue, batch_entry) { + if (strcmp(rq_batch->m_id, envelope->message_id) == 0) + return rq_batch; + } + + rq_batch = calloc(1, sizeof (*rq_batch)); + if (rq_batch == NULL) + fatal("calloc"); + rq_batch->b_id = generate_uid(); + rq_batch->type = envelope->type; + rq_batch->rule = envelope->recipient.rule; + strlcpy(rq_batch->m_id, envelope->message_id, sizeof(rq_batch->m_id)); + TAILQ_INIT(&rq_batch->envelope_queue); + TAILQ_INSERT_TAIL(&host->batch_queue, rq_batch, batch_entry); + + rqueue->env->stats->ramqueue.batches++; + SET_IF_GREATER(rqueue->env->stats->ramqueue.batches, + rqueue->env->stats->ramqueue.batches_max); + return rq_batch; +} + +void +ramqueue_put_batch(struct ramqueue *rqueue, struct ramqueue_batch *rq_batch) +{ + struct ramqueue_host *rq_host; + + TAILQ_INIT(&rq_batch->envelope_queue); + RB_FOREACH(rq_host, hosttree, &rqueue->hosttree) { + if (rq_host->h_id == rq_batch->h_id) { + TAILQ_INSERT_TAIL(&rq_host->batch_queue, rq_batch, + batch_entry); + return; + } + } +} + +void +ramqueue_remove_batch(struct ramqueue_host *rq_host, struct ramqueue_batch *rq_batch) +{ + TAILQ_REMOVE(&rq_host->batch_queue, rq_batch, batch_entry); +} + +void +ramqueue_remove_host(struct ramqueue *rqueue, struct ramqueue_host *rq_host) +{ + RB_REMOVE(hosttree, &rqueue->hosttree, rq_host); +} + +int +ramqueue_host_cmp(struct ramqueue_host *h1, struct ramqueue_host *h2) +{ + return strcmp(h1->hostname, h2->hostname); +} + +RB_GENERATE(hosttree, ramqueue_host, host_entry, ramqueue_host_cmp); diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c index a91099d42dd..da71b28eb53 100644 --- a/usr.sbin/smtpd/runner.c +++ b/usr.sbin/smtpd/runner.c @@ -1,4 +1,4 @@ -/* $OpenBSD: runner.c,v 1.95 2010/11/28 14:35:58 gilles Exp $ */ +/* $OpenBSD: runner.c,v 1.96 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -48,42 +48,66 @@ void runner_sig_handler(int, short, void *); void runner_setup_events(struct smtpd *); void runner_disable_events(struct smtpd *); -void runner_reset_flags(void); -void runner_process_offline(struct smtpd *); - void runner_timeout(int, short, void *); -void runner_process_queue(struct smtpd *); -void runner_process_runqueue(struct smtpd *); -void runner_process_batchqueue(struct smtpd *); - -int runner_message_schedule(struct message *, time_t); +int runner_process_envelope(struct smtpd *, struct ramqueue_envelope *, time_t); +void runner_process_batch(struct smtpd *, struct ramqueue_envelope *, time_t); void runner_purge_run(void); void runner_purge_message(char *); int runner_check_loop(struct message *); -struct batch *batch_record(struct smtpd *, struct message *); -struct batch *batch_lookup(struct smtpd *, struct message *); +int runner_force_message_to_ramqueue(struct ramqueue *, char *); -int runner_force_envelope_schedule(char *); -int runner_force_message_schedule(char *); - -int runner_force_envelope_remove(char *); -int runner_force_message_remove(char *); +void ramqueue_insert(struct ramqueue *, struct message *, time_t); void runner_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) { struct message *m; - struct remove *rem; - struct sched *sched; switch (imsg->hdr.type) { + case IMSG_QUEUE_COMMIT_MESSAGE: + m = imsg->data; + runner_force_message_to_ramqueue(&env->sc_rqueue, m->message_id); + runner_setup_events(env); + return; + case IMSG_QUEUE_MESSAGE_UPDATE: + m = imsg->data; + m->retry++; env->stats->runner.active--; - queue_message_update(imsg->data); + + /* temporary failure, message remains in queue, + * gets reinserted in ramqueue + */ + if (m->status & S_MESSAGE_TEMPFAILURE) { + m->status &= ~S_MESSAGE_TEMPFAILURE; + queue_update_envelope(m); + ramqueue_insert(&env->sc_rqueue, m, time(NULL)); + runner_setup_events(env); + return; + } + + /* permanent failure, eventually generate a + * bounce (and insert bounce in ramqueue). + */ + if (m->status & S_MESSAGE_PERMFAILURE) { + struct message bounce; + + if (m->type != T_BOUNCE_MESSAGE && + m->sender.user[0] != '\0') { + bounce_record_message(m, &bounce); + ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL)); + runner_setup_events(env); + } + } + + /* successful delivery or permanent failure, + * remove envelope from queue. + */ + queue_remove_envelope(m); return; case IMSG_MDA_SESS_NEW: @@ -94,37 +118,18 @@ runner_imsg(struct smtpd *env, struct imsgev *iev, struct imsg *imsg) env->stats->mta.sessions_active--; return; - case IMSG_QUEUE_SCHEDULE: - sched = imsg->data; - sched->ret = 0; - if (valid_message_uid(sched->mid)) - sched->ret = runner_force_envelope_schedule(sched->mid); - else if (valid_message_id(sched->mid)) - sched->ret = runner_force_message_schedule(sched->mid); - imsg_compose_event(iev, IMSG_QUEUE_SCHEDULE, imsg->hdr.peerid, - 0, -1, sched, sizeof *sched); - return; - - case IMSG_QUEUE_REMOVE: - rem = imsg->data; - rem->ret = 0; - if (valid_message_uid(rem->mid)) - rem->ret = runner_force_envelope_remove(rem->mid); - else if (valid_message_id(rem->mid)) - rem->ret = runner_force_message_remove(rem->mid); - imsg_compose_event(iev, IMSG_QUEUE_REMOVE, imsg->hdr.peerid, 0, - -1, rem, sizeof *rem); - return; - case IMSG_PARENT_ENQUEUE_OFFLINE: - runner_process_offline(env); + /* runner_process_offline(env);*/ return; case IMSG_SMTP_ENQUEUE: m = imsg->data; if (imsg->fd < 0 || !bounce_session(env, imsg->fd, m)) { - m->status = S_MESSAGE_TEMPFAILURE; - queue_message_update(m); + m->status = 0; + queue_update_envelope(m); + ramqueue_insert(&env->sc_rqueue, m, time(NULL)); + runner_setup_events(env); + return; } return; @@ -228,7 +233,7 @@ runner(struct smtpd *env) setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) fatal("runner: cannot drop privileges"); - SPLAY_INIT(&env->batch_queue); + ramqueue_init(env, &env->sc_rqueue); imsg_callback = runner_imsg; event_init(); @@ -249,8 +254,6 @@ runner(struct smtpd *env) config_peers(env, peers, nitems(peers)); unlink(PATH_QUEUE "/envelope.tmp"); - runner_reset_flags(); - runner_process_offline(env); runner_setup_events(env); event_dispatch(); @@ -259,382 +262,250 @@ runner(struct smtpd *env) return (0); } -void -runner_process_offline(struct smtpd *env) -{ - char path[MAXPATHLEN]; - struct qwalk *q; - - q = qwalk_new(PATH_OFFLINE); - - if (qwalk(q, path)) - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_PARENT_ENQUEUE_OFFLINE, PROC_PARENT, 0, -1, path, - strlen(path) + 1); - - qwalk_close(q); -} - -void -runner_reset_flags(void) -{ - char path[MAXPATHLEN]; - struct message message; - struct qwalk *q; - - q = qwalk_new(PATH_QUEUE); - - while (qwalk(q, path)) { - while (! queue_load_envelope(&message, basename(path))) - sleep(1); - message_reset_flags(&message); - } - - qwalk_close(q); -} - void runner_timeout(int fd, short event, void *p) { struct smtpd *env = p; + struct ramqueue *rqueue = &env->sc_rqueue; + struct ramqueue_envelope *rq_evp; struct timeval tv; + static int rq_done = 0; + static int rq_off_done = 0; + time_t nsched; + time_t curtm; runner_purge_run(); - runner_process_queue(env); - runner_process_runqueue(env); - runner_process_batchqueue(env); - - tv.tv_sec = 1; - tv.tv_usec = 0; - evtimer_add(&env->sc_ev, &tv); -} - -void -runner_process_queue(struct smtpd *env) -{ - char path[MAXPATHLEN]; - char rqpath[MAXPATHLEN]; - struct message message; - time_t now; - size_t mta_av, mda_av, bnc_av; - struct qwalk *q; - - mta_av = env->sc_maxconn - env->stats->mta.sessions_active; - mda_av = env->sc_maxconn - env->stats->mda.sessions_active; - bnc_av = env->sc_maxconn - env->stats->runner.bounces_active; - - now = time(NULL); - q = qwalk_new(PATH_QUEUE); - - while (qwalk(q, path)) { - if (! queue_load_envelope(&message, basename(path))) - continue; - - if (message.type & T_MDA_MESSAGE) { - if (env->sc_opts & SMTPD_MDA_PAUSED) - continue; - if (mda_av == 0) - continue; - } - - if (message.type & T_MTA_MESSAGE) { - if (env->sc_opts & SMTPD_MTA_PAUSED) - continue; - if (mta_av == 0) - continue; - } + nsched = 0; + rq_evp = ramqueue_first_envelope(rqueue); + if (rq_evp) + nsched = rq_evp->sched; - if (message.type & T_BOUNCE_MESSAGE) { - if (env->sc_opts & (SMTPD_MDA_PAUSED|SMTPD_MTA_PAUSED)) - continue; - if (bnc_av == 0) - continue; - } - if (! runner_message_schedule(&message, now)) - continue; - - if (runner_check_loop(&message)) { - message_set_errormsg(&message, "loop has been detected"); - bounce_record_message(&message); - queue_remove_envelope(&message); - continue; - } + /* fetch one offline message at a time to prevent a huge + * offline queue from hogging the deliveries of incoming + * messages. + */ + if (! rq_off_done) + rq_off_done = ramqueue_load_offline(rqueue); - message.flags |= F_MESSAGE_SCHEDULED; - message.flags &= ~F_MESSAGE_FORCESCHEDULE; - queue_update_envelope(&message); - if (! bsnprintf(rqpath, sizeof(rqpath), "%s/%s", PATH_RUNQUEUE, - basename(path))) - fatalx("runner_process_queue: snprintf"); + /* load as many envelopes as possible from disk-queue to + * ram-queue until a schedulable envelope is found. + */ + if (! rq_done) + rq_done = ramqueue_load(rqueue, &nsched); - if (symlink(path, rqpath) == -1) { - if (errno == EEXIST) - continue; - if (errno == ENOSPC) - break; - fatal("runner_process_queue: symlink"); - } - if (message.type & T_MDA_MESSAGE) - mda_av--; - if (message.type & T_MTA_MESSAGE) - mta_av--; - if (message.type & T_BOUNCE_MESSAGE) - bnc_av--; + /* let's do the schedule dance baby ! */ + curtm = time(NULL); + rq_evp = ramqueue_next_envelope(rqueue); + while (rq_evp) { + if (rq_evp->sched > curtm) + break; + runner_process_envelope(env, rq_evp, curtm); + rq_evp = ramqueue_next_envelope(rqueue); } - - qwalk_close(q); -} -void -runner_process_runqueue(struct smtpd *env) -{ - char path[MAXPATHLEN]; - struct message message; - time_t tm; - struct batch *batchp; - struct message *messagep; - struct qwalk *q; - - tm = time(NULL); - - q = qwalk_new(PATH_RUNQUEUE); - - while (qwalk(q, path)) { - unlink(path); - - if (! queue_load_envelope(&message, basename(path))) - continue; - - if (message.flags & F_MESSAGE_PROCESSING) - continue; - - message.lasttry = tm; - message.flags &= ~F_MESSAGE_SCHEDULED; - message.flags |= F_MESSAGE_PROCESSING; - - if (! queue_update_envelope(&message)) - continue; - - messagep = calloc(1, sizeof (struct message)); - if (messagep == NULL) - fatal("runner_process_runqueue: calloc"); - *messagep = message; - - messagep->batch_id = 0; - batchp = batch_lookup(env, messagep); - if (batchp != NULL) - messagep->batch_id = batchp->id; - - batchp = batch_record(env, messagep); - if (messagep->batch_id == 0) - messagep->batch_id = batchp->id; + if (rq_done && rq_off_done && ramqueue_is_empty(rqueue)) { + log_debug("runner: ramqueue is empty, wake me up. zZzZzZ"); + return; } - qwalk_close(q); -} - -void -runner_process_batchqueue(struct smtpd *env) -{ - struct batch *batchp; - struct message *m; - int fd; - - while ((batchp = SPLAY_MIN(batchtree, &env->batch_queue)) != NULL) { - switch (batchp->type) { - case T_BOUNCE_BATCH: - while ((m = TAILQ_FIRST(&batchp->messages))) { - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, m, - sizeof *m); - TAILQ_REMOVE(&batchp->messages, m, entry); - free(m); - } - env->stats->runner.bounces_active++; - env->stats->runner.bounces++; - SET_IF_GREATER(env->stats->runner.bounces_active, - env->stats->runner.bounces_maxactive); - break; - - case T_MDA_BATCH: - m = TAILQ_FIRST(&batchp->messages); - fd = queue_open_message_file(m->message_id); - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, m, - sizeof *m); - TAILQ_REMOVE(&batchp->messages, m, entry); - free(m); - env->stats->mda.sessions_active++; - env->stats->mda.sessions++; - SET_IF_GREATER(env->stats->mda.sessions_active, - env->stats->mda.sessions_maxactive); - break; - - case T_MTA_BATCH: - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CREATE, PROC_MTA, 0, -1, batchp, - sizeof *batchp); - while ((m = TAILQ_FIRST(&batchp->messages))) { - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_APPEND, PROC_MTA, 0, -1, m, - sizeof *m); - TAILQ_REMOVE(&batchp->messages, m, entry); - free(m); - } - imsg_compose_event(env->sc_ievs[PROC_QUEUE], - IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, batchp, - sizeof *batchp); - env->stats->mta.sessions_active++; - env->stats->mta.sessions++; - SET_IF_GREATER(env->stats->mta.sessions_active, - env->stats->mta.sessions_maxactive); - break; + /* disk-queues not fully loaded, no time for sleeping */ + if (!rq_done || !rq_off_done) + nsched = 0; + else { + nsched = nsched - curtm; + if (nsched < 0) + nsched = 0; + } - default: - fatalx("runner_process_batchqueue: unknown type"); - } + log_debug("runner: nothing to do for the next %d seconds, zZzZzZ", + nsched); - SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); - free(batchp); - } + tv.tv_sec = nsched; + tv.tv_usec = 0; + evtimer_add(&env->sc_ev, &tv); } int -runner_message_schedule(struct message *messagep, time_t tm) +runner_process_envelope(struct smtpd *env, struct ramqueue_envelope *rq_evp, time_t curtm) { - time_t delay; + size_t mta_av, mda_av, bnc_av; + struct message envelope; - if (messagep->flags & (F_MESSAGE_SCHEDULED|F_MESSAGE_PROCESSING)) + mta_av = env->sc_maxconn - env->stats->mta.sessions_active; + mda_av = env->sc_maxconn - env->stats->mda.sessions_active; + bnc_av = env->sc_maxconn - env->stats->runner.bounces_active; + + if (! queue_load_envelope(&envelope, rq_evp->id)) return 0; - if (messagep->flags & F_MESSAGE_FORCESCHEDULE) - return 1; - - /* Batch has been in the queue for too long and expired */ - if (tm - messagep->creation >= messagep->expire) { - message_set_errormsg(messagep, - "message expired after sitting in queue for %d days", - messagep->expire / 60 / 60 / 24); - bounce_record_message(messagep); - queue_remove_envelope(messagep); - return 0; + if (envelope.type & T_MDA_MESSAGE) { + if (env->sc_opts & SMTPD_MDA_PAUSED) + return 0; + if (mda_av == 0) + return 0; } - if (messagep->lasttry == 0) - return 1; - - delay = SMTPD_QUEUE_MAXINTERVAL; - - // recompute path - - if (messagep->type == T_MDA_MESSAGE || - messagep->type == T_BOUNCE_MESSAGE) { - if (messagep->retry < 5) - return 1; - - if (messagep->retry < 15) - delay = (messagep->retry * 60) + arc4random_uniform(60); + if (envelope.type & T_MTA_MESSAGE) { + if (env->sc_opts & SMTPD_MTA_PAUSED) + return 0; + if (mta_av == 0) + return 0; } - if (messagep->type == T_MTA_MESSAGE) { - if (messagep->retry < 3) - delay = SMTPD_QUEUE_INTERVAL; - else if (messagep->retry <= 7) { - delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3)); - if (delay > SMTPD_QUEUE_MAXINTERVAL) - delay = SMTPD_QUEUE_MAXINTERVAL; - } + if (envelope.type & T_BOUNCE_MESSAGE) { + if (env->sc_opts & (SMTPD_MDA_PAUSED|SMTPD_MTA_PAUSED)) + return 0; + if (bnc_av == 0) + return 0; } - if (tm >= messagep->lasttry + delay) - return 1; - - return 0; -} - -int -runner_force_envelope_schedule(char *mid) -{ - struct message message; - - if (! queue_load_envelope(&message, mid)) - return 0; + if (runner_check_loop(&envelope)) { + struct message bounce; - if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED)) - return 1; - - message.flags |= F_MESSAGE_FORCESCHEDULE; - - if (! queue_update_envelope(&message)) + message_set_errormsg(&envelope, "loop has been detected"); + bounce_record_message(&envelope, &bounce); + ramqueue_insert(&env->sc_rqueue, &bounce, time(NULL)); + runner_setup_events(env); + queue_remove_envelope(&envelope); return 0; - - return 1; -} - -int -runner_force_message_schedule(char *mid) -{ - char path[MAXPATHLEN]; - DIR *dirp; - struct dirent *dp; - - if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes", - PATH_QUEUE, queue_hash(mid), mid)) - return 0; - - dirp = opendir(path); - if (dirp == NULL) - return 0; - - while ((dp = readdir(dirp)) != NULL) { - if (valid_message_uid(dp->d_name)) - runner_force_envelope_schedule(dp->d_name); } - closedir(dirp); + + log_debug("dispatching host: %p, batch: %p, envelope: %p, %s", + rq_evp->host, + rq_evp->batch, + rq_evp, rq_evp->id); + runner_process_batch(env, rq_evp, curtm); return 1; } -int -runner_force_envelope_remove(char *mid) +void +runner_process_batch(struct smtpd *env, struct ramqueue_envelope *rq_evp, time_t curtm) { - struct message message; - - if (! queue_load_envelope(&message, mid)) - return 0; + struct ramqueue_host *host = rq_evp->host; + struct ramqueue_batch *batch = rq_evp->batch; + struct message envelope; + int fd; - if (message.flags & (F_MESSAGE_PROCESSING|F_MESSAGE_SCHEDULED)) - return 0; + switch (batch->type) { + case T_BOUNCE_MESSAGE: + while ((rq_evp = ramqueue_batch_first_envelope(batch))) { + if (! queue_load_envelope(&envelope, rq_evp->id)) + return; + envelope.lasttry = curtm; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_SMTP_ENQUEUE, PROC_SMTP, 0, -1, &envelope, + sizeof envelope); + ramqueue_remove(&env->sc_rqueue, rq_evp); + free(rq_evp); + } + env->stats->runner.bounces_active++; + env->stats->runner.bounces++; + SET_IF_GREATER(env->stats->runner.bounces_active, + env->stats->runner.bounces_maxactive); + env->stats->runner.active++; + SET_IF_GREATER(env->stats->runner.active, + env->stats->runner.maxactive); + break; + + case T_MDA_MESSAGE: + + rq_evp = ramqueue_batch_first_envelope(batch); + if (! queue_load_envelope(&envelope, rq_evp->id)) + return; + envelope.lasttry = curtm; + fd = queue_open_message_file(envelope.message_id); + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_MDA_SESS_NEW, PROC_MDA, 0, fd, &envelope, + sizeof envelope); + ramqueue_remove(&env->sc_rqueue, rq_evp); + free(rq_evp); + + env->stats->mda.sessions_active++; + env->stats->mda.sessions++; + SET_IF_GREATER(env->stats->mda.sessions_active, + env->stats->mda.sessions_maxactive); + env->stats->runner.active++; + SET_IF_GREATER(env->stats->runner.active, + env->stats->runner.maxactive); + break; + + case T_MTA_MESSAGE: + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CREATE, PROC_MTA, 0, -1, batch, + sizeof *batch); + while ((rq_evp = ramqueue_batch_first_envelope(batch))) { + if (! queue_load_envelope(&envelope, rq_evp->id)) + return; + envelope.lasttry = curtm; + envelope.batch_id = batch->b_id; + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_APPEND, PROC_MTA, 0, -1, &envelope, + sizeof envelope); + ramqueue_remove(&env->sc_rqueue, rq_evp); + free(rq_evp); + env->stats->runner.active++; + SET_IF_GREATER(env->stats->runner.active, + env->stats->runner.maxactive); + } + imsg_compose_event(env->sc_ievs[PROC_QUEUE], + IMSG_BATCH_CLOSE, PROC_MTA, 0, -1, batch, + sizeof *batch); + env->stats->mta.sessions_active++; + env->stats->mta.sessions++; + SET_IF_GREATER(env->stats->mta.sessions_active, + env->stats->mta.sessions_maxactive); + break; + + default: + fatalx("runner_process_batchqueue: unknown type"); + } - if (! queue_remove_envelope(&message)) - return 0; + if (ramqueue_batch_is_empty(batch)) { + ramqueue_remove_batch(host, batch); + free(batch); + env->stats->ramqueue.batches--; + + } - return 1; + if (ramqueue_host_is_empty(host)) { + ramqueue_remove_host(&env->sc_rqueue, host); + free(host); + env->stats->ramqueue.hosts--; + } } +/* XXX - temporary solution */ int -runner_force_message_remove(char *mid) +runner_force_message_to_ramqueue(struct ramqueue *rqueue, char *mid) { char path[MAXPATHLEN]; DIR *dirp; struct dirent *dp; + struct message envelope; + time_t curtm; if (! bsnprintf(path, MAXPATHLEN, "%s/%d/%s/envelopes", PATH_QUEUE, queue_hash(mid), mid)) return 0; + dirp = opendir(path); if (dirp == NULL) return 0; + curtm = time(NULL); while ((dp = readdir(dirp)) != NULL) { - if (valid_message_uid(dp->d_name)) - runner_force_envelope_remove(dp->d_name); + if (valid_message_uid(dp->d_name)) { + if (! queue_load_envelope(&envelope, dp->d_name)) + continue; + ramqueue_insert(rqueue, &envelope, curtm); + } } closedir(dirp); @@ -709,114 +580,6 @@ delroot: fatal("runner_purge_message: rmdir"); } -struct batch * -batch_record(struct smtpd *env, struct message *messagep) -{ - struct batch *batchp; - struct path *path; - - batchp = NULL; - if (messagep->batch_id != 0) { - batchp = batch_by_id(env, messagep->batch_id); - if (batchp == NULL) - fatalx("batch_record: internal inconsistency."); - } - if (batchp == NULL) { - batchp = calloc(1, sizeof(struct batch)); - if (batchp == NULL) - fatal("batch_record: calloc"); - - batchp->id = generate_uid(); - - (void)strlcpy(batchp->message_id, messagep->message_id, - sizeof(batchp->message_id)); - TAILQ_INIT(&batchp->messages); - SPLAY_INSERT(batchtree, &env->batch_queue, batchp); - - if (messagep->type & T_BOUNCE_MESSAGE) { - batchp->type = T_BOUNCE_BATCH; - path = &messagep->sender; - } - else { - path = &messagep->recipient; - } - batchp->rule = path->rule; - - (void)strlcpy(batchp->hostname, path->domain, - sizeof(batchp->hostname)); - - if (batchp->type != T_BOUNCE_BATCH) { - if (IS_MAILBOX(*path) || IS_EXT(*path)) { - batchp->type = T_MDA_BATCH; - } - else { - batchp->type = T_MTA_BATCH; - } - } - } - - TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry); - env->stats->runner.active++; - SET_IF_GREATER(env->stats->runner.active, - env->stats->runner.maxactive); - return batchp; -} - -struct batch * -batch_lookup(struct smtpd *env, struct message *message) -{ - struct batch *batchp; - struct batch lookup; - - /* We only support delivery of one message at a time, in MDA - * and bounces messages. - */ - if (message->type == T_BOUNCE_MESSAGE || message->type == T_MDA_MESSAGE) - return NULL; - - /* If message->batch_id != 0, we can retrieve batch by id */ - if (message->batch_id != 0) { - lookup.id = message->batch_id; - return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); - } - - /* We do not know the batch_id yet, maybe it was created but we could not - * be notified, or it just does not exist. Let's scan to see if we can do - * a match based on our message_id and flags. - */ - SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { - - if (batchp->type != message->type) - continue; - - if (strcasecmp(batchp->message_id, message->message_id) != 0) - continue; - - if (batchp->type & T_MTA_BATCH) - if (strcasecmp(batchp->hostname, message->recipient.domain) != 0) - continue; - - break; - } - - return batchp; -} - -int -batch_cmp(struct batch *s1, struct batch *s2) -{ - /* - * do not return u_int64_t's - */ - if (s1->id < s2->id) - return (-1); - - if (s1->id > s2->id) - return (1); - - return (0); -} - int runner_check_loop(struct message *messagep) { @@ -879,15 +642,3 @@ runner_check_loop(struct message *messagep) fclose(fp); return ret; } - -void -message_reset_flags(struct message *m) -{ - m->flags &= ~F_MESSAGE_SCHEDULED; - m->flags &= ~F_MESSAGE_PROCESSING; - - while (! queue_update_envelope(m)) - sleep(1); -} - -SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); diff --git a/usr.sbin/smtpd/smtpctl.c b/usr.sbin/smtpd/smtpctl.c index cfe4ff83519..4c0be4cc7fd 100644 --- a/usr.sbin/smtpd/smtpctl.c +++ b/usr.sbin/smtpd/smtpctl.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpctl.c,v 1.57 2011/04/02 16:40:19 eric Exp $ */ +/* $OpenBSD: smtpctl.c,v 1.58 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2006 Pierre-Yves Ritschard @@ -92,7 +92,6 @@ main(int argc, char *argv[]) show_queue(PATH_QUEUE, 0); break; case SHOW_RUNQUEUE: - show_queue(PATH_RUNQUEUE, 0); break; default: goto connected; @@ -130,11 +129,6 @@ connected: case SHUTDOWN: imsg_compose(ibuf, IMSG_CTL_SHUTDOWN, 0, 0, -1, NULL, 0); break; -/* - case RELOAD: - imsg_compose(ibuf, IMSG_CONF_RELOAD, 0, 0, -1, NULL, 0); - break; - */ case PAUSE_MDA: imsg_compose(ibuf, IMSG_QUEUE_PAUSE_LOCAL, 0, 0, -1, NULL, 0); break; @@ -156,24 +150,6 @@ connected: case SHOW_STATS: imsg_compose(ibuf, IMSG_STATS, 0, 0, -1, NULL, 0); break; - case SCHEDULE: { - struct sched s; - - s.fd = -1; - bzero(s.mid, sizeof (s.mid)); - strlcpy(s.mid, res->data, sizeof (s.mid)); - imsg_compose(ibuf, IMSG_QUEUE_SCHEDULE, 0, 0, -1, &s, sizeof (s)); - break; - } - case REMOVE: { - struct remove s; - - s.fd = -1; - bzero(s.mid, sizeof (s.mid)); - strlcpy(s.mid, res->data, sizeof (s.mid)); - imsg_compose(ibuf, IMSG_QUEUE_REMOVE, 0, 0, -1, &s, sizeof (s)); - break; - } case MONITOR: /* XXX */ break; @@ -206,10 +182,8 @@ connected: if (n == 0) break; switch(res->action) { -/* case RELOAD:*/ + /* case RELOAD: */ case SHUTDOWN: - case SCHEDULE: - case REMOVE: case PAUSE_MDA: case PAUSE_MTA: case PAUSE_SMTP: @@ -304,6 +278,17 @@ show_stats_output(struct imsg *imsg) printf("runner.bounces.maxactive=%zd\n", stats->runner.bounces_maxactive); + printf("ramqueue.hosts=%zd\n", stats->ramqueue.hosts); + printf("ramqueue.batches=%zd\n", stats->ramqueue.batches); + printf("ramqueue.envelopes=%zd\n", stats->ramqueue.envelopes); + printf("ramqueue.hosts.max=%zd\n", stats->ramqueue.hosts_max); + printf("ramqueue.batches.max=%zd\n", stats->ramqueue.batches_max); + printf("ramqueue.envelopes.max=%zd\n", stats->ramqueue.envelopes_max); + printf("ramqueue.size=%zd\n", + stats->ramqueue.hosts * sizeof(struct ramqueue_host) + + stats->ramqueue.batches * sizeof(struct ramqueue_batch) + + stats->ramqueue.envelopes * sizeof(struct ramqueue_envelope)); + printf("smtp.errors.delays=%zd\n", stats->smtp.delays); printf("smtp.errors.linetoolong=%zd\n", stats->smtp.linetoolong); printf("smtp.errors.read_eof=%zd\n", stats->smtp.read_eof); diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c index bd3a27dafaf..894a0e5068d 100644 --- a/usr.sbin/smtpd/smtpd.c +++ b/usr.sbin/smtpd/smtpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.c,v 1.116 2011/03/15 19:24:55 gilles Exp $ */ +/* $OpenBSD: smtpd.c,v 1.117 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -638,8 +638,7 @@ setup_spool(uid_t uid, gid_t gid) { unsigned int n; char *paths[] = { PATH_INCOMING, PATH_ENQUEUE, PATH_QUEUE, - PATH_RUNQUEUE, PATH_PURGE, - PATH_OFFLINE, PATH_BOUNCE }; + PATH_PURGE, PATH_OFFLINE, PATH_BOUNCE }; char pathname[MAXPATHLEN]; struct stat sb; int ret; diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 4b6b25448d0..1f112c94af9 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.207 2011/04/02 16:40:19 eric Exp $ */ +/* $OpenBSD: smtpd.h,v 1.208 2011/04/13 20:53:18 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade @@ -71,8 +71,6 @@ #define PATH_MESSAGE "/message" #define PATH_ENVELOPES "/envelopes" -#define PATH_RUNQUEUE "/runqueue" - #define PATH_OFFLINE "/offline" #define PATH_BOUNCE "/bounce" @@ -463,23 +461,6 @@ struct message { enum message_status status; }; -enum batch_type { - T_MDA_BATCH = 0x1, - T_MTA_BATCH = 0x2, - T_BOUNCE_BATCH = 0x4 -}; - -struct batch { - SPLAY_ENTRY(batch) b_nodes; - u_int64_t id; - enum batch_type type; - struct rule rule; - struct smtpd *env; - char message_id[MAX_ID_SIZE]; - char hostname[MAXHOSTNAMELEN]; - TAILQ_HEAD(, message) messages; -}; - enum child_type { CHILD_INVALID, CHILD_DAEMON, @@ -587,13 +568,46 @@ struct session { long s_datalen; struct auth s_auth; - struct batch *batch; FILE *datafp; int mboxfd; int messagefd; }; + +/* ram-queue structures */ +struct ramqueue_host { + RB_ENTRY(ramqueue_host) host_entry; + TAILQ_HEAD(,ramqueue_batch) batch_queue; + u_int64_t h_id; + char hostname[MAXHOSTNAMELEN]; +}; +struct ramqueue_batch { + TAILQ_ENTRY(ramqueue_batch) batch_entry; + TAILQ_HEAD(,ramqueue_envelope) envelope_queue; + enum message_type type; + u_int64_t h_id; + u_int64_t b_id; + char m_id[MAX_ID_SIZE]; + struct rule rule; +}; +struct ramqueue_envelope { + TAILQ_ENTRY(ramqueue_envelope) queue_entry; + TAILQ_ENTRY(ramqueue_envelope) batchqueue_entry; + struct ramqueue_host *host; + struct ramqueue_batch *batch; + char id[MAX_ID_SIZE]; + time_t sched; +}; + +struct ramqueue { + struct smtpd *env; + struct ramqueue_envelope *current_evp; + RB_HEAD(hosttree, ramqueue_host) hosttree; + TAILQ_HEAD(,ramqueue_envelope) queue; +}; + + struct smtpd { char sc_conffile[MAXPATHLEN]; size_t sc_maxsize; @@ -619,14 +633,14 @@ struct smtpd { char *sc_title[PROC_COUNT]; struct passwd *sc_pw; char sc_hostname[MAXHOSTNAMELEN]; + struct ramqueue sc_rqueue; + TAILQ_HEAD(listenerlist, listener) *sc_listeners; TAILQ_HEAD(maplist, map) *sc_maps, *sc_maps_reload; TAILQ_HEAD(rulelist, rule) *sc_rules, *sc_rules_reload; SPLAY_HEAD(sessiontree, session) sc_sessions; SPLAY_HEAD(msgtree, message) sc_messages; SPLAY_HEAD(ssltree, ssl) *sc_ssl; - - SPLAY_HEAD(batchtree, batch) batch_queue; SPLAY_HEAD(childtree, child) children; SPLAY_HEAD(lkatree, lkasession) lka_sessions; SPLAY_HEAD(dnstree, dnssession) dns_sessions; @@ -702,6 +716,15 @@ struct s_lka { size_t queries_failure; }; +struct s_ramqueue { + size_t hosts; + size_t batches; + size_t envelopes; + size_t hosts_max; + size_t batches_max; + size_t envelopes_max; +}; + struct stats { struct s_parent parent; struct s_queue queue; @@ -711,18 +734,7 @@ struct stats { struct s_session smtp; struct s_control control; struct s_lka lka; -}; - -struct sched { - int fd; - char mid[MAX_ID_SIZE]; - int ret; -}; - -struct remove { - int fd; - char mid[MAX_ID_SIZE]; - int ret; + struct s_ramqueue ramqueue; }; struct reload { @@ -868,6 +880,7 @@ struct mta_session { struct event ev; char *cert; void *pcb; + struct ramqueue_batch *batch; }; @@ -1001,8 +1014,6 @@ int queue_update_envelope(struct message *); int queue_remove_envelope(struct message *); void queue_submit_envelope(struct smtpd *, struct message *); void queue_commit_envelopes(struct smtpd *, struct message*); -int batch_cmp(struct batch *, struct batch *); -struct batch *batch_by_id(struct smtpd *, u_int64_t); u_int16_t queue_hash(char *); @@ -1024,7 +1035,7 @@ void bounce_delete_message(char *); int bounce_record_envelope(struct message *); int bounce_remove_envelope(struct message *); int bounce_commit_message(struct message *); -int bounce_record_message(struct message *); +int bounce_record_message(struct message *, struct message *); int queue_create_incoming_layout(char *); void queue_delete_incoming_message(char *); int queue_record_incoming_envelope(struct message *); @@ -1041,10 +1052,27 @@ void show_queue(char *, int); u_int16_t queue_hash(char *); +/* ramqueue.c */ +void ramqueue_init(struct smtpd *, struct ramqueue *); +int ramqueue_load(struct ramqueue *, time_t *); +int ramqueue_load_offline(struct ramqueue *); +int ramqueue_host_cmp(struct ramqueue_host *, struct ramqueue_host *); +void ramqueue_remove(struct ramqueue *, struct ramqueue_envelope *); +int ramqueue_is_empty(struct ramqueue *); +int ramqueue_is_empty(struct ramqueue *); +int ramqueue_batch_is_empty(struct ramqueue_batch *); +int ramqueue_host_is_empty(struct ramqueue_host *); +void ramqueue_remove_batch(struct ramqueue_host *, struct ramqueue_batch *); +void ramqueue_remove_host(struct ramqueue *, struct ramqueue_host *); +struct ramqueue_envelope *ramqueue_first_envelope(struct ramqueue *); +struct ramqueue_envelope *ramqueue_next_envelope(struct ramqueue *); +struct ramqueue_envelope *ramqueue_batch_first_envelope(struct ramqueue_batch *); +RB_PROTOTYPE(hosttree, ramqueue_host, host_entry, ramqueue_host_cmp); + + /* runner.c */ pid_t runner(struct smtpd *); void message_reset_flags(struct message *); -SPLAY_PROTOTYPE(batchtree, batch, b_nodes, batch_cmp); /* smtp.c */ diff --git a/usr.sbin/smtpd/smtpd/Makefile b/usr.sbin/smtpd/smtpd/Makefile index e78cef4315d..b244726c86d 100644 --- a/usr.sbin/smtpd/smtpd/Makefile +++ b/usr.sbin/smtpd/smtpd/Makefile @@ -1,4 +1,4 @@ -# $OpenBSD: Makefile,v 1.22 2010/11/29 15:25:56 gilles Exp $ +# $OpenBSD: Makefile,v 1.23 2011/04/13 20:53:18 gilles Exp $ PROG= smtpd SRCS= aliases.c authenticate.c bounce.c client.c \ @@ -7,7 +7,7 @@ SRCS= aliases.c authenticate.c bounce.c client.c \ mfa.c mta.c parse.y queue.c queue_shared.c ruleset.c \ runner.c smtp.c smtp_session.c smtpd.c ssl.c \ ssl_privsep.c util.c asr.c print.c pack.c dname.c \ - res_random.c sockaddr.c + res_random.c sockaddr.c ramqueue.c MAN= smtpd.8 smtpd.conf.5 BINDIR= /usr/sbin -- cgit v1.2.3