summaryrefslogtreecommitdiff
path: root/usr.sbin
diff options
context:
space:
mode:
authorGilles Chehade <gilles@cvs.openbsd.org>2008-12-05 02:51:33 +0000
committerGilles Chehade <gilles@cvs.openbsd.org>2008-12-05 02:51:33 +0000
commit747f62b5dbedca879d99609ee63547c8d6d68e49 (patch)
tree2425a8308ec1b78b4e6b95c3b4a061ef0daffd87 /usr.sbin
parent695bc4c79dcf8db0af4dc04498a5753d14ca39ad (diff)
- last part of the new queue code: the runner process (unprivileged and
chrooted) is now in charge of doing the scheduling of deliveries, and the dispatching of messages to MDA and MTA. queue process only does inserts/updates/removals from the queue and can no longer be so busy that it delays answers to imsg from smtp server.
Diffstat (limited to 'usr.sbin')
-rw-r--r--usr.sbin/smtpd/control.c52
-rw-r--r--usr.sbin/smtpd/lka.c55
-rw-r--r--usr.sbin/smtpd/mda.c57
-rw-r--r--usr.sbin/smtpd/mta.c96
-rw-r--r--usr.sbin/smtpd/queue.c604
-rw-r--r--usr.sbin/smtpd/runner.c896
-rw-r--r--usr.sbin/smtpd/smtpd.c8
-rw-r--r--usr.sbin/smtpd/smtpd.h12
-rw-r--r--usr.sbin/smtpd/smtpd/Makefile4
9 files changed, 1188 insertions, 596 deletions
diff --git a/usr.sbin/smtpd/control.c b/usr.sbin/smtpd/control.c
index 6d9635c87c8..6cfcea70d70 100644
--- a/usr.sbin/smtpd/control.c
+++ b/usr.sbin/smtpd/control.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: control.c,v 1.3 2008/11/17 21:27:03 chl Exp $ */
+/* $OpenBSD: control.c,v 1.4 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -57,6 +57,7 @@ void control_dispatch_ext(int, short, void *);
void control_dispatch_lka(int, short, void *);
void control_dispatch_mfa(int, short, void *);
void control_dispatch_queue(int, short, void *);
+void control_dispatch_runner(int, short, void *);
struct ctl_connlist ctl_conns;
@@ -86,6 +87,7 @@ control(struct smtpd *env)
struct event ev_sigterm;
struct peer peers [] = {
{ PROC_QUEUE, control_dispatch_queue },
+ { PROC_RUNNER, control_dispatch_runner },
};
switch (pid = fork()) {
@@ -158,7 +160,7 @@ control(struct smtpd *env)
TAILQ_INIT(&ctl_conns);
- config_peers(env, peers, 1);
+ config_peers(env, peers, 2);
control_listen(env);
event_dispatch();
control_shutdown();
@@ -460,6 +462,52 @@ control_dispatch_queue(int sig, short event, void *p)
}
void
+control_dispatch_runner(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_RUNNER];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("control_dispatch_runner: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
+ default:
+ log_debug("control_dispatch_runner: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
session_socket_blockmode(int fd, enum blockmodes bm)
{
int flags;
diff --git a/usr.sbin/smtpd/lka.c b/usr.sbin/smtpd/lka.c
index 43b4daa2ce4..0f74cfe1954 100644
--- a/usr.sbin/smtpd/lka.c
+++ b/usr.sbin/smtpd/lka.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: lka.c,v 1.4 2008/11/25 20:26:40 gilles Exp $ */
+/* $OpenBSD: lka.c,v 1.5 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -49,6 +49,7 @@ void lka_dispatch_parent(int, short, void *);
void lka_dispatch_mfa(int, short, void *);
void lka_dispatch_smtp(int, short, void *);
void lka_dispatch_queue(int, short, void *);
+void lka_dispatch_runner(int, short, void *);
void lka_setup_events(struct smtpd *);
void lka_disable_events(struct smtpd *);
int lka_verify_mail(struct smtpd *, struct path *);
@@ -470,6 +471,52 @@ lka_dispatch_queue(int sig, short event, void *p)
break;
}
+ default:
+ log_debug("lka_dispatch_queue: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+lka_dispatch_runner(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_RUNNER];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("lka_dispatch_runner: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
case IMSG_LKA_MX_LOOKUP: {
struct batch *batchp;
struct addrinfo hints, *res, *resp;
@@ -545,8 +592,9 @@ lka_dispatch_queue(int sig, short event, void *p)
break;
}
+
default:
- log_debug("lka_dispatch_queue: unexpected imsg %d",
+ log_debug("lka_dispatch_runner: unexpected imsg %d",
imsg.hdr.type);
break;
}
@@ -586,6 +634,7 @@ lka(struct smtpd *env)
{ PROC_MFA, lka_dispatch_mfa },
{ PROC_QUEUE, lka_dispatch_queue },
{ PROC_SMTP, lka_dispatch_smtp },
+ { PROC_RUNNER, lka_dispatch_runner },
};
switch (pid = fork()) {
@@ -620,7 +669,7 @@ lka(struct smtpd *env)
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
- config_peers(env, peers, 4);
+ config_peers(env, peers, 5);
lka_setup_events(env);
event_dispatch();
diff --git a/usr.sbin/smtpd/mda.c b/usr.sbin/smtpd/mda.c
index ff2b62519ee..2532eb649b8 100644
--- a/usr.sbin/smtpd/mda.c
+++ b/usr.sbin/smtpd/mda.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mda.c,v 1.2 2008/11/05 12:14:45 sobrado Exp $ */
+/* $OpenBSD: mda.c,v 1.3 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -39,6 +39,7 @@ __dead void mda_shutdown(void);
void mda_sig_handler(int, short, void *);
void mda_dispatch_parent(int, short, void *);
void mda_dispatch_queue(int, short, void *);
+void mda_dispatch_runner(int, short, void *);
void mda_setup_events(struct smtpd *);
void mda_disable_events(struct smtpd *);
void mda_timeout(int, short, void *);
@@ -217,6 +218,52 @@ mda_dispatch_queue(int sig, short event, void *p)
break;
switch (imsg.hdr.type) {
+ default:
+ log_debug("parent_dispatch_queue: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+mda_dispatch_runner(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_RUNNER];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("parent_dispatch_runner: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
case IMSG_CREATE_BATCH: {
struct batch *batchp;
@@ -269,9 +316,8 @@ mda_dispatch_queue(int sig, short event, void *p)
break;
}
-
default:
- log_debug("parent_dispatch_queue: unexpected imsg %d",
+ log_debug("parent_dispatch_runner: unexpected imsg %d",
imsg.hdr.type);
break;
}
@@ -327,7 +373,8 @@ mda(struct smtpd *env)
struct peer peers[] = {
{ PROC_PARENT, mda_dispatch_parent },
- { PROC_QUEUE, mda_dispatch_queue }
+ { PROC_QUEUE, mda_dispatch_queue },
+ { PROC_RUNNER, mda_dispatch_runner }
};
switch (pid = fork()) {
@@ -373,7 +420,7 @@ mda(struct smtpd *env)
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
- config_peers(env, peers, 2);
+ config_peers(env, peers, 3);
mda_setup_events(env);
event_dispatch();
diff --git a/usr.sbin/smtpd/mta.c b/usr.sbin/smtpd/mta.c
index a16a5b011a9..7e2f425b082 100644
--- a/usr.sbin/smtpd/mta.c
+++ b/usr.sbin/smtpd/mta.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: mta.c,v 1.7 2008/11/25 20:26:40 gilles Exp $ */
+/* $OpenBSD: mta.c,v 1.8 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
@@ -44,6 +44,7 @@ __dead void mta_shutdown(void);
void mta_sig_handler(int, short, void *);
void mta_dispatch_parent(int, short, void *);
void mta_dispatch_queue(int, short, void *);
+void mta_dispatch_runner(int, short, void *);
void mta_setup_events(struct smtpd *);
void mta_disable_events(struct smtpd *);
void mta_timeout(int, short, void *);
@@ -150,6 +151,72 @@ mta_dispatch_queue(int sig, short event, void *p)
break;
switch (imsg.hdr.type) {
+ case IMSG_QUEUE_MESSAGE_FD: {
+ struct batch *batchp;
+ int fd;
+
+ if ((fd = imsg_get_fd(ibuf, &imsg)) == -1) {
+ /* NEEDS_FIX - unsure yet how it must be handled */
+ errx(1, "imsg_get_fd");
+ }
+
+ batchp = (struct batch *)imsg.data;
+ batchp = batch_by_id(env, batchp->id);
+
+ if ((batchp->messagefp = fdopen(fd, "r")) == NULL)
+ err(1, "fdopen");
+
+ evbuffer_add_printf(batchp->bev->output, "DATA\r\n");
+
+ bufferevent_enable(batchp->bev, EV_WRITE|EV_READ);
+ break;
+ }
+ default:
+ log_debug("parent_dispatch_mta: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+mta_dispatch_runner(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_RUNNER];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("mta_dispatch_runner: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
case IMSG_CREATE_BATCH: {
struct batch *batchp;
@@ -202,28 +269,8 @@ mta_dispatch_queue(int sig, short event, void *p)
}
break;
}
- case IMSG_QUEUE_MESSAGE_FD: {
- struct batch *batchp;
- int fd;
-
- if ((fd = imsg_get_fd(ibuf, &imsg)) == -1) {
- /* NEEDS_FIX - unsure yet how it must be handled */
- errx(1, "imsg_get_fd");
- }
-
- batchp = (struct batch *)imsg.data;
- batchp = batch_by_id(env, batchp->id);
-
- if ((batchp->messagefp = fdopen(fd, "r")) == NULL)
- err(1, "fdopen");
-
- evbuffer_add_printf(batchp->bev->output, "DATA\r\n");
-
- bufferevent_enable(batchp->bev, EV_WRITE|EV_READ);
- break;
- }
default:
- log_debug("parent_dispatch_mta: unexpected imsg %d",
+ log_debug("mta_dispatch_runner: unexpected imsg %d",
imsg.hdr.type);
break;
}
@@ -277,7 +324,8 @@ mta(struct smtpd *env)
struct event ev_sigterm;
struct peer peers[] = {
- { PROC_QUEUE, mta_dispatch_queue }
+ { PROC_QUEUE, mta_dispatch_queue },
+ { PROC_RUNNER, mta_dispatch_runner }
};
switch (pid = fork()) {
@@ -320,7 +368,7 @@ mta(struct smtpd *env)
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
- config_peers(env, peers, 1);
+ config_peers(env, peers, 2);
SPLAY_INIT(&env->batch_queue);
diff --git a/usr.sbin/smtpd/queue.c b/usr.sbin/smtpd/queue.c
index 6d67033a1be..f3e4fc4422c 100644
--- a/usr.sbin/smtpd/queue.c
+++ b/usr.sbin/smtpd/queue.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: queue.c,v 1.18 2008/12/03 20:08:08 gilles Exp $ */
+/* $OpenBSD: queue.c,v 1.19 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -51,10 +51,10 @@ void queue_dispatch_smtp(int, short, void *);
void queue_dispatch_mda(int, short, void *);
void queue_dispatch_mta(int, short, void *);
void queue_dispatch_lka(int, short, void *);
+void queue_dispatch_runner(int, short, void *);
void queue_setup_events(struct smtpd *);
void queue_disable_events(struct smtpd *);
void queue_timeout(int, short, void *);
-void queue_process_runqueue(int, short, void *);
int queue_create_incoming_layout(char *);
int queue_record_envelope(struct message *);
int queue_remove_envelope(struct message *);
@@ -81,7 +81,6 @@ void batch_send(struct smtpd *, struct batch *, time_t);
u_int32_t hash(u_int8_t *, size_t);
struct batch *queue_record_batch(struct smtpd *, struct message *);
struct batch *batch_by_id(struct smtpd *, u_int64_t);
-struct batch *batch_lookup(struct smtpd *, struct message *);
struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t);
void
@@ -541,12 +540,54 @@ queue_dispatch_lka(int sig, short event, void *p)
break;
}
- case IMSG_LKA_MX_LOOKUP: {
- queue_batch_resolved(env, imsg.data);
+ default:
+ log_debug("queue_dispatch_lka: unexpected imsg %d",
+ imsg.hdr.type);
break;
}
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+queue_dispatch_runner(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_RUNNER];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("queue_dispatch_runner: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
default:
- log_debug("queue_dispatch_lka: unexpected imsg %d",
+ log_debug("queue_dispatch_runner: unexpected imsg %d",
imsg.hdr.type);
break;
}
@@ -565,57 +606,11 @@ queue_shutdown(void)
void
queue_setup_events(struct smtpd *env)
{
- struct timeval tv;
-
- evtimer_set(&env->sc_ev, queue_timeout, env);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
-
- evtimer_set(&env->sc_rqev, queue_process_runqueue, env);
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_rqev, &tv);
}
void
queue_disable_events(struct smtpd *env)
{
- evtimer_del(&env->sc_ev);
-}
-
-void
-queue_timeout(int fd, short event, void *p)
-{
- struct smtpd *env = p;
- struct timeval tv;
- time_t curtime;
- struct batch *batchp, *nxt;
-
- queue_process(env);
-
- curtime = time(NULL);
-
- for (batchp = SPLAY_MIN(batchtree, &env->batch_queue);
- batchp != NULL;
- batchp = nxt) {
- nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp);
- if ((batchp->type & T_MTA_BATCH) &&
- (batchp->flags & F_BATCH_RESOLVED) == 0) {
- continue;
- }
-
- batch_send(env, batchp, curtime);
-
- SPLAY_REMOVE(batchtree, &env->batch_queue, batchp);
- bzero(batchp, sizeof(struct batch));
- free(batchp);
-
- }
-
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_ev, &tv);
}
pid_t
@@ -632,7 +627,8 @@ queue(struct smtpd *env)
{ PROC_SMTP, queue_dispatch_smtp },
{ PROC_MDA, queue_dispatch_mda },
{ PROC_MTA, queue_dispatch_mta },
- { PROC_LKA, queue_dispatch_lka }
+ { PROC_LKA, queue_dispatch_lka },
+ { PROC_RUNNER, queue_dispatch_runner }
};
switch (pid = fork()) {
@@ -667,8 +663,6 @@ queue(struct smtpd *env)
fatal("queue: cannot drop privileges");
#endif
- SPLAY_INIT(&env->batch_queue);
-
event_init();
signal_set(&ev_sigint, SIGINT, queue_sig_handler, env);
@@ -678,7 +672,7 @@ queue(struct smtpd *env)
signal(SIGPIPE, SIG_IGN);
signal(SIGHUP, SIG_IGN);
- config_peers(env, peers, 5);
+ config_peers(env, peers, 6);
queue_setup_events(env);
event_dispatch();
@@ -687,257 +681,6 @@ queue(struct smtpd *env)
return (0);
}
-void
-queue_process(struct smtpd *env)
-{
- u_int16_t cbucket = 0;
- static u_int16_t lbucket = 0;
- DIR *dirp;
- struct dirent *dp;
- const char *errstr;
- static u_int8_t bucketdone = 1;
-
- if (! bucketdone) {
- bucketdone = queue_process_bucket(env, lbucket);
- if (bucketdone)
- lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
- return;
- }
-
- dirp = opendir(PATH_QUEUE);
- if (dirp == NULL)
- fatal("queue_process: opendir");
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
-
- cbucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr);
- if (errstr) {
- log_warn("queue_process: %s/%s is not a valid bucket",
- PATH_QUEUE, dp->d_name);
- continue;
- }
-
- if (cbucket == lbucket)
- break;
- }
- closedir(dirp);
-
- if (dp == NULL) {
- lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
- return;
- }
-
- bucketdone = queue_process_bucket(env, cbucket);
- if (bucketdone)
- lbucket = (lbucket + 1) % DIRHASH_BUCKETS;
-}
-
-int
-queue_process_bucket(struct smtpd *env, u_int16_t bucket)
-{
- int spret;
- static DIR *dirp = NULL;
- struct dirent *dp;
- static char *msgid = NULL;
- char bucketpath[MAXPATHLEN];
- static u_int8_t messagedone = 1;
-
- if (! messagedone) {
- messagedone = queue_process_message(env, msgid);
- if (! messagedone)
- return 0;
- msgid = NULL;
- }
-
- spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_bucket: snprintf");
-
- if (dirp == NULL) {
- dirp = opendir(bucketpath);
- if (dirp == NULL)
- fatal("queue_process_bucket: opendir");
- }
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
-
- break;
- }
-
- if (dp != NULL) {
- msgid = dp->d_name;
- messagedone = queue_process_message(env, msgid);
- if (! messagedone)
- return 0;
- msgid = NULL;
- }
-
- closedir(dirp);
- dirp = NULL;
- return 1;
-}
-
-int
-queue_process_message(struct smtpd *env, char *messageid)
-{
- int spret;
- static DIR *dirp = NULL;
- struct dirent *dp;
- char evppath[MAXPATHLEN];
- u_int16_t hval = 0;
-
- hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS;
-
- spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval,
- messageid, PATH_ENVELOPES);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_message: snprintf");
-
- if (dirp == NULL) {
- dirp = opendir(evppath);
- if (dirp == NULL)
- fatal("queue_process_message: opendir");
- }
-
- while ((dp = readdir(dirp)) != NULL) {
-
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
- break;
- }
-
- if (dp != NULL) {
- queue_process_envelope(env, messageid, dp->d_name);
- return 0;
- }
-
- closedir(dirp);
- dirp = NULL;
- return 1;
-}
-
-void
-queue_process_envelope(struct smtpd *env, char *msgid, char *evpid)
-{
- int spret;
- struct message message;
- time_t tm;
- char evppath[MAXPATHLEN];
- char rqpath[MAXPATHLEN];
- u_int16_t hval;
- struct stat sb;
-
- if (! queue_load_envelope(&message, evpid)) {
- log_debug("failed to load envelope: %s", evpid);
- return;
- }
-
- tm = time(NULL);
-
- if (! queue_message_schedule(&message, tm)) {
- if (message.flags & F_MESSAGE_EXPIRED) {
- log_debug("message has expired, mdaemon");
- queue_remove_envelope(&message);
- }
- return;
- }
-
- message.flags |= F_MESSAGE_SCHEDULED;
- queue_update_envelope(&message);
-
- log_debug("SCHEDULED: %s", evpid);
- hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS;
- spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval,
- msgid, PATH_ENVELOPES, evpid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_envelope: snprintf");
-
- spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid);
- if (spret == -1 || spret >= MAXPATHLEN)
- fatal("queue_process_envelope: snprintf");
-
- if (stat(rqpath, &sb) == -1) {
- if (errno != ENOENT)
- fatal("queue_process_envelope: stat");
-
- if (symlink(evppath, rqpath) == -1) {
- log_info("queue_process_envelope: "
- "failed to place envelope in runqueue");
- }
- }
-}
-
-void
-queue_process_runqueue(int fd, short event, void *p)
-{
- DIR *dirp;
- struct dirent *dp;
- struct message message;
- struct message *messagep;
- struct batch *batchp;
- char pathname[MAXPATHLEN];
- time_t tm;
- struct smtpd *env = p;
- struct timeval tv;
-
- tm = time(NULL);
-
- dirp = opendir(PATH_RUNQUEUE);
- if (dirp == NULL)
- fatal("queue_process_runqueue: opendir");
-
- while ((dp = readdir(dirp)) != NULL) {
- if (strcmp(dp->d_name, ".") == 0 ||
- strcmp(dp->d_name, "..") == 0)
- continue;
-
- /* XXX */
- snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name);
- unlink(pathname);
-
- if (! queue_load_envelope(&message, dp->d_name)) {
- log_debug("failed to load envelope");
- continue;
- }
-
- if (message.flags & F_MESSAGE_PROCESSING)
- continue;
-
- message.lasttry = tm;
- message.flags &= ~F_MESSAGE_SCHEDULED;
- message.flags |= F_MESSAGE_PROCESSING;
- queue_update_envelope(&message);
-
- messagep = calloc(1, sizeof (struct message));
- if (messagep == NULL)
- err(1, "calloc");
- *messagep = message;
-
- batchp = batch_lookup(env, messagep);
- if (batchp != NULL)
- messagep->batch_id = batchp->id;
-
- batchp = queue_record_batch(env, messagep);
- if (messagep->batch_id == 0)
- messagep->batch_id = batchp->id;
- }
-
- closedir(dirp);
-
- tv.tv_sec = 0;
- tv.tv_usec = 10;
- evtimer_add(&env->sc_rqev, &tv);
-}
-
u_int64_t
queue_generate_id(void)
{
@@ -955,62 +698,6 @@ queue_generate_id(void)
return (id);
}
-struct batch *
-queue_record_batch(struct smtpd *env, struct message *messagep)
-{
- struct batch *batchp;
- struct path *path;
-
- batchp = NULL;
- if (messagep->batch_id != 0) {
- batchp = batch_by_id(env, messagep->batch_id);
- if (batchp == NULL)
- errx(1, "%s: internal inconsistency.", __func__);
- }
-
- if (batchp == NULL) {
- batchp = calloc(1, sizeof(struct batch));
- if (batchp == NULL)
- err(1, "%s: calloc", __func__);
-
- batchp->id = queue_generate_id();
- batchp->creation = messagep->creation;
-
- (void)strlcpy(batchp->message_id, messagep->message_id,
- sizeof(batchp->message_id));
-
- TAILQ_INIT(&batchp->messages);
- SPLAY_INSERT(batchtree, &env->batch_queue, batchp);
-
- if (messagep->type & T_DAEMON_MESSAGE) {
- batchp->type = T_DAEMON_BATCH;
- path = &messagep->sender;
- }
- else {
- path = &messagep->recipient;
- }
-
- batchp->rule = path->rule;
-
- (void)strlcpy(batchp->hostname, path->domain,
- sizeof(batchp->hostname));
-
- if (IS_MAILBOX(path->rule.r_action) ||
- IS_EXT(path->rule.r_action)) {
- batchp->type |= T_MDA_BATCH;
- }
- else {
- batchp->type |= T_MTA_BATCH;
- imsg_compose(env->sc_ibufs[PROC_LKA], IMSG_LKA_MX_LOOKUP, 0, 0, -1,
- batchp, sizeof(struct batch));
- }
- }
-
- TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry);
-
- return batchp;
-}
-
int
queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct message *messagep)
{
@@ -1028,195 +715,6 @@ queue_remove_batch_message(struct smtpd *env, struct batch *batchp, struct messa
return 0;
}
-int
-queue_batch_resolved(struct smtpd *env, struct batch *lookup)
-{
- u_int32_t i;
- struct batch *batchp;
-
- batchp = batch_by_id(env, lookup->id);
- batchp->getaddrinfo_error = lookup->getaddrinfo_error;
- batchp->mx_cnt = lookup->mx_cnt;
-
-/*
- EAI_NODATA no address associated with hostname
- EAI_NONAME hostname or servname not provided, or not known
- EAI_PROTOCOL resolved protocol is unknown
- EAI_SERVICE servname not supported for ai_socktype
- EAI_SOCKTYPE ai_socktype not supported
- EAI_SYSTEM system error returned in errno
-
-
- */
-
- switch (batchp->getaddrinfo_error) {
- case EAI_ADDRFAMILY:
- case EAI_BADFLAGS:
- case EAI_BADHINTS:
- case EAI_FAIL:
- case EAI_FAMILY:
- case EAI_NODATA:
- case EAI_NONAME:
- case EAI_SERVICE:
- case EAI_SOCKTYPE:
- case EAI_SYSTEM:
- /* XXX */
- /*
- * In the case of a DNS permanent error, do not generate a
- * daemon message if the error originates from one already
- * as this would cause a loop. Remove the initial batch as
- * it will never succeed.
- *
- */
- return 0;
-
- case EAI_AGAIN:
- case EAI_MEMORY:
- /* XXX */
- /*
- * Do not generate a daemon message if this error happened
- * while processing a daemon message. Do NOT remove batch,
- * it may succeed later.
- */
- return 0;
-
- default:
- batchp->flags |= F_BATCH_RESOLVED;
- for (i = 0; i < batchp->mx_cnt; ++i)
- batchp->mxarray[i].ss = lookup->mxarray[i].ss;
- }
- return 1;
-}
-
-struct batch *
-batch_lookup(struct smtpd *env, struct message *message)
-{
- struct batch *batchp;
- struct batch lookup;
-
- /* If message->batch_id != 0, we can retrieve batch by id */
- if (message->batch_id != 0) {
- lookup.id = message->batch_id;
- return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
- }
-
- /* We do not know the batch_id yet, maybe it was created but we could not
- * be notified, or it just does not exist. Let's scan to see if we can do
- * a match based on our message_id and flags.
- */
- SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) {
-
- if (batchp->type != message->type)
- continue;
-
- if (strcasecmp(batchp->message_id, message->message_id) != 0)
- continue;
-
- if (batchp->type & T_MTA_BATCH)
- if (strcasecmp(batchp->hostname, message->recipient.domain) != 0)
- continue;
-
- break;
- }
-
- return batchp;
-}
-
-int
-batch_cmp(struct batch *s1, struct batch *s2)
-{
- /*
- * do not return u_int64_t's
- */
- if (s1->id < s2->id)
- return (-1);
-
- if (s1->id > s2->id)
- return (1);
-
- return (0);
-}
-
-int
-queue_message_schedule(struct message *messagep, time_t tm)
-{
- time_t delay;
-
- /* Batch has been in the queue for too long and expired */
- if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) {
- messagep->flags |= F_MESSAGE_EXPIRED;
- return 0;
- }
-
- if (messagep->retry == 255) {
- messagep->flags |= F_MESSAGE_EXPIRED;
- return 0;
- }
-
- if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0)
- return 0;
-
- if ((messagep->flags & F_MESSAGE_PROCESSING) != 0)
- return 0;
-
- if (messagep->lasttry == 0)
- return 1;
-
- delay = SMTPD_QUEUE_MAXINTERVAL;
-
- if (messagep->type & T_MDA_MESSAGE) {
- if (messagep->retry < 5)
- return 1;
-
- if (messagep->retry < 15)
- delay = (messagep->retry * 60) + arc4random() % 60;
- }
-
- if (messagep->type & T_MTA_MESSAGE) {
- if (messagep->retry < 3)
- delay = SMTPD_QUEUE_INTERVAL;
- else if (messagep->retry <= 7) {
- delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3));
- if (delay > SMTPD_QUEUE_MAXINTERVAL)
- delay = SMTPD_QUEUE_MAXINTERVAL;
- }
- }
-
- if (tm >= messagep->lasttry + delay)
- return 1;
-
- return 0;
-}
-
-void
-batch_send(struct smtpd *env, struct batch *batchp, time_t curtime)
-{
- u_int8_t proctype;
- struct message *messagep;
-
- if ((batchp->type & (T_MDA_BATCH|T_MTA_BATCH)) == 0)
- fatal("batch_send: unknown batch type");
-
- if (batchp->type & T_MDA_BATCH)
- proctype = PROC_MDA;
- else if (batchp->type & T_MTA_BATCH)
- proctype = PROC_MTA;
-
- imsg_compose(env->sc_ibufs[proctype], IMSG_CREATE_BATCH, 0, 0, -1,
- batchp, sizeof (struct batch));
-
- while ((messagep = TAILQ_FIRST(&batchp->messages))) {
- imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_APPEND, 0, 0,
- -1, messagep, sizeof (struct message));
- TAILQ_REMOVE(&batchp->messages, messagep, entry);
- bzero(messagep, sizeof(struct message));
- free(messagep);
- }
-
- imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_CLOSE, 0, 0, -1,
- batchp, sizeof(struct batch));
-}
-
struct batch *
batch_by_id(struct smtpd *env, u_int64_t id)
{
@@ -1763,5 +1261,3 @@ hash(u_int8_t *buf, size_t len)
return h;
}
-
-SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp);
diff --git a/usr.sbin/smtpd/runner.c b/usr.sbin/smtpd/runner.c
new file mode 100644
index 00000000000..447e6e14e93
--- /dev/null
+++ b/usr.sbin/smtpd/runner.c
@@ -0,0 +1,896 @@
+/* $OpenBSD: runner.c,v 1.1 2008/12/05 02:51:32 gilles Exp $ */
+
+/*
+ * Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
+ * Copyright (c) 2008 Pierre-Yves Ritschard <pyr@openbsd.org>
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/param.h>
+#include <sys/socket.h>
+#include <sys/stat.h>
+#include <sys/time.h>
+
+#include <netinet/in.h>
+#include <arpa/inet.h>
+
+#include <dirent.h>
+#include <err.h>
+#include <errno.h>
+#include <event.h>
+#include <fcntl.h>
+#include <netdb.h>
+#include <pwd.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <time.h>
+#include <unistd.h>
+
+#include "smtpd.h"
+
+__dead void runner_shutdown(void);
+void runner_sig_handler(int, short, void *);
+void runner_dispatch_control(int, short, void *);
+void runner_dispatch_queue(int, short, void *);
+void runner_dispatch_mda(int, short, void *);
+void runner_dispatch_mta(int, short, void *);
+void runner_dispatch_lka(int, short, void *);
+void runner_setup_events(struct smtpd *);
+void runner_disable_events(struct smtpd *);
+void runner_timeout(int, short, void *);
+
+void runner_process_queue(struct smtpd *);
+void runner_process_bucket(struct smtpd *, u_int16_t);
+void runner_process_message(struct smtpd *, char *);
+void runner_process_envelope(struct smtpd *, char *, char *);
+void runner_process_runqueue(struct smtpd *);
+void runner_process_batchqueue(struct smtpd *);
+
+int runner_batch_resolved(struct smtpd *, struct batch *);
+void runner_batch_dispatch(struct smtpd *, struct batch *, time_t);
+
+int runner_message_schedule(struct message *, time_t);
+
+struct batch *batch_record(struct smtpd *, struct message *);
+struct batch *batch_lookup(struct smtpd *, struct message *);
+
+int queue_load_envelope(struct message *, char *);
+int queue_update_envelope(struct message *);
+int queue_remove_envelope(struct message *);
+
+u_int32_t hash(u_int8_t *, size_t);
+
+void
+runner_sig_handler(int sig, short event, void *p)
+{
+ switch (sig) {
+ case SIGINT:
+ case SIGTERM:
+ runner_shutdown();
+ break;
+ default:
+ fatalx("runner_sig_handler: unexpected signal");
+ }
+}
+
+void
+runner_dispatch_control(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_CONTROL];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("runner_dispatch_control: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
+ default:
+ log_debug("queue_dispatch_control: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+runner_dispatch_queue(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_QUEUE];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("runner_dispatch_queue: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
+ default:
+ log_debug("runner_dispatch_queue: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+runner_dispatch_mda(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_MDA];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("runner_dispatch_mda: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
+ default:
+ log_debug("runner_dispatch_mda: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+runner_dispatch_mta(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_MTA];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("runner_dispatch_mta: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
+
+ default:
+ log_debug("runner_dispatch_mta: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+runner_dispatch_lka(int sig, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct imsgbuf *ibuf;
+ struct imsg imsg;
+ ssize_t n;
+
+ ibuf = env->sc_ibufs[PROC_LKA];
+ switch (event) {
+ case EV_READ:
+ if ((n = imsg_read(ibuf)) == -1)
+ fatal("imsg_read_error");
+ if (n == 0) {
+ /* this pipe is dead, so remove the event handler */
+ event_del(&ibuf->ev);
+ event_loopexit(NULL);
+ return;
+ }
+ break;
+ case EV_WRITE:
+ if (msgbuf_write(&ibuf->w) == -1)
+ fatal("msgbuf_write");
+ imsg_event_add(ibuf);
+ return;
+ default:
+ fatalx("unknown event");
+ }
+
+ for (;;) {
+ if ((n = imsg_get(ibuf, &imsg)) == -1)
+ fatal("runner_dispatch_lka: imsg_read error");
+ if (n == 0)
+ break;
+
+ switch (imsg.hdr.type) {
+ case IMSG_LKA_MX_LOOKUP: {
+ runner_batch_resolved(env, imsg.data);
+ break;
+ }
+ default:
+ log_debug("runner_dispatch_lka: unexpected imsg %d",
+ imsg.hdr.type);
+ break;
+ }
+ imsg_free(&imsg);
+ }
+ imsg_event_add(ibuf);
+}
+
+void
+runner_shutdown(void)
+{
+ log_info("runner handler");
+ _exit(0);
+}
+
+void
+runner_setup_events(struct smtpd *env)
+{
+
+ struct timeval tv;
+
+ evtimer_set(&env->sc_ev, runner_timeout, env);
+ tv.tv_sec = 0;
+ tv.tv_usec = 10;
+ evtimer_add(&env->sc_ev, &tv);
+}
+
+void
+runner_disable_events(struct smtpd *env)
+{
+ evtimer_del(&env->sc_ev);
+}
+
+pid_t
+runner(struct smtpd *env)
+{
+ pid_t pid;
+ struct passwd *pw;
+
+ struct event ev_sigint;
+ struct event ev_sigterm;
+
+ struct peer peers[] = {
+ { PROC_CONTROL, runner_dispatch_control },
+ { PROC_MDA, runner_dispatch_mda },
+ { PROC_MTA, runner_dispatch_mta },
+ { PROC_QUEUE, runner_dispatch_queue },
+ { PROC_LKA, runner_dispatch_lka },
+ };
+
+ switch (pid = fork()) {
+ case -1:
+ fatal("runner: cannot fork");
+ case 0:
+ break;
+ default:
+ return (pid);
+ }
+
+ purge_config(env, PURGE_EVERYTHING);
+
+ pw = env->sc_pw;
+
+#ifndef DEBUG
+ if (chroot(PATH_SPOOL) == -1)
+ fatal("runner: chroot");
+ if (chdir("/") == -1)
+ fatal("runner: chdir(\"/\")");
+#else
+#warning disabling privilege revocation and chroot in DEBUG MODE
+#endif
+
+ setproctitle("runner");
+ smtpd_process = PROC_RUNNER;
+
+#ifndef DEBUG
+ if (setgroups(1, &pw->pw_gid) ||
+ setresgid(pw->pw_gid, pw->pw_gid, pw->pw_gid) ||
+ setresuid(pw->pw_uid, pw->pw_uid, pw->pw_uid))
+ fatal("runner: cannot drop privileges");
+#endif
+
+ SPLAY_INIT(&env->batch_queue);
+
+ event_init();
+
+ signal_set(&ev_sigint, SIGINT, runner_sig_handler, env);
+ signal_set(&ev_sigterm, SIGTERM, runner_sig_handler, env);
+ signal_add(&ev_sigint, NULL);
+ signal_add(&ev_sigterm, NULL);
+ signal(SIGPIPE, SIG_IGN);
+ signal(SIGHUP, SIG_IGN);
+
+ config_peers(env, peers, 5);
+
+ runner_setup_events(env);
+ event_dispatch();
+ runner_shutdown();
+
+ return (0);
+}
+
+void
+runner_timeout(int fd, short event, void *p)
+{
+ struct smtpd *env = p;
+ struct timeval tv;
+
+ runner_process_queue(env);
+ runner_process_runqueue(env);
+ runner_process_batchqueue(env);
+
+ tv.tv_sec = 1;
+ tv.tv_usec = 0;
+ evtimer_add(&env->sc_ev, &tv);
+}
+
+void
+runner_process_queue(struct smtpd *env)
+{
+ DIR *dirp;
+ struct dirent *dp;
+ const char *errstr;
+ u_int16_t bucket;
+
+ dirp = opendir(PATH_QUEUE);
+ if (dirp == NULL)
+ fatal("queue_process: opendir");
+
+ while ((dp = readdir(dirp)) != NULL) {
+
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
+
+ bucket = strtonum(dp->d_name, 0, DIRHASH_BUCKETS - 1, &errstr);
+ if (errstr) {
+ log_warn("queue_process: %s/%s is not a valid bucket",
+ PATH_QUEUE, dp->d_name);
+ continue;
+ }
+
+ runner_process_bucket(env, bucket);
+ }
+ closedir(dirp);
+}
+
+void
+runner_process_bucket(struct smtpd *env, u_int16_t bucket)
+{
+ int spret;
+ DIR *dirp = NULL;
+ struct dirent *dp;
+ char bucketpath[MAXPATHLEN];
+
+ spret = snprintf(bucketpath, MAXPATHLEN, "%s/%d", PATH_QUEUE, bucket);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_process_bucket: snprintf");
+
+ dirp = opendir(bucketpath);
+ if (dirp == NULL)
+ fatal("queue_process_bucket: opendir");
+
+ while ((dp = readdir(dirp)) != NULL) {
+
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
+
+ runner_process_message(env, dp->d_name);
+ }
+
+ closedir(dirp);
+}
+
+void
+runner_process_message(struct smtpd *env, char *messageid)
+{
+ int spret;
+ DIR *dirp = NULL;
+ struct dirent *dp;
+ char evppath[MAXPATHLEN];
+ u_int16_t hval = 0;
+
+ hval = hash(messageid, strlen(messageid)) % DIRHASH_BUCKETS;
+
+ spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s", PATH_QUEUE, hval,
+ messageid, PATH_ENVELOPES);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_process_message: snprintf");
+
+ dirp = opendir(evppath);
+ if (dirp == NULL)
+ fatal("queue_process_message: opendir");
+
+ while ((dp = readdir(dirp)) != NULL) {
+
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
+
+ runner_process_envelope(env, dp->d_name, dp->d_name);
+ }
+
+ closedir(dirp);
+}
+
+void
+runner_process_envelope(struct smtpd *env, char *msgid, char *evpid)
+{
+ int spret;
+ struct message message;
+ time_t tm;
+ char evppath[MAXPATHLEN];
+ char rqpath[MAXPATHLEN];
+ u_int16_t hval;
+ struct stat sb;
+
+ if (! queue_load_envelope(&message, evpid)) {
+ log_debug("failed to load envelope: %s", evpid);
+ return;
+ }
+
+ tm = time(NULL);
+
+ if (! runner_message_schedule(&message, tm)) {
+ if (message.flags & F_MESSAGE_EXPIRED) {
+ log_debug("message has expired, mdaemon");
+ queue_remove_envelope(&message);
+ }
+ return;
+ }
+
+ message.flags |= F_MESSAGE_SCHEDULED;
+ queue_update_envelope(&message);
+
+ hval = hash(msgid, strlen(msgid)) % DIRHASH_BUCKETS;
+ spret = snprintf(evppath, MAXPATHLEN, "%s/%d/%s%s/%s", PATH_QUEUE, hval,
+ msgid, PATH_ENVELOPES, evpid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_process_envelope: snprintf");
+
+ spret = snprintf(rqpath, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, evpid);
+ if (spret == -1 || spret >= MAXPATHLEN)
+ fatal("queue_process_envelope: snprintf");
+
+ if (stat(rqpath, &sb) == -1) {
+ if (errno != ENOENT)
+ fatal("queue_process_envelope: stat");
+
+ if (symlink(evppath, rqpath) == -1) {
+ log_info("queue_process_envelope: "
+ "failed to place envelope in runqueue");
+ }
+ }
+}
+
+void
+runner_process_runqueue(struct smtpd *env)
+{
+ DIR *dirp;
+ struct dirent *dp;
+ struct message message;
+ struct message *messagep;
+ struct batch *batchp;
+ char pathname[MAXPATHLEN];
+ time_t tm;
+
+ tm = time(NULL);
+
+ dirp = opendir(PATH_RUNQUEUE);
+ if (dirp == NULL)
+ fatal("queue_process_runqueue: opendir");
+
+ while ((dp = readdir(dirp)) != NULL) {
+ if (strcmp(dp->d_name, ".") == 0 ||
+ strcmp(dp->d_name, "..") == 0)
+ continue;
+
+ snprintf(pathname, MAXPATHLEN, "%s/%s", PATH_RUNQUEUE, dp->d_name);
+ unlink(pathname);
+
+ if (! queue_load_envelope(&message, dp->d_name)) {
+ log_debug("failed to load envelope");
+ continue;
+ }
+
+ if (message.flags & F_MESSAGE_PROCESSING)
+ continue;
+
+ message.lasttry = tm;
+ message.flags &= ~F_MESSAGE_SCHEDULED;
+ message.flags |= F_MESSAGE_PROCESSING;
+ queue_update_envelope(&message);
+
+ messagep = calloc(1, sizeof (struct message));
+ if (messagep == NULL)
+ err(1, "calloc");
+ *messagep = message;
+
+ batchp = batch_lookup(env, messagep);
+ if (batchp != NULL)
+ messagep->batch_id = batchp->id;
+
+ batchp = batch_record(env, messagep);
+ if (messagep->batch_id == 0)
+ messagep->batch_id = batchp->id;
+ }
+
+ closedir(dirp);
+}
+
+void
+runner_process_batchqueue(struct smtpd *env)
+{
+ time_t curtime;
+ struct batch *batchp, *nxt;
+
+ curtime = time(NULL);
+ for (batchp = SPLAY_MIN(batchtree, &env->batch_queue);
+ batchp != NULL;
+ batchp = nxt) {
+ nxt = SPLAY_NEXT(batchtree, &env->batch_queue, batchp);
+ if ((batchp->type & T_MTA_BATCH) &&
+ (batchp->flags & F_BATCH_RESOLVED) == 0) {
+ continue;
+ }
+
+ runner_batch_dispatch(env, batchp, curtime);
+
+ SPLAY_REMOVE(batchtree, &env->batch_queue, batchp);
+ bzero(batchp, sizeof(struct batch));
+ free(batchp);
+ }
+}
+
+int
+runner_batch_resolved(struct smtpd *env, struct batch *lookup)
+{
+ u_int32_t i;
+ struct batch *batchp;
+
+ batchp = batch_by_id(env, lookup->id);
+ batchp->getaddrinfo_error = lookup->getaddrinfo_error;
+ batchp->mx_cnt = lookup->mx_cnt;
+
+/*
+ EAI_NODATA no address associated with hostname
+ EAI_NONAME hostname or servname not provided, or not known
+ EAI_PROTOCOL resolved protocol is unknown
+ EAI_SERVICE servname not supported for ai_socktype
+ EAI_SOCKTYPE ai_socktype not supported
+ EAI_SYSTEM system error returned in errno
+
+
+ */
+
+ switch (batchp->getaddrinfo_error) {
+ case EAI_ADDRFAMILY:
+ case EAI_BADFLAGS:
+ case EAI_BADHINTS:
+ case EAI_FAIL:
+ case EAI_FAMILY:
+ case EAI_NODATA:
+ case EAI_NONAME:
+ case EAI_SERVICE:
+ case EAI_SOCKTYPE:
+ case EAI_SYSTEM:
+ /* XXX */
+ /*
+ * In the case of a DNS permanent error, do not generate a
+ * daemon message if the error originates from one already
+ * as this would cause a loop. Remove the initial batch as
+ * it will never succeed.
+ *
+ */
+ return 0;
+
+ case EAI_AGAIN:
+ case EAI_MEMORY:
+ /* XXX */
+ /*
+ * Do not generate a daemon message if this error happened
+ * while processing a daemon message. Do NOT remove batch,
+ * it may succeed later.
+ */
+ return 0;
+
+ default:
+ batchp->flags |= F_BATCH_RESOLVED;
+ for (i = 0; i < batchp->mx_cnt; ++i)
+ batchp->mxarray[i].ss = lookup->mxarray[i].ss;
+ }
+ return 1;
+}
+
+void
+runner_batch_dispatch(struct smtpd *env, struct batch *batchp, time_t curtime)
+{
+ u_int8_t proctype;
+ struct message *messagep;
+
+ if ((batchp->type & (T_MDA_BATCH|T_MTA_BATCH)) == 0)
+ fatal("batch_send: unknown batch type");
+
+ if (batchp->type & T_MDA_BATCH)
+ proctype = PROC_MDA;
+ else if (batchp->type & T_MTA_BATCH)
+ proctype = PROC_MTA;
+
+ imsg_compose(env->sc_ibufs[proctype], IMSG_CREATE_BATCH, 0, 0, -1,
+ batchp, sizeof (struct batch));
+
+ while ((messagep = TAILQ_FIRST(&batchp->messages))) {
+ imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_APPEND, 0, 0,
+ -1, messagep, sizeof (struct message));
+ TAILQ_REMOVE(&batchp->messages, messagep, entry);
+ bzero(messagep, sizeof(struct message));
+ free(messagep);
+ }
+
+ imsg_compose(env->sc_ibufs[proctype], IMSG_BATCH_CLOSE, 0, 0, -1,
+ batchp, sizeof(struct batch));
+}
+
+int
+runner_message_schedule(struct message *messagep, time_t tm)
+{
+ time_t delay;
+
+ /* Batch has been in the queue for too long and expired */
+ if (tm - messagep->creation >= SMTPD_QUEUE_EXPIRY) {
+ messagep->flags |= F_MESSAGE_EXPIRED;
+ return 0;
+ }
+
+ if (messagep->retry == 255) {
+ messagep->flags |= F_MESSAGE_EXPIRED;
+ return 0;
+ }
+
+ if ((messagep->flags & F_MESSAGE_SCHEDULED) != 0)
+ return 0;
+
+ if ((messagep->flags & F_MESSAGE_PROCESSING) != 0)
+ return 0;
+
+ if (messagep->lasttry == 0)
+ return 1;
+
+ delay = SMTPD_QUEUE_MAXINTERVAL;
+
+ if (messagep->type & T_MDA_MESSAGE) {
+ if (messagep->retry < 5)
+ return 1;
+
+ if (messagep->retry < 15)
+ delay = (messagep->retry * 60) + arc4random() % 60;
+ }
+
+ if (messagep->type & T_MTA_MESSAGE) {
+ if (messagep->retry < 3)
+ delay = SMTPD_QUEUE_INTERVAL;
+ else if (messagep->retry <= 7) {
+ delay = SMTPD_QUEUE_INTERVAL * (1 << (messagep->retry - 3));
+ if (delay > SMTPD_QUEUE_MAXINTERVAL)
+ delay = SMTPD_QUEUE_MAXINTERVAL;
+ }
+ }
+
+ if (tm >= messagep->lasttry + delay)
+ return 1;
+
+ return 0;
+}
+
+struct batch *
+batch_record(struct smtpd *env, struct message *messagep)
+{
+ struct batch *batchp;
+ struct path *path;
+
+ batchp = NULL;
+ if (messagep->batch_id != 0) {
+ batchp = batch_by_id(env, messagep->batch_id);
+ if (batchp == NULL)
+ errx(1, "%s: internal inconsistency.", __func__);
+ }
+ if (batchp == NULL) {
+ batchp = calloc(1, sizeof(struct batch));
+ if (batchp == NULL)
+ err(1, "%s: calloc", __func__);
+
+ batchp->id = queue_generate_id();
+ batchp->creation = messagep->creation;
+
+ (void)strlcpy(batchp->message_id, messagep->message_id,
+ sizeof(batchp->message_id));
+ TAILQ_INIT(&batchp->messages);
+ SPLAY_INSERT(batchtree, &env->batch_queue, batchp);
+
+ if (messagep->type & T_DAEMON_MESSAGE) {
+ batchp->type = T_DAEMON_BATCH;
+ path = &messagep->sender;
+ }
+ else {
+ path = &messagep->recipient;
+ }
+ batchp->rule = path->rule;
+
+ (void)strlcpy(batchp->hostname, path->domain,
+ sizeof(batchp->hostname));
+
+ if (IS_MAILBOX(path->rule.r_action) ||
+ IS_EXT(path->rule.r_action)) {
+ batchp->type |= T_MDA_BATCH;
+ }
+ else {
+ batchp->type |= T_MTA_BATCH;
+ imsg_compose(env->sc_ibufs[PROC_LKA], IMSG_LKA_MX_LOOKUP, 0, 0, -1,
+ batchp, sizeof(struct batch));
+ }
+ }
+
+ TAILQ_INSERT_TAIL(&batchp->messages, messagep, entry);
+ return batchp;
+}
+
+struct batch *
+batch_lookup(struct smtpd *env, struct message *message)
+{
+ struct batch *batchp;
+ struct batch lookup;
+
+ /* If message->batch_id != 0, we can retrieve batch by id */
+ if (message->batch_id != 0) {
+ lookup.id = message->batch_id;
+ return SPLAY_FIND(batchtree, &env->batch_queue, &lookup);
+ }
+
+ /* We do not know the batch_id yet, maybe it was created but we could not
+ * be notified, or it just does not exist. Let's scan to see if we can do
+ * a match based on our message_id and flags.
+ */
+ SPLAY_FOREACH(batchp, batchtree, &env->batch_queue) {
+
+ if (batchp->type != message->type)
+ continue;
+
+ if (strcasecmp(batchp->message_id, message->message_id) != 0)
+ continue;
+
+ if (batchp->type & T_MTA_BATCH)
+ if (strcasecmp(batchp->hostname, message->recipient.domain) != 0)
+ continue;
+
+ break;
+ }
+
+ return batchp;
+}
+
+int
+batch_cmp(struct batch *s1, struct batch *s2)
+{
+ /*
+ * do not return u_int64_t's
+ */
+ if (s1->id < s2->id)
+ return (-1);
+
+ if (s1->id > s2->id)
+ return (1);
+
+ return (0);
+}
+
+SPLAY_GENERATE(batchtree, batch, b_nodes, batch_cmp);
diff --git a/usr.sbin/smtpd/smtpd.c b/usr.sbin/smtpd/smtpd.c
index 1dd188dd22e..2cfc9760feb 100644
--- a/usr.sbin/smtpd/smtpd.c
+++ b/usr.sbin/smtpd/smtpd.c
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.c,v 1.12 2008/12/03 17:58:00 gilles Exp $ */
+/* $OpenBSD: smtpd.c,v 1.13 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -68,6 +68,7 @@ pid_t mda_pid = 0;
pid_t mta_pid = 0;
pid_t control_pid = 0;
pid_t smtp_pid = 0;
+pid_t runner_pid = 0;
int __b64_pton(char const *, unsigned char *, size_t);
@@ -94,7 +95,8 @@ parent_shutdown(void)
mda_pid,
mta_pid,
control_pid,
- smtp_pid
+ smtp_pid,
+ runner_pid
};
for (i = 0; i < sizeof(pids) / sizeof(pid); i++)
@@ -427,6 +429,7 @@ parent_sig_handler(int sig, short event, void *p)
{ mta_pid, "mail transfer agent" },
{ control_pid, "control process" },
{ smtp_pid, "smtp server" },
+ { runner_pid, "runner" },
{ 0, NULL },
};
@@ -571,6 +574,7 @@ main(int argc, char *argv[])
mta_pid = mta(&env);
smtp_pid = smtp(&env);
control_pid = control(&env);
+ runner_pid = runner(&env);
setproctitle("parent");
SPLAY_INIT(&env.mdaproc_queue);
diff --git a/usr.sbin/smtpd/smtpd.h b/usr.sbin/smtpd/smtpd.h
index 62710a22d7a..c3695e761ee 100644
--- a/usr.sbin/smtpd/smtpd.h
+++ b/usr.sbin/smtpd/smtpd.h
@@ -1,4 +1,4 @@
-/* $OpenBSD: smtpd.h,v 1.19 2008/12/04 17:24:13 cloder Exp $ */
+/* $OpenBSD: smtpd.h,v 1.20 2008/12/05 02:51:32 gilles Exp $ */
/*
* Copyright (c) 2008 Gilles Chehade <gilles@openbsd.org>
@@ -19,7 +19,7 @@
#define CONF_FILE "/etc/mail/smtpd.conf"
#define MAX_LISTEN 16
-#define PROC_COUNT 8
+#define PROC_COUNT 9
#define READ_BUF_SIZE 32768
#define MAX_NAME_SIZE 64
@@ -251,6 +251,7 @@ enum smtp_proc_type {
PROC_MDA,
PROC_MTA,
PROC_CONTROL,
+ PROC_RUNNER,
} smtpd_process;
struct peer {
@@ -623,7 +624,6 @@ struct smtpd {
u_int32_t sc_flags;
struct timeval sc_qintval;
struct event sc_ev;
- struct event sc_rqev;
int sc_pipes[PROC_COUNT]
[PROC_COUNT][2];
struct imsgbuf *sc_ibufs[PROC_COUNT];
@@ -717,7 +717,6 @@ int batch_cmp(struct batch *, struct batch *);
struct batch *batch_by_id(struct smtpd *, u_int64_t);
struct message *message_by_id(struct smtpd *, struct batch *, u_int64_t);
int queue_remove_batch_message(struct smtpd *, struct batch *, struct message *);
-SPLAY_PROTOTYPE(batchtree, batch, b_nodes, batch_cmp);
/* mda.c */
pid_t mda(struct smtpd *);
@@ -731,6 +730,11 @@ pid_t mta(struct smtpd *);
pid_t control(struct smtpd *);
void session_socket_blockmode(int, enum blockmodes);
+/* runner.c */
+pid_t runner(struct smtpd *);
+SPLAY_PROTOTYPE(batchtree, batch, b_nodes, batch_cmp);
+
+
/* smtp.c */
pid_t smtp(struct smtpd *);
void smtp_listener_setup(struct smtpd *, struct listener *);
diff --git a/usr.sbin/smtpd/smtpd/Makefile b/usr.sbin/smtpd/smtpd/Makefile
index 78247187961..83a10fe1500 100644
--- a/usr.sbin/smtpd/smtpd/Makefile
+++ b/usr.sbin/smtpd/smtpd/Makefile
@@ -1,11 +1,11 @@
-# $OpenBSD: Makefile,v 1.1 2008/12/04 13:36:58 todd Exp $
+# $OpenBSD: Makefile,v 1.2 2008/12/05 02:51:32 gilles Exp $
PROG= smtpd
SRCS= parse.y log.c config.c buffer.c imsg.c \
smtpd.c lka.c mfa.c queue.c mta.c mda.c control.c \
smtp.c smtp_session.c store.c \
ssl.c ssl_privsep.c dns.c aliases.c forward.c \
- map.c
+ map.c runner.c
MAN= smtpd.8 smtpd.conf.5
BINDIR= /usr/sbin