diff options
author | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-12-05 02:51:33 +0000 |
---|---|---|
committer | Gilles Chehade <gilles@cvs.openbsd.org> | 2008-12-05 02:51:33 +0000 |
commit | 747f62b5dbedca879d99609ee63547c8d6d68e49 (patch) | |
tree | 2425a8308ec1b78b4e6b95c3b4a061ef0daffd87 /usr.sbin | |
parent | 695bc4c79dcf8db0af4dc04498a5753d14ca39ad (diff) |
- last part of the new queue code: the runner process (unprivileged and
chrooted) is now in charge of doing the scheduling of deliveries,
and the dispatching of messages to MDA and MTA. queue process only
does inserts/updates/removals from the queue and can no longer be
so busy that it delays answers to imsg from smtp server.
Diffstat (limited to 'usr.sbin')
-rw-r--r-- | usr.sbin/smtpd/control.c | 52 | ||||
-rw-r--r-- | usr.sbin/smtpd/lka.c | 55 | ||||
-rw-r--r-- | usr.sbin/smtpd/mda.c | 57 | ||||
-rw-r--r-- | usr.sbin/smtpd/mta.c | 96 | ||||
-rw-r--r-- | usr.sbin/smtpd/queue.c | 604 | ||||
-rw-r--r-- | usr.sbin/smtpd/runner.c | 896 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.c | 8 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd.h | 12 | ||||
-rw-r--r-- | usr.sbin/smtpd/smtpd/Makefile | 4 |
9 files changed, 1188 insertions, 596 deletions
diff --git a/usr.sbin/smtpd/control.c b/usr.sbin/smtpd/control.c index 6d9635c87c8..6cfcea70d70 100644 --- a/usr.sbin/smtpd/control.c +++ b/usr.sbin/smtpd/control.c @@ -1,4 +1,4 @@ -/* $OpenBSD: control.c,v 1.3 2008/11/17 21:27:03 chl Exp $ */ +/* $OpenBSD: control.c,v 1.4 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -57,6 +57,7 @@ void control_dispatch_ext(int, short, void *); void control_dispatch_lka(int, short, void *); void control_dispatch_mfa(int, short, void *); void control_dispatch_queue(int, short, void *); +void control_dispatch_runner(int, short, void *); struct ctl_connlist ctl_conns; @@ -86,6 +87,7 @@ control(struct smtpd *env) struct event ev_sigterm; struct peer peers [] = { { PROC_QUEUE, control_dispatch_queue }, + { PROC_RUNNER, control_dispatch_runner }, }; switch (pid = fork()) { @@ -158,7 +160,7 @@ control(struct smtpd *env) TAILQ_INIT(&ctl_conns); - config_peers(env, peers, 1); + config_peers(env, peers, 2); control_listen(env); event_dispatch(); control_shutdown(); @@ -460,6 +462,52 @@ control_dispatch_queue(int sig, short event, void *p) } void +control_dispatch_runner(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_RUNNER]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("control_dispatch_runner: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { + default: + log_debug("control_dispatch_runner: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void session_socket_blockmode(int fd, enum blockmodes bm) { int flags; diff --git a/usr.sbin/smtpd/lka.c b/usr.sbin/smtpd/lka.c index 43b4daa2ce4..0f74cfe1954 100644 --- a/usr.sbin/smtpd/lka.c +++ b/usr.sbin/smtpd/lka.c @@ -1,4 +1,4 @@ -/* $OpenBSD: lka.c,v 1.4 2008/11/25 20:26:40 gilles Exp $ */ +/* $OpenBSD: lka.c,v 1.5 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -49,6 +49,7 @@ void lka_dispatch_parent(int, short, void *); void lka_dispatch_mfa(int, short, void *); void lka_dispatch_smtp(int, short, void *); void lka_dispatch_queue(int, short, void *); +void lka_dispatch_runner(int, short, void *); void lka_setup_events(struct smtpd *); void lka_disable_events(struct smtpd *); int lka_verify_mail(struct smtpd *, struct path *); @@ -470,6 +471,52 @@ lka_dispatch_queue(int sig, short event, void *p) break; } + default: + log_debug("lka_dispatch_queue: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +lka_dispatch_runner(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_RUNNER]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("lka_dispatch_runner: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { case IMSG_LKA_MX_LOOKUP: { struct batch *batchp; struct addrinfo hints, *res, *resp; @@ -545,8 +592,9 @@ lka_dispatch_queue(int sig, short event, void *p) break; } + default: - log_debug("lka_dispatch_queue: unexpected imsg %d", + log_debug("lka_dispatch_runner: unexpected imsg %d", imsg.hdr.type); break; } @@ -586,6 +634,7 @@ lka(struct smtpd *env) { PROC_MFA, lka_dispatch_mfa }, { PROC_QUEUE, lka_dispatch_queue }, { PROC_SMTP, lka_dispatch_smtp }, + { PROC_RUNNER, lka_dispatch_runner }, }; switch (pid = fork()) { @@ -620,7 +669,7 @@ lka(struct smtpd *env) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); - config_peers(env, peers, 4); + config_peers(env, peers, 5); lka_setup_events(env); event_dispatch(); diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c index ff2b62519ee..2532eb649b8 100644 --- a/usr.sbin/smtpd/mda.c +++ b/usr.sbin/smtpd/mda.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mda.c,v 1.2 2008/11/05 12:14:45 sobrado Exp $ */ +/* $OpenBSD: mda.c,v 1.3 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -39,6 +39,7 @@ __dead void mda_shutdown(void); void mda_sig_handler(int, short, void *); void mda_dispatch_parent(int, short, void *); void mda_dispatch_queue(int, short, void *); +void mda_dispatch_runner(int, short, void *); void mda_setup_events(struct smtpd *); void mda_disable_events(struct smtpd *); void mda_timeout(int, short, void *); @@ -217,6 +218,52 @@ mda_dispatch_queue(int sig, short event, void *p) break; switch (imsg.hdr.type) { + default: + log_debug("parent_dispatch_queue: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +mda_dispatch_runner(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_RUNNER]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("parent_dispatch_runner: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { case IMSG_CREATE_BATCH: { struct batch *batchp; @@ -269,9 +316,8 @@ mda_dispatch_queue(int sig, short event, void *p) break; } - default: - log_debug("parent_dispatch_queue: unexpected imsg %d", + log_debug("parent_dispatch_runner: unexpected imsg %d", imsg.hdr.type); break; } @@ -327,7 +373,8 @@ mda(struct smtpd *env) struct peer peers[] = { { PROC_PARENT, mda_dispatch_parent }, - { PROC_QUEUE, mda_dispatch_queue } + { PROC_QUEUE, mda_dispatch_queue }, + { PROC_RUNNER, mda_dispatch_runner } }; switch (pid = fork()) { @@ -373,7 +420,7 @@ mda(struct smtpd *env) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); - config_peers(env, peers, 2); + config_peers(env, peers, 3); mda_setup_events(env); event_dispatch(); diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c index a16a5b011a9..7e2f425b082 100644 --- a/usr.sbin/smtpd/mta.c +++ b/usr.sbin/smtpd/mta.c @@ -1,4 +1,4 @@ -/* $OpenBSD: mta.c,v 1.7 2008/11/25 20:26:40 gilles Exp $ */ +/* $OpenBSD: mta.c,v 1.8 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> @@ -44,6 +44,7 @@ __dead void mta_shutdown(void); void mta_sig_handler(int, short, void *); void mta_dispatch_parent(int, short, void *); void mta_dispatch_queue(int, short, void *); +void mta_dispatch_runner(int, short, void *); void mta_setup_events(struct smtpd *); void mta_disable_events(struct smtpd *); void mta_timeout(int, short, void *); @@ -150,6 +151,72 @@ mta_dispatch_queue(int sig, short event, void *p) break; switch (imsg.hdr.type) { + case IMSG_QUEUE_MESSAGE_FD: { + struct batch *batchp; + int fd; + + if ((fd = imsg_get_fd(ibuf, &imsg)) == -1) { + /* NEEDS_FIX - unsure yet how it must be handled */ + errx(1, "imsg_get_fd"); + } + + batchp = (struct batch *)imsg.data; + batchp = batch_by_id(env, batchp->id); + + if ((batchp->messagefp = fdopen(fd, "r")) == NULL) + err(1, "fdopen"); + + evbuffer_add_printf(batchp->bev->output, "DATA\r\n"); + + bufferevent_enable(batchp->bev, EV_WRITE|EV_READ); + break; + } + default: + log_debug("parent_dispatch_mta: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +mta_dispatch_runner(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_RUNNER]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("mta_dispatch_runner: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { case IMSG_CREATE_BATCH: { struct batch *batchp; @@ -202,28 +269,8 @@ mta_dispatch_queue(int sig, short event, void *p) } break; } - case IMSG_QUEUE_MESSAGE_FD: { - struct batch *batchp; - int fd; - - if ((fd = imsg_get_fd(ibuf, &imsg)) == -1) { - /* NEEDS_FIX - unsure yet how it must be handled */ - errx(1, "imsg_get_fd"); - } - - batchp = (struct batch *)imsg.data; - batchp = batch_by_id(env, batchp->id); - - if ((batchp->messagefp = fdopen(fd, "r")) == NULL) - err(1, "fdopen"); - - evbuffer_add_printf(batchp->bev->output, "DATA\r\n"); - - bufferevent_enable(batchp->bev, EV_WRITE|EV_READ); - break; - } default: - log_debug("parent_dispatch_mta: unexpected imsg %d", + log_debug("mta_dispatch_runner: unexpected imsg %d", imsg.hdr.type); break; } @@ -277,7 +324,8 @@ mta(struct smtpd *env) struct event ev_sigterm; struct peer peers[] = { - { PROC_QUEUE, mta_dispatch_queue } + { PROC_QUEUE, mta_dispatch_queue }, + { PROC_RUNNER, mta_dispatch_runner } }; switch (pid = fork()) { @@ -320,7 +368,7 @@ mta(struct smtpd *env) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); - config_peers(env, peers, 1); + config_peers(env, peers, 2); SPLAY_INIT(&env->batch_queue); diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c index 6d67033a1be..f3e4fc4422c 100644 --- a/usr.sbin/smtpd/queue.c +++ b/usr.sbin/smtpd/queue.c @@ -1,4 +1,4 @@ -/* $OpenBSD: queue.c,v 1.18 2008/12/03 20:08:08 gilles Exp $ */ +/* $OpenBSD: queue.c,v 1.19 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -51,10 +51,10 @@ void queue_dispatch_smtp(int, short, void *); void queue_dispatch_mda(int, short, void *); void queue_dispatch_mta(int, short, void *); void queue_dispatch_lka(int, short, void *); +void queue_dispatch_runner(int, short, void *); void queue_setup_events(struct smtpd *); void queue_disable_events(struct smtpd *); void queue_timeout(int, short, void *); -void queue_process_runqueue(int, short, void *); int queue_create_incoming_layout(char *); int queue_record_envelope(struct message *); int queue_remove_envelope(struct message *); @@ -81,7 +81,6 @@ void batch_send(struct smtpd *, struct batch *, time_t); u_int32_t hash(u_int8_t *, size_t); struct batch *queue_record_batch(struct smtpd *, struct message *); struct batch *batch_by_id(struct smtpd *, u_int64_t); -struct batch *batch_lookup(struct smtpd *, struct message *); struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t); void @@ -541,12 +540,54 @@ queue_dispatch_lka(int sig, short event, void *p) break; } - case IMSG_LKA_MX_LOOKUP: { - queue_batch_resolved(env, imsg.data); + default: + log_debug("queue_dispatch_lka: unexpected imsg %d", + imsg.hdr.type); break; } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +queue_dispatch_runner(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_RUNNER]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("queue_dispatch_runner: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { default: - log_debug("queue_dispatch_lka: unexpected imsg %d", + log_debug("queue_dispatch_runner: unexpected imsg %d", imsg.hdr.type); break; } @@ -565,57 +606,11 @@ queue_shutdown(void) void queue_setup_events(struct smtpd *env) { - struct timeval tv; - - evtimer_set(&env->sc_ev, queue_timeout, env); - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_ev, &tv); - - evtimer_set(&env->sc_rqev, queue_process_runqueue, env); - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_rqev, &tv); } void queue_disable_events(struct smtpd *env) { - evtimer_del(&env->sc_ev); -} - -void -queue_timeout(int fd, short event, void *p) -{ - struct smtpd *env = p; - struct timeval tv; - time_t curtime; - struct batch *batchp, *nxt; - - queue_process(env); - - curtime = time(NULL); - - for (batchp = SPLAY_MIN(batchtree, &env->batch_queue); - batchp != NULL; - batchp = nxt) { - nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp); - if ((batchp->type & T_MTA_BATCH) && - (batchp->flags & F_BATCH_RESOLVED) == 0) { - continue; - } - - batch_send(env, batchp, curtime); - - SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); - bzero(batchp, sizeof(struct batch)); - free(batchp); - - } - - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_ev, &tv); } pid_t @@ -632,7 +627,8 @@ queue(struct smtpd *env) { PROC_SMTP, queue_dispatch_smtp }, { PROC_MDA, queue_dispatch_mda }, { PROC_MTA, queue_dispatch_mta }, - { PROC_LKA, queue_dispatch_lka } + { PROC_LKA, queue_dispatch_lka }, + { PROC_RUNNER, queue_dispatch_runner } }; switch (pid = fork()) { @@ -667,8 +663,6 @@ queue(struct smtpd *env) fatal("queue: cannot drop privileges"); #endif - SPLAY_INIT(&env->batch_queue); - event_init(); signal_set(&ev_sigint, SIGINT, queue_sig_handler, env); @@ -678,7 +672,7 @@ queue(struct smtpd *env) signal(SIGPIPE, SIG_IGN); signal(SIGHUP, SIG_IGN); - config_peers(env, peers, 5); + config_peers(env, peers, 6); queue_setup_events(env); event_dispatch(); @@ -687,257 +681,6 @@ queue(struct smtpd *env) return (0); } -void -queue_process(struct smtpd *env) -{ - u_int16_t cbucket = 0; - static u_int16_t lbucket = 0; - DIR *dirp; - struct dirent *dp; - const char *errstr; - static u_int8_t bucketdone = 1; - - if (! bucketdone) { - bucketdone = queue_process_bucket(env, lbucket); - if (bucketdone) - lbucket = (lbucket + 1) % DIRHASH_BUCKETS; - return; - } - - dirp = opendir(PATH_QUEUE); - if (dirp == NULL) - fatal("queue_process: opendir"); - - while ((dp = readdir(dirp)) != NULL) { - - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - - cbucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); - if (errstr) { - log_warn("queue_process: %s/%s is not a valid bucket", - PATH_QUEUE, dp->d_name); - continue; - } - - if (cbucket == lbucket) - break; - } - closedir(dirp); - - if (dp == NULL) { - lbucket = (lbucket + 1) % DIRHASH_BUCKETS; - return; - } - - bucketdone = queue_process_bucket(env, cbucket); - if (bucketdone) - lbucket = (lbucket + 1) % DIRHASH_BUCKETS; -} - -int -queue_process_bucket(struct smtpd *env, u_int16_t bucket) -{ - int spret; - static DIR *dirp = NULL; - struct dirent *dp; - static char *msgid = NULL; - char bucketpath[MAXPATHLEN]; - static u_int8_t messagedone = 1; - - if (! messagedone) { - messagedone = queue_process_message(env, msgid); - if (! messagedone) - return 0; - msgid = NULL; - } - - spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_bucket: snprintf"); - - if (dirp == NULL) { - dirp = opendir(bucketpath); - if (dirp == NULL) - fatal("queue_process_bucket: opendir"); - } - - while ((dp = readdir(dirp)) != NULL) { - - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - - break; - } - - if (dp != NULL) { - msgid = dp->d_name; - messagedone = queue_process_message(env, msgid); - if (! messagedone) - return 0; - msgid = NULL; - } - - closedir(dirp); - dirp = NULL; - return 1; -} - -int -queue_process_message(struct smtpd *env, char *messageid) -{ - int spret; - static DIR *dirp = NULL; - struct dirent *dp; - char evppath[MAXPATHLEN]; - u_int16_t hval = 0; - - hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS; - - spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval, - messageid, PATH_ENVELOPES); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_message: snprintf"); - - if (dirp == NULL) { - dirp = opendir(evppath); - if (dirp == NULL) - fatal("queue_process_message: opendir"); - } - - while ((dp = readdir(dirp)) != NULL) { - - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - break; - } - - if (dp != NULL) { - queue_process_envelope(env, messageid, dp->d_name); - return 0; - } - - closedir(dirp); - dirp = NULL; - return 1; -} - -void -queue_process_envelope(struct smtpd *env, char *msgid, char *evpid) -{ - int spret; - struct message message; - time_t tm; - char evppath[MAXPATHLEN]; - char rqpath[MAXPATHLEN]; - u_int16_t hval; - struct stat sb; - - if (! queue_load_envelope(&message, evpid)) { - log_debug("failed to load envelope: %s", evpid); - return; - } - - tm = time(NULL); - - if (! queue_message_schedule(&message, tm)) { - if (message.flags & F_MESSAGE_EXPIRED) { - log_debug("message has expired, mdaemon"); - queue_remove_envelope(&message); - } - return; - } - - message.flags |= F_MESSAGE_SCHEDULED; - queue_update_envelope(&message); - - log_debug("SCHEDULED: %s", evpid); - hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; - spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval, - msgid, PATH_ENVELOPES, evpid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_envelope: snprintf"); - - spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid); - if (spret == -1 || spret >= MAXPATHLEN) - fatal("queue_process_envelope: snprintf"); - - if (stat(rqpath, &sb) == -1) { - if (errno != ENOENT) - fatal("queue_process_envelope: stat"); - - if (symlink(evppath, rqpath) == -1) { - log_info("queue_process_envelope: " - "failed to place envelope in runqueue"); - } - } -} - -void -queue_process_runqueue(int fd, short event, void *p) -{ - DIR *dirp; - struct dirent *dp; - struct message message; - struct message *messagep; - struct batch *batchp; - char pathname[MAXPATHLEN]; - time_t tm; - struct smtpd *env = p; - struct timeval tv; - - tm = time(NULL); - - dirp = opendir(PATH_RUNQUEUE); - if (dirp == NULL) - fatal("queue_process_runqueue: opendir"); - - while ((dp = readdir(dirp)) != NULL) { - if (strcmp(dp->d_name, ".") == 0 || - strcmp(dp->d_name, "..") == 0) - continue; - - /* XXX */ - snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name); - unlink(pathname); - - if (! queue_load_envelope(&message, dp->d_name)) { - log_debug("failed to load envelope"); - continue; - } - - if (message.flags & F_MESSAGE_PROCESSING) - continue; - - message.lasttry = tm; - message.flags &= ~F_MESSAGE_SCHEDULED; - message.flags |= F_MESSAGE_PROCESSING; - queue_update_envelope(&message); - - messagep = calloc(1, sizeof (struct message)); - if (messagep == NULL) - err(1, "calloc"); - *messagep = message; - - batchp = batch_lookup(env, messagep); - if (batchp != NULL) - messagep->batch_id = batchp->id; - - batchp = queue_record_batch(env, messagep); - if (messagep->batch_id == 0) - messagep->batch_id = batchp->id; - } - - closedir(dirp); - - tv.tv_sec = 0; - tv.tv_usec = 10; - evtimer_add(&env->sc_rqev, &tv); -} - u_int64_t queue_generate_id(void) { @@ -955,62 +698,6 @@ queue_generate_id(void) return (id); } -struct batch * -queue_record_batch(struct smtpd *env, struct message *messagep) -{ - struct batch *batchp; - struct path *path; - - batchp = NULL; - if (messagep->batch_id != 0) { - batchp = batch_by_id(env, messagep->batch_id); - if (batchp == NULL) - errx(1, "%s: internal inconsistency.", __func__); - } - - if (batchp == NULL) { - batchp = calloc(1, sizeof(struct batch)); - if (batchp == NULL) - err(1, "%s: calloc", __func__); - - batchp->id = queue_generate_id(); - batchp->creation = messagep->creation; - - (void)strlcpy(batchp->message_id, messagep->message_id, - sizeof(batchp->message_id)); - - TAILQ_INIT(&batchp->messages); - SPLAY_INSERT(batchtree, &env->batch_queue, batchp); - - if (messagep->type & T_DAEMON_MESSAGE) { - batchp->type = T_DAEMON_BATCH; - path = &messagep->sender; - } - else { - path = &messagep->recipient; - } - - batchp->rule = path->rule; - - (void)strlcpy(batchp->hostname, path->domain, - sizeof(batchp->hostname)); - - if (IS_MAILBOX(path->rule.r_action) || - IS_EXT(path->rule.r_action)) { - batchp->type |= T_MDA_BATCH; - } - else { - batchp->type |= T_MTA_BATCH; - imsg_compose(env->sc_ibufs[PROC_LKA], IMSG_LKA_MX_LOOKUP, 0, 0, -1, - batchp, sizeof(struct batch)); - } - } - - TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry); - - return batchp; -} - int queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep) { @@ -1028,195 +715,6 @@ queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct messa return 0; } -int -queue_batch_resolved(struct smtpd *env, struct batch *lookup) -{ - u_int32_t i; - struct batch *batchp; - - batchp = batch_by_id(env, lookup->id); - batchp->getaddrinfo_error = lookup->getaddrinfo_error; - batchp->mx_cnt = lookup->mx_cnt; - -/* - EAI_NODATA no address associated with hostname - EAI_NONAME hostname or servname not provided, or not known - EAI_PROTOCOL resolved protocol is unknown - EAI_SERVICE servname not supported for ai_socktype - EAI_SOCKTYPE ai_socktype not supported - EAI_SYSTEM system error returned in errno - - - */ - - switch (batchp->getaddrinfo_error) { - case EAI_ADDRFAMILY: - case EAI_BADFLAGS: - case EAI_BADHINTS: - case EAI_FAIL: - case EAI_FAMILY: - case EAI_NODATA: - case EAI_NONAME: - case EAI_SERVICE: - case EAI_SOCKTYPE: - case EAI_SYSTEM: - /* XXX */ - /* - * In the case of a DNS permanent error, do not generate a - * daemon message if the error originates from one already - * as this would cause a loop. Remove the initial batch as - * it will never succeed. - * - */ - return 0; - - case EAI_AGAIN: - case EAI_MEMORY: - /* XXX */ - /* - * Do not generate a daemon message if this error happened - * while processing a daemon message. Do NOT remove batch, - * it may succeed later. - */ - return 0; - - default: - batchp->flags |= F_BATCH_RESOLVED; - for (i = 0; i < batchp->mx_cnt; ++i) - batchp->mxarray[i].ss = lookup->mxarray[i].ss; - } - return 1; -} - -struct batch * -batch_lookup(struct smtpd *env, struct message *message) -{ - struct batch *batchp; - struct batch lookup; - - /* If message->batch_id != 0, we can retrieve batch by id */ - if (message->batch_id != 0) { - lookup.id = message->batch_id; - return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); - } - - /* We do not know the batch_id yet, maybe it was created but we could not - * be notified, or it just does not exist. Let's scan to see if we can do - * a match based on our message_id and flags. - */ - SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { - - if (batchp->type != message->type) - continue; - - if (strcasecmp(batchp->message_id, message->message_id) != 0) - continue; - - if (batchp->type & T_MTA_BATCH) - if (strcasecmp(batchp->hostname, message->recipient.domain) != 0) - continue; - - break; - } - - return batchp; -} - -int -batch_cmp(struct batch *s1, struct batch *s2) -{ - /* - * do not return u_int64_t's - */ - if (s1->id < s2->id) - return (-1); - - if (s1->id > s2->id) - return (1); - - return (0); -} - -int -queue_message_schedule(struct message *messagep, time_t tm) -{ - time_t delay; - - /* Batch has been in the queue for too long and expired */ - if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) { - messagep->flags |= F_MESSAGE_EXPIRED; - return 0; - } - - if (messagep->retry == 255) { - messagep->flags |= F_MESSAGE_EXPIRED; - return 0; - } - - if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0) - return 0; - - if ((messagep->flags & F_MESSAGE_PROCESSING) != 0) - return 0; - - if (messagep->lasttry == 0) - return 1; - - delay = SMTPD_QUEUE_MAXINTERVAL; - - if (messagep->type & T_MDA_MESSAGE) { - if (messagep->retry < 5) - return 1; - - if (messagep->retry < 15) - delay = (messagep->retry * 60) + arc4random() % 60; - } - - if (messagep->type & T_MTA_MESSAGE) { - if (messagep->retry < 3) - delay = SMTPD_QUEUE_INTERVAL; - else if (messagep->retry <= 7) { - delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3)); - if (delay > SMTPD_QUEUE_MAXINTERVAL) - delay = SMTPD_QUEUE_MAXINTERVAL; - } - } - - if (tm >= messagep->lasttry + delay) - return 1; - - return 0; -} - -void -batch_send(struct smtpd *env, struct batch *batchp, time_t curtime) -{ - u_int8_t proctype; - struct message *messagep; - - if ((batchp->type & (T_MDA_BATCH|T_MTA_BATCH)) == 0) - fatal("batch_send: unknown batch type"); - - if (batchp->type & T_MDA_BATCH) - proctype = PROC_MDA; - else if (batchp->type & T_MTA_BATCH) - proctype = PROC_MTA; - - imsg_compose(env->sc_ibufs[proctype], IMSG_CREATE_BATCH, 0, 0, -1, - batchp, sizeof (struct batch)); - - while ((messagep = TAILQ_FIRST(&batchp->messages))) { - imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_APPEND, 0, 0, - -1, messagep, sizeof (struct message)); - TAILQ_REMOVE(&batchp->messages, messagep, entry); - bzero(messagep, sizeof(struct message)); - free(messagep); - } - - imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_CLOSE, 0, 0, -1, - batchp, sizeof(struct batch)); -} - struct batch * batch_by_id(struct smtpd *env, u_int64_t id) { @@ -1763,5 +1261,3 @@ hash(u_int8_t *buf, size_t len) return h; } - -SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c new file mode 100644 index 00000000000..447e6e14e93 --- /dev/null +++ b/usr.sbin/smtpd/runner.c @@ -0,0 +1,896 @@ +/* $OpenBSD: runner.c,v 1.1 2008/12/05 02:51:32 gilles Exp $ */ + +/* + * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> + * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org> + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#include <sys/types.h> +#include <sys/queue.h> +#include <sys/tree.h> +#include <sys/param.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/time.h> + +#include <netinet/in.h> +#include <arpa/inet.h> + +#include <dirent.h> +#include <err.h> +#include <errno.h> +#include <event.h> +#include <fcntl.h> +#include <netdb.h> +#include <pwd.h> +#include <signal.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <time.h> +#include <unistd.h> + +#include "smtpd.h" + +__dead void runner_shutdown(void); +void runner_sig_handler(int, short, void *); +void runner_dispatch_control(int, short, void *); +void runner_dispatch_queue(int, short, void *); +void runner_dispatch_mda(int, short, void *); +void runner_dispatch_mta(int, short, void *); +void runner_dispatch_lka(int, short, void *); +void runner_setup_events(struct smtpd *); +void runner_disable_events(struct smtpd *); +void runner_timeout(int, short, void *); + +void runner_process_queue(struct smtpd *); +void runner_process_bucket(struct smtpd *, u_int16_t); +void runner_process_message(struct smtpd *, char *); +void runner_process_envelope(struct smtpd *, char *, char *); +void runner_process_runqueue(struct smtpd *); +void runner_process_batchqueue(struct smtpd *); + +int runner_batch_resolved(struct smtpd *, struct batch *); +void runner_batch_dispatch(struct smtpd *, struct batch *, time_t); + +int runner_message_schedule(struct message *, time_t); + +struct batch *batch_record(struct smtpd *, struct message *); +struct batch *batch_lookup(struct smtpd *, struct message *); + +int queue_load_envelope(struct message *, char *); +int queue_update_envelope(struct message *); +int queue_remove_envelope(struct message *); + +u_int32_t hash(u_int8_t *, size_t); + +void +runner_sig_handler(int sig, short event, void *p) +{ + switch (sig) { + case SIGINT: + case SIGTERM: + runner_shutdown(); + break; + default: + fatalx("runner_sig_handler: unexpected signal"); + } +} + +void +runner_dispatch_control(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_CONTROL]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("runner_dispatch_control: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { + default: + log_debug("queue_dispatch_control: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +runner_dispatch_queue(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_QUEUE]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("runner_dispatch_queue: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { + default: + log_debug("runner_dispatch_queue: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +runner_dispatch_mda(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_MDA]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("runner_dispatch_mda: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { + default: + log_debug("runner_dispatch_mda: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +runner_dispatch_mta(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_MTA]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("runner_dispatch_mta: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { + + default: + log_debug("runner_dispatch_mta: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +runner_dispatch_lka(int sig, short event, void *p) +{ + struct smtpd *env = p; + struct imsgbuf *ibuf; + struct imsg imsg; + ssize_t n; + + ibuf = env->sc_ibufs[PROC_LKA]; + switch (event) { + case EV_READ: + if ((n = imsg_read(ibuf)) == -1) + fatal("imsg_read_error"); + if (n == 0) { + /* this pipe is dead, so remove the event handler */ + event_del(&ibuf->ev); + event_loopexit(NULL); + return; + } + break; + case EV_WRITE: + if (msgbuf_write(&ibuf->w) == -1) + fatal("msgbuf_write"); + imsg_event_add(ibuf); + return; + default: + fatalx("unknown event"); + } + + for (;;) { + if ((n = imsg_get(ibuf, &imsg)) == -1) + fatal("runner_dispatch_lka: imsg_read error"); + if (n == 0) + break; + + switch (imsg.hdr.type) { + case IMSG_LKA_MX_LOOKUP: { + runner_batch_resolved(env, imsg.data); + break; + } + default: + log_debug("runner_dispatch_lka: unexpected imsg %d", + imsg.hdr.type); + break; + } + imsg_free(&imsg); + } + imsg_event_add(ibuf); +} + +void +runner_shutdown(void) +{ + log_info("runner handler"); + _exit(0); +} + +void +runner_setup_events(struct smtpd *env) +{ + + struct timeval tv; + + evtimer_set(&env->sc_ev, runner_timeout, env); + tv.tv_sec = 0; + tv.tv_usec = 10; + evtimer_add(&env->sc_ev, &tv); +} + +void +runner_disable_events(struct smtpd *env) +{ + evtimer_del(&env->sc_ev); +} + +pid_t +runner(struct smtpd *env) +{ + pid_t pid; + struct passwd *pw; + + struct event ev_sigint; + struct event ev_sigterm; + + struct peer peers[] = { + { PROC_CONTROL, runner_dispatch_control }, + { PROC_MDA, runner_dispatch_mda }, + { PROC_MTA, runner_dispatch_mta }, + { PROC_QUEUE, runner_dispatch_queue }, + { PROC_LKA, runner_dispatch_lka }, + }; + + switch (pid = fork()) { + case -1: + fatal("runner: cannot fork"); + case 0: + break; + default: + return (pid); + } + + purge_config(env, PURGE_EVERYTHING); + + pw = env->sc_pw; + +#ifndef DEBUG + if (chroot(PATH_SPOOL) == -1) + fatal("runner: chroot"); + if (chdir("/") == -1) + fatal("runner: chdir(\"/\")"); +#else +#warning disabling privilege revocation and chroot in DEBUG MODE +#endif + + setproctitle("runner"); + smtpd_process = PROC_RUNNER; + +#ifndef DEBUG + if (setgroups(1, &pw->pw_gid) || + setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) || + setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid)) + fatal("runner: cannot drop privileges"); +#endif + + SPLAY_INIT(&env->batch_queue); + + event_init(); + + signal_set(&ev_sigint, SIGINT, runner_sig_handler, env); + signal_set(&ev_sigterm, SIGTERM, runner_sig_handler, env); + signal_add(&ev_sigint, NULL); + signal_add(&ev_sigterm, NULL); + signal(SIGPIPE, SIG_IGN); + signal(SIGHUP, SIG_IGN); + + config_peers(env, peers, 5); + + runner_setup_events(env); + event_dispatch(); + runner_shutdown(); + + return (0); +} + +void +runner_timeout(int fd, short event, void *p) +{ + struct smtpd *env = p; + struct timeval tv; + + runner_process_queue(env); + runner_process_runqueue(env); + runner_process_batchqueue(env); + + tv.tv_sec = 1; + tv.tv_usec = 0; + evtimer_add(&env->sc_ev, &tv); +} + +void +runner_process_queue(struct smtpd *env) +{ + DIR *dirp; + struct dirent *dp; + const char *errstr; + u_int16_t bucket; + + dirp = opendir(PATH_QUEUE); + if (dirp == NULL) + fatal("queue_process: opendir"); + + while ((dp = readdir(dirp)) != NULL) { + + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + + bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr); + if (errstr) { + log_warn("queue_process: %s/%s is not a valid bucket", + PATH_QUEUE, dp->d_name); + continue; + } + + runner_process_bucket(env, bucket); + } + closedir(dirp); +} + +void +runner_process_bucket(struct smtpd *env, u_int16_t bucket) +{ + int spret; + DIR *dirp = NULL; + struct dirent *dp; + char bucketpath[MAXPATHLEN]; + + spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_process_bucket: snprintf"); + + dirp = opendir(bucketpath); + if (dirp == NULL) + fatal("queue_process_bucket: opendir"); + + while ((dp = readdir(dirp)) != NULL) { + + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + + runner_process_message(env, dp->d_name); + } + + closedir(dirp); +} + +void +runner_process_message(struct smtpd *env, char *messageid) +{ + int spret; + DIR *dirp = NULL; + struct dirent *dp; + char evppath[MAXPATHLEN]; + u_int16_t hval = 0; + + hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS; + + spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval, + messageid, PATH_ENVELOPES); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_process_message: snprintf"); + + dirp = opendir(evppath); + if (dirp == NULL) + fatal("queue_process_message: opendir"); + + while ((dp = readdir(dirp)) != NULL) { + + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + + runner_process_envelope(env, dp->d_name, dp->d_name); + } + + closedir(dirp); +} + +void +runner_process_envelope(struct smtpd *env, char *msgid, char *evpid) +{ + int spret; + struct message message; + time_t tm; + char evppath[MAXPATHLEN]; + char rqpath[MAXPATHLEN]; + u_int16_t hval; + struct stat sb; + + if (! queue_load_envelope(&message, evpid)) { + log_debug("failed to load envelope: %s", evpid); + return; + } + + tm = time(NULL); + + if (! runner_message_schedule(&message, tm)) { + if (message.flags & F_MESSAGE_EXPIRED) { + log_debug("message has expired, mdaemon"); + queue_remove_envelope(&message); + } + return; + } + + message.flags |= F_MESSAGE_SCHEDULED; + queue_update_envelope(&message); + + hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS; + spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval, + msgid, PATH_ENVELOPES, evpid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_process_envelope: snprintf"); + + spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid); + if (spret == -1 || spret >= MAXPATHLEN) + fatal("queue_process_envelope: snprintf"); + + if (stat(rqpath, &sb) == -1) { + if (errno != ENOENT) + fatal("queue_process_envelope: stat"); + + if (symlink(evppath, rqpath) == -1) { + log_info("queue_process_envelope: " + "failed to place envelope in runqueue"); + } + } +} + +void +runner_process_runqueue(struct smtpd *env) +{ + DIR *dirp; + struct dirent *dp; + struct message message; + struct message *messagep; + struct batch *batchp; + char pathname[MAXPATHLEN]; + time_t tm; + + tm = time(NULL); + + dirp = opendir(PATH_RUNQUEUE); + if (dirp == NULL) + fatal("queue_process_runqueue: opendir"); + + while ((dp = readdir(dirp)) != NULL) { + if (strcmp(dp->d_name, ".") == 0 || + strcmp(dp->d_name, "..") == 0) + continue; + + snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name); + unlink(pathname); + + if (! queue_load_envelope(&message, dp->d_name)) { + log_debug("failed to load envelope"); + continue; + } + + if (message.flags & F_MESSAGE_PROCESSING) + continue; + + message.lasttry = tm; + message.flags &= ~F_MESSAGE_SCHEDULED; + message.flags |= F_MESSAGE_PROCESSING; + queue_update_envelope(&message); + + messagep = calloc(1, sizeof (struct message)); + if (messagep == NULL) + err(1, "calloc"); + *messagep = message; + + batchp = batch_lookup(env, messagep); + if (batchp != NULL) + messagep->batch_id = batchp->id; + + batchp = batch_record(env, messagep); + if (messagep->batch_id == 0) + messagep->batch_id = batchp->id; + } + + closedir(dirp); +} + +void +runner_process_batchqueue(struct smtpd *env) +{ + time_t curtime; + struct batch *batchp, *nxt; + + curtime = time(NULL); + for (batchp = SPLAY_MIN(batchtree, &env->batch_queue); + batchp != NULL; + batchp = nxt) { + nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp); + if ((batchp->type & T_MTA_BATCH) && + (batchp->flags & F_BATCH_RESOLVED) == 0) { + continue; + } + + runner_batch_dispatch(env, batchp, curtime); + + SPLAY_REMOVE(batchtree, &env->batch_queue, batchp); + bzero(batchp, sizeof(struct batch)); + free(batchp); + } +} + +int +runner_batch_resolved(struct smtpd *env, struct batch *lookup) +{ + u_int32_t i; + struct batch *batchp; + + batchp = batch_by_id(env, lookup->id); + batchp->getaddrinfo_error = lookup->getaddrinfo_error; + batchp->mx_cnt = lookup->mx_cnt; + +/* + EAI_NODATA no address associated with hostname + EAI_NONAME hostname or servname not provided, or not known + EAI_PROTOCOL resolved protocol is unknown + EAI_SERVICE servname not supported for ai_socktype + EAI_SOCKTYPE ai_socktype not supported + EAI_SYSTEM system error returned in errno + + + */ + + switch (batchp->getaddrinfo_error) { + case EAI_ADDRFAMILY: + case EAI_BADFLAGS: + case EAI_BADHINTS: + case EAI_FAIL: + case EAI_FAMILY: + case EAI_NODATA: + case EAI_NONAME: + case EAI_SERVICE: + case EAI_SOCKTYPE: + case EAI_SYSTEM: + /* XXX */ + /* + * In the case of a DNS permanent error, do not generate a + * daemon message if the error originates from one already + * as this would cause a loop. Remove the initial batch as + * it will never succeed. + * + */ + return 0; + + case EAI_AGAIN: + case EAI_MEMORY: + /* XXX */ + /* + * Do not generate a daemon message if this error happened + * while processing a daemon message. Do NOT remove batch, + * it may succeed later. + */ + return 0; + + default: + batchp->flags |= F_BATCH_RESOLVED; + for (i = 0; i < batchp->mx_cnt; ++i) + batchp->mxarray[i].ss = lookup->mxarray[i].ss; + } + return 1; +} + +void +runner_batch_dispatch(struct smtpd *env, struct batch *batchp, time_t curtime) +{ + u_int8_t proctype; + struct message *messagep; + + if ((batchp->type & (T_MDA_BATCH|T_MTA_BATCH)) == 0) + fatal("batch_send: unknown batch type"); + + if (batchp->type & T_MDA_BATCH) + proctype = PROC_MDA; + else if (batchp->type & T_MTA_BATCH) + proctype = PROC_MTA; + + imsg_compose(env->sc_ibufs[proctype], IMSG_CREATE_BATCH, 0, 0, -1, + batchp, sizeof (struct batch)); + + while ((messagep = TAILQ_FIRST(&batchp->messages))) { + imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_APPEND, 0, 0, + -1, messagep, sizeof (struct message)); + TAILQ_REMOVE(&batchp->messages, messagep, entry); + bzero(messagep, sizeof(struct message)); + free(messagep); + } + + imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_CLOSE, 0, 0, -1, + batchp, sizeof(struct batch)); +} + +int +runner_message_schedule(struct message *messagep, time_t tm) +{ + time_t delay; + + /* Batch has been in the queue for too long and expired */ + if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) { + messagep->flags |= F_MESSAGE_EXPIRED; + return 0; + } + + if (messagep->retry == 255) { + messagep->flags |= F_MESSAGE_EXPIRED; + return 0; + } + + if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0) + return 0; + + if ((messagep->flags & F_MESSAGE_PROCESSING) != 0) + return 0; + + if (messagep->lasttry == 0) + return 1; + + delay = SMTPD_QUEUE_MAXINTERVAL; + + if (messagep->type & T_MDA_MESSAGE) { + if (messagep->retry < 5) + return 1; + + if (messagep->retry < 15) + delay = (messagep->retry * 60) + arc4random() % 60; + } + + if (messagep->type & T_MTA_MESSAGE) { + if (messagep->retry < 3) + delay = SMTPD_QUEUE_INTERVAL; + else if (messagep->retry <= 7) { + delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3)); + if (delay > SMTPD_QUEUE_MAXINTERVAL) + delay = SMTPD_QUEUE_MAXINTERVAL; + } + } + + if (tm >= messagep->lasttry + delay) + return 1; + + return 0; +} + +struct batch * +batch_record(struct smtpd *env, struct message *messagep) +{ + struct batch *batchp; + struct path *path; + + batchp = NULL; + if (messagep->batch_id != 0) { + batchp = batch_by_id(env, messagep->batch_id); + if (batchp == NULL) + errx(1, "%s: internal inconsistency.", __func__); + } + if (batchp == NULL) { + batchp = calloc(1, sizeof(struct batch)); + if (batchp == NULL) + err(1, "%s: calloc", __func__); + + batchp->id = queue_generate_id(); + batchp->creation = messagep->creation; + + (void)strlcpy(batchp->message_id, messagep->message_id, + sizeof(batchp->message_id)); + TAILQ_INIT(&batchp->messages); + SPLAY_INSERT(batchtree, &env->batch_queue, batchp); + + if (messagep->type & T_DAEMON_MESSAGE) { + batchp->type = T_DAEMON_BATCH; + path = &messagep->sender; + } + else { + path = &messagep->recipient; + } + batchp->rule = path->rule; + + (void)strlcpy(batchp->hostname, path->domain, + sizeof(batchp->hostname)); + + if (IS_MAILBOX(path->rule.r_action) || + IS_EXT(path->rule.r_action)) { + batchp->type |= T_MDA_BATCH; + } + else { + batchp->type |= T_MTA_BATCH; + imsg_compose(env->sc_ibufs[PROC_LKA], IMSG_LKA_MX_LOOKUP, 0, 0, -1, + batchp, sizeof(struct batch)); + } + } + + TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry); + return batchp; +} + +struct batch * +batch_lookup(struct smtpd *env, struct message *message) +{ + struct batch *batchp; + struct batch lookup; + + /* If message->batch_id != 0, we can retrieve batch by id */ + if (message->batch_id != 0) { + lookup.id = message->batch_id; + return SPLAY_FIND(batchtree, &env->batch_queue, &lookup); + } + + /* We do not know the batch_id yet, maybe it was created but we could not + * be notified, or it just does not exist. Let's scan to see if we can do + * a match based on our message_id and flags. + */ + SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) { + + if (batchp->type != message->type) + continue; + + if (strcasecmp(batchp->message_id, message->message_id) != 0) + continue; + + if (batchp->type & T_MTA_BATCH) + if (strcasecmp(batchp->hostname, message->recipient.domain) != 0) + continue; + + break; + } + + return batchp; +} + +int +batch_cmp(struct batch *s1, struct batch *s2) +{ + /* + * do not return u_int64_t's + */ + if (s1->id < s2->id) + return (-1); + + if (s1->id > s2->id) + return (1); + + return (0); +} + +SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp); diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c index 1dd188dd22e..2cfc9760feb 100644 --- a/usr.sbin/smtpd/smtpd.c +++ b/usr.sbin/smtpd/smtpd.c @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.c,v 1.12 2008/12/03 17:58:00 gilles Exp $ */ +/* $OpenBSD: smtpd.c,v 1.13 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -68,6 +68,7 @@ pid_t mda_pid = 0; pid_t mta_pid = 0; pid_t control_pid = 0; pid_t smtp_pid = 0; +pid_t runner_pid = 0; int __b64_pton(char const *, unsigned char *, size_t); @@ -94,7 +95,8 @@ parent_shutdown(void) mda_pid, mta_pid, control_pid, - smtp_pid + smtp_pid, + runner_pid }; for (i = 0; i < sizeof(pids) / sizeof(pid); i++) @@ -427,6 +429,7 @@ parent_sig_handler(int sig, short event, void *p) { mta_pid, "mail transfer agent" }, { control_pid, "control process" }, { smtp_pid, "smtp server" }, + { runner_pid, "runner" }, { 0, NULL }, }; @@ -571,6 +574,7 @@ main(int argc, char *argv[]) mta_pid = mta(&env); smtp_pid = smtp(&env); control_pid = control(&env); + runner_pid = runner(&env); setproctitle("parent"); SPLAY_INIT(&env.mdaproc_queue); diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h index 62710a22d7a..c3695e761ee 100644 --- a/usr.sbin/smtpd/smtpd.h +++ b/usr.sbin/smtpd/smtpd.h @@ -1,4 +1,4 @@ -/* $OpenBSD: smtpd.h,v 1.19 2008/12/04 17:24:13 cloder Exp $ */ +/* $OpenBSD: smtpd.h,v 1.20 2008/12/05 02:51:32 gilles Exp $ */ /* * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org> @@ -19,7 +19,7 @@ #define CONF_FILE "/etc/mail/smtpd.conf" #define MAX_LISTEN 16 -#define PROC_COUNT 8 +#define PROC_COUNT 9 #define READ_BUF_SIZE 32768 #define MAX_NAME_SIZE 64 @@ -251,6 +251,7 @@ enum smtp_proc_type { PROC_MDA, PROC_MTA, PROC_CONTROL, + PROC_RUNNER, } smtpd_process; struct peer { @@ -623,7 +624,6 @@ struct smtpd { u_int32_t sc_flags; struct timeval sc_qintval; struct event sc_ev; - struct event sc_rqev; int sc_pipes[PROC_COUNT] [PROC_COUNT][2]; struct imsgbuf *sc_ibufs[PROC_COUNT]; @@ -717,7 +717,6 @@ int batch_cmp(struct batch *, struct batch *); struct batch *batch_by_id(struct smtpd *, u_int64_t); struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t); int queue_remove_batch_message(struct smtpd *, struct batch *, struct message *); -SPLAY_PROTOTYPE(batchtree, batch, b_nodes, batch_cmp); /* mda.c */ pid_t mda(struct smtpd *); @@ -731,6 +730,11 @@ pid_t mta(struct smtpd *); pid_t control(struct smtpd *); void session_socket_blockmode(int, enum blockmodes); +/* runner.c */ +pid_t runner(struct smtpd *); +SPLAY_PROTOTYPE(batchtree, batch, b_nodes, batch_cmp); + + /* smtp.c */ pid_t smtp(struct smtpd *); void smtp_listener_setup(struct smtpd *, struct listener *); diff --git a/usr.sbin/smtpd/smtpd/Makefile b/usr.sbin/smtpd/smtpd/Makefile index 78247187961..83a10fe1500 100644 --- a/usr.sbin/smtpd/smtpd/Makefile +++ b/usr.sbin/smtpd/smtpd/Makefile @@ -1,11 +1,11 @@ -# $OpenBSD: Makefile,v 1.1 2008/12/04 13:36:58 todd Exp $ +# $OpenBSD: Makefile,v 1.2 2008/12/05 02:51:32 gilles Exp $ PROG= smtpd SRCS= parse.y log.c config.c buffer.c imsg.c \ smtpd.c lka.c mfa.c queue.c mta.c mda.c control.c \ smtp.c smtp_session.c store.c \ ssl.c ssl_privsep.c dns.c aliases.c forward.c \ - map.c + map.c runner.c MAN= smtpd.8 smtpd.conf.5 BINDIR= /usr/sbin |